查看 Application Integration 支援的連接器

使用 For Each 平行工作負載將資料插入 BigQuery

在本教學課程中,您將建立應用程式整合和子整合,以便處理一系列記錄。針對每個記錄,主要整合作業會非同步叫用子整合作業,後者會擷取每個記錄的資料,並將資料插入 BigQuery 資料集中的資料表中,做為一列資料。

在本教學課程中,您將完成下列工作:

事前準備

  • 確認您有權存取應用程式整合功能。
  • 在 Google Cloud 專案中執行下列操作:

    • 將下列角色授予要用來建立連線的服務帳戶:
      • roles/bigquery.dataEditor
      • roles/bigquery.readSessionUser
      • roles/secretmanager.viewer
      • roles/secretmanager.secretAccessor
    • 啟用下列服務:
      • secretmanager.googleapis.com (Secret Manager API)
      • connectors.googleapis.com (連接器 API)

      如果您之前未為專案啟用這些服務,系統會在「建立連線」頁面中提示您啟用這些服務。

設定 BigQuery 連線

首先,請建立要用於本教學課程的 BigQuery 資料集和資料表。建立資料集和資料表後,請建立 BigQuery 連線。本教學課程後續將在整合作業中使用這個連線。

設定 BigQuery 資料集和資料表

如要設定 BigQuery 資料集和資料表,請按照下列步驟操作:

  1. Cloud 控制台頁面中,選取您的 Google Cloud 專案。
  2. 如要從 Google Cloud 控制台啟動 Cloud Shell 工作階段,請按一下 Cloud 控制台中的 「啟用 Cloud Shell」圖示「啟用 Cloud Shell」圖示。系統會在 Google Cloud 控制台的底部窗格啟動工作階段。
  3. 如要啟用 BigQuery API,請在 Cloud Shell 終端機中輸入下列指令:
    export PROJECT_ID=project_id
    export REGION=region
    gcloud services enable --project "${PROJECT_ID}" \
        bigquery.googleapis.com \
        bigquerystorage.googleapis.com
    在這個指令中,請這樣取代:
    • project_id 改為您的 Google Cloud 專案 ID。
    • region 與您要用來建立 BigQuery 資料集的區域。
  4. 如要建立名為 bq_tutorial 的 BigQuery 資料集,請在 Cloud Shell 終端機中輸入下列指令:
          bq  --project_id ${PROJECT_ID} --location ${REGION} mk bq_tutorial
        
  5. 如要建立名稱為 tutorial 的 BigQuery 資料表,請在 Cloud Shell 終端機中輸入下列指令:
          bq --project_id ${PROJECT_ID} \
            query  \
            --nouse_legacy_sql \
          'create table bq_tutorial.tutorial (
          unique_key STRING NOT NULL,
          created_date STRING,
          closed_date STRING,
          agency STRING,
          agency_name STRING,
          complaint_type STRING,
          descriptor STRING,
          location_type STRING,
          incident_zip STRING,
          incident_address STRING,
          street_name STRING,
          cross_street_1 STRING,
          cross_street_2 STRING,
          intersection_street_1 STRING,
          intersection_street_2 STRING,
          address_type STRING,
          city STRING,
          landmark STRING,
          facility_type STRING,
          status STRING,
          due_date STRING,
          resolution_action_updated_date STRING,
          community_board STRING,
          borough STRING,
          x_coordinate_state_plane STRING,
          y_coordinate_state_plane STRING,
          park_facility_name STRING,
          park_borough STRING,
          school_name STRING,
          school_number STRING,
          school_region STRING,
          school_code STRING,
          school_phone_number STRING,
          school_address STRING,
          school_city STRING,
          school_state STRING,
          school_zip STRING,
          school_not_found STRING,
          school_or_citywide_complaint STRING,
          vehicle_type STRING,
          taxi_company_borough STRING,
          taxi_pick_up_location STRING,
          bridge_highway_name STRING,
          bridge_highway_direction STRING,
          bridge_highway_segment STRING,
          road_ramp STRING,
          garage_lot_name STRING,
          ferry_direction STRING,
          ferry_terminal_name STRING,
          latitude STRING,
          longitude STRING,
          location STRING
          ) '
      
  6. Verify that your BigQuery table is created.
    1. In the Cloud console page, click the Navigation menu.
    2. In the Analytics section, click BigQuery.
    3. Expand your project and confirm that the bq_tutorial dataset is listed.
    4. Expand the bq_tutorial dataset and confirm that the tutorial table is listed.
    5. Click the documents table to view the schema.

Create a BigQuery connection

Next, you'll create a BigQuery connection. A BigQuery connection lets you insert, read, update and delete rows in a BigQuery table and use the resulting output in an integration. After creating the BigQuery connection, you'll use this connection in an integration later in this tutorial to add rows to the BigQuery table.

To create a BigQuery connection, complete the following steps:

  1. In the Cloud console page, select your Google Cloud project.
  2. Open the connections page.
  3. Click + CREATE NEW to open the Create Connection page.
  4. Configure the connection:
    1. In the Create Connection section, complete the following:
      • Connector: Select BigQuery from the drop down list of available Connectors.
      • Connector version: Select the latest Connector version from the drop down list of available versions.
      • In the Connection Name field, enter a name for the Connection instance. For this tutorial, enter connector-bq-tutorial.
      • Optionally, add a Description of the connection instance.
      • Service Account: Select a service account that has the required roles.
      • Project ID: Enter the ID of the Google Cloud project where the BigQuery data resides.
      • Dataset ID: Enter the ID of the BigQuery dataset that you want to use. For this tutorial, enter bq_tutorial.
      • Optionally, click + ADD LABEL to add a label in the form of a key/value pair.
      • Click Next.
    2. Location: Select a region from where the connection will run. Supported regions for connectors include:

        For the list of all the supported regions, see Locations.

      • Click Next.
    3. Authentication: The BigQuery connection does not require authentication configuration. Click Next.
    4. Review: Review your connection's configuration details. In this section, the connection and authentication details of the new connection are displayed for your review.
  5. Click Create.

Set up a sub-integration

In this tutorial, the sub-integration takes each record sent to it by the main integration and inserts it as a row in the tutorial table in the bq_tutorial dataset.

Create a sub-integration

To create the sub-integration, complete the following steps:

  1. In the Google Cloud console, go to the Application Integration page.

    Go to Application Integration

  2. Click Integrations from the left navigation menu to open the Integrations page.
  3. Click Create integration.
  4. In the Create Integration dialog, do the following:
    • Enter a name, for example, enter Process-each-record
    • Optionally, enter a description. For example, enter API Trigger to process each record (sub-integration)
    • Select the region where you want to create your integration.
  5. Click Create to open the integration editor.

Add an API Trigger

To add an API Trigger to the integration, do the following:

  1. In the integration editor, select Add a task/trigger > Triggers to display a list of available triggers.
  2. Drag the API Trigger element to the integration editor.

Add a Data Mapping task

To add a Data Mapping task in the integration, complete the following steps:

  1. Select +Add a task/trigger > Tasks in the integration editor to display the list of available tasks.
  2. Drag the Data Mapping element to the integration editor.

Configure the BigQuery connection

Now you are ready to use the BigQuery connection that you created earlier in the sub-integration. To configure the BigQuery connection in this integration, complete the following steps:

  1. Select +Add a task/trigger > Tasks in the integration editor to display the list of available tasks.
  2. Drag the Connectors element to the integration editor.
  3. Click the Connectors task element on the designer to view the task configuration pane.
  4. Click the edit icon on the right panel and update the Label to Insert row to BigQuery.
  5. Click Configure task.

    The Configure connector task dialog appears.

  6. In the Configure connector task dialog, do the following:
    1. Select the connection region where you created your BigQuery connection.
    2. Select the BigQuery connection that you want to use. For this tutorial, select connector-bq-tutorial.
    3. Once a connection is chosen, the Type column appears. Select Entities and then tutorial from the list of available entities.
    4. Once a type is chosen, the Operation column appears. Select Create.
    5. Click Done to complete the connection configuration and close the dialog.

Connect the integration elements

Next, add edge connections to connect the API Trigger to the Data Mapping task and the Data Mapping task to the Connectors task. An edge connection is a connection between any two elements in an integration. For more information on edges and edge conditions, see Edges.

To add the edge connections, complete the following steps:

  1. Click the Fork control point at the bottom of the API Trigger element. Drag and drop the edge connection at the Join control point at the top of the Data Mapping element.
  2. Click the Fork control point at the bottom of the Data Mapping element. Drag and drop the edge connection at the Join control point at the top of the Connectors element.

Configure the Data Mapping task

To configure the Data Mapping task, complete the following steps:

  1. In the integration editor, click the Data Mapping task to view the task configuration pane.
  2. Click Open Data Mapping Editor.
  3. In the Data Mapping Editor, click Add to add a new variable.
  4. In the Create Variable dialog, enter the following information:
    • Name: Enter record.
    • Data Type: Select JSON.
    • Schema: Select Infer from a sample JSON payload. Enter the following sample JSON payload:
                  {
                    "unique_key":"304271",
                    "created_date":"02/06/2007 12:00:00 AM",
                    "closed_date":"03/01/2007 12:00:00 AM",
                    "agency":"TLC",
                    "agency_name":"Taxi and Limousine Commission",
                    "complaint_type":"Taxi Complaint",
                    "descriptor":"Driver Complaint",
                    "location_type":"Street",
                    "incident_zip":"10001",
                    "incident_address":"",
                    "street_name":"",
                    "cross_street_1":"",
                    "cross_street_2":"",
                    "intersection_street_1":"WEST 29 STREET",
                    "intersection_street_2":"7 AVENUE",
                    "address_type":"INTERSECTION",
                    "city":"NEW YORK",
                    "landmark":"",
                    "facility_type":"N/A",
                    "status":"Closed",
                    "due_date":"02/28/2007 12:00:00 AM",
                    "resolution_action_updated_date":"03/01/2007 12:00:00 AM",
                    "community_board":"05 MANHATTAN",
                    "borough":"MANHATTAN",
                    "x_coordinate_state_plane":"986215",
                    "y_coordinate_state_plane":"211740",
                    "park_facility_name":"",
                    "park_borough":"MANHATTAN",
                    "school_name":"",
                    "school_number":"",
                    "school_region":"",
                    "school_code":"",
                    "school_phone_number":"",
                    "school_address":"",
                    "school_city":"",
                    "school_state":"",
                    "school_zip":"",
                    "school_not_found":"",
                    "school_or_citywide_complaint":"",
                    "vehicle_type":"",
                    "taxi_company_borough":"",
                    "taxi_pick_up_location":"Other",
                    "bridge_highway_name":"",
                    "bridge_highway_direction":"",
                    "road_ramp":"",
                    "bridge_highway_segment":"",
                    "garage_lot_name":"",
                    "ferry_direction":"",
                    "ferry_terminal_name":"",
                    "latitude":"40.74785373937869",
                    "longitude":"-73.99290823133913",
                    "location":"(40.74785373937869, -73.99290823133913)"
                  }
                
    • 按一下 [Create] (建立)
    • 建立變數後,請在「資料對應編輯器」中完成下列步驟:
      • 將新的「記錄」變數拖曳至「輸入」欄。
      • 將「connectorInputPayload」變數拖曳至「Output」欄。
    • 關閉「資料對應關係編輯器」,返回整合服務編輯器。

發布子整合

如要發布子整合服務,請在整合服務編輯器中按一下「發布」

設定主要整合

在本節中,您將設定主要整合作業,該作業會使用「For Each Parallel」工作來處理每個記錄。然後,這項主要整合會針對每個記錄叫用一次子整合。

建立主要整合

如要建立主要整合,請完成下列步驟:

  1. 前往 Google Cloud 控制台的「Application Integration」頁面。

    前往「應用程式整合」

  2. 按一下左側導覽選單中的「整合」,開啟「整合」頁面。
  3. 按一下「建立整合功能」
  4. 在「Create Integration」對話方塊中,執行下列操作:
    • 輸入名稱,例如輸入「process-records」
    • �����可以選擇輸入說明。例如輸入「API 觸發條件,用於處理記錄 (主要整合)」
    • 選取要建立整合項目的地區。
  5. 按一下「建立」開啟整合服務編輯器。

新增 API 觸發條件

如要將 API 觸發條件新增至整合,請按照下列步驟操作:

  1. 在整合編輯器中,依序選取「Add a task/trigger」>「Triggers」,即可查看可用觸發條件清單。
  2. 將「API Trigger」元素拖曳至整合服務編輯器。

新增 For Each 並行工作

如要在整合中新增 For-Each 並行工作,請完成下列步驟:

  1. 在整合服務編輯器中依序選取「+新增任務/觸發條件」>「任務」,即可顯示可用任務清單。
  2. 將「For Each Parallel」元素拖曳至整合編輯器。

連結整合元素

接下來,請新增邊緣連線,將 API 觸發條件連結至 For Each 平行工作。

如要新增邊緣連線,請按一下 API 觸發條件元素底部的「分支」控制點。將邊緣連線拖曳至「For Each Parallel」任務元素頂端的「Join」控制點。

設定 For Each 並行工作

如要設定 For Each Parallel 工作,請完成下列步驟:

  1. 在整合編輯器中,按一下「For Each Parallel」工作,即可查看任務設定窗格。
  2. 在「陣列選取」>「要迭代的清單」下方,點選「新增變數」來新增變數。
  3. 在「建立變數」對話方塊中輸入以下資訊:
    • 名稱:輸入 records
    • 資料類型:選取「JSON」
    • 結構定義:選取「從 JSON 酬載示例推斷」。請輸入下列 JSON 酬載範例:
                    [{
                      "unique_key":"304271",
                      "created_date":"02/06/2007 12:00:00 AM",
                      "closed_date":"03/01/2007 12:00:00 AM",
                      "agency":"TLC",
                      "agency_name":"Taxi and Limousine Commission",
                      "complaint_type":"Taxi Complaint",
                      "descriptor":"Driver Complaint",
                      "location_type":"Street",
                      "incident_zip":"10001",
                      "incident_address":"",
                      "street_name":"",
                      "cross_street_1":"",
                      "cross_street_2":"",
                      "intersection_street_1":"WEST 29 STREET",
                      "intersection_street_2":"7 AVENUE",
                      "address_type":"INTERSECTION",
                      "city":"NEW YORK",
                      "landmark":"",
                      "facility_type":"N/A",
                      "status":"Closed",
                      "due_date":"02/28/2007 12:00:00 AM",
                      "resolution_action_updated_date":"03/01/2007 12:00:00 AM",
                      "community_board":"05 MANHATTAN",
                      "borough":"MANHATTAN",
                      "x_coordinate_state_plane":"986215",
                      "y_coordinate_state_plane":"211740",
                      "park_facility_name":"",
                      "park_borough":"MANHATTAN",
                      "school_name":"",
                      "school_number":"",
                      "school_region":"",
                      "school_code":"",
                      "school_phone_number":"",
                      "school_address":"",
                      "school_city":"",
                      "school_state":"",
                      "school_zip":"",
                      "school_not_found":"",
                      "school_or_citywide_complaint":"",
                      "vehicle_type":"",
                      "taxi_company_borough":"",
                      "taxi_pick_up_location":"Other",
                      "bridge_highway_name":"",
                      "bridge_highway_direction":"",
                      "road_ramp":"",
                      "bridge_highway_segment":"",
                      "garage_lot_name":"",
                      "ferry_direction":"",
                      "ferry_terminal_name":"",
                      "latitude":"40.74785373937869",
                      "longitude":"-73.99290823133913",
                      "location":"(40.74785373937869, -73.99290823133913)"
                    }]
                  
  4. 按一下 [建立]。
  5. 在「子整合作業詳細資料」部分中,輸入以下資訊:
    • API 觸發條件 ID:選取子整合項目中的 API 觸發條件元素。例如選取「Process-each-record_API_1」Process-each-record_API_1
    • 執行策略:選取「ASYNC」
    • 選取「執行單一整合」
  6. 在「On each execution」部分,針對「Where to map individual array elements」,在子整合作業中輸入資料對應工作中的變數名稱。在這種情況下,請輸入「record」。只有已發布的整合項目才會列出子整合變數。如果未列出變數,請重新整理頁面,因為在子整合項目發布後,系統需要一些時間才能顯示變數。

發布主要整合

如要發布主要整合服務,請在整合服務編輯器中按一下「發布」

測試整合

如要測試整合,請完成下列步驟:

  1. 將範例資料下載至 Cloud Shell:
    1. 如要從 Google Cloud 控制台啟動 Cloud Shell 工作階段,請按一下 Cloud 控制台中的 「啟用 Cloud Shell」圖示「啟用 Cloud Shell」圖示。系統會在 Google Cloud 控制台的底部窗格啟動工作階段。
    2. 在 Cloud Shell 終端機中輸入下列指令:
      wget https://raw.githubusercontent.com/GoogleCloudPlatform/application-integration-samples/main/assets/bq-sample-dataset.json
              
    3. 如要確認是否已下載範例資料,請在 Cloud Shell 終端機中輸入下列指令:
      ls -la bq-sample-dataset.json
      下載的檔案會列在 Cloud Shell 終端機中。
  2. 如要從範例資料集中隨機選取三個項目,並以可傳遞至整合項目的方式儲存,請在 Cloud Shell 終端機中輸入下列指令:
    AUTH=$(gcloud auth print-access-token)
    export SAMPLE_DOCS=$(jq $(r=$((RANDOM % 1000)) ; echo ".[$r:$((r + 3))]") < bq-sample-dataset.json | jq -Rs '.')
                
    generate_post_data()
      {
        cat <<EOF
          {
            "triggerId": "api_trigger/process-records_API_1",
            "inputParameters": 
              {
                "records": 
                  {
                    "jsonValue": $SAMPLE_DOCS
                  }
              }
          }
          EOF
      }
  3. 如要開始測試,請在 Cloud Shell 終端機中輸入下列指令:
    curl -X POST \
      https://integrations.googleapis.com/v1/projects/project_id/locations/region/integrations/process-records:execute \
      -H "Authorization: Bearer $AUTH" \
      -H "Content-Type: application/json" \
      -d "$(generate_post_data)"
    在這個指令中,請這樣取代:
    • project_id 改為您的 Google Cloud 專案 ID。
    • region 改為您建立整合項目的地區。
    這個指令會叫用主要整合作業,並將樣本資料集的項目傳遞至主要整合作業。主整合作業會將每個項目傳遞給子整合作業,後者會將資料新增為 BigQuery 表格的一列。
  4. 如要確認 BigQuery 資料表現在包含這些記錄,請執行下列步驟:
    1. Cloud 控制台頁面中,按一下 「導覽選單」
    2. 在「數據分析」部分,點選「BigQuery」
    3. 展開專案並按一下 bq_tutorial 資料集。
    4. 展開 bq_tutorial 資料集,然後按一��� tutorial 資料表。
    5. 按一下「資料表探索工具」分頁標籤,查看插入的記錄。

後續步驟

請嘗試與其他連接器建立整合。如需所有支援連接器的清單,請參閱 連接器參考資料