Skip to content

feat: add 'index', 'pad', 'nearest' interpolate methods #162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Nov 8, 2023
207 changes: 160 additions & 47 deletions bigframes/core/block_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import bigframes.core.blocks as blocks
import bigframes.core.ordering as ordering
import bigframes.core.window_spec as windows
import bigframes.dtypes as dtypes
import bigframes.operations as ops
import bigframes.operations.aggregations as agg_ops

Expand Down Expand Up @@ -106,67 +107,59 @@ def indicate_duplicates(


def interpolate(block: blocks.Block, method: str = "linear") -> blocks.Block:
if method != "linear":
supported_methods = [
"linear",
"values",
"index",
"nearest",
"zero",
"slinear",
]
if method not in supported_methods:
raise NotImplementedError(
f"Only 'linear' interpolate method supported. {constants.FEEDBACK_LINK}"
f"Method {method} not supported, following interpolate methods supported: {', '.join(supported_methods)}. {constants.FEEDBACK_LINK}"
)
backwards_window = windows.WindowSpec(following=0)
forwards_window = windows.WindowSpec(preceding=0)

output_column_ids = []

original_columns = block.value_columns
original_labels = block.column_labels
block, offsets = block.promote_offsets()

if method == "linear": # Assumes evenly spaced, ignore index
block, xvalues = block.promote_offsets()
else:
index_columns = block.index_columns
if len(index_columns) != 1:
raise ValueError("only method 'linear' supports multi-index")
xvalues = block.index_columns[0]
if block.index_dtypes[0] not in dtypes.NUMERIC_BIGFRAMES_TYPES:
raise ValueError("Can only interpolate on numeric index.")

for column in original_columns:
# null in same places column is null
should_interpolate = block._column_type(column) in [
pd.Float64Dtype(),
pd.Int64Dtype(),
]
if should_interpolate:
block, notnull = block.apply_unary_op(column, ops.notnull_op)
block, masked_offsets = block.apply_binary_op(
offsets, notnull, ops.partial_arg3(ops.where_op, None)
)

block, previous_value = block.apply_window_op(
column, agg_ops.LastNonNullOp(), backwards_window
)
block, next_value = block.apply_window_op(
column, agg_ops.FirstNonNullOp(), forwards_window
)
block, previous_value_offset = block.apply_window_op(
masked_offsets,
agg_ops.LastNonNullOp(),
backwards_window,
skip_reproject_unsafe=True,
)
block, next_value_offset = block.apply_window_op(
masked_offsets,
agg_ops.FirstNonNullOp(),
forwards_window,
skip_reproject_unsafe=True,
)

block, prediction_id = _interpolate(
interpolate_method_map = {
"linear": "linear",
"values": "linear",
"index": "linear",
"slinear": "linear",
"zero": "ffill",
"nearest": "nearest",
}
extrapolating_methods = ["linear", "values", "index"]
interpolate_method = interpolate_method_map[method]
do_extrapolate = method in extrapolating_methods
block, interpolated = _interpolate_column(
block,
previous_value_offset,
previous_value,
next_value_offset,
next_value,
offsets,
column,
xvalues,
interpolate_method=interpolate_method,
do_extrapolate=do_extrapolate,
)

block, interpolated_column = block.apply_binary_op(
column, prediction_id, ops.fillna_op
)
# Pandas performs ffill-like behavior to extrapolate forwards
block, interpolated_and_ffilled = block.apply_binary_op(
interpolated_column, previous_value, ops.fillna_op
)

output_column_ids.append(interpolated_and_ffilled)
output_column_ids.append(interpolated)
else:
output_column_ids.append(column)

Expand All @@ -175,7 +168,80 @@ def interpolate(block: blocks.Block, method: str = "linear") -> blocks.Block:
return block.with_column_labels(original_labels)


def _interpolate(
def _interpolate_column(
block: blocks.Block,
column: str,
x_values: str,
interpolate_method: str,
do_extrapolate: bool = True,
) -> typing.Tuple[blocks.Block, str]:
if interpolate_method not in ["linear", "nearest", "ffill"]:
raise ValueError("interpolate method not supported")
window_ordering = (ordering.OrderingColumnReference(x_values),)
backwards_window = windows.WindowSpec(following=0, ordering=window_ordering)
forwards_window = windows.WindowSpec(preceding=0, ordering=window_ordering)

# Note, this method may
block, notnull = block.apply_unary_op(column, ops.notnull_op)
block, masked_offsets = block.apply_binary_op(
x_values, notnull, ops.partial_arg3(ops.where_op, None)
)

block, previous_value = block.apply_window_op(
column, agg_ops.LastNonNullOp(), backwards_window
)
block, next_value = block.apply_window_op(
column, agg_ops.FirstNonNullOp(), forwards_window
)
block, previous_value_offset = block.apply_window_op(
masked_offsets,
agg_ops.LastNonNullOp(),
backwards_window,
skip_reproject_unsafe=True,
)
block, next_value_offset = block.apply_window_op(
masked_offsets,
agg_ops.FirstNonNullOp(),
forwards_window,
skip_reproject_unsafe=True,
)

if interpolate_method == "linear":
block, prediction_id = _interpolate_points_linear(
block,
previous_value_offset,
previous_value,
next_value_offset,
next_value,
x_values,
)
elif interpolate_method == "nearest":
block, prediction_id = _interpolate_points_nearest(
block,
previous_value_offset,
previous_value,
next_value_offset,
next_value,
x_values,
)
else: # interpolate_method == 'ffill':
block, prediction_id = _interpolate_points_ffill(
block,
previous_value_offset,
previous_value,
next_value_offset,
next_value,
x_values,
)
if do_extrapolate:
block, prediction_id = block.apply_binary_op(
prediction_id, previous_value, ops.fillna_op
)

return block.apply_binary_op(column, prediction_id, ops.fillna_op)


def _interpolate_points_linear(
block: blocks.Block,
x0_id: str,
y0_id: str,
Expand All @@ -196,6 +262,53 @@ def _interpolate(
return block, prediction_id


def _interpolate_points_nearest(
block: blocks.Block,
x0_id: str,
y0_id: str,
x1_id: str,
y1_id: str,
xpredict_id: str,
) -> typing.Tuple[blocks.Block, str]:
"""Interpolate by taking the y value of the nearest x value"""
block, left_diff = block.apply_binary_op(xpredict_id, x0_id, ops.sub_op)
block, right_diff = block.apply_binary_op(x1_id, xpredict_id, ops.sub_op)
# If diffs equal, choose left
block, choose_left = block.apply_binary_op(left_diff, right_diff, ops.le_op)
block, choose_left = block.apply_unary_op(
choose_left, ops.partial_right(ops.fillna_op, False)
)

block, nearest = block.apply_ternary_op(y0_id, choose_left, y1_id, ops.where_op)

block, y0_exists = block.apply_unary_op(y0_id, ops.notnull_op)
block, y1_exists = block.apply_unary_op(y1_id, ops.notnull_op)
block, is_interpolation = block.apply_binary_op(y0_exists, y1_exists, ops.and_op)

block, prediction_id = block.apply_binary_op(
nearest, is_interpolation, ops.partial_arg3(ops.where_op, None)
)

return block, prediction_id


def _interpolate_points_ffill(
block: blocks.Block,
x0_id: str,
y0_id: str,
x1_id: str,
y1_id: str,
xpredict_id: str,
) -> typing.Tuple[blocks.Block, str]:
"""Interpolates by using the preceding values"""
# check for existance of y1, otherwise we are extrapolating instead of interpolating
block, y1_exists = block.apply_unary_op(y1_id, ops.notnull_op)
block, prediction_id = block.apply_binary_op(
y0_id, y1_exists, ops.partial_arg3(ops.where_op, None)
)
return block, prediction_id


def drop_duplicates(
block: blocks.Block, columns: typing.Sequence[str], keep: str = "first"
) -> blocks.Block:
Expand Down
2 changes: 2 additions & 0 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1450,6 +1450,8 @@ def reindex_like(self, other: DataFrame, *, validate: typing.Optional[bool] = No
return self.reindex(index=other.index, columns=other.columns, validate=validate)

def interpolate(self, method: str = "linear") -> DataFrame:
if method == "pad":
return self.ffill()
result = block_ops.interpolate(self._block, method)
return DataFrame(result)

Expand Down
2 changes: 2 additions & 0 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,8 @@ def replace(
return Series(block.select_column(result_col))

def interpolate(self, method: str = "linear") -> Series:
if method == "pad":
return self.ffill()
result = block_ops.interpolate(self._block, method)
return Series(result)

Expand Down
21 changes: 13 additions & 8 deletions tests/system/small/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,21 +273,26 @@ def test_series_replace_list_scalar(scalars_dfs):


@pytest.mark.parametrize(
("values",),
("method",),
(
([None, 1, 2, None, None, 16, None],),
([None, None, 3.6, None],),
([403.2, None, 352.1, None, None, 111.9],),
("linear",),
("values",),
("slinear",),
("nearest",),
("zero",),
("pad",),
),
)
def test_series_interpolate(values):
pd_series = pd.Series(values)
def test_series_interpolate(method):
values = [None, 1, 2, None, None, 16, None]
index = [-3.2, 11.4, 3.56, 4, 4.32, 5.55, 76.8]
pd_series = pd.Series(values, index)
bf_series = series.Series(pd_series)

# Pandas can only interpolate on "float64" columns
# https://github.com/pandas-dev/pandas/issues/40252
pd_result = pd_series.astype("float64").interpolate()
bf_result = bf_series.interpolate().to_pandas()
pd_result = pd_series.astype("float64").interpolate(method=method)
bf_result = bf_series.interpolate(method=method).to_pandas()

# pd uses non-null types, while bf uses nullable types
pd.testing.assert_series_equal(
Expand Down
51 changes: 32 additions & 19 deletions third_party/bigframes_vendored/pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -2872,17 +2872,6 @@ def interpolate(self, method: str = "linear"):
"""
Fill NaN values using an interpolation method.

Args:
method (str, default 'linear'):
Interpolation technique to use. Only 'linear' supported.
'linear': Ignore the index and treat the values as equally spaced.
This is the only method supported on MultiIndexes.

Returns:
DataFrame:
Returns the same object type as the caller, interpolated at
some or all ``NaN`` values

**Examples:**

>>> import bigframes.pandas as bpd
Expand All @@ -2891,17 +2880,41 @@ def interpolate(self, method: str = "linear"):
>>> df = bpd.DataFrame({
... 'A': [1, 2, 3, None, None, 6],
... 'B': [None, 6, None, 2, None, 3],
... })
... }, index=[0, 0.1, 0.3, 0.7, 0.9, 1.0])
>>> df.interpolate()
A B
0 1.0 <NA>
1 2.0 6.0
2 3.0 4.0
3 4.0 2.0
4 5.0 2.5
5 6.0 3.0
A B
0.0 1.0 <NA>
0.1 2.0 6.0
0.3 3.0 4.0
0.7 4.0 2.0
0.9 5.0 2.5
1.0 6.0 3.0
<BLANKLINE>
[6 rows x 2 columns]
>>> df.interpolate(method="values")
A B
0.0 1.0 <NA>
0.1 2.0 6.0
0.3 3.0 4.666667
0.7 4.714286 2.0
0.9 5.571429 2.666667
1.0 6.0 3.0
<BLANKLINE>
[6 rows x 2 columns]

Args:
method (str, default 'linear'):
Interpolation technique to use. Only 'linear' supported.
'linear': Ignore the index and treat the values as equally spaced.
This is the only method supported on MultiIndexes.
'index', 'values': use the actual numerical values of the index.
'pad': Fill in NaNs using existing values.
'nearest', 'zero', 'slinear': Emulates `scipy.interpolate.interp1d`

Returns:
DataFrame:
Returns the same object type as the caller, interpolated at
some or all ``NaN`` values
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

Expand Down
Loading