Skip to content

feat: add ml PCA.detect_anomalies method #422

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions bigframes/ml/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,12 @@ def model(self) -> bigquery.Model:
return self._model

def predict(self, input_data: bpd.DataFrame) -> bpd.DataFrame:
# TODO: validate input data schema
return self._apply_sql(
input_data,
self._model_manipulation_sql_generator.ml_predict,
)

def transform(self, input_data: bpd.DataFrame) -> bpd.DataFrame:
# TODO: validate input data schema
return self._apply_sql(
input_data,
self._model_manipulation_sql_generator.ml_transform,
Expand All @@ -146,7 +144,6 @@ def generate_text(
input_data: bpd.DataFrame,
options: Mapping[str, int | float],
) -> bpd.DataFrame:
# TODO: validate input data schema
return self._apply_sql(
input_data,
lambda source_df: self._model_manipulation_sql_generator.ml_generate_text(
Expand All @@ -160,7 +157,6 @@ def generate_text_embedding(
input_data: bpd.DataFrame,
options: Mapping[str, int | float],
) -> bpd.DataFrame:
# TODO: validate input data schema
return self._apply_sql(
input_data,
lambda source_df: self._model_manipulation_sql_generator.ml_generate_text_embedding(
Expand All @@ -169,12 +165,24 @@ def generate_text_embedding(
),
)

def detect_anomalies(
self, input_data: bpd.DataFrame, options: Mapping[str, int | float]
) -> bpd.DataFrame:
assert self._model.model_type in ("PCA", "KMEANS", "ARIMA_PLUS")

return self._apply_sql(
input_data,
lambda source_df: self._model_manipulation_sql_generator.ml_detect_anomalies(
source_df=source_df,
struct_options=options,
),
)

def forecast(self, options: Mapping[str, int | float]) -> bpd.DataFrame:
sql = self._model_manipulation_sql_generator.ml_forecast(struct_options=options)
return self._session.read_gbq(sql, index_col="forecast_timestamp").reset_index()

def evaluate(self, input_data: Optional[bpd.DataFrame] = None):
# TODO: validate input data schema
sql = self._model_manipulation_sql_generator.ml_evaluate(input_data)

return self._session.read_gbq(sql)
Expand Down
28 changes: 28 additions & 0 deletions bigframes/ml/decomposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,34 @@ def predict(self, X: Union[bpd.DataFrame, bpd.Series]) -> bpd.DataFrame:

return self._bqml_model.predict(X)

def detect_anomalies(
self, X: Union[bpd.DataFrame, bpd.Series], *, contamination=0.1
) -> bpd.DataFrame:
"""Detect the anomaly data points of the input.

Args:
X (bigframes.dataframe.DataFrame or bigframes.series.Series):
Series or a DataFrame to detect anomalies.
contamination (float, default 0.1):
Identifies the proportion of anomalies in the training dataset that are used to create the model.
The value must be in the range [0, 0.5].

Returns:
bigframes.dataframe.DataFrame: detected DataFrame."""
if contamination < 0.0 or contamination > 0.5:
raise ValueError(
f"contamination must be [0.0, 0.5], but is {contamination}."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

must be "in [0.0. 0.5]"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think [] means in or from to. But I don't speak english.

)

if not self._bqml_model:
raise RuntimeError("A model must be fitted before detect_anomalies")

(X,) = utils.convert_to_dataframe(X)

return self._bqml_model.detect_anomalies(
X, options={"contamination": contamination}
)

def to_gbq(self, model_name: str, replace: bool = False) -> PCA:
"""Save the model to BigQuery.

Expand Down
5 changes: 2 additions & 3 deletions bigframes/ml/imported.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import bigframes
from bigframes.core import log_adapter
from bigframes.ml import base, core, globals, utils
from bigframes.ml.globals import _SUPPORTED_DTYPES
import bigframes.pandas as bpd


Expand Down Expand Up @@ -236,9 +235,9 @@ def _create_bqml_model(self):
else:
for io in (self.input, self.output):
for v in io.values():
if v not in _SUPPORTED_DTYPES:
if v not in globals._SUPPORTED_DTYPES:
raise ValueError(
f"field_type {v} is not supported. We only support {', '.join(_SUPPORTED_DTYPES)}."
f"field_type {v} is not supported. We only support {', '.join(globals._SUPPORTED_DTYPES)}."
)

return self._bqml_model_factory.create_xgboost_imported_model(
Expand Down
5 changes: 2 additions & 3 deletions bigframes/ml/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from bigframes import clients
from bigframes.core import log_adapter
from bigframes.ml import base, core, globals, utils
from bigframes.ml.globals import _SUPPORTED_DTYPES
import bigframes.pandas as bpd

_REMOTE_MODEL_STATUS = "remote_model_status"
Expand Down Expand Up @@ -102,9 +101,9 @@ def standardize_type(v: str):
v = v.lower()
v = v.replace("boolean", "bool")

if v not in _SUPPORTED_DTYPES:
if v not in globals._SUPPORTED_DTYPES:
raise ValueError(
f"Data type {v} is not supported. We only support {', '.join(_SUPPORTED_DTYPES)}."
f"Data type {v} is not supported. We only support {', '.join(globals._SUPPORTED_DTYPES)}."
)

return v
Expand Down
8 changes: 8 additions & 0 deletions bigframes/ml/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,14 @@ def ml_generate_text_embedding(
return f"""SELECT * FROM ML.GENERATE_TEXT_EMBEDDING(MODEL `{self._model_name}`,
({self._source_sql(source_df)}), {struct_options_sql})"""

def ml_detect_anomalies(
self, source_df: bpd.DataFrame, struct_options: Mapping[str, Union[int, float]]
) -> str:
"""Encode ML.DETECT_ANOMALIES for BQML"""
struct_options_sql = self.struct_options(**struct_options)
return f"""SELECT * FROM ML.DETECT_ANOMALIES(MODEL `{self._model_name}`,
{struct_options_sql}, ({self._source_sql(source_df)}))"""

# ML evaluation TVFs
def ml_evaluate(self, source_df: Optional[bpd.DataFrame] = None) -> str:
"""Encode ML.EVALUATE for BQML"""
Expand Down
30 changes: 28 additions & 2 deletions tests/system/small/ml/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,29 @@ def test_model_predict_with_unnamed_index(
)


def test_model_detect_anomalies(
penguins_bqml_pca_model: core.BqmlModel, new_penguins_df
):
options = {"contamination": 0.25}
anomalies = penguins_bqml_pca_model.detect_anomalies(
new_penguins_df, options
).to_pandas()
expected = pd.DataFrame(
{
"is_anomaly": [True, True, True],
"mean_squared_error": [0.254188, 0.731243, 0.298889],
},
index=pd.Index([1633, 1672, 1690], name="tag_number", dtype="Int64"),
)
pd.testing.assert_frame_equal(
anomalies[["is_anomaly", "mean_squared_error"]].sort_index(),
expected,
check_exact=False,
check_dtype=False,
rtol=0.1,
)


def test_remote_model_predict(
bqml_linear_remote_model: core.BqmlModel, new_penguins_df
):
Expand Down Expand Up @@ -367,16 +390,19 @@ def test_model_forecast(time_series_bqml_arima_plus_model: core.BqmlModel):
)


def test_model_register(ephemera_penguins_bqml_linear_model):
def test_model_register(ephemera_penguins_bqml_linear_model: core.BqmlModel):
model = ephemera_penguins_bqml_linear_model
model.register()

assert model.model.model_id is not None
model_name = "bigframes_" + model.model.model_id
# Only registered model contains the field, and the field includes project/dataset. Here only check model_id.
assert model_name in model.model.training_runs[-1]["vertexAiModelId"]


def test_model_register_with_params(ephemera_penguins_bqml_linear_model):
def test_model_register_with_params(
ephemera_penguins_bqml_linear_model: core.BqmlModel,
):
model_name = "bigframes_system_test_model"
model = ephemera_penguins_bqml_linear_model
model.register(model_name)
Expand Down
26 changes: 25 additions & 1 deletion tests/system/small/ml/test_decomposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
import pandas as pd

from bigframes.ml import decomposition
import bigframes.pandas as bpd
import tests.system.utils


def test_pca_predict(penguins_pca_model, new_penguins_df):
def test_pca_predict(
penguins_pca_model: decomposition.PCA, new_penguins_df: bpd.DataFrame
):
predictions = penguins_pca_model.predict(new_penguins_df).to_pandas()
expected = pd.DataFrame(
{
Expand All @@ -35,6 +38,27 @@ def test_pca_predict(penguins_pca_model, new_penguins_df):
)


def test_pca_detect_anomalies(
penguins_pca_model: decomposition.PCA, new_penguins_df: bpd.DataFrame
):
anomalies = penguins_pca_model.detect_anomalies(new_penguins_df).to_pandas()
expected = pd.DataFrame(
{
"is_anomaly": [False, True, False],
"mean_squared_error": [0.254188, 0.731243, 0.298889],
},
index=pd.Index([1633, 1672, 1690], name="tag_number", dtype="Int64"),
)

pd.testing.assert_frame_equal(
anomalies[["is_anomaly", "mean_squared_error"]].sort_index(),
expected,
check_exact=False,
check_dtype=False,
rtol=0.1,
)


def test_pca_score(penguins_pca_model: decomposition.PCA):
result = penguins_pca_model.score().to_pandas()
expected = pd.DataFrame(
Expand Down
20 changes: 18 additions & 2 deletions tests/unit/ml/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,9 +341,8 @@ def test_ml_centroids_correct(
)


def test_forecast_correct_sql(
def test_ml_forecast_correct_sql(
model_manipulation_sql_generator: ml_sql.ModelManipulationSqlGenerator,
mock_df: bpd.DataFrame,
):
sql = model_manipulation_sql_generator.ml_forecast(
struct_options={"option_key1": 1, "option_key2": 2.2},
Expand Down Expand Up @@ -391,6 +390,23 @@ def test_ml_generate_text_embedding_correct(
)


def test_ml_detect_anomalies_correct_sql(
model_manipulation_sql_generator: ml_sql.ModelManipulationSqlGenerator,
mock_df: bpd.DataFrame,
):
sql = model_manipulation_sql_generator.ml_detect_anomalies(
source_df=mock_df,
struct_options={"option_key1": 1, "option_key2": 2.2},
)
assert (
sql
== """SELECT * FROM ML.DETECT_ANOMALIES(MODEL `my_project_id.my_dataset_id.my_model_id`,
STRUCT(
1 AS option_key1,
2.2 AS option_key2), (input_X_sql))"""
)


def test_ml_principal_components_correct(
model_manipulation_sql_generator: ml_sql.ModelManipulationSqlGenerator,
):
Expand Down