Skip to content

feat: JSON dtype support for read_pandas and Series constructor #1391

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion bigframes/core/compile/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@

import bigframes_vendored.ibis.backends.bigquery as ibis_bigquery
import bigframes_vendored.ibis.expr.api as ibis_api
import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes
import bigframes_vendored.ibis.expr.types as ibis_types
import google.cloud.bigquery
import pandas as pd

from bigframes import dtypes
from bigframes.core import utils
import bigframes.core.compile.compiled as compiled
import bigframes.core.compile.concat as concat_impl
import bigframes.core.compile.explode
import bigframes.core.compile.ibis_types
import bigframes.core.compile.scalar_op_compiler
import bigframes.core.compile.scalar_op_compiler as compile_scalar
import bigframes.core.compile.schema_translator
import bigframes.core.expression as ex
Expand Down Expand Up @@ -224,6 +225,18 @@ def compile_read_table_unordered(
ibis_table = self.read_table_as_unordered_ibis(
source, scan_cols=[col.source_id for col in scan.items]
)

# TODO(b/395912450): Remove workaround solution once b/374784249 got resolved.
for scan_item in scan.items:
if (
scan_item.dtype == dtypes.JSON_DTYPE
and ibis_table[scan_item.source_id].type() == ibis_dtypes.string
):
json_column = compile_scalar.parse_json(
ibis_table[scan_item.source_id]
).name(scan_item.source_id)
ibis_table = ibis_table.mutate(json_column)

return compiled.UnorderedIR(
ibis_table,
tuple(
Expand Down
22 changes: 22 additions & 0 deletions bigframes/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import pandas.api.types as pdtypes
import typing_extensions

import bigframes.dtypes as dtypes
import bigframes.exceptions as bfe

UNNAMED_COLUMN_ID = "bigframes_unnamed_column"
Expand Down Expand Up @@ -226,3 +227,24 @@ def replace_timedeltas_with_micros(dataframe: pd.DataFrame) -> List[str]:
updated_columns.append(dataframe.index.name)

return updated_columns


def replace_json_with_string(dataframe: pd.DataFrame) -> List[str]:
"""
Due to a BigQuery IO limitation with loading JSON from Parquet files (b/374784249),
we're using a workaround: storing JSON as strings and then parsing them into JSON
objects.
TODO(b/395912450): Remove workaround solution once b/374784249 got resolved.
"""
updated_columns = []

for col in dataframe.columns:
if dataframe[col].dtype == dtypes.JSON_DTYPE:
dataframe[col] = dataframe[col].astype(dtypes.STRING_DTYPE)
updated_columns.append(col)

if dataframe.index.dtype == dtypes.JSON_DTYPE:
dataframe.index = dataframe.index.astype(dtypes.STRING_DTYPE)
updated_columns.append(dataframe.index.name)

return updated_columns
1 change: 1 addition & 0 deletions bigframes/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ def is_object_like(type_: Union[ExpressionType, str]) -> bool:
return type_ in ("object", "O") or (
getattr(type_, "kind", None) == "O"
and getattr(type_, "storage", None) != "pyarrow"
and getattr(type_, "name", None) != "dbjson"
)


Expand Down
12 changes: 9 additions & 3 deletions bigframes/session/_io/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
from __future__ import annotations

import dataclasses
from typing import Collection, List, Union
import typing
from typing import Collection, Union

import bigframes_vendored.constants as constants
import db_dtypes # type: ignore
Expand All @@ -38,7 +39,7 @@ class DataFrameAndLabels:
column_labels: Collection
index_labels: Collection
ordering_col: str
timedelta_cols: List[str]
col_type_overrides: typing.Dict[str, bigframes.dtypes.Dtype]


def _arrow_to_pandas_arrowdtype(
Expand Down Expand Up @@ -165,11 +166,16 @@ def pandas_to_bq_compatible(pandas_dataframe: pandas.DataFrame) -> DataFrameAndL
pandas_dataframe_copy[ordering_col] = np.arange(pandas_dataframe_copy.shape[0])

timedelta_cols = utils.replace_timedeltas_with_micros(pandas_dataframe_copy)
json_cols = utils.replace_json_with_string(pandas_dataframe_copy)
col_type_overrides: typing.Dict[str, bigframes.dtypes.Dtype] = {
**{col: bigframes.dtypes.TIMEDELTA_DTYPE for col in timedelta_cols},
**{col: bigframes.dtypes.JSON_DTYPE for col in json_cols},
}

return DataFrameAndLabels(
df=pandas_dataframe_copy,
column_labels=col_labels,
index_labels=idx_labels,
ordering_col=ordering_col,
timedelta_cols=timedelta_cols,
col_type_overrides=col_type_overrides,
)
13 changes: 2 additions & 11 deletions bigframes/session/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,11 @@ def read_pandas_load_job(
self._start_generic_job(load_job)

destination_table = self._bqclient.get_table(load_table_destination)
col_type_overrides: typing.Dict[str, bigframes.dtypes.Dtype] = {
col: bigframes.dtypes.TIMEDELTA_DTYPE
for col in df_and_labels.timedelta_cols
}
array_value = core.ArrayValue.from_table(
table=destination_table,
# TODO (b/394156190): Generate this directly from original pandas df.
schema=schemata.ArraySchema.from_bq_table(
destination_table, col_type_overrides
destination_table, df_and_labels.col_type_overrides
),
session=self._session,
offsets_col=ordering_col,
Expand Down Expand Up @@ -234,16 +230,11 @@ def read_pandas_streaming(
raise ValueError(
f"Problem loading at least one row from DataFrame: {errors}. {constants.FEEDBACK_LINK}"
)

col_type_overrides: typing.Dict[str, bigframes.dtypes.Dtype] = {
col: bigframes.dtypes.TIMEDELTA_DTYPE
for col in df_and_labels.timedelta_cols
}
array_value = (
core.ArrayValue.from_table(
table=destination_table,
schema=schemata.ArraySchema.from_bq_table(
destination_table, col_type_overrides
destination_table, df_and_labels.col_type_overrides
),
session=self._session,
# Don't set the offsets column because we want to group by it.
Expand Down
64 changes: 31 additions & 33 deletions tests/system/small/bigquery/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json

import db_dtypes # type: ignore
import geopandas as gpd # type: ignore
import pandas as pd
import pyarrow as pa
Expand All @@ -24,19 +23,6 @@
import bigframes.pandas as bpd


def _get_series_from_json(json_data):
# Note: converts None to sql "null" and not to json none.
values = [
f"JSON '{json.dumps(data)}'" if data is not None else "NULL"
for data in json_data
]
sql = " UNION ALL ".join(
[f"SELECT {id} AS id, {value} AS data" for id, value in enumerate(values)]
)
df = bpd.read_gbq(sql).set_index("id").sort_index()
return df["data"]


@pytest.mark.parametrize(
("json_path", "expected_json"),
[
Expand All @@ -45,10 +31,11 @@ def _get_series_from_json(json_data):
],
)
def test_json_set_at_json_path(json_path, expected_json):
s = _get_series_from_json([{"a": {"b": {"c": "tester", "d": []}}}])
original_json = [{"a": {"b": {"c": "tester", "d": []}}}]
s = bpd.Series(original_json, dtype=db_dtypes.JSONDtype())
actual = bbq.json_set(s, json_path_value_pairs=[(json_path, 10)])

expected = _get_series_from_json(expected_json)
expected = bpd.Series(expected_json, dtype=db_dtypes.JSONDtype())
pd.testing.assert_series_equal(
actual.to_pandas(),
expected.to_pandas(),
Expand All @@ -65,41 +52,43 @@ def test_json_set_at_json_path(json_path, expected_json):
],
)
def test_json_set_at_json_value_type(json_value, expected_json):
s = _get_series_from_json([{"a": {"b": "dev"}}, {"a": {"b": [1, 2]}}])
original_json = [{"a": {"b": "dev"}}, {"a": {"b": [1, 2]}}]
s = bpd.Series(original_json, dtype=db_dtypes.JSONDtype())
actual = bbq.json_set(s, json_path_value_pairs=[("$.a.b", json_value)])

expected = _get_series_from_json(expected_json)
expected = bpd.Series(expected_json, dtype=db_dtypes.JSONDtype())
pd.testing.assert_series_equal(
actual.to_pandas(),
expected.to_pandas(),
)


def test_json_set_w_more_pairs():
s = _get_series_from_json([{"a": 2}, {"b": 5}, {"c": 1}])
original_json = [{"a": 2}, {"b": 5}, {"c": 1}]
s = bpd.Series(original_json, dtype=db_dtypes.JSONDtype())
actual = bbq.json_set(
s, json_path_value_pairs=[("$.a", 1), ("$.b", 2), ("$.a", [3, 4, 5])]
)
expected = _get_series_from_json(
[{"a": 3, "b": 2}, {"a": 4, "b": 2}, {"a": 5, "b": 2, "c": 1}]
)

expected_json = [{"a": 3, "b": 2}, {"a": 4, "b": 2}, {"a": 5, "b": 2, "c": 1}]
expected = bpd.Series(expected_json, dtype=db_dtypes.JSONDtype())
pd.testing.assert_series_equal(
actual.to_pandas(),
expected.to_pandas(),
)


def test_json_set_w_invalid_json_path_value_pairs():
s = bpd.Series([{"a": 10}], dtype=db_dtypes.JSONDtype())
with pytest.raises(ValueError):
bbq.json_set(
_get_series_from_json([{"a": 10}]), json_path_value_pairs=[("$.a", 1, 100)] # type: ignore
)
bbq.json_set(s, json_path_value_pairs=[("$.a", 1, 100)]) # type: ignore


def test_json_set_w_invalid_value_type():
s = bpd.Series([{"a": 10}], dtype=db_dtypes.JSONDtype())
with pytest.raises(TypeError):
bbq.json_set(
_get_series_from_json([{"a": 10}]),
s,
json_path_value_pairs=[
(
"$.a",
Expand All @@ -117,19 +106,25 @@ def test_json_set_w_invalid_series_type():


def test_json_extract_from_json():
s = _get_series_from_json([{"a": {"b": [1, 2]}}, {"a": {"c": 1}}, {"a": {"b": 0}}])
s = bpd.Series(
[{"a": {"b": [1, 2]}}, {"a": {"c": 1}}, {"a": {"b": 0}}],
dtype=db_dtypes.JSONDtype(),
)
actual = bbq.json_extract(s, "$.a.b").to_pandas()
expected = _get_series_from_json([[1, 2], None, 0]).to_pandas()
expected = bpd.Series([[1, 2], None, 0], dtype=db_dtypes.JSONDtype()).to_pandas()
pd.testing.assert_series_equal(
actual,
expected,
)


def test_json_extract_from_string():
s = bpd.Series(['{"a": {"b": [1, 2]}}', '{"a": {"c": 1}}', '{"a": {"b": 0}}'])
s = bpd.Series(
['{"a": {"b": [1, 2]}}', '{"a": {"c": 1}}', '{"a": {"b": 0}}'],
dtype=pd.StringDtype(storage="pyarrow"),
)
actual = bbq.json_extract(s, "$.a.b")
expected = bpd.Series(["[1,2]", None, "0"])
expected = bpd.Series(["[1,2]", None, "0"], dtype=pd.StringDtype(storage="pyarrow"))
pd.testing.assert_series_equal(
actual.to_pandas(),
expected.to_pandas(),
Expand All @@ -142,8 +137,9 @@ def test_json_extract_w_invalid_series_type():


def test_json_extract_array_from_json():
s = _get_series_from_json(
[{"a": ["ab", "2", "3 xy"]}, {"a": []}, {"a": ["4", "5"]}, {}]
s = bpd.Series(
[{"a": ["ab", "2", "3 xy"]}, {"a": []}, {"a": ["4", "5"]}, {}],
dtype=db_dtypes.JSONDtype(),
)
actual = bbq.json_extract_array(s, "$.a")

Expand All @@ -160,6 +156,8 @@ def test_json_extract_array_from_json():
"""
df = bpd.read_gbq(sql).set_index("id").sort_index()
expected = df["data"]
expected.index.name = None
expected.name = None

pd.testing.assert_series_equal(
actual.to_pandas(),
Expand Down
15 changes: 15 additions & 0 deletions tests/system/small/test_dataframe_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,21 @@ def test_to_gbq_w_invalid_destination_table(scalars_df_index):
scalars_df_index.to_gbq("table_id")


def test_to_gbq_w_json(bigquery_client):
"""Test the `to_gbq` API can get a JSON column."""
s1 = bpd.Series([1, 2, 3, 4])
s2 = bpd.Series(
["a", 1, False, ["a", {"b": 1}], {"c": [1, 2, 3]}], dtype=db_dtypes.JSONDtype()
)

df = bpd.DataFrame({"id": s1, "json_col": s2})
destination_table = df.to_gbq()
table = bigquery_client.get_table(destination_table)

assert table.schema[1].name == "json_col"
assert table.schema[1].field_type == "JSON"


@pytest.mark.parametrize(
("index"),
[True, False],
Expand Down
22 changes: 21 additions & 1 deletion tests/system/small/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def test_series_construct_geodata():
pytest.param(pd.StringDtype(storage="pyarrow"), id="string"),
],
)
def test_series_construct_w_dtype_for_int(dtype):
def test_series_construct_w_dtype(dtype):
data = [1, 2, 3]
expected = pd.Series(data, dtype=dtype)
expected.index = expected.index.astype("Int64")
Expand Down Expand Up @@ -302,6 +302,26 @@ def test_series_construct_w_dtype_for_array_struct():
)


def test_series_construct_w_dtype_for_json():
data = [
1,
"str",
False,
["a", {"b": 1}, None],
None,
{"a": {"b": [1, 2, 3], "c": True}},
]
s = bigframes.pandas.Series(data, dtype=db_dtypes.JSONDtype())

assert s[0] == 1
assert s[1] == "str"
assert s[2] is False
assert s[3][0] == "a"
assert s[3][1]["b"] == 1
assert pd.isna(s[4])
assert s[5]["a"] == {"b": [1, 2, 3], "c": True}


def test_series_keys(scalars_dfs):
scalars_df, scalars_pandas_df = scalars_dfs
bf_result = scalars_df["int64_col"].keys().to_pandas()
Expand Down
Loading