Skip to content

Commit 6308bbb

Browse files
Merge remote-tracking branch 'github/main' into client_rechunk
2 parents ce1aa67 + 5c125c9 commit 6308bbb

File tree

44 files changed

+3035
-661
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+3035
-661
lines changed

‎.pre-commit-config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ repos:
2020
hooks:
2121
- id: trailing-whitespace
2222
- id: end-of-file-fixer
23+
exclude: "^tests/unit/core/compile/sqlglot/snapshots"
2324
- id: check-yaml
2425
- repo: https://github.com/pycqa/isort
2526
rev: 5.12.0

‎CHANGELOG.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,32 @@
44

55
[1]: https://pypi.org/project/bigframes/#history
66

7+
## [2.3.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v2.2.0...v2.3.0) (2025-05-06)
8+
9+
10+
### Features
11+
12+
* Add dry_run parameter to `read_gbq()`, `read_gbq_table()` and `read_gbq_query()` ([#1674](https://github.com/googleapis/python-bigquery-dataframes/issues/1674)) ([4c5dee5](https://github.com/googleapis/python-bigquery-dataframes/commit/4c5dee5e6f4b30deb01e258670aa21dbf3ac9aa5))
13+
14+
15+
### Bug Fixes
16+
17+
* Guarantee guid thread safety across threads ([#1684](https://github.com/googleapis/python-bigquery-dataframes/issues/1684)) ([cb0267d](https://github.com/googleapis/python-bigquery-dataframes/commit/cb0267deea227ea85f20d6dbef8c29cf03526d7a))
18+
* Support large lists of lists in bpd.Series() constructor ([#1662](https://github.com/googleapis/python-bigquery-dataframes/issues/1662)) ([0f4024c](https://github.com/googleapis/python-bigquery-dataframes/commit/0f4024c84508c17657a9104ef1f8718094827ada))
19+
* Use value equality to check types for unix epoch functions and timestamp diff ([#1690](https://github.com/googleapis/python-bigquery-dataframes/issues/1690)) ([81e8fb8](https://github.com/googleapis/python-bigquery-dataframes/commit/81e8fb8627f1d35423dbbdcc99d02ab0ad362d11))
20+
21+
22+
### Performance Improvements
23+
24+
* `to_datetime()` now avoids caching inputs unless data is inspected to infer format ([#1667](https://github.com/googleapis/python-bigquery-dataframes/issues/1667)) ([dd08857](https://github.com/googleapis/python-bigquery-dataframes/commit/dd08857f65140cbe5c524050d2d538949897c3cc))
25+
26+
27+
### Documentation
28+
29+
* Add a visualization notebook to BigFrame samples ([#1675](https://github.com/googleapis/python-bigquery-dataframes/issues/1675)) ([ee062bf](https://github.com/googleapis/python-bigquery-dataframes/commit/ee062bfc29c27949205ca21d6c1dcd6125300e5e))
30+
* Fix spacing of k-means code snippet ([#1687](https://github.com/googleapis/python-bigquery-dataframes/issues/1687)) ([99f45dd](https://github.com/googleapis/python-bigquery-dataframes/commit/99f45dd14bd9632d209389a5fef009f18c57adbf))
31+
* Update snippet for `Create a k-means` model tutorial ([#1664](https://github.com/googleapis/python-bigquery-dataframes/issues/1664)) ([761c364](https://github.com/googleapis/python-bigquery-dataframes/commit/761c364f4df045b9e9d8d3d5fee91d9a87b772db))
32+
733
## [2.2.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v2.1.0...v2.2.0) (2025-04-30)
834

935

‎bigframes/core/blocks.py

Lines changed: 28 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -22,25 +22,14 @@
2222
from __future__ import annotations
2323

2424
import ast
25-
import copy
2625
import dataclasses
2726
import datetime
2827
import functools
2928
import itertools
3029
import random
3130
import textwrap
3231
import typing
33-
from typing import (
34-
Any,
35-
Iterable,
36-
List,
37-
Literal,
38-
Mapping,
39-
Optional,
40-
Sequence,
41-
Tuple,
42-
Union,
43-
)
32+
from typing import Iterable, List, Literal, Mapping, Optional, Sequence, Tuple, Union
4433
import warnings
4534

4635
import bigframes_vendored.constants as constants
@@ -69,6 +58,8 @@
6958
import bigframes.exceptions as bfe
7059
import bigframes.operations as ops
7160
import bigframes.operations.aggregations as agg_ops
61+
from bigframes.session import dry_runs
62+
from bigframes.session import executor as executors
7263

7364
# Type constraint for wherever column labels are used
7465
Label = typing.Hashable
@@ -821,59 +812,18 @@ def _compute_dry_run(
821812
if sampling.enable_downsampling:
822813
raise NotImplementedError("Dry run with sampling is not supported")
823814

824-
index: List[Any] = []
825-
values: List[Any] = []
826-
827-
index.append("columnCount")
828-
values.append(len(self.value_columns))
829-
index.append("columnDtypes")
830-
values.append(
831-
{
832-
col: self.expr.get_column_type(self.resolve_label_exact_or_error(col))
833-
for col in self.column_labels
834-
}
835-
)
836-
837-
index.append("indexLevel")
838-
values.append(self.index.nlevels)
839-
index.append("indexDtypes")
840-
values.append(self.index.dtypes)
841-
842815
expr = self._apply_value_keys_to_expr(value_keys=value_keys)
843816
query_job = self.session._executor.dry_run(expr, ordered)
844-
job_api_repr = copy.deepcopy(query_job._properties)
845-
846-
job_ref = job_api_repr["jobReference"]
847-
for key, val in job_ref.items():
848-
index.append(key)
849-
values.append(val)
850-
851-
index.append("jobType")
852-
values.append(job_api_repr["configuration"]["jobType"])
853-
854-
query_config = job_api_repr["configuration"]["query"]
855-
for key in ("destinationTable", "useLegacySql"):
856-
index.append(key)
857-
values.append(query_config.get(key))
858-
859-
query_stats = job_api_repr["statistics"]["query"]
860-
for key in (
861-
"referencedTables",
862-
"totalBytesProcessed",
863-
"cacheHit",
864-
"statementType",
865-
):
866-
index.append(key)
867-
values.append(query_stats.get(key))
868817

869-
index.append("creationTime")
870-
values.append(
871-
pd.Timestamp(
872-
job_api_repr["statistics"]["creationTime"], unit="ms", tz="UTC"
873-
)
874-
)
818+
column_dtypes = {
819+
col: self.expr.get_column_type(self.resolve_label_exact_or_error(col))
820+
for col in self.column_labels
821+
}
875822

876-
return pd.Series(values, index=index), query_job
823+
dry_run_stats = dry_runs.get_query_stats_with_dtypes(
824+
query_job, column_dtypes, self.index.dtypes
825+
)
826+
return dry_run_stats, query_job
877827

878828
def _apply_value_keys_to_expr(self, value_keys: Optional[Iterable[str]] = None):
879829
expr = self._expr
@@ -1560,12 +1510,19 @@ def retrieve_repr_request_results(
15601510
"""
15611511

15621512
# head caches full underlying expression, so row_count will be free after
1563-
head_result = self.session._executor.head(self.expr, max_results)
1513+
executor = self.session._executor
1514+
executor.cached(
1515+
array_value=self.expr,
1516+
config=executors.CacheConfig(optimize_for="head", if_cached="reuse-strict"),
1517+
)
1518+
head_result = self.session._executor.execute(
1519+
self.expr.slice(start=None, stop=max_results, step=None)
1520+
)
15641521
row_count = self.session._executor.execute(self.expr.row_count()).to_py_scalar()
15651522

1566-
df = head_result.to_pandas()
1567-
self._copy_index_to_pandas(df)
1568-
return df, row_count, head_result.query_job
1523+
head_df = head_result.to_pandas()
1524+
self._copy_index_to_pandas(head_df)
1525+
return head_df, row_count, head_result.query_job
15691526

15701527
def promote_offsets(self, label: Label = None) -> typing.Tuple[Block, str]:
15711528
expr, result_id = self._expr.promote_offsets()
@@ -2535,9 +2492,12 @@ def cached(self, *, force: bool = False, session_aware: bool = False) -> None:
25352492
# use a heuristic for whether something needs to be cached
25362493
self.session._executor.cached(
25372494
self.expr,
2538-
force=force,
2539-
use_session=session_aware,
2540-
cluster_cols=self.index_columns,
2495+
config=executors.CacheConfig(
2496+
optimize_for="auto"
2497+
if session_aware
2498+
else executors.HierarchicalKey(tuple(self.index_columns)),
2499+
if_cached="replace" if force else "reuse-any",
2500+
),
25412501
)
25422502

25432503
def _is_monotonic(

‎bigframes/core/compile/sqlglot/compiler.py

Lines changed: 57 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,28 @@
1515

1616
import dataclasses
1717
import functools
18-
import itertools
1918
import typing
2019

2120
from google.cloud import bigquery
2221
import pyarrow as pa
2322
import sqlglot.expressions as sge
2423

25-
from bigframes.core import expression, identifiers, nodes, rewrite
24+
from bigframes.core import expression, guid, identifiers, nodes, rewrite
2625
from bigframes.core.compile import configs
2726
import bigframes.core.compile.sqlglot.scalar_compiler as scalar_compiler
2827
import bigframes.core.compile.sqlglot.sqlglot_ir as ir
2928
import bigframes.core.ordering as bf_ordering
3029

3130

32-
@dataclasses.dataclass(frozen=True)
3331
class SQLGlotCompiler:
3432
"""Compiles BigFrame nodes into SQL using SQLGlot."""
3533

34+
uid_gen: guid.SequentialUIDGenerator
35+
"""Generator for unique identifiers."""
36+
37+
def __init__(self):
38+
self.uid_gen = guid.SequentialUIDGenerator()
39+
3640
def compile(
3741
self,
3842
node: nodes.BigFrameNode,
@@ -82,7 +86,7 @@ def _compile_sql(self, request: configs.CompileRequest) -> configs.CompileResult
8286
result_node = typing.cast(
8387
nodes.ResultNode, rewrite.column_pruning(result_node)
8488
)
85-
result_node = _remap_variables(result_node)
89+
result_node = self._remap_variables(result_node)
8690
sql = self._compile_result_node(result_node)
8791
return configs.CompileResult(
8892
sql, result_node.schema.to_bigquery(), result_node.order_by
@@ -92,7 +96,7 @@ def _compile_sql(self, request: configs.CompileRequest) -> configs.CompileResult
9296
result_node = dataclasses.replace(result_node, order_by=None)
9397
result_node = typing.cast(nodes.ResultNode, rewrite.column_pruning(result_node))
9498

95-
result_node = _remap_variables(result_node)
99+
result_node = self._remap_variables(result_node)
96100
sql = self._compile_result_node(result_node)
97101
# Return the ordering iff no extra columns are needed to define the row order
98102
if ordering is not None:
@@ -106,63 +110,62 @@ def _compile_sql(self, request: configs.CompileRequest) -> configs.CompileResult
106110
sql, result_node.schema.to_bigquery(), output_order
107111
)
108112

113+
def _remap_variables(self, node: nodes.ResultNode) -> nodes.ResultNode:
114+
"""Remaps `ColumnId`s in the BFET of a `ResultNode` to produce deterministic UIDs."""
115+
116+
result_node, _ = rewrite.remap_variables(
117+
node, map(identifiers.ColumnId, self.uid_gen.get_uid_stream("bfcol_"))
118+
)
119+
return typing.cast(nodes.ResultNode, result_node)
120+
109121
def _compile_result_node(self, root: nodes.ResultNode) -> str:
110-
sqlglot_ir = compile_node(root.child)
122+
sqlglot_ir = self.compile_node(root.child)
111123
# TODO: add order_by, limit, and selections to sqlglot_expr
112124
return sqlglot_ir.sql
113125

126+
@functools.lru_cache(maxsize=5000)
127+
def compile_node(self, node: nodes.BigFrameNode) -> ir.SQLGlotIR:
128+
"""Compiles node into CompileArrayValue. Caches result."""
129+
return node.reduce_up(
130+
lambda node, children: self._compile_node(node, *children)
131+
)
114132

115-
def _replace_unsupported_ops(node: nodes.BigFrameNode):
116-
node = nodes.bottom_up(node, rewrite.rewrite_slice)
117-
node = nodes.bottom_up(node, rewrite.rewrite_timedelta_expressions)
118-
node = nodes.bottom_up(node, rewrite.rewrite_range_rolling)
119-
return node
120-
121-
122-
def _remap_variables(node: nodes.ResultNode) -> nodes.ResultNode:
123-
"""Remaps `ColumnId`s in the BFET of a `ResultNode` to produce deterministic UIDs."""
124-
125-
def anonymous_column_ids() -> typing.Generator[identifiers.ColumnId, None, None]:
126-
for i in itertools.count():
127-
yield identifiers.ColumnId(name=f"bfcol_{i}")
128-
129-
result_node, _ = rewrite.remap_variables(node, anonymous_column_ids())
130-
return typing.cast(nodes.ResultNode, result_node)
131-
132-
133-
@functools.lru_cache(maxsize=5000)
134-
def compile_node(node: nodes.BigFrameNode) -> ir.SQLGlotIR:
135-
"""Compiles node into CompileArrayValue. Caches result."""
136-
return node.reduce_up(lambda node, children: _compile_node(node, *children))
137-
138-
139-
@functools.singledispatch
140-
def _compile_node(
141-
node: nodes.BigFrameNode, *compiled_children: ir.SQLGlotIR
142-
) -> ir.SQLGlotIR:
143-
"""Defines transformation but isn't cached, always use compile_node instead"""
144-
raise ValueError(f"Can't compile unrecognized node: {node}")
133+
@functools.singledispatchmethod
134+
def _compile_node(
135+
self, node: nodes.BigFrameNode, *compiled_children: ir.SQLGlotIR
136+
) -> ir.SQLGlotIR:
137+
"""Defines transformation but isn't cached, always use compile_node instead"""
138+
raise ValueError(f"Can't compile unrecognized node: {node}")
139+
140+
@_compile_node.register
141+
def compile_readlocal(self, node: nodes.ReadLocalNode, *args) -> ir.SQLGlotIR:
142+
pa_table = node.local_data_source.data
143+
pa_table = pa_table.select([item.source_id for item in node.scan_list.items])
144+
pa_table = pa_table.rename_columns(
145+
[item.id.sql for item in node.scan_list.items]
146+
)
145147

148+
offsets = node.offsets_col.sql if node.offsets_col else None
149+
if offsets:
150+
pa_table = pa_table.append_column(
151+
offsets, pa.array(range(pa_table.num_rows), type=pa.int64())
152+
)
146153

147-
@_compile_node.register
148-
def compile_readlocal(node: nodes.ReadLocalNode, *args) -> ir.SQLGlotIR:
149-
pa_table = node.local_data_source.data
150-
pa_table = pa_table.select([item.source_id for item in node.scan_list.items])
151-
pa_table = pa_table.rename_columns([item.id.sql for item in node.scan_list.items])
154+
return ir.SQLGlotIR.from_pyarrow(pa_table, node.schema, uid_gen=self.uid_gen)
152155

153-
offsets = node.offsets_col.sql if node.offsets_col else None
154-
if offsets:
155-
pa_table = pa_table.append_column(
156-
offsets, pa.array(range(pa_table.num_rows), type=pa.int64())
156+
@_compile_node.register
157+
def compile_selection(
158+
self, node: nodes.SelectionNode, child: ir.SQLGlotIR
159+
) -> ir.SQLGlotIR:
160+
selected_cols: tuple[tuple[str, sge.Expression], ...] = tuple(
161+
(id.sql, scalar_compiler.compile_scalar_expression(expr))
162+
for expr, id in node.input_output_pairs
157163
)
164+
return child.select(selected_cols)
158165

159-
return ir.SQLGlotIR.from_pyarrow(pa_table, node.schema)
160166

161-
162-
@_compile_node.register
163-
def compile_selection(node: nodes.SelectionNode, child: ir.SQLGlotIR) -> ir.SQLGlotIR:
164-
select_cols: typing.Dict[str, sge.Expression] = {
165-
id.name: scalar_compiler.compile_scalar_expression(expr)
166-
for expr, id in node.input_output_pairs
167-
}
168-
return child.select(select_cols)
167+
def _replace_unsupported_ops(node: nodes.BigFrameNode):
168+
node = nodes.bottom_up(node, rewrite.rewrite_slice)
169+
node = nodes.bottom_up(node, rewrite.rewrite_timedelta_expressions)
170+
node = nodes.bottom_up(node, rewrite.rewrite_range_rolling)
171+
return node

0 commit comments

Comments
 (0)