Python 中的使用者定義函式

Python 使用者定義函式 (UDF) 可讓您在 Python 中實作純量函式,並在 SQL 查詢中使用。Python UDF 與 SQL 和 JavaScript UDF 類似,但具備額外功能。您可以使用 Python UDF 從 Python Package Index (PyPI) 安裝第三方程式庫,並透過 Cloud 資源連線存取外部服務。

Python UDF 會在 BigQuery 管理的資源上建構及執行。

限制

  • 目前僅支援 python-3.11 執行階段。
  • 您無法建立暫時性 Python UDF。
  • 您無法搭配具體化檢視表使用 Python UDF。
  • 由於系統一律會假設 Python UDF 的傳回值不具決定性,因此不會快取呼叫 Python UDF 的查詢結果。
  • INFORMATION_SCHEMA 檢視區塊不完全支援 Python UDF。
  • 您無法使用 Routine API 建立或更新 Python UDF。
  • 不支援 VPC Service Controls
  • 不支援客戶自行管理的加密金鑰 (CMEK)
  • 不支援的資料類型:JSONRANGEINTERVALGEOGRAPHY
  • 執行 Python UDF 的容器最多只能設定 2 個 vCPU 和 8 Gi

必要 IAM 角色

所需 IAM 角色取決於您是 Python UDF 擁有者還是使用者。Python UDF 擁有者通常會建立或更新 UDF。Python UDF 使用者會叫用他人建立的 UDF。

如果您建立或執行參照 Cloud 資源連線的 Python UDF,也需要其他角色。

UDF 擁有者

如要建立或更新 Python UDF,請在適當的資源上授予下列預先定義的 IAM 角色:

角色 所需權限 資源
BigQuery 資料編輯者 (roles/bigquery.dataEditor)
  • bigquery.routines.create,使用 CREATE FUNCTION 陳述式建立 Python UDF。
  • bigquery.routines.update,使用 CREATE FUNCTION 陳述式更新 Python UDF。
建立或更新 Python UDF 的資料集。
BigQuery 工作使用者 (roles/bigquery.jobUser)
  • bigquery.jobs.create:執行 CREATE FUNCTION 陳述式查詢工作。
您執行 CREATE FUNCTION 陳述式的專案。
BigQuery Connection 管理員 (roles/bigquery.connectionAdmin)
  • 只有在bigquery.connections.create建立新的 Cloud 資源連線時,才需要此角色。
  • bigquery.connections.delegate,在 CREATE FUNCTION 陳述式中使用連線。
您要授予外部資源存取權的連線。只有在 UDF 使用 WITH CONNECTION 子句存取外部服務時,才需要這個連線。

UDF 使用者

如果您要叫用 Python UDF,請在適當的資源上授予下列預先定義的 IAM 角色:

角色 所需權限 資源
BigQuery 使用者 (roles/bigquery.user) bigquery.jobs.create:執行參照 UDF 的查詢工作。 您要執行查詢工作的專案,該工作會叫用 Python UDF。
BigQuery 資料檢視者 (roles/bigquery.dataViewer) bigquery.routines.get 執行他人建立的 UDF。 儲存 Python UDF 的資料集。
BigQuery Connection 使用者 (roles/bigquery.connectionUser) bigquery.connections.use,執行參照 Cloud 資源連線的 Python UDF。 Python UDF 參照的 Cloud 資源連線。只有在 UDF 參照連線時,才需要這個連線。

如要進一步瞭解 BigQuery 中的角色,請參閱「預先定義的 IAM 角色」。

建立永久 Python UDF

建立 Python UDF 時,請遵守下列規則:

  • Python UDF 的主體必須是代表 Python 程式碼的引號字串常值。如要進一步瞭解以引號括住的字串文字,請參閱以引號括住的文字格式

  • Python UDF 的主體必須包含 Python 函式,該函式會用於 Python UDF 選項清單的 entry_point 引數中。

  • 您需要在 runtime_version 選項中指定 Python 執行階段版本。目前僅支援 Python 執行階段版本 python-3.11。如需可用選項的完整清單,請參閱 CREATE FUNCTION 陳述式的函式選項清單

如要建立永久性 Python UDF,請使用 CREATE FUNCTION 陳述式,但不要使用 TEMPTEMPORARY 關鍵字。如要刪除永久 Python UDF,請使用 DROP FUNCTION 陳述式。

使用 CREATE FUNCTION 陳述式建立 Python UDF 時,BigQuery 會根據基礎映像檔建立或更新容器映像檔。容器會使用您的程式碼和任何指定的套件依附元件,以基本映像檔為基礎建構而成。建立容器是長時間執行的程序。執行 CREATE FUNCTION 陳述式後的第一個查詢可能會自動等待圖片完成。通常不需要任何外部依附元件,容器映像檔應可在不到一分鐘內建立完成。

範例

如要查看建立永久性 Python UDF 的範例,請選擇下列任一選項:

主控台

下列範例會建立名為 multiplyInputs 的永久性 Python UDF,並從 SELECT 陳述式中呼叫該 UDF:

  1. 前往「BigQuery」頁面

    前往 BigQuery

  2. 在查詢編輯器中,輸入下列CREATE FUNCTION陳述式:

    CREATE FUNCTION `PROJECT_ID.DATASET_ID`.multiplyInputs(x FLOAT64, y FLOAT64)
    RETURNS FLOAT64
    LANGUAGE python
    OPTIONS(runtime_version="python-3.11", entry_point="multiply")
    AS r'''
    
    def multiply(x, y):
      return x * y
    
    ''';
    
    -- Call the Python UDF.
    WITH numbers AS
      (SELECT 1 AS x, 5 as y
      UNION ALL
      SELECT 2 AS x, 10 as y
      UNION ALL
      SELECT 3 as x, 15 as y)
    SELECT x, y,
    `PROJECT_ID.DATASET_ID`.multiplyInputs(x, y) AS product
    FROM numbers;

    取代 PROJECT_ID。將 DATASET_ID 替換為專案 ID 和資料集 ID。

  3. 按一下「執行」

    這個範例會產生下列輸出內容:

    +-----+-----+--------------+
    | x   | y   | product      |
    +-----+-----+--------------+
    | 1   | 5   |  5.0         |
    | 2   | 10  | 20.0         |
    | 3   | 15  | 45.0         |
    +-----+-----+--------------+
    

BigQuery DataFrames

下列範例使用 BigQuery DataFrames,將自訂函式轉換為 Python UDF:

import bigframes.pandas as bpd

# Set BigQuery DataFrames options
bpd.options.bigquery.project = your_gcp_project_id
bpd.options.bigquery.location = "US"

# BigQuery DataFrames gives you the ability to turn your custom functions
# into a BigQuery Python UDF. One can find more details about the usage and
# the requirements via `help` command.
help(bpd.udf)

# Read a table and inspect the column of interest.
df = bpd.read_gbq("bigquery-public-data.ml_datasets.penguins")
df["body_mass_g"].peek(10)

# Define a custom function, and specify the intent to turn it into a
# BigQuery Python UDF. Let's try a `pandas`-like use case in which we want
# to apply a user defined function to every value in a `Series`, more
# specifically bucketize the `body_mass_g` value of the penguins, which is a
# real number, into a category, which is a string.
@bpd.udf(
    dataset=your_bq_dataset_id,
    name=your_bq_routine_id,
)
def get_bucket(num: float) -> str:
    if not num:
        return "NA"
    boundary = 4000
    return "at_or_above_4000" if num >= boundary else "below_4000"

# Then we can apply the udf on the `Series` of interest via
# `apply` API and store the result in a new column in the DataFrame.
df = df.assign(body_mass_bucket=df["body_mass_g"].apply(get_bucket))

# This will add a new column `body_mass_bucket` in the DataFrame. You can
# preview the original value and the bucketized value side by side.
df[["body_mass_g", "body_mass_bucket"]].peek(10)

# The above operation was possible by doing all the computation on the
# cloud through an underlying BigQuery Python UDF that was created to
# support the user's operations in the Python code.

# The BigQuery Python UDF created to support the BigQuery DataFrames
# udf can be located via a property `bigframes_bigquery_function`
# set in the udf object.
print(f"Created BQ Python UDF: {get_bucket.bigframes_bigquery_function}")

# If you have already defined a custom function in BigQuery, either via the
# BigQuery Google Cloud Console or with the `udf` decorator,
# or otherwise, you may use it with BigQuery DataFrames with the
# `read_gbq_function` method. More details are available via the `help`
# command.
help(bpd.read_gbq_function)

existing_get_bucket_bq_udf = get_bucket.bigframes_bigquery_function

# Here is an example of using `read_gbq_function` to load an existing
# BigQuery Python UDF.
df = bpd.read_gbq("bigquery-public-data.ml_datasets.penguins")
get_bucket_function = bpd.read_gbq_function(existing_get_bucket_bq_udf)

df = df.assign(body_mass_bucket=df["body_mass_g"].apply(get_bucket_function))
df.peek(10)

# Let's continue trying other potential use cases of udf. Let's say we
# consider the `species`, `island` and `sex` of the penguins sensitive
# information and want to redact that by replacing with their hash code
# instead. Let's define another scalar custom function and decorate it
# as a udf. The custom function in this example has external package
# dependency, which can be specified via `packages` parameter.
@bpd.udf(
    dataset=your_bq_dataset_id,
    name=your_bq_routine_id,
    packages=["cryptography"],
)
def get_hash(input: str) -> str:
    from cryptography.fernet import Fernet

    # handle missing value
    if input is None:
        input = ""

    key = Fernet.generate_key()
    f = Fernet(key)
    return f.encrypt(input.encode()).decode()

# We can use this udf in another `pandas`-like API `map` that
# can be applied on a DataFrame
df_redacted = df[["species", "island", "sex"]].map(get_hash)
df_redacted.peek(10)

# If the BigQuery routine is no longer needed, we can clean it up
# to free up any cloud quota
session = bpd.get_global_session()
session.bqclient.delete_routine(f"{your_bq_dataset_id}.{your_bq_routine_id}")

建立向量化 Python UDF

您可以實作 Python UDF,使用向量化處理一批資料列,而非單一資料列。向量化可提高查詢效能。

如要控管批次處理行為,請在CREATE OR REPLACE FUNCTION 選項清單中使用 max_batching_rows 選項,指定每個批次中的最大列數。 如果您指定 max_batching_rows,BigQuery 會決定批次中的資料列數,最多為 max_batching_rows 限制。如未指定 max_batching_rows,系統會自動判斷要批次處理的資料列數量。

向量化 Python UDF 具有單一pandas.DataFrame引數,必須加上註解。pandas.DataFrame 引數的資料欄數量與 CREATE FUNCTION 陳述式中定義的 Python UDF 參數相同。pandas.DataFrame 引數中的資料欄名稱與 UDF 的參數名稱相同。

函式必須傳回 pandas.Series 或單欄 pandas.DataFrame,且列數與輸入內容相同。

下列範例會建立名為 multiplyInputs 的向量化 Python UDF,其中包含 xy 這兩個參數:

  1. 前往「BigQuery」頁面

    前往 BigQuery

  2. 在查詢編輯器中,輸入下列 CREATE FUNCTION 陳述式:

    CREATE FUNCTION `PROJECT_ID.DATASET_ID`.multiplyVectorized(x FLOAT64, y FLOAT64)
    RETURNS FLOAT64
    LANGUAGE python
    OPTIONS(runtime_version="python-3.11", entry_point="vectorized_multiply")
    AS r'''
    import pandas as pd
    
    def vectorized_multiply(df: pd.DataFrame):
      return df['x'] * df['y']
    
    ''';

    取代 PROJECT_ID。將 DATASET_ID 替換為專案 ID 和資料集 ID。

    呼叫 UDF 的方式與上一個範例相同。

  3. 按一下「執行」

支援的 Python UDF 資料類型

下表定義 BigQuery 資料類型、Python 資料類型和 Pandas 資料類型之間的對應關係:

BigQuery 資料類型 標準 UDF 使用的 Python 內建資料型別 向量化 UDF 使用的 Pandas 資料類型 用於向量化 UDF 中 ARRAY 和 STRUCT 的 PyArrow 資料類型
BOOL bool BooleanDtype DataType(bool)
INT64 int Int64Dtype DataType(int64)
FLOAT64 float FloatDtype DataType(double)
STRING str StringDtype DataType(string)
BYTES bytes binary[pyarrow] DataType(binary)
TIMESTAMP

函式參數:datetime.datetime (已設定世界標準時間時區)

函式傳回值:datetime.datetime (已設定任何時區)

函式參數:timestamp[us, tz=UTC][pyarrow]

函式傳回值:timestamp[us, tz=*][pyarrow]\(any timezone\)

TimestampType(timestamp[us]) (含時區)
DATE datetime.date date32[pyarrow] DataType(date32[day])
TIME datetime.time time64[pyarrow] Time64Type(time64[us])
DATETIME datetime.datetime (不含時區) timestamp[us][pyarrow] TimestampType(timestamp[us]),不含時區
ARRAY list list<...>[pyarrow],其中元素資料類型為 pandas.ArrowDtype ListType
STRUCT dict ,其中欄位資料類型為 pandas.ArrowDtypestruct<...>[pyarrow] StructType

支援的執行階段版本

BigQuery Python UDF 支援 python-3.11 執行階段。這個 Python 版本包含一些額外預先安裝的套件。如果是系統程式庫,請檢查執行階段基本映像檔。

執行階段版本 Python 版本 收錄 執行階段基礎映像檔
python-3.11 Python 3.11 numpy 1.26.3
pyarrow 14.0.2
pandas 2.1.4
python-dateutil 2.8.2
google-22-full/python311

使用第三方套件

如要使用 Python 標準程式庫和預先安裝套件以外的模組,請使用 CREATE FUNCTION 選項清單。您可以從 Python Package Index (PyPI) 安裝套件,也可以從 Cloud Storage 匯入 Python 檔案。

從 Python 套件索引安裝套件

安裝套件時,您必須提供套件名稱,也可以使用 Python 套件版本指定碼提供套件版本。如果套件位於執行階段,系統會使用該套件,除非 CREATE FUNCTION 選項清單中指定了特定版本。如果未指定套件版本,且套件不在執行階段中,系統會使用最新版本。僅支援輪子二進位格式的套件。

以下範例說明如何建立 Python UDF,使用 CREATE OR REPLACE FUNCTION 選項清單安裝 scipy 套件:

  1. 前往「BigQuery」頁面

    前往 BigQuery

  2. 在查詢編輯器中,輸入下列CREATE FUNCTION陳述式:

    CREATE FUNCTION `PROJECT_ID.DATASET_ID`.area(radius FLOAT64)
    RETURNS FLOAT64 LANGUAGE python
    OPTIONS (entry_point='area_handler', runtime_version='python-3.11', packages=['scipy==1.15.3'])
    AS r"""
    import scipy
    
    def area_handler(radius):
      return scipy.constants.pi*radius*radius
    """;
    
    SELECT `PROJECT_ID.DATASET_ID`.area(4.5);

    取代 PROJECT_ID。將 DATASET_ID 替換為專案 ID 和資料集 ID。

  3. 按一下「執行」

將其他 Python 檔案匯入為程式庫

您可以從 Cloud Storage 匯入 Python 檔案,藉此使用函式選項清單擴充 Python UDF。

在 UDF 的 Python 程式碼中,您可以使用 import 陳述式,後面加上 Cloud Storage 物件的路徑,將 Cloud Storage 中的 Python 檔案匯入為模組。舉例來說,如果您要匯入 gs://BUCKET_NAME/path/to/lib1.py,匯入陳述式就會是 import path.to.lib1

Python 檔案名稱必須是 Python ID。物件名稱中的每個 folder 名稱 (/ 後方) 都應是有效的 Python ID。在 ASCII 範圍 (U+0001..U+007F) 內,以下字元可用於 ID:

  • 大寫和大小寫字母 A 到 Z。
  • 底線。
  • 數字 0 到 9,但 ID 的第一個字元不得為數字。

以下範例說明如何建立 Python UDF,從名為 my_bucket 的 Cloud Storage bucket 匯入 lib1.py 用戶端程式庫套件:

  1. 前往「BigQuery」頁面

    前往 BigQuery

  2. 在查詢編輯器中,輸入下列CREATE FUNCTION陳述式:

    CREATE FUNCTION `PROJECT_ID.DATASET_ID`.myFunc(a FLOAT64, b STRING)
    RETURNS STRING LANGUAGE python
    OPTIONS (
    entry_point='compute', runtime_version='python-3.11',
    library=['gs://my_bucket/path/to/lib1.py'])
    AS r"""
    import path.to.lib1 as lib1
    
    def compute(a, b):
      # doInterestingStuff is a function defined in
      # gs://my_bucket/path/to/lib1.py
      return lib1.doInterestingStuff(a, b);
    
    """;

    取代 PROJECT_ID。將 DATASET_ID 替換為專案 ID 和資料集 ID。

  3. 按一下「執行」

設定 Python UDF 的容器限制

您可以透過CREATE FUNCTION 選項清單,為執行 Python UDF 的容器指定 CPU 和記憶體限制。

根據預設,分配給每個容器執行個體的記憶體為 512 MiB,分配的 CPU 為 0.33 個 vCPU。

下列範例會使用 CREATE FUNCTION 選項清單指定容器限制,藉此建立 Python UDF:

  1. 前往「BigQuery」頁面

    前往 BigQuery

  2. 在查詢編輯器中,輸入下列CREATE FUNCTION陳述式:

    CREATE FUNCTION `PROJECT_ID.DATASET_ID`.resizeImage(image BYTES)
    RETURNS BYTES LANGUAGE python
    OPTIONS (entry_point='resize_image', runtime_version='python-3.11',
    packages=['Pillow==11.2.1'], container_memory='2Gi', container_cpu=1)
    AS r"""
    import io
    from PIL import Image
    
    def resize_image(image_bytes):
      img = Image.open(io.BytesIO(image_bytes))
    
      resized_img = img.resize((256, 256), Image.Resampling.LANCZOS)
      output_stream = io.BytesIO()
      resized_img.convert('RGB').save(output_stream, format='JPEG')
      return output_stream.getvalue()
    """;

    取代 PROJECT_ID。將 DATASET_ID 替換為專案 ID 和資料集 ID。

  3. 按一下「執行」

支援的 CPU 值

Python UDF 支援介於 0.331.0 之間的 CPU 分數值,以及 12 的 CPU 非分數值。系統會先將分數輸入值四捨五入至小數點後兩位,再套用至容器。

支援的記憶體值

Python UDF 容器支援下列格式的記憶體值: <integer_number><unit>。單位必須是下列其中一個值:MiMGiG。可設定的記憶體量下限為 256 MiB (256 Mi)。您最多可設定 8 Gibibyte (8 Gi) 的記憶體。

根據您選擇的記憶體值,您也必須指定 CPU 的最低數量。下表顯示每個記憶體值的最低 CPU 值:

記憶體 最低 CPU
512 MiB or less 0.33
More than 512 MiB 0.5
More than 1 GiB 1
More than 4 GiB 2

在 Python 程式碼中呼叫 Google Cloud 或線上服務

Python UDF 會使用雲端資源連線服務帳戶,存取 Google Cloud 服務或外部服務。必須將存取服務的權限授予連線的服務帳戶。視存取的服務和從 Python 程式碼呼叫的 API 而定,所需權限也會有所不同。

如果您建立 Python UDF 時未使用 Cloud 資源連線,系統會在禁止網路存取的環境中執行函式。如果 UDF 會存取線上服務,您必須使用 Cloud 資源連線建立 UDF。否則,在達到內部連線逾時前,UDF 會遭到封鎖,無法存取網路。

以下範例說明如何從 Python UDF 存取 Cloud Translation 服務。這個範例有兩個專案:一個是名為 my_query_project 的專案,您可以在其中建立 UDF 和 Cloud 資源連線;另一個是名為 my_translate_project 的專案,您可以在其中執行 Cloud Translation。

建立 Cloud 資源連線

首先,您要在 my_query_project 中建立 Cloud 資源連線。如要建立 Cloud 資源連線,請按照「建立 Cloud 資源連線」頁面的步驟操作。

建立連線後,請開啟連線,然後在「連線資訊」窗格中複製服務帳戶 ID。設定連線權限時,需要這個 ID。建立連線資源時,BigQuery 會建立專屬的系統服務帳戶,並將其與連線建立關聯。

將存取權授予連線的服務帳戶

如要授予 Cloud 資源連線服務帳戶專案存取權,請在 my_query_project 中授予服務帳戶服務使用情形個人使用者角色 (roles/serviceusage.serviceUsageConsumer),並在 my_translate_project 中授予Cloud Translation API 使用者角色 (roles/cloudtranslate.user)。

  1. 前往身分與存取權管理頁面。

    前往身分與存取權管理頁面

  2. 確認已選取 my_query_project

  3. 按一下 「授予存取權」

  4. 在「新增主體」欄位,輸入先前複製的 Cloud 資源連結服務帳戶 ID。

  5. 在「Select a role」(請選擇角色) 欄位中,依序選取「Service usage」(服務用量) 和「Service usage consumer」(服務用量消費者)

  6. 按一下 [儲存]

  7. 在專案選取器中,選擇 my_translate_project

  8. 前往身分與存取權管理頁面。

    前往身分與存取權管理頁面

  9. 按一下 「授予存取權」

  10. 在「新增主體」欄位,輸入先前複製的 Cloud 資源連結服務帳戶 ID。

  11. 在「Select a role」(請選擇角色) 欄位中,依序選取「Cloud translation」(Cloud Translation) 和「Cloud Translation API user」(Cloud Translation API 使用者)

  12. 按一下 [儲存]

建立呼叫 Cloud Translation 服務的 Python UDF

my_query_project 中,使用 Cloud 資源連線建立呼叫 Cloud Translation 服務的 Python UDF。

  1. 前往「BigQuery」頁面

    前往 BigQuery

  2. 在查詢編輯器中輸入下列 CREATE FUNCTION 陳述式:

    CREATE FUNCTION `PROJECT_ID.DATASET_ID`.translate_to_es(x STRING)
    RETURNS STRING LANGUAGE python
    WITH CONNECTION `PROJECT_ID.REGION.CONNECTION_ID`
    OPTIONS (entry_point='do_translate', runtime_version='python-3.11', packages=['google-cloud-translate>=3.11', 'google-api-core'])
    AS r"""
    
    from google.api_core.retry import Retry
    from google.cloud import translate
    
    project = "my_translate_project"
    translate_client = translate.TranslationServiceClient()
    
    def do_translate(x : str) -> str:
    
        response = translate_client.translate_text(
            request={
                "parent": f"projects/{project}/locations/us-central1",
                "contents": [x],
                "target_language_code": "es",
                "mime_type": "text/plain",
            },
            retry=Retry(),
        )
        return response.translations[0].translated_text
    
    """;
    
    -- Call the UDF.
    WITH text_table AS
      (SELECT "Hello" AS text
      UNION ALL
      SELECT "Good morning" AS text
      UNION ALL
      SELECT "Goodbye" AS text)
    SELECT text,
    `PROJECT_ID.DATASET_ID`.translate_to_es(text) AS translated_text
    FROM text_table;

    更改下列內容:

    • PROJECT_ID.DATASET_ID: 您的專案 ID 和資料集 ID
    • REGION.CONNECTION_ID: 連線的區域和連線 ID
  3. 按一下「執行」

    輸出內容應如下所示:

    +--------------------------+-------------------------------+
    | text                     | translated_text               |
    +--------------------------+-------------------------------+
    | Hello                    | Hola                          |
    | Good morning             | Buen dia                      |
    | Goodbye                  | Adios                         |
    +--------------------------+-------------------------------+
    

支援的地區

所有 BigQuery 多區域和區域位置都支援 Python UDF。

定價

Python UDF 免費提供,不另外收費。

啟用計費功能後,系統會套用下列設定:

  • 系統會使用 BigQuery 服務 SKU,收取 Python UDF 費用。
  • 費用會與叫用 Python UDF 時消耗的運算和記憶體量成正比。
  • 此外,系統也會向 Python UDF 客戶收取建構或重建 UDF 容器映像檔的費用。這筆費用會根據使用客戶程式碼和依附元件建構映像檔時所用的資源比例計算。
  • 如果 Python UDF 導致外部或網際網路網路輸出,您也會看到 Cloud Networking 的進階級網際網路輸出費用。