Skip to content

perf: Directly read gbq table for simple plans #1607

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 51 additions & 1 deletion bigframes/core/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,16 @@
import functools
import itertools
import typing
from typing import Callable, cast, Iterable, Mapping, Optional, Sequence, Tuple
from typing import (
AbstractSet,
Callable,
cast,
Iterable,
Mapping,
Optional,
Sequence,
Tuple,
)

import google.cloud.bigquery as bq

Expand Down Expand Up @@ -572,8 +581,39 @@ def with_id(self, id: identifiers.ColumnId) -> ScanItem:

@dataclasses.dataclass(frozen=True)
class ScanList:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we get a docstring for ScanList, please?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

"""
Defines the set of columns to scan from a source, along with the variable to bind the columns to.
"""

items: typing.Tuple[ScanItem, ...]

def filter_cols(
self,
ids: AbstractSet[identifiers.ColumnId],
) -> ScanList:
"""Drop columns from the scan that except those in the 'ids' arg."""
result = ScanList(tuple(item for item in self.items if item.id in ids))
if len(result.items) == 0:
# We need to select something, or sql syntax breaks
result = ScanList(self.items[:1])
return result

def project(
self,
selections: Mapping[identifiers.ColumnId, identifiers.ColumnId],
) -> ScanList:
"""Project given ids from the scanlist, dropping previous bindings."""
by_id = {item.id: item for item in self.items}
result = ScanList(
tuple(
by_id[old_id].with_id(new_id) for old_id, new_id in selections.items()
)
)
if len(result.items) == 0:
# We need to select something, or sql syntax breaks
result = ScanList((self.items[:1]))
return result


@dataclasses.dataclass(frozen=True, eq=False)
class ReadLocalNode(LeafNode):
Expand Down Expand Up @@ -675,6 +715,11 @@ def from_table(table: bq.Table, columns: Sequence[str] = ()) -> GbqTable:
else tuple(table.clustering_fields),
)

def get_table_ref(self) -> bq.TableReference:
return bq.TableReference(
bq.DatasetReference(self.project_id, self.dataset_id), self.table_id
)

@property
@functools.cache
def schema_by_id(self):
Expand Down Expand Up @@ -1068,6 +1113,11 @@ def variables_introduced(self) -> int:
# This operation only renames variables, doesn't actually create new ones
return 0

@property
def has_multi_referenced_ids(self) -> bool:
referenced = tuple(ref.ref.id for ref in self.input_output_pairs)
return len(referenced) != len(set(referenced))

# TODO: Reuse parent namespace
# Currently, Selection node allows renaming an reusing existing names, so it must establish a
# new namespace.
Expand Down
2 changes: 2 additions & 0 deletions bigframes/core/rewrite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from bigframes.core.rewrite.legacy_align import legacy_join_as_projection
from bigframes.core.rewrite.order import pull_up_order
from bigframes.core.rewrite.pruning import column_pruning
from bigframes.core.rewrite.scan_reduction import try_reduce_to_table_scan
from bigframes.core.rewrite.slices import pullup_limit_from_slice, rewrite_slice
from bigframes.core.rewrite.timedeltas import rewrite_timedelta_expressions
from bigframes.core.rewrite.windows import rewrite_range_rolling
Expand All @@ -31,4 +32,5 @@
"pull_up_order",
"column_pruning",
"rewrite_range_rolling",
"try_reduce_to_table_scan",
]
17 changes: 2 additions & 15 deletions bigframes/core/rewrite/pruning.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def prune_readlocal(
node: bigframes.core.nodes.ReadLocalNode,
selection: AbstractSet[identifiers.ColumnId],
) -> bigframes.core.nodes.ReadLocalNode:
new_scan_list = filter_scanlist(node.scan_list, selection)
new_scan_list = node.scan_list.filter_cols(selection)
return dataclasses.replace(
node,
scan_list=new_scan_list,
Expand All @@ -183,18 +183,5 @@ def prune_readtable(
node: bigframes.core.nodes.ReadTableNode,
selection: AbstractSet[identifiers.ColumnId],
) -> bigframes.core.nodes.ReadTableNode:
new_scan_list = filter_scanlist(node.scan_list, selection)
new_scan_list = node.scan_list.filter_cols(selection)
return dataclasses.replace(node, scan_list=new_scan_list)


def filter_scanlist(
scanlist: bigframes.core.nodes.ScanList,
ids: AbstractSet[identifiers.ColumnId],
):
result = bigframes.core.nodes.ScanList(
tuple(item for item in scanlist.items if item.id in ids)
)
if len(result.items) == 0:
# We need to select something, or stuff breaks
result = bigframes.core.nodes.ScanList(scanlist.items[:1])
return result
47 changes: 47 additions & 0 deletions bigframes/core/rewrite/scan_reduction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import dataclasses
import functools
from typing import Optional

from bigframes.core import nodes


def try_reduce_to_table_scan(root: nodes.BigFrameNode) -> Optional[nodes.ReadTableNode]:
for node in root.unique_nodes():
if not isinstance(node, (nodes.ReadTableNode, nodes.SelectionNode)):
return None
result = root.bottom_up(merge_scan)
if isinstance(result, nodes.ReadTableNode):
return result
return None


@functools.singledispatch
def merge_scan(node: nodes.BigFrameNode) -> nodes.BigFrameNode:
return node


@merge_scan.register
def _(node: nodes.SelectionNode) -> nodes.BigFrameNode:
if not isinstance(node.child, nodes.ReadTableNode):
return node
if node.has_multi_referenced_ids:
return node

selection = {
aliased_ref.ref.id: aliased_ref.id for aliased_ref in node.input_output_pairs
}
new_scan_list = node.child.scan_list.project(selection)
return dataclasses.replace(node.child, scan_list=new_scan_list)
17 changes: 7 additions & 10 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,10 @@
# to register new and replacement ops with the Ibis BigQuery backend.
import bigframes.functions._function_session as bff_session
import bigframes.functions.function as bff
from bigframes.session import bigquery_session
from bigframes.session import bigquery_session, bq_caching_executor, executor
import bigframes.session._io.bigquery as bf_io_bigquery
import bigframes.session.anonymous_dataset
import bigframes.session.clients
import bigframes.session.executor
import bigframes.session.loader
import bigframes.session.metrics
import bigframes.session.validation
Expand Down Expand Up @@ -245,14 +244,12 @@ def __init__(
self._temp_storage_manager = (
self._session_resource_manager or self._anon_dataset_manager
)
self._executor: bigframes.session.executor.Executor = (
bigframes.session.executor.BigQueryCachingExecutor(
bqclient=self._clients_provider.bqclient,
bqstoragereadclient=self._clients_provider.bqstoragereadclient,
storage_manager=self._temp_storage_manager,
strictly_ordered=self._strictly_ordered,
metrics=self._metrics,
)
self._executor: executor.Executor = bq_caching_executor.BigQueryCachingExecutor(
bqclient=self._clients_provider.bqclient,
bqstoragereadclient=self._clients_provider.bqstoragereadclient,
storage_manager=self._temp_storage_manager,
strictly_ordered=self._strictly_ordered,
metrics=self._metrics,
)
self._loader = bigframes.session.loader.GbqDataLoader(
session=self,
Expand Down
Loading