Skip to content

feat: add bigframes.bigquery.sql_scalar() to apply SQL syntax on Series objects #1293

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jan 17, 2025
Merged
3 changes: 3 additions & 0 deletions bigframes/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
parse_json,
)
from bigframes.bigquery._operations.search import create_vector_index, vector_search
from bigframes.bigquery._operations.sql import sql_scalar
from bigframes.bigquery._operations.struct import struct

__all__ = [
Expand All @@ -48,6 +49,8 @@
# search ops
"create_vector_index",
"vector_search",
# sql ops
"sql_scalar",
# struct ops
"struct",
]
94 changes: 94 additions & 0 deletions bigframes/bigquery/_operations/sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""SQL escape hatch features."""

from __future__ import annotations

from typing import Sequence

import google.cloud.bigquery

import bigframes.core.sql
import bigframes.dataframe
import bigframes.dtypes
import bigframes.operations
import bigframes.series


def sql_scalar(
sql_template: str,
columns: Sequence[bigframes.series.Series],
) -> bigframes.series.Series:
"""Create a Series from a SQL template.

**Examples:**

>>> import bigframes.pandas as bpd
>>> import bigframes.bigquery as bbq
>>> import pandas as pd
>>> import pyarrow as pa
>>> bpd.options.display.progress_bar = None

>>> s = bpd.Series(["1.5", "2.5", "3.5"])
>>> s = s.astype(pd.ArrowDtype(pa.decimal128(38, 9)))
>>> bbq.sql_scalar("ROUND({0}, 0, 'ROUND_HALF_EVEN')", [s])
0 2.000000000
1 2.000000000
2 4.000000000
dtype: decimal128(38, 9)[pyarrow]

Args:
sql_template (str):
A SQL format string with Python-style {0} placeholders for each of
the Series objects in ``columns``.
columns (Sequence[bigframes.pandas.Series]):
Series objects representing the column inputs to the
``sql_template``. Must contain at least one Series.

Returns:
bigframes.pandas.Series:
A Series with the SQL applied.

Raises:
ValueError: If ``columns`` is empty.
"""
if len(columns) == 0:
raise ValueError("Must provide at least one column in columns")

# To integrate this into our expression trees, we need to get the output
# type, so we do some manual compilation and a dry run query to get that.
# Another benefit of this is that if there is a syntax error in the SQL
# template, then this will fail with an error earlier in the process,
# aiding users in debugging.
base_series = columns[0]
literals = [
bigframes.dtypes.bigframes_dtype_to_literal(column.dtype) for column in columns
]
literals_sql = [bigframes.core.sql.simple_literal(literal) for literal in literals]

# Use the executor directly, because we want the original column IDs, not
# the user-friendly column names that block.to_sql_query() would produce.
select_sql = sql_template.format(*literals_sql)
dry_run_sql = f"SELECT {select_sql}"
bqclient = base_series._session.bqclient
job = bqclient.query(
dry_run_sql, job_config=google.cloud.bigquery.QueryJobConfig(dry_run=True)
)
_, output_type = bigframes.dtypes.convert_schema_field(job.schema[0])

op = bigframes.operations.SqlScalarOp(
_output_type=output_type, sql_template=sql_template
)
return base_series._apply_nary_op(op, columns[1:])
11 changes: 11 additions & 0 deletions bigframes/core/compile/scalar_op_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1840,6 +1840,17 @@ def nary_remote_function_op_impl(
return result


@scalar_op_compiler.register_nary_op(ops.SqlScalarOp, pass_op=True)
def sql_scalar_op_impl(*operands: ibis_types.Value, op: ops.SqlScalarOp):
return ibis_generic.SqlScalar(
op.sql_template,
values=tuple(typing.cast(ibis_generic.Value, expr.op()) for expr in operands),
output_type=bigframes.core.compile.ibis_types.bigframes_dtype_to_ibis_dtype(
op.output_type()
),
).to_expr()


@scalar_op_compiler.register_nary_op(ops.StructOp, pass_op=True)
def struct_op_impl(
*values: ibis_types.Value, op: ops.StructOp
Expand Down
28 changes: 24 additions & 4 deletions bigframes/core/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
"""

import datetime
import decimal
import json
import math
from typing import cast, Collection, Iterable, Mapping, Optional, TYPE_CHECKING, Union

import shapely # type: ignore

import bigframes.core.compile.googlesql as googlesql

if TYPE_CHECKING:
Expand All @@ -31,12 +34,16 @@


### Writing SQL Values (literals, column references, table references, etc.)
def simple_literal(value: str | int | bool | float | datetime.datetime):
def simple_literal(value: bytes | str | int | bool | float | datetime.datetime | None):
"""Return quoted input string."""
# https://cloud.google.com/bigquery/docs/reference/standard-sql/lexical#literals
if isinstance(value, str):
if value is None:
return "NULL"
elif isinstance(value, str):
# Single quoting seems to work nicer with ibis than double quoting
return f"'{googlesql._escape_chars(value)}'"
elif isinstance(value, bytes):
return repr(value)
elif isinstance(value, (bool, int)):
return str(value)
elif isinstance(value, float):
Expand All @@ -48,8 +55,21 @@ def simple_literal(value: str | int | bool | float | datetime.datetime):
if value == -math.inf:
return 'CAST("-inf" as FLOAT)'
return str(value)
if isinstance(value, datetime.datetime):
return f"TIMESTAMP('{value.isoformat()}')"
# Check datetime first as it is a subclass of date
elif isinstance(value, datetime.datetime):
if value.tzinfo is None:
return f"DATETIME('{value.isoformat()}')"
else:
return f"TIMESTAMP('{value.isoformat()}')"
elif isinstance(value, datetime.date):
return f"DATE('{value.isoformat()}')"
elif isinstance(value, datetime.time):
return f"TIME(DATETIME('1970-01-01 {value.isoformat()}'))"
elif isinstance(value, shapely.Geometry):
return f"ST_GEOGFROMTEXT({simple_literal(shapely.to_wkt(value))})"
elif isinstance(value, decimal.Decimal):
# TODO: disambiguate BIGNUMERIC based on scale and/or precision
return f"CAST('{str(value)}' AS NUMERIC)"
else:
raise ValueError(f"Cannot produce literal for {value}")

Expand Down
71 changes: 70 additions & 1 deletion bigframes/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
import datetime
import decimal
import typing
from typing import Dict, List, Literal, Union
from typing import Any, Dict, List, Literal, Union

import bigframes_vendored.constants as constants
import geopandas as gpd # type: ignore
import google.cloud.bigquery
import numpy as np
import pandas as pd
import pyarrow as pa
import shapely # type: ignore

# Type hints for Pandas dtypes supported by BigQuery DataFrame
Dtype = Union[
Expand Down Expand Up @@ -450,6 +451,74 @@ def bigframes_dtype_to_arrow_dtype(
)


def bigframes_dtype_to_literal(
bigframes_dtype: Dtype,
) -> Any:
"""Create a representative literal value for a bigframes dtype.

The inverse of infer_literal_type().
"""
if isinstance(bigframes_dtype, pd.ArrowDtype):
arrow_type = bigframes_dtype.pyarrow_dtype
return arrow_type_to_literal(arrow_type)

if isinstance(bigframes_dtype, pd.Float64Dtype):
return 1.0
if isinstance(bigframes_dtype, pd.Int64Dtype):
return 1
if isinstance(bigframes_dtype, pd.BooleanDtype):
return True
if isinstance(bigframes_dtype, pd.StringDtype):
return "string"
if isinstance(bigframes_dtype, gpd.array.GeometryDtype):
return shapely.Point((0, 0))

raise ValueError(
f"No literal conversion for {bigframes_dtype}. {constants.FEEDBACK_LINK}"
)


def arrow_type_to_literal(
arrow_type: pa.DataType,
) -> Any:
"""Create a representative literal value for an arrow type."""
if pa.types.is_list(arrow_type):
return [arrow_type_to_literal(arrow_type.value_type)]
if pa.types.is_struct(arrow_type):
return {
field.name: arrow_type_to_literal(field.type) for field in arrow_type.fields
}
if pa.types.is_string(arrow_type):
return "string"
if pa.types.is_binary(arrow_type):
return b"bytes"
if pa.types.is_floating(arrow_type):
return 1.0
if pa.types.is_integer(arrow_type):
return 1
if pa.types.is_boolean(arrow_type):
return True
if pa.types.is_date(arrow_type):
return datetime.date(2025, 1, 1)
if pa.types.is_timestamp(arrow_type):
return datetime.datetime(
2025,
1,
1,
1,
1,
tzinfo=datetime.timezone.utc if arrow_type.tz is not None else None,
)
if pa.types.is_decimal(arrow_type):
return decimal.Decimal("1.0")
if pa.types.is_time(arrow_type):
return datetime.time(1, 1, 1)

raise ValueError(
f"No literal conversion for {arrow_type}. {constants.FEEDBACK_LINK}"
)


def infer_literal_type(literal) -> typing.Optional[Dtype]:
# Maybe also normalize literal to canonical python representation to remove this burden from compilers?
if pd.api.types.is_list_like(literal):
Expand Down
2 changes: 2 additions & 0 deletions bigframes/operations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
minimum_op,
notnull_op,
RowKey,
SqlScalarOp,
where_op,
)
from bigframes.operations.geo_ops import geo_x_op, geo_y_op
Expand Down Expand Up @@ -190,6 +191,7 @@
"minimum_op",
"notnull_op",
"RowKey",
"SqlScalarOp",
"where_op",
# String ops
"capitalize_op",
Expand Down
12 changes: 12 additions & 0 deletions bigframes/operations/generic_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,15 @@ def is_bijective(self) -> bool:
@property
def deterministic(self) -> bool:
return False


@dataclasses.dataclass(frozen=True)
class SqlScalarOp(base_ops.NaryOp):
"""An escape to SQL, representing a single column."""

name: typing.ClassVar[str] = "sql_scalar"
_output_type: dtypes.ExpressionType
sql_template: str

def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType:
return self._output_type
Loading
Loading