Skip to content

fix: Fix issues where duration type returned as int #1875

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jul 1, 2025
13 changes: 4 additions & 9 deletions bigframes/core/local_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import pyarrow as pa
import pyarrow.parquet # type: ignore

from bigframes.core import pyarrow_utils
import bigframes.core.schema as schemata
import bigframes.dtypes

Expand Down Expand Up @@ -113,7 +114,9 @@ def to_arrow(
schema = self.data.schema
if duration_type == "int":
schema = _schema_durations_to_ints(schema)
batches = map(functools.partial(_cast_pa_batch, schema=schema), batches)
batches = map(
functools.partial(pyarrow_utils.cast_batch, schema=schema), batches
)

if offsets_col is not None:
return schema.append(pa.field(offsets_col, pa.int64())), _append_offsets(
Expand Down Expand Up @@ -468,14 +471,6 @@ def _schema_durations_to_ints(schema: pa.Schema) -> pa.Schema:
)


# TODO: Use RecordBatch.cast once min pyarrow>=16.0
def _cast_pa_batch(batch: pa.RecordBatch, schema: pa.Schema) -> pa.RecordBatch:
return pa.record_batch(
[arr.cast(type) for arr, type in zip(batch.columns, schema.types)],
schema=schema,
)


def _pairwise(iterable):
do_yield = False
a = None
Expand Down
10 changes: 10 additions & 0 deletions bigframes/core/pyarrow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ def chunk_by_row_count(
yield buffer.take_as_batches(len(buffer))


def cast_batch(batch: pa.RecordBatch, schema: pa.Schema) -> pa.RecordBatch:
if batch.schema == schema:
return batch
# TODO: Use RecordBatch.cast once min pyarrow>=16.0
return pa.record_batch(
[arr.cast(type) for arr, type in zip(batch.columns, schema.types)],
schema=schema,
)


def truncate_pyarrow_iterable(
batches: Iterable[pa.RecordBatch], max_results: int
) -> Iterator[pa.RecordBatch]:
Expand Down
3 changes: 3 additions & 0 deletions bigframes/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ class SimpleDtypeInfo:
"decimal128(38, 9)[pyarrow]",
"decimal256(76, 38)[pyarrow]",
"binary[pyarrow]",
"duration[us][pyarrow]",
]

DTYPE_STRINGS = typing.get_args(DtypeString)
Expand Down Expand Up @@ -421,6 +422,8 @@ def is_bool_coercable(type_: ExpressionType) -> bool:
# special case - both "Int64" and "int64[pyarrow]" are accepted
BIGFRAMES_STRING_TO_BIGFRAMES["int64[pyarrow]"] = INT_DTYPE

BIGFRAMES_STRING_TO_BIGFRAMES["duration[us][pyarrow]"] = TIMEDELTA_DTYPE

# For the purposes of dataframe.memory_usage
DTYPE_BYTE_SIZES = {
type_info.dtype: type_info.logical_bytes for type_info in SIMPLE_TYPES
Expand Down
1 change: 1 addition & 0 deletions bigframes/session/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def arrow_batches(self) -> Iterator[pyarrow.RecordBatch]:
result_rows = 0

for batch in self._arrow_batches:
batch = pyarrow_utils.cast_batch(batch, self.schema.to_pyarrow())
result_rows += batch.num_rows

maximum_result_rows = bigframes.options.compute.maximum_result_rows
Expand Down
10 changes: 10 additions & 0 deletions bigframes/testing/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,16 @@ def convert_pandas_dtypes(df: pd.DataFrame, bytes_col: bool):
"timestamp_col"
]

if not isinstance(df["duration_col"].dtype, pd.ArrowDtype):
df["duration_col"] = df["duration_col"].astype(pd.Int64Dtype())
arrow_table = pa.Table.from_pandas(
pd.DataFrame(df, columns=["duration_col"]),
schema=pa.schema([("duration_col", pa.duration("us"))]),
)
df["duration_col"] = arrow_table.to_pandas(types_mapper=pd.ArrowDtype)[
"duration_col"
]

# Convert geography types columns.
if "geography_col" in df.columns:
df["geography_col"] = df["geography_col"].astype(
Expand Down
18 changes: 9 additions & 9 deletions tests/data/scalars.jsonl
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
{"bool_col": true, "bytes_col": "SGVsbG8sIFdvcmxkIQ==", "date_col": "2021-07-21", "datetime_col": "2021-07-21 11:39:45", "geography_col": "POINT(-122.0838511 37.3860517)", "int64_col": "123456789", "int64_too": "0", "numeric_col": "1.23456789", "float64_col": "1.25", "rowindex": 0, "rowindex_2": 0, "string_col": "Hello, World!", "time_col": "11:41:43.076160", "timestamp_col": "2021-07-21T17:43:43.945289Z"}
{"bool_col": false, "bytes_col": "44GT44KT44Gr44Gh44Gv", "date_col": "1991-02-03", "datetime_col": "1991-01-02 03:45:06", "geography_col": "POINT(-71.104 42.315)", "int64_col": "-987654321", "int64_too": "1", "numeric_col": "1.23456789", "float64_col": "2.51", "rowindex": 1, "rowindex_2": 1, "string_col": "こんにちは", "time_col": "11:14:34.701606", "timestamp_col": "2021-07-21T17:43:43.945289Z"}
{"bool_col": true, "bytes_col": "wqFIb2xhIE11bmRvIQ==", "date_col": "2023-03-01", "datetime_col": "2023-03-01 10:55:13", "geography_col": "POINT(-0.124474760143016 51.5007826749545)", "int64_col": "314159", "int64_too": "0", "numeric_col": "101.1010101", "float64_col": "2.5e10", "rowindex": 2, "rowindex_2": 2, "string_col": " ¡Hola Mundo! ", "time_col": "23:59:59.999999", "timestamp_col": "2023-03-01T10:55:13.250125Z"}
{"bool_col": null, "bytes_col": null, "date_col": null, "datetime_col": null, "geography_col": null, "int64_col": null, "int64_too": "1", "numeric_col": null, "float64_col": null, "rowindex": 3, "rowindex_2": 3, "string_col": null, "time_col": null, "timestamp_col": null}
{"bool_col": false, "bytes_col": "44GT44KT44Gr44Gh44Gv", "date_col": "2021-07-21", "datetime_col": null, "geography_col": null, "int64_col": "-234892", "int64_too": "-2345", "numeric_col": null, "float64_col": null, "rowindex": 4, "rowindex_2": 4, "string_col": "Hello, World!", "time_col": null, "timestamp_col": null}
{"bool_col": false, "bytes_col": "R8O8dGVuIFRhZw==", "date_col": "1980-03-14", "datetime_col": "1980-03-14 15:16:17", "geography_col": null, "int64_col": "55555", "int64_too": "0", "numeric_col": "5.555555", "float64_col": "555.555", "rowindex": 5, "rowindex_2": 5, "string_col": "Güten Tag!", "time_col": "15:16:17.181921", "timestamp_col": "1980-03-14T15:16:17.181921Z"}
{"bool_col": true, "bytes_col": "SGVsbG8JQmlnRnJhbWVzIQc=", "date_col": "2023-05-23", "datetime_col": "2023-05-23 11:37:01", "geography_col": "LINESTRING(-0.127959 51.507728, -0.127026 51.507473)", "int64_col": "101202303", "int64_too": "2", "numeric_col": "-10.090807", "float64_col": "-123.456", "rowindex": 6, "rowindex_2": 6, "string_col": "capitalize, This ", "time_col": "01:02:03.456789", "timestamp_col": "2023-05-23T11:42:55.000001Z"}
{"bool_col": true, "bytes_col": null, "date_col": "2038-01-20", "datetime_col": "2038-01-19 03:14:08", "geography_col": null, "int64_col": "-214748367", "int64_too": "2", "numeric_col": "11111111.1", "float64_col": "42.42", "rowindex": 7, "rowindex_2": 7, "string_col": " سلام", "time_col": "12:00:00.000001", "timestamp_col": "2038-01-19T03:14:17.999999Z"}
{"bool_col": false, "bytes_col": null, "date_col": null, "datetime_col": null, "geography_col": null, "int64_col": "2", "int64_too": "1", "numeric_col": null, "float64_col": "6.87", "rowindex": 8, "rowindex_2": 8, "string_col": "T", "time_col": null, "timestamp_col": null}
{"bool_col": true, "bytes_col": "SGVsbG8sIFdvcmxkIQ==", "date_col": "2021-07-21", "datetime_col": "2021-07-21 11:39:45", "geography_col": "POINT(-122.0838511 37.3860517)", "int64_col": "123456789", "int64_too": "0", "numeric_col": "1.23456789", "float64_col": "1.25", "rowindex": 0, "rowindex_2": 0, "string_col": "Hello, World!", "time_col": "11:41:43.076160", "timestamp_col": "2021-07-21T17:43:43.945289Z", "duration_col": 4}
{"bool_col": false, "bytes_col": "44GT44KT44Gr44Gh44Gv", "date_col": "1991-02-03", "datetime_col": "1991-01-02 03:45:06", "geography_col": "POINT(-71.104 42.315)", "int64_col": "-987654321", "int64_too": "1", "numeric_col": "1.23456789", "float64_col": "2.51", "rowindex": 1, "rowindex_2": 1, "string_col": "こんにちは", "time_col": "11:14:34.701606", "timestamp_col": "2021-07-21T17:43:43.945289Z", "duration_col": -1000000}
{"bool_col": true, "bytes_col": "wqFIb2xhIE11bmRvIQ==", "date_col": "2023-03-01", "datetime_col": "2023-03-01 10:55:13", "geography_col": "POINT(-0.124474760143016 51.5007826749545)", "int64_col": "314159", "int64_too": "0", "numeric_col": "101.1010101", "float64_col": "2.5e10", "rowindex": 2, "rowindex_2": 2, "string_col": " ¡Hola Mundo! ", "time_col": "23:59:59.999999", "timestamp_col": "2023-03-01T10:55:13.250125Z", "duration_col": 0}
{"bool_col": null, "bytes_col": null, "date_col": null, "datetime_col": null, "geography_col": null, "int64_col": null, "int64_too": "1", "numeric_col": null, "float64_col": null, "rowindex": 3, "rowindex_2": 3, "string_col": null, "time_col": null, "timestamp_col": null, "duration_col": null}
{"bool_col": false, "bytes_col": "44GT44KT44Gr44Gh44Gv", "date_col": "2021-07-21", "datetime_col": null, "geography_col": null, "int64_col": "-234892", "int64_too": "-2345", "numeric_col": null, "float64_col": null, "rowindex": 4, "rowindex_2": 4, "string_col": "Hello, World!", "time_col": null, "timestamp_col": null, "duration_col": 31540000000000}
{"bool_col": false, "bytes_col": "R8O8dGVuIFRhZw==", "date_col": "1980-03-14", "datetime_col": "1980-03-14 15:16:17", "geography_col": null, "int64_col": "55555", "int64_too": "0", "numeric_col": "5.555555", "float64_col": "555.555", "rowindex": 5, "rowindex_2": 5, "string_col": "Güten Tag!", "time_col": "15:16:17.181921", "timestamp_col": "1980-03-14T15:16:17.181921Z", "duration_col": 4}
{"bool_col": true, "bytes_col": "SGVsbG8JQmlnRnJhbWVzIQc=", "date_col": "2023-05-23", "datetime_col": "2023-05-23 11:37:01", "geography_col": "LINESTRING(-0.127959 51.507728, -0.127026 51.507473)", "int64_col": "101202303", "int64_too": "2", "numeric_col": "-10.090807", "float64_col": "-123.456", "rowindex": 6, "rowindex_2": 6, "string_col": "capitalize, This ", "time_col": "01:02:03.456789", "timestamp_col": "2023-05-23T11:42:55.000001Z", "duration_col": null}
{"bool_col": true, "bytes_col": null, "date_col": "2038-01-20", "datetime_col": "2038-01-19 03:14:08", "geography_col": null, "int64_col": "-214748367", "int64_too": "2", "numeric_col": "11111111.1", "float64_col": "42.42", "rowindex": 7, "rowindex_2": 7, "string_col": " سلام", "time_col": "12:00:00.000001", "timestamp_col": "2038-01-19T03:14:17.999999Z", "duration_col": 4}
{"bool_col": false, "bytes_col": null, "date_col": null, "datetime_col": null, "geography_col": null, "int64_col": "2", "int64_too": "1", "numeric_col": null, "float64_col": "6.87", "rowindex": 8, "rowindex_2": 8, "string_col": "T", "time_col": null, "timestamp_col": null, "duration_col": 432000000000}
6 changes: 6 additions & 0 deletions tests/data/scalars_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,11 @@
"mode": "NULLABLE",
"name": "timestamp_col",
"type": "TIMESTAMP"
},
{
"mode": "NULLABLE",
"name": "duration_col",
"type": "INTEGER",
"description": "#microseconds"
}
]
8 changes: 7 additions & 1 deletion tests/system/small/pandas/core/methods/test_describe.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@ def test_df_describe_non_temporal(scalars_dfs):
pytest.importorskip("pandas", minversion="2.0.0")
scalars_df, scalars_pandas_df = scalars_dfs
# excluding temporal columns here because BigFrames cannot perform percentiles operations on them
unsupported_columns = ["datetime_col", "timestamp_col", "time_col", "date_col"]
unsupported_columns = [
"datetime_col",
"timestamp_col",
"time_col",
"date_col",
"duration_col",
]
bf_result = scalars_df.drop(columns=unsupported_columns).describe().to_pandas()

modified_pd_df = scalars_pandas_df.drop(columns=unsupported_columns)
Expand Down
12 changes: 9 additions & 3 deletions tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ def test_df_info(scalars_dfs):
expected = (
"<class 'bigframes.dataframe.DataFrame'>\n"
"Index: 9 entries, 0 to 8\n"
"Data columns (total 13 columns):\n"
"Data columns (total 14 columns):\n"
" # Column Non-Null Count Dtype\n"
"--- ------------- ---------------- ------------------------------\n"
" 0 bool_col 8 non-null boolean\n"
Expand All @@ -569,8 +569,9 @@ def test_df_info(scalars_dfs):
" 10 string_col 8 non-null string\n"
" 11 time_col 6 non-null time64[us][pyarrow]\n"
" 12 timestamp_col 6 non-null timestamp[us, tz=UTC][pyarrow]\n"
"dtypes: Float64(1), Int64(3), binary[pyarrow](1), boolean(1), date32[day][pyarrow](1), decimal128(38, 9)[pyarrow](1), geometry(1), string(1), time64[us][pyarrow](1), timestamp[us, tz=UTC][pyarrow](1), timestamp[us][pyarrow](1)\n"
"memory usage: 1269 bytes\n"
" 13 duration_col 7 non-null duration[us][pyarrow]\n"
"dtypes: Float64(1), Int64(3), binary[pyarrow](1), boolean(1), date32[day][pyarrow](1), decimal128(38, 9)[pyarrow](1), duration[us][pyarrow](1), geometry(1), string(1), time64[us][pyarrow](1), timestamp[us, tz=UTC][pyarrow](1), timestamp[us][pyarrow](1)\n"
"memory usage: 1341 bytes\n"
)

scalars_df, _ = scalars_dfs
Expand Down Expand Up @@ -1694,6 +1695,7 @@ def test_get_dtypes(scalars_df_default_index):
"string_col": pd.StringDtype(storage="pyarrow"),
"time_col": pd.ArrowDtype(pa.time64("us")),
"timestamp_col": pd.ArrowDtype(pa.timestamp("us", tz="UTC")),
"duration_col": pd.ArrowDtype(pa.duration("us")),
}
pd.testing.assert_series_equal(
dtypes,
Expand Down Expand Up @@ -4771,6 +4773,9 @@ def test_df_to_json_local_str(scalars_df_index, scalars_pandas_df_index):
def test_df_to_json_local_file(scalars_df_index, scalars_pandas_df_index):
# TODO: supply a reason why this isn't compatible with pandas 1.x
pytest.importorskip("pandas", minversion="2.0.0")
# duration not fully supported at pandas level
scalars_df_index = scalars_df_index.drop(columns="duration_col")
scalars_pandas_df_index = scalars_pandas_df_index.drop(columns="duration_col")
with tempfile.TemporaryFile() as bf_result_file, tempfile.TemporaryFile() as pd_result_file:
scalars_df_index.to_json(bf_result_file, orient="table")
# default_handler for arrow types that have no default conversion
Expand Down Expand Up @@ -4882,6 +4887,7 @@ def test_df_to_orc(scalars_df_index, scalars_pandas_df_index):
"time_col",
"timestamp_col",
"geography_col",
"duration_col",
]

bf_result_file = tempfile.TemporaryFile()
Expand Down
38 changes: 25 additions & 13 deletions tests/system/small/test_dataframe_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def test_sql_executes(scalars_df_default_index, bigquery_client):
"""
# Do some operations to make for more complex SQL.
df = (
scalars_df_default_index.drop(columns=["geography_col"])
scalars_df_default_index.drop(columns=["geography_col", "duration_col"])
.groupby("string_col")
.max()
)
Expand Down Expand Up @@ -87,7 +87,7 @@ def test_sql_executes_and_includes_named_index(
"""
# Do some operations to make for more complex SQL.
df = (
scalars_df_default_index.drop(columns=["geography_col"])
scalars_df_default_index.drop(columns=["geography_col", "duration_col"])
.groupby("string_col")
.max()
)
Expand Down Expand Up @@ -120,7 +120,7 @@ def test_sql_executes_and_includes_named_multiindex(
"""
# Do some operations to make for more complex SQL.
df = (
scalars_df_default_index.drop(columns=["geography_col"])
scalars_df_default_index.drop(columns=["geography_col", "duration_col"])
.groupby(["string_col", "bool_col"])
.max()
)
Expand Down Expand Up @@ -999,14 +999,16 @@ def test_to_sql_query_unnamed_index_included(
scalars_df_default_index: bpd.DataFrame,
scalars_pandas_df_default_index: pd.DataFrame,
):
bf_df = scalars_df_default_index.reset_index(drop=True)
bf_df = scalars_df_default_index.reset_index(drop=True).drop(columns="duration_col")
sql, idx_ids, idx_labels = bf_df._to_sql_query(include_index=True)
assert len(idx_labels) == 1
assert len(idx_ids) == 1
assert idx_labels[0] is None
assert idx_ids[0].startswith("bigframes")

pd_df = scalars_pandas_df_default_index.reset_index(drop=True)
pd_df = scalars_pandas_df_default_index.reset_index(drop=True).drop(
columns="duration_col"
)
roundtrip = session.read_gbq(sql, index_col=idx_ids)
roundtrip.index.names = [None]
utils.assert_pandas_df_equal(roundtrip.to_pandas(), pd_df, check_index_type=False)
Expand All @@ -1017,14 +1019,18 @@ def test_to_sql_query_named_index_included(
scalars_df_default_index: bpd.DataFrame,
scalars_pandas_df_default_index: pd.DataFrame,
):
bf_df = scalars_df_default_index.set_index("rowindex_2", drop=True)
bf_df = scalars_df_default_index.set_index("rowindex_2", drop=True).drop(
columns="duration_col"
)
sql, idx_ids, idx_labels = bf_df._to_sql_query(include_index=True)
assert len(idx_labels) == 1
assert len(idx_ids) == 1
assert idx_labels[0] == "rowindex_2"
assert idx_ids[0] == "rowindex_2"

pd_df = scalars_pandas_df_default_index.set_index("rowindex_2", drop=True)
pd_df = scalars_pandas_df_default_index.set_index("rowindex_2", drop=True).drop(
columns="duration_col"
)
roundtrip = session.read_gbq(sql, index_col=idx_ids)
utils.assert_pandas_df_equal(roundtrip.to_pandas(), pd_df)

Expand All @@ -1034,12 +1040,14 @@ def test_to_sql_query_unnamed_index_excluded(
scalars_df_default_index: bpd.DataFrame,
scalars_pandas_df_default_index: pd.DataFrame,
):
bf_df = scalars_df_default_index.reset_index(drop=True)
bf_df = scalars_df_default_index.reset_index(drop=True).drop(columns="duration_col")
sql, idx_ids, idx_labels = bf_df._to_sql_query(include_index=False)
assert len(idx_labels) == 0
assert len(idx_ids) == 0

pd_df = scalars_pandas_df_default_index.reset_index(drop=True)
pd_df = scalars_pandas_df_default_index.reset_index(drop=True).drop(
columns="duration_col"
)
roundtrip = session.read_gbq(sql)
utils.assert_pandas_df_equal(
roundtrip.to_pandas(), pd_df, check_index_type=False, ignore_order=True
Expand All @@ -1051,14 +1059,18 @@ def test_to_sql_query_named_index_excluded(
scalars_df_default_index: bpd.DataFrame,
scalars_pandas_df_default_index: pd.DataFrame,
):
bf_df = scalars_df_default_index.set_index("rowindex_2", drop=True)
bf_df = scalars_df_default_index.set_index("rowindex_2", drop=True).drop(
columns="duration_col"
)
sql, idx_ids, idx_labels = bf_df._to_sql_query(include_index=False)
assert len(idx_labels) == 0
assert len(idx_ids) == 0

pd_df = scalars_pandas_df_default_index.set_index(
"rowindex_2", drop=True
).reset_index(drop=True)
pd_df = (
scalars_pandas_df_default_index.set_index("rowindex_2", drop=True)
.reset_index(drop=True)
.drop(columns="duration_col")
)
roundtrip = session.read_gbq(sql)
utils.assert_pandas_df_equal(
roundtrip.to_pandas(), pd_df, check_index_type=False, ignore_order=True
Expand Down
Loading