Skip to content

feat: Use validated local storage for data uploads #1612

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Apr 14, 2025
11 changes: 9 additions & 2 deletions bigframes/core/array_value.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def from_table(
at_time: Optional[datetime.datetime] = None,
primary_key: Sequence[str] = (),
offsets_col: Optional[str] = None,
n_rows: Optional[int] = None,
):
if offsets_col and primary_key:
raise ValueError("must set at most one of 'offests', 'primary_key'")
Expand Down Expand Up @@ -126,7 +127,11 @@ def from_table(
)
)
source_def = nodes.BigqueryDataSource(
table=table_def, at_time=at_time, sql_predicate=predicate, ordering=ordering
table=table_def,
at_time=at_time,
sql_predicate=predicate,
ordering=ordering,
n_rows=n_rows,
)
node = nodes.ReadTableNode(
source=source_def,
Expand Down Expand Up @@ -176,7 +181,9 @@ def as_cached(
Replace the node with an equivalent one that references a table where the value has been materialized to.
"""
table = nodes.GbqTable.from_table(cache_table)
source = nodes.BigqueryDataSource(table, ordering=ordering)
source = nodes.BigqueryDataSource(
table, ordering=ordering, n_rows=cache_table.num_rows
)
# Assumption: GBQ cached table uses field name as bq column name
scan_list = nodes.ScanList(
tuple(
Expand Down
4 changes: 3 additions & 1 deletion bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2713,11 +2713,13 @@ def _get_rows_as_json_values(self) -> Block:
)
)

dest_table = self.session.bqclient.get_table(destination)
expr = core.ArrayValue.from_table(
self.session.bqclient.get_table(destination),
dest_table,
schema=new_schema,
session=self.session,
offsets_col=ordering_column_name,
n_rows=dest_table.num_rows,
).drop_columns([ordering_column_name])
block = Block(
expr,
Expand Down
205 changes: 178 additions & 27 deletions bigframes/core/local_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@

import dataclasses
import functools
from typing import cast, Union
import io
import itertools
import json
from typing import Any, Callable, cast, Generator, Iterable, Literal, Optional, Union
import uuid

import geopandas # type: ignore
import numpy as np
import pandas
import pyarrow as pa
import pyarrow.parquet # type: ignore

import bigframes.core.schema as schemata
import bigframes.dtypes
Expand All @@ -42,7 +46,9 @@ def from_arrow(cls, table: pa.Table) -> LocalTableMetadata:

_MANAGED_STORAGE_TYPES_OVERRIDES: dict[bigframes.dtypes.Dtype, pa.DataType] = {
# wkt to be precise
bigframes.dtypes.GEO_DTYPE: pa.string()
bigframes.dtypes.GEO_DTYPE: pa.string(),
# Just json as string
bigframes.dtypes.JSON_DTYPE: pa.string(),
}


Expand Down Expand Up @@ -90,6 +96,50 @@ def from_pyarrow(self, table: pa.Table) -> ManagedArrowTable:
schemata.ArraySchema(tuple(fields)),
)

def to_parquet(
self,
dst: Union[str, io.IOBase],
*,
offsets_col: Optional[str] = None,
geo_format: Literal["wkb", "wkt"] = "wkt",
duration_type: Literal["int", "duration"] = "duration",
json_type: Literal["string"] = "string",
):
pa_table = self.data
if offsets_col is not None:
pa_table = pa_table.append_column(
offsets_col, pa.array(range(pa_table.num_rows), type=pa.int64())
)
if geo_format != "wkt":
raise NotImplementedError(f"geo format {geo_format} not yet implemented")
if duration_type != "duration":
raise NotImplementedError(
f"duration as {duration_type} not yet implemented"
)
assert json_type == "string"
pyarrow.parquet.write_table(pa_table, where=dst)

def itertuples(
self,
*,
geo_format: Literal["wkb", "wkt"] = "wkt",
duration_type: Literal["int", "timedelta"] = "timedelta",
json_type: Literal["string", "object"] = "string",
) -> Iterable[tuple]:
"""
Yield each row as an unlabeled tuple.

Row-wise iteration of columnar data is slow, avoid if possible.
"""
for row_dict in _iter_table(
self.data,
self.schema,
geo_format=geo_format,
duration_type=duration_type,
json_type=json_type,
):
yield tuple(row_dict.values())

def validate(self):
# TODO: Content-based validation for some datatypes (eg json, wkt, list) where logical domain is smaller than pyarrow type
for bf_field, arrow_field in zip(self.schema.items, self.data.schema):
Expand All @@ -101,11 +151,78 @@ def validate(self):
)


def _get_managed_storage_type(dtype: bigframes.dtypes.Dtype) -> pa.DataType:
if dtype in _MANAGED_STORAGE_TYPES_OVERRIDES.keys():
return _MANAGED_STORAGE_TYPES_OVERRIDES[dtype]
else:
return bigframes.dtypes.bigframes_dtype_to_arrow_dtype(dtype)
# Sequential iterator, but could split into batches and leverage parallelism for speed
def _iter_table(
table: pa.Table,
schema: schemata.ArraySchema,
*,
geo_format: Literal["wkb", "wkt"] = "wkt",
duration_type: Literal["int", "timedelta"] = "timedelta",
json_type: Literal["string", "object"] = "string",
) -> Generator[dict[str, Any], None, None]:
"""For when you feel like iterating row-wise over a column store. Don't expect speed."""

if geo_format != "wkt":
raise NotImplementedError(f"geo format {geo_format} not yet implemented")

@functools.singledispatch
def iter_array(
array: pa.Array, dtype: bigframes.dtypes.Dtype
) -> Generator[Any, None, None]:
values = array.to_pylist()
if dtype == bigframes.dtypes.JSON_DTYPE:
if json_type == "object":
yield from map(lambda x: json.loads(x) if x is not None else x, values)
else:
yield from values
elif dtype == bigframes.dtypes.TIMEDELTA_DTYPE:
if duration_type == "int":
yield from map(
lambda x: ((x.days * 3600 * 24) + x.seconds) * 1_000_000
+ x.microseconds
if x is not None
else x,
values,
)
else:
yield from values
else:
yield from values

@iter_array.register
def _(
array: pa.ListArray, dtype: bigframes.dtypes.Dtype
) -> Generator[Any, None, None]:
value_generator = iter_array(
array.flatten(), bigframes.dtypes.get_array_inner_type(dtype)
)
for (start, end) in itertools.pairwise(array.offsets):
arr_size = end.as_py() - start.as_py()
yield list(itertools.islice(value_generator, arr_size))

@iter_array.register
def _(
array: pa.StructArray, dtype: bigframes.dtypes.Dtype
) -> Generator[Any, None, None]:
# yield from each subarray
sub_generators: dict[str, Generator[Any, None, None]] = {}
for field_name, dtype in bigframes.dtypes.get_struct_fields(dtype).items():
sub_generators[field_name] = iter_array(array.field(field_name), dtype)

keys = list(sub_generators.keys())
for row_values in zip(*sub_generators.values()):
yield {key: value for key, value in zip(keys, row_values)}

for batch in table.to_batches():
sub_generators: dict[str, Generator[Any, None, None]] = {}
for field in schema.items:
sub_generators[field.column] = iter_array(
batch.column(field.column), field.dtype
)

keys = list(sub_generators.keys())
for row_values in zip(*sub_generators.values()):
yield {key: value for key, value in zip(keys, row_values)}


def _adapt_pandas_series(
Expand All @@ -117,32 +234,63 @@ def _adapt_pandas_series(
return pa.array(series, type=pa.string()), bigframes.dtypes.GEO_DTYPE
try:
return _adapt_arrow_array(pa.array(series))
except Exception as e:
except pa.ArrowInvalid as e:
if series.dtype == np.dtype("O"):
try:
series = series.astype(bigframes.dtypes.GEO_DTYPE)
return _adapt_pandas_series(series.astype(bigframes.dtypes.GEO_DTYPE))
except TypeError:
# Prefer original error
pass
raise e


def _adapt_arrow_array(
array: Union[pa.ChunkedArray, pa.Array]
) -> tuple[Union[pa.ChunkedArray, pa.Array], bigframes.dtypes.Dtype]:
target_type = _arrow_type_replacements(array.type)
target_type = _logical_type_replacements(array.type)
if target_type != array.type:
# TODO: Maybe warn if lossy conversion?
array = array.cast(target_type)
bf_type = bigframes.dtypes.arrow_dtype_to_bigframes_dtype(target_type)

storage_type = _get_managed_storage_type(bf_type)
if storage_type != array.type:
raise TypeError(
f"Expected {bf_type} to use arrow {storage_type}, instead got {array.type}"
)
array = array.cast(storage_type)
return array, bf_type


def _arrow_type_replacements(type: pa.DataType) -> pa.DataType:
def _get_managed_storage_type(dtype: bigframes.dtypes.Dtype) -> pa.DataType:
if dtype in _MANAGED_STORAGE_TYPES_OVERRIDES.keys():
return _MANAGED_STORAGE_TYPES_OVERRIDES[dtype]
return _physical_type_replacements(
bigframes.dtypes.bigframes_dtype_to_arrow_dtype(dtype)
)


def _recursive_map_types(
f: Callable[[pa.DataType], pa.DataType]
) -> Callable[[pa.DataType], pa.DataType]:
@functools.wraps(f)
def recursive_f(type: pa.DataType) -> pa.DataType:
if pa.types.is_list(type):
new_field_t = recursive_f(type.value_type)
if new_field_t != type.value_type:
return pa.list_(new_field_t)
return type
if pa.types.is_struct(type):
struct_type = cast(pa.StructType, type)
new_fields: list[pa.Field] = []
for i in range(struct_type.num_fields):
field = struct_type.field(i)
new_fields.append(field.with_type(recursive_f(field.type)))
return pa.struct(new_fields)
return f(type)

return recursive_f


@_recursive_map_types
def _logical_type_replacements(type: pa.DataType) -> pa.DataType:
if pa.types.is_timestamp(type):
# This is potentially lossy, but BigFrames doesn't support ns
new_tz = "UTC" if (type.tz is not None) else None
Expand All @@ -160,21 +308,24 @@ def _arrow_type_replacements(type: pa.DataType) -> pa.DataType:
if pa.types.is_large_string(type):
# simple string type can handle the largest strings needed
return pa.string()
if pa.types.is_dictionary(type):
return _logical_type_replacements(type.value_type)
if pa.types.is_null(type):
# null as a type not allowed, default type is float64 for bigframes
return pa.float64()
if pa.types.is_list(type):
new_field_t = _arrow_type_replacements(type.value_type)
if new_field_t != type.value_type:
return pa.list_(new_field_t)
return type
if pa.types.is_struct(type):
struct_type = cast(pa.StructType, type)
new_fields: list[pa.Field] = []
for i in range(struct_type.num_fields):
field = struct_type.field(i)
field.with_type(_arrow_type_replacements(field.type))
new_fields.append(field.with_type(_arrow_type_replacements(field.type)))
return pa.struct(new_fields)
else:
return type


_ARROW_MANAGED_STORAGE_OVERRIDES = {
bigframes.dtypes._BIGFRAMES_TO_ARROW[bf_dtype]: arrow_type
for bf_dtype, arrow_type in _MANAGED_STORAGE_TYPES_OVERRIDES.items()
if bf_dtype in bigframes.dtypes._BIGFRAMES_TO_ARROW
}


@_recursive_map_types
def _physical_type_replacements(dtype: pa.DataType) -> pa.DataType:
if dtype in _ARROW_MANAGED_STORAGE_OVERRIDES:
return _ARROW_MANAGED_STORAGE_OVERRIDES[dtype]
return dtype
5 changes: 2 additions & 3 deletions bigframes/core/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,6 @@ class GbqTable:
dataset_id: str = dataclasses.field()
table_id: str = dataclasses.field()
physical_schema: Tuple[bq.SchemaField, ...] = dataclasses.field()
n_rows: int = dataclasses.field()
is_physically_stored: bool = dataclasses.field()
cluster_cols: typing.Optional[Tuple[str, ...]]

Expand All @@ -670,7 +669,6 @@ def from_table(table: bq.Table, columns: Sequence[str] = ()) -> GbqTable:
dataset_id=table.dataset_id,
table_id=table.table_id,
physical_schema=schema,
n_rows=table.num_rows,
is_physically_stored=(table.table_type in ["TABLE", "MATERIALIZED_VIEW"]),
cluster_cols=None
if table.clustering_fields is None
Expand All @@ -696,6 +694,7 @@ class BigqueryDataSource:
# Added for backwards compatibility, not validated
sql_predicate: typing.Optional[str] = None
ordering: typing.Optional[orderings.RowOrdering] = None
n_rows: Optional[int] = None


## Put ordering in here or just add order_by node above?
Expand Down Expand Up @@ -773,7 +772,7 @@ def variables_introduced(self) -> int:
@property
def row_count(self) -> typing.Optional[int]:
if self.source.sql_predicate is None and self.source.table.is_physically_stored:
return self.source.table.n_rows
return self.source.n_rows
return None

@property
Expand Down
8 changes: 6 additions & 2 deletions bigframes/core/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,13 @@ def dtypes(self) -> typing.Tuple[bigframes.dtypes.Dtype, ...]:
def _mapping(self) -> typing.Dict[ColumnIdentifierType, bigframes.dtypes.Dtype]:
return {item.column: item.dtype for item in self.items}

def to_bigquery(self) -> typing.Tuple[google.cloud.bigquery.SchemaField, ...]:
def to_bigquery(
self, overrides: dict[bigframes.dtypes.Dtype, str] = {}
) -> typing.Tuple[google.cloud.bigquery.SchemaField, ...]:
return tuple(
bigframes.dtypes.convert_to_schema_field(item.column, item.dtype)
bigframes.dtypes.convert_to_schema_field(
item.column, item.dtype, overrides=overrides
)
for item in self.items
)

Expand Down
Loading