使用 Dataflow 建立變更串流連線

本頁面將說明如何使用變更串流建立 Dataflow 管道,以便取用及轉送 Spanner 變更資料。您可以使用本頁面的範例程式碼來建構自訂管道。

核心概念

以下是變更資料流的 Dataflow 管道的一些核心概念。

Dataflow

Dataflow 是一項無伺服器、快速且具成本效益的服務,可同時支援串流和批次處理作業。這項服務可為使用開放原始碼 Apache Beam 程式庫編寫的處理工作提供可攜性,並自動佈建基礎架構和叢集管理。從變更串流讀取資料時,Dataflow 會提供近乎即時的串流。

您可以使用 Dataflow 搭配 SpannerIO 連接器使用 Spanner 變更串流,這可提供 Spanner API 的抽象概念,用於查詢變更串流。使用這個連接器,您就不需要管理變更串流分割作業生命週期,而這項作業是直接使用 Spanner API 時的必要步驟。連接器會提供一���串的������變更���錄,讓您可以專注於應用程式邏輯,而非特定 API 詳細資料和動態變更串流分割作業。在需要讀取變更串流資料的大多數情況下,建議使用 SpannerIO 連接器,而非 Spanner API。

Dataflow 範本是預先建構的 Dataflow 管道,可用於實作常見用途。如需總覽,請參閱「Dataflow 範本」。

Dataflow 管道

Spanner 變更串流 Dataflow 管道由四個主要部分組成:

  1. 含有變更串流的 Spanner 資料庫
  2. SpannerIO 連接器
  3. 使用者定義的轉換和匯出目的
  4. Apache Beam 接收器輸入/輸出寫入器

圖片

Spanner 變更串流

如要進一步瞭解如何建立變更串流,請參閱「建立變更串流」。

Apache Beam SpannerIO 連接器

這是 前文「Dataflow」一節所述的 SpannerIO 連接器。這是一個來源 I/O 連接器,可將資料變更記錄的 PCollection 傳送至管道的後續階段。每個產生的資料變更記錄的事件時間會是提交時間戳記。請注意,產生的記錄是未排序的,而 SpannerIO 連接器會保證不會有延遲記錄

使用變更串流時,Dataflow 會使用檢查點。因此,每個 worker 可能會等待最多設定的檢查點間隔,以便緩衝變更,然後再將變更送出以便進一步處理。

使用者定義的轉換

使用者定義的轉換可讓使用者在 Dataflow 管道中匯總、轉換或修改處理資料。這類常見用途包括移除個人識別資訊、滿足下游資料格式需求,以及排序。請參閱官方 Apache Beam 說明文件,瞭解轉換作業的程式設計指南。

Apache Beam 接收器 I/O 寫入器

Apache Beam 包含內建 I/O 連接器,可用於將資料從 Dataflow 管道寫入 BigQuery 等資料接收器。支援大部分常見的資料接收器。

Dataflow 範本

Dataflow 範本提供一種方法,可根據預先建構的 Docker 映像檔建立 Dataflow 工作,以便使用 Google Cloud 控制台、 Google Cloud CLI 或 Rest API 呼叫,用於常見的用途。

針對 Spanner 變更串流,我們提供三個 Dataflow 彈性範本:

使用 Spanner 變更串流至 Pub/Sub 範本時,請注意下列限制:

設定 Dataflow 範本的 IAM 權限

在建立含有三個彈性範本的 Dataflow 工作之前,請確認您已取得下列服務帳戶的必要 IAM 權限

如果您沒有必要的 IAM 權限,就必須指定使用者代管的工作者服務帳戶,才能建立 Dataflow 工作。詳情請參閱「Dataflow 安全性與權限」。

如果您嘗試在沒有所有必要權限的情況下,透過 Dataflow 彈性範本執行工作,工作可能會失敗,並顯示無法讀取結果檔案錯誤資源拒絕權限錯誤。詳情請參閱「排解 Flex 範本問題」。

建構 Dataflow pipeline

本節將說明連接器的初始設定,並提供與 Spanner 變更串流功能的常見整合範例。

如要按照這些步驟操作,您需要 Dataflow 的 Java 開發環境。詳情請參閱「使用 Java 建立 Dataflow 管道」。

建立變更串流

如要進一步瞭解如何建立變更串流,請參閱「建立變更串流」一文。如要繼續進行下一個步驟,您必須擁有已設定變更串流的 Spanner 資料庫。

授予精細的存取權控管權限

如果您希望任何精細存取權控管使用者都能執行資料流作業,請務必為使用者授予資料庫角色存取權,讓他們在變更資料流中擁有 SELECT 權限,並在變更資料流的資料表值函式中擁有 EXECUTE 權限。此外,請確認授權者在 SpannerIO 設定或 Dataflow 彈性範本中指定資料庫角色。

詳情請參閱「關於精細存取權控管機制」。

將 SpannerIO 連接器新增為依附元件

Apache Beam SpannerIO 連接器會封裝使用 Cloud Spanner API 直接使用變更資料流的複雜性,並將變更資料流資料記錄的 PCollection 傳送至管道的後續階段。

這些物件可在使用者資料流管線的其他階段中使用。變更串流整合是 SpannerIO 連接器的一部分。如要使用 SpannerIO 連接器,請將依附元件新增至 pom.xml 檔案:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>

建立中繼資料資料庫

執行 Apache Beam 管道時,連接器需要追蹤每個區隔。這項服務會將中繼資料保留在連接器在初始化期間建立的 Spanner 資料表中。您可以在設定連接器時,指定要建立這個資料表的資料庫。

變更串流最佳做法所述,我們建議您為此目的建立新的資料庫,而不是讓連接器使用應用程式的資料庫來儲存其中繼資料表。

使用 SpannerIO 連接器的 Dataflow 工作擁有者,需要具備以下 IAM 權限,並與此中繼資料資料庫搭配使用:

  • spanner.databases.updateDdl
  • spanner.databases.beginReadOnlyTransaction
  • spanner.databases.beginOrRollbackReadWriteTransaction
  • spanner.databases.read
  • spanner.databases.select
  • spanner.databases.write
  • spanner.sessions.create
  • spanner.sessions.get

設定連接器

您可以按照下列方式設定 Spanner 變更串流連接器:

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");    // Needed for fine-grained access control only

Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
   startTime.getSeconds() + (10 * 60),
   startTime.getNanos()
);

SpannerIO
  .readChangeStream()
  .withSpannerConfig(spannerConfig)
  .withChangeStreamName("my-change-stream")
  .withMetadataInstance("my-meta-instance-id")
  .withMetadataDatabase("my-meta-database-id")
  .withMetadataTable("my-meta-table-name")
  .withRpcPriority(RpcPriority.MEDIUM)
  .withInclusiveStartAt(startTime)
  .withInclusiveEndAt(endTime);

以下是 readChangeStream() 選項的說明:

Spanner 設定 (必填)

用於設定建立變更串流的專案、例項和資料庫,並從中查詢。您也可以選擇在執行 Dataflow 工作時,指定要使用的資料庫角色,以便精細存取權控管使用者使用。工作會假設這個資料庫角色,以便存取變更串流。詳情請參閱「關於精細存取權控管機制」。

變更串流名稱 (必填)

這個名稱可用來唯一識別變更串流。這個名稱必須與建立時使用的名稱相同。

中繼資料執行個體 ID (選填)

這是用來儲存連接器用於控制變更串流 API 資料用途的中繼資料例項。

中繼資料資料庫 ID (必填)

這是用來儲存連接器用於控制變更串流 API 資料用途的中繼資料資料庫。

中繼資料表名稱 (選填)

只有在更新現有管道時,才應使用此選項。

這是連接器要使用的現有中繼資料表名稱。連接器會使用這個值儲存中繼資料,以便控制變更串流 API 資料的用量。如果省略這個選項,Spanner 會在連接器初始化時,使用產生的名稱建立新資料表。

RPC 優先順序 (選填)

用於變更串流查詢的要求優先順序。如果省略這個參數,系統會使用 high priority

InclusiveStartAt (必填)

系統會將指定時間戳記的變更內容傳回給呼叫端。

InclusiveEndAt (選填)

系統會將指定時間戳記之前的變更傳回給呼叫端。如果省略這項參數,系統會無限期發出變更。

新增轉換和接收器來處理變更資料

完成前述步驟後,已設定的 SpannerIO 連接器即可發出 DataChangeRecord 物件的 PCollection。如需透過不同方式處理這類串流資料的管道設定範例,請參閱「轉換和接收器範例」。

請注意,SpannerIO 連接器產生的變更串流記錄並未排序。這是因為 PCollection 不會提供任何排序保證。如果您需要有順序的串流,就必須在管道中將記錄分組並排序為轉換:請參閱「範例:依鍵排序」。您可以擴充這個範例,根據記錄的任何欄位 (例如交易 ID) 排序記錄。

轉換和接收器範例

您可以自行定義轉換作業,並指定要寫入資料的接收器。Apache Beam 文件提供可套用的各種轉換,以及可用來將資料寫入外部系統的 I/O 連接器

範例:依鍵排序

此程式碼範例會使用 Dataflow 連接器,依據提交時間戳記排序資料變更記錄,並依主要索引鍵分組。

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new BreakRecordByModFn()))
  .apply(ParDo.of(new KeyByIdFn()))
  .apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
  // Subsequent processing goes here

這個程式碼範例會使用狀態和計時器來緩衝每個鍵的記錄,並將計時器的到期時間設為使用者在未來設定的時間 T (在 BufferKeyUntilOutputTimestamp 函式中定義)。當 Dataflow 浮水印時間超過 T 時,這個程式碼會清除緩衝區中時間戳記小於 T 的所有記錄,並依提交時間戳記排序這些記錄,然後輸出鍵/值組合,其中:

  • 鍵是輸入鍵,也就是將主鍵散列至大小為 1000 的桶陣列。
  • 這個值是為鍵緩衝的資料變更記錄排序。

我們針對每個鍵提供下列保證:

  • 計時器保證會依到期時間戳記的順序觸發。
  • 保證會以產生的順序,將元素傳送至下游階段。

舉例來說,如果鍵的值為 100,計時器會分別在 T1T10 觸發,並在每個時間戳記產生一組資料變更記錄。由於在 T1 輸出的資料變更記錄會在 T10 輸出的資料變更記錄之前產生,因此系統也保證會在 T10 輸出的資料變更記錄之前,先由下個階段接收 T1 輸出的資料變更記錄。這項機制有助於我們確保每個主索引鍵的提交時間戳記排序嚴謹,以利後續處理。

這個程序會重複執行,直到管道結束並處理���所有資料變更記錄為止 (如果未指定結束時間,則會無限重複)。

請注意,這個程式碼範例使用狀態和計時器,而非視窗,來執行每個鍵的排序。原因是我們無法保證會依序處理視窗。也就是說,較舊的時間區間可能會比較新的時間區間晚處理,這可能會導致處理順序錯亂。

BreakRecordByModFn

每個資料變更記錄可能包含多個修改項目。每個 mod 代表對單一主鍵值的插入、更新或刪除作業。這個函式會將每筆資料變更記錄分割為個別的資料變更記錄,每個模組各有一筆。

private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
                                                     DataChangeRecord>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
    outputReceiver) {
    record.getMods().stream()
      .map(
          mod ->
              new DataChangeRecord(
                  record.getPartitionToken(),
                  record.getCommitTimestamp(),
                  record.getServerTransactionId(),
                  record.isLastRecordInTransactionInPartition(),
                  record.getRecordSequence(),
                  record.getTableName(),
                  record.getRowType(),
                  Collections.singletonList(mod),
                  record.getModType(),
                  record.getValueCaptureType(),
                  record.getNumberOfRecordsInTransaction(),
                  record.getNumberOfPartitionsInTransaction(),
                  record.getTransactionTag(),
                  record.isSystemTransaction(),
                  record.getMetadata()))
      .forEach(outputReceiver::output);
  }
}

KeyByIdFn

這個函式會接收 DataChangeRecord,並輸出以雜湊為整數值的 Spanner 主鍵做為索引的 DataChangeRecord

private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  // NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
  // Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
  // of states and timers for performance purposes.
  // Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
  // On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
  private static final int NUMBER_OF_BUCKETS = 1000;

  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
    // Hash the received keys into a bucket in order to have a
    // deterministic number of buffers and timers.
    String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);

    outputReceiver.output(KV.of(bucketIndex, record));
  }
}

BufferKeyUntilOutputTimestamp

計時器和緩衝區是按鍵計算。這個函式會將每筆資料變更記錄緩衝,直到浮水印通過我們要輸出緩衝資料變更記錄的時間戳記為止。

這段程式碼會使用迴圈計時器,判斷要何時將緩衝區刷出:

  1. 第一次看到某個鍵的資料變更記錄時,就會設定計時器,在該資料變更記錄的提交時間戳記加上 incrementIntervalSeconds (使用者可設定的選項) 時觸發。
  2. 計時器啟動時,會將緩衝區中時間戳記小於計時器到期時間的所有資料變更記錄新增至 recordsToOutput。如果緩衝區內有資料變更記錄,且其時間戳記大於或等於計時器的到期時間,則會將這些資料變更記錄新增回緩衝區,而非輸出。然後將下一個計時器設為目前計時器的到期時間加上 incrementIntervalInSeconds
  3. 如果 recordsToOutput 非空值,函式會依據修訂時間戳記和交易 ID,排序 recordsToOutput 中的資料變更記錄,然後輸出這些記錄。
private static class BufferKeyUntilOutputTimestamp extends
    DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>>  {
  private static final Logger LOG =
      LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);

  private final long incrementIntervalInSeconds = 2;

  private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
    this.incrementIntervalInSeconds = incrementIntervalInSeconds;
  }

  @SuppressWarnings("unused")
  @TimerId("timer")
  private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("keyString")
  private final StateSpec<ValueState<String>> keyString =
      StateSpecs.value(StringUtf8Coder.of());

  @ProcessElement
  public void process(
      @Element KV<String, DataChangeRecord> element,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    buffer.add(element.getValue());

    // Only set the timer if this is the first time we are receiving a data change
    // record with this key.
    String elementKey = keyString.read();
    if (elementKey == null) {
      Instant commitTimestamp =
          new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
      Instant outputTimestamp =
          commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
      timer.set(outputTimestamp);
      keyString.write(element.getKey());
    }
  }

  @OnTimer("timer")
  public void onExpiry(
      OnTimerContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    if (!buffer.isEmpty().read()) {
      String elementKey = keyString.read();

      final List<DataChangeRecord> records =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .collect(Collectors.toList());
      buffer.clear();

      List<DataChangeRecord> recordsToOutput = new ArrayList<>();
      for (DataChangeRecord record : records) {
        Instant recordCommitTimestamp =
            new Instant(record.getCommitTimestamp().toSqlTimestamp());
        final String recordString =
            record.getMods().get(0).getNewValuesJson().isEmpty()
                ? "Deleted record"
                : record.getMods().get(0).getNewValuesJson();
        // When the watermark passes time T, this means that all records with
        // event time < T have been processed and successfully committed. Since the
        // timer fires when the watermark passes the expiration time, we should
        // only output records with event time < expiration time.
        if (recordCommitTimestamp.isBefore(context.timestamp())) {
          LOG.info(
             "Outputting record with key {} and value {} at expiration " +
             "timestamp {}",
              elementKey,
              recordString,
              context.timestamp().toString());
          recordsToOutput.add(record);
        } else {
          LOG.info(
              "Expired at {} but adding record with key {} and value {} back to " +
              "buffer due to commit timestamp {}",
              context.timestamp().toString(),
              elementKey,
              recordString,
              recordCommitTimestamp.toString());
          buffer.add(record);
        }
      }

      // Output records, if there are any to output.
      if (!recordsToOutput.isEmpty()) {
        // Order the records in place, and output them. The user would need
        // to implement DataChangeRecordComparator class that sorts the
        // data change records by commit timestamp and transaction ID.
        Collections.sort(recordsToOutput, new DataChangeRecordComparator());
        context.outputWithTimestamp(
            KV.of(elementKey, recordsToOutput), context.timestamp());
        LOG.info(
            "Expired at {}, outputting records for key {}",
            context.timestamp().toString(),
            elementKey);
      } else {
        LOG.info("Expired at {} with no records", context.timestamp().toString());
      }
    }

    Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
    if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
      LOG.info("Setting next timer to {}", nextTimer.toString());
      timer.set(nextTimer);
    } else {
      LOG.info(
          "Timer not being set since the buffer is empty: ");
      keyString.clear();
    }
  }
}

排序交易

這個管道可以變更為依交易 ID 和提交時間戳記排序。為此,請為每個交易 ID / 提交時間戳記組合緩衝記錄,而不是為每個 Spanner 索引鍵緩衝記錄。這需要修改 KeyByIdFn 中的程式碼。

範例:組合交易

這個程式碼範例會讀取資料變更記錄,將屬於同一筆交易的所有資料變更記錄組合成單一元素,並輸出該元素。請注意,這個範例程式碼輸出的交易並未依提交時間戳記排序。

這個程式碼範例會使用緩衝區,從資料變更記錄中組合交易。第一次收到屬於交易的資料變更記錄時,系統會讀取資料變更記錄中的 numberOfRecordsInTransaction 欄位,該欄位會說明屬於該交易的預期資料變更記錄數量。它會將該交易的資料變更記錄緩衝,直到緩衝記錄數量與 numberOfRecordsInTransaction 相符為止,然後輸出已組合的資料變更記錄。

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new KeyByTransactionIdFn()))
  .apply(ParDo.of(new TransactionBoundaryFn()))
  // Subsequent processing goes here

KeyByTransactionIdFn

這個函式會擷取 DataChangeRecord,並輸出以交易 ID 做為索引的 DataChangeRecord

private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    outputReceiver.output(KV.of(record.getServerTransactionId(), record));
  }
}

TransactionBoundaryFn

TransactionBoundaryFn 會從 KeyByTransactionIdFn 緩衝收到的 {TransactionId, DataChangeRecord} 鍵/值組合,並根據 TransactionId 將這些組合分組緩衝。當緩衝的記錄數量等於整個交易中包含的記錄數量時,這個函式會依記錄序號排序群組中的 DataChangeRecord 物件,並輸出 {CommitTimestamp, TransactionId}Iterable<DataChangeRecord> 的鍵/值組合。

我們假設 SortKey 是代表 {CommitTimestamp, TransactionId} 組合的使用者定義類別。如要進一步瞭解 SortKey,請參閱範例導入程序

private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>>  {
  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

  @ProcessElement
  public void process(
      ProcessContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @StateId("count") ValueState<Integer> countState) {
    final KV<String, DataChangeRecord> element = context.element();
    final DataChangeRecord record = element.getValue();

    buffer.add(record);
    int count = (countState.read() != null ? countState.read() : 0);
    count = count + 1;
    countState.write(count);

    if (count == record.getNumberOfRecordsInTransaction()) {
      final List<DataChangeRecord> sortedRecords =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
              .collect(Collectors.toList());

      final Instant commitInstant =
          new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
              .getTime());
      context.outputWithTimestamp(
          KV.of(
              new SortKey(sortedRecords.get(0).getCommitTimestamp(),
                          sortedRecords.get(0).getServerTransactionId()),
              sortedRecords),
          commitInstant);
      buffer.clear();
      countState.clear();
    }
  }
}

範例:依據交易標記篩選

當修改使用者資料的交易標記時,對應的標記及其類型會儲存在 DataChangeRecord 中。以下範例說明如何根據使用者定義的交易代碼和系統代碼,篩選變更串流記錄:

my-tx-tag 的使用者定義標記篩選:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           !record.isSystemTransaction()
           && record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
  // Subsequent processing goes here

系統標記篩選/TTL稽核:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           record.isSystemTransaction()
           && record.getTransactionTag().equals("RowDeletionPolicy")))
  // Subsequent processing goes here

範例:擷取完整資料列

這個範例適用於名為 Singer 的 Spanner 資料表,其定義如下:

CREATE TABLE Singers (
  SingerId INT64 NOT NULL,
  FirstName STRING(1024),
  LastName STRING(1024)
) PRIMARY KEY (SingerId);

在變更資料流的預設 OLD_AND_NEW_VALUES 值擷取模式下,如果 Spanner 資料列有更新,則收到的資料變更記錄只會包含已變更的資料欄。系統不會將已追蹤但未變更的欄納入記錄。您可以使用 mod 的主鍵,在資料變更記錄的認可時間戳記執行 Spanner 快照讀取作業,以便擷取未變更的資料欄,甚至是擷取整個資料列。

請注意,資料庫保留政策可能需要變更為大於或等於變更資料流保留政策的值,才能順利讀取快照。

另請注意,使用 NEW_ROW 值擷取類型是建議且更有效率的方式,因為預設會傳回資料列的所有追蹤欄,且不需要額外的快照讀取至 Spanner。

SpannerConfig spannerConfig = SpannerConfig
   .create()
   .withProjectId("my-project-id")
   .withInstanceId("my-instance-id")
   .withDatabaseId("my-database-id")
   .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
   .apply(SpannerIO
       .readChangeStream()
       .withSpannerConfig(spannerConfig)
       // Assume we have a change stream "my-change-stream" that watches Singers table.
       .withChangeStreamName("my-change-stream")
       .withMetadataInstance("my-metadata-instance-id")
       .withMetadataDatabase("my-metadata-database-id")
       .withInclusiveStartAt(Timestamp.now()))
   .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
   // Subsequent processing goes here

ToFullRowJsonFn

這個轉換作業會在收到的每筆記錄提交時間戳記時執行陳舊讀取作業,並將整個資料列對應至 JSON。

public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
 // Since each instance of this DoFn will create its own session pool and will
 // perform calls to Spanner sequentially, we keep the number of sessions in
 // the pool small. This way, we avoid wasting resources.
 private static final int MIN_SESSIONS = 1;
 private static final int MAX_SESSIONS = 5;
 private final String projectId;
 private final String instanceId;
 private final String databaseId;

 private transient DatabaseClient client;
 private transient Spanner spanner;

 public ToFullRowJsonFn(SpannerConfig spannerConfig) {
   this.projectId = spannerConfig.getProjectId().get();
   this.instanceId = spannerConfig.getInstanceId().get();
   this.databaseId = spannerConfig.getDatabaseId().get();
 }

 @Setup
 public void setup() {
   SessionPoolOptions sessionPoolOptions = SessionPoolOptions
      .newBuilder()
      .setMinSessions(MIN_SESSIONS)
      .setMaxSessions(MAX_SESSIONS)
      .build();
   SpannerOptions options = SpannerOptions
       .newBuilder()
       .setProjectId(projectId)
       .setSessionPoolOption(sessionPoolOptions)
       .build();
   DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
   spanner = options.getService();
   client = spanner.getDatabaseClient(id);
 }

 @Teardown
 public void teardown() {
   spanner.close();
 }

 @ProcessElement
 public void process(
   @Element DataChangeRecord element,
   OutputReceiver<String> output) {
   com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
   element.getMods().forEach(mod -> {
     JSONObject keysJson = new JSONObject(mod.getKeysJson());
     JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
     ModType modType = element.getModType();
     JSONObject jsonRow = new JSONObject();
     long singerId = keysJson.getLong("SingerId");
     jsonRow.put("SingerId", singerId);
     if (modType == ModType.INSERT) {
       // For INSERT mod, get non-primary key columns from mod.
       jsonRow.put("FirstName", newValuesJson.get("FirstName"));
       jsonRow.put("LastName", newValuesJson.get("LastName"));
     } else if (modType == ModType.UPDATE) {
       // For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
       try (ResultSet resultSet = client
         .singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
         .read(
           "Singers",
           KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
             Arrays.asList("FirstName", "LastName"))) {
         if (resultSet.next()) {
           jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
             JSONObject.NULL : resultSet.getString("FirstName"));
           jsonRow.put("LastName", resultSet.isNull("LastName") ?
             JSONObject.NULL : resultSet.getString("LastName"));
         }
       }
     } else {
       // For DELETE mod, there is nothing to do, as we already set SingerId.
     }

     output.output(jsonRow.toString());
   });
 }
}

這段程式碼會建立 Spanner 資料庫用戶端,以便執行完整的資料列擷取作業,並將工作階段集區設為只包含少數工作階段,以便依序在 ToFullReowJsonFn 的一個例項中執行讀取作業。Dataflow 會確保產生此函式的多個例項,每個例項都有自己的用戶端集區。

範例:從 Spanner 傳送至 Pub/Sub

在這種情況下,呼叫端會盡可能快速地將記錄串流至 Pub/Sub,而不會進行任何分組或匯總。這非常適合觸發下游處理程序,因為所有新插入 Spanner 資料表的資料列都會串流至 Pub/Sub,以利後續處理。

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(PubsubIO.writeStrings().to("my-topic"));

請注意,Pub/Sub 接收器可設定為確保「僅限一次」語意。

範例:從 Spanner 到 Cloud Storage

在這種情況下,呼叫端會將指定時間範圍內的所有記錄分組,並將分組儲存在個別的 Cloud Storage 檔案中。這非常適合用於數據分析和時間點封存作業,且不受 Spanner 的保留期限影響。

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
  .apply(TextIO
    .write()
    .to("gs://my-bucket/change-stream-results-")
    .withSuffix(".txt")
    .withWindowedWrites()
    .withNumShards(1));

請注意,Cloud Storage 匯出端��設會提供至少一次的語義。透過額外處理,可以修改為採用一次性語意。

我們也為這個用途提供 Dataflow 範本:請參閱「將變更串流連結至 Cloud Storage」。

範例:從 Spanner 匯出至 BigQuery (分錄表)

在此例中,呼叫端會將變更記錄串流至 BigQuery。每筆資料變更記錄都會在 BigQuery 中顯示為一列。這類資料非常適合用於數據分析。這段程式碼會使用先前在「擷取完整資料列」一節中定義的函式,擷取記錄的完整資料列,並將其寫入 BigQuery。

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(spannerConfig)
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
  .apply(BigQueryIO
    .<String>write()
    .to("my-bigquery-table")
    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
    .withSchema(new TableSchema().setFields(Arrays.asList(
      new TableFieldSchema()
        .setName("SingerId")
        .setType("INT64")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("FirstName")
        .setType("STRING")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("LastName")
        .setType("STRING")
        .setMode("REQUIRED")
    )))
    .withAutoSharding()
    .optimizedWrites()
    .withFormatFunction((String element) -> {
      ObjectMapper objectMapper = new ObjectMapper();
      JsonNode jsonNode = null;
      try {
        jsonNode = objectMapper.readTree(element);
      } catch (IOException e) {
        e.printStackTrace();
      }
      return new TableRow()
        .set("SingerId", jsonNode.get("SingerId").asInt())
        .set("FirstName", jsonNode.get("FirstName").asText())
        .set("LastName", jsonNode.get("LastName").asText());
    }
  )
);

請注意,BigQuery 匯出端預設會提供至少一次的語義。透過額外處理,可以修改為採用一次性語意。

我們也為這個用途提供 Dataflow 範本,請參閱「將變更串流連結至 BigQuery」一文。

監控管道

您可以使用兩種指標類別監控變更串流 Dataflow 管道。

標準 Dataflow 指標

Dataflow 提供多項指標,可確保工作正常運作,例如資料更新間隔、系統延遲、工作處理量、工作站 CPU 使用率等。詳情請參閱「使用 Dataflow 管道監控功能」。

對於變更串流管道,您應考量兩個主要指標:系統延遲時間資料更新間隔

系統延遲時間會顯示資料項目目前處理或等待處理的時間上限 (以秒為單位)。

資料更新間隔會顯示目前 (即時) 與輸出浮水印之間的時間長度。輸出浮水印的時間為 T,表示所有元素的事件時間 (嚴格) 早於 T,且已完成運算處理。換句話說,資料更新間隔是指管道處理已收到事件的最新狀態。

如果管道資源不足,這兩項指標就會顯示這項影響。系統延遲時間會增加,因為項目需要等待更長的時間才能處理。資料更新頻率也會提高,因為管道無法追上收到的資料量。

自訂變更串流指標

這些指標會在 Cloud Monitoring 中顯示,包括:

  • 從在 Spanner 中提交記錄到連接器將記錄發布至 PCollection 之間的延遲時間 (分割區 (直方圖))。這個指標可用於查看管道的任何效能 (延遲) 問題。
  • 讀取的資料記錄總數。這是��接器產生的記錄總數。這個數字應會持續增加,反映底層 Spanner 資料庫中的寫入趨勢。
  • 正在讀取的分區數量。應一律讀取分區。如果這個數字為零,表示管道中發生錯誤。
  • 執行連接器時發出的查詢總數。這是在執行管道時,對 Spanner 執行個體提出的變更串流查詢的整體指標。這可用於估算從連接器到 Spanner 資料庫的負載。

更新現有管道

如果工作相容性檢查通過,您可以更新使用 SpannerIO 連接器來處理變更串流的執行中管道。如要執行這項操作,您必須在更新新工作時明確設定中繼資料表格名稱參數。請使用要更新的工作的 metadataTable 管道選項值。

如果您使用的是 Google 提供的 Dataflow 範本,請使用 spannerMetadataTableName 參數設定資料表名稱。您也可以修改現有的工作,在連接器設定中使用 withMetadataTable(your-metadata-table-name) 方法,明確使用中繼資料表。完成後,您可以按照 Dataflow 說明文件中的「啟動替換工作」操作說明,更新執行中的工作。

變更串流和 Dataflow 的最佳做法

以下是使用 Dataflow 建構變更串流連線的最佳做法。

使用獨立的中繼資料資料庫

建議您為 SpannerIO 連接器建立專屬資料庫,用於儲存中繼資料,而非將其設為使用應用程式資料庫。

詳情請參閱「考慮使用獨立的中繼資料資料庫」。

調整叢集大小

在 Spanner 變更串流工作中,工作站的初始數量大致為每秒 1,000 次寫入作業一個工作站。請注意,這項預估值可能會因多項因素而有所不同,例如每筆交易的大小、單筆交易產生的變更串流記錄數量,以及管道中使用的其他轉換、匯總或匯出項目。

完成初始資源重整後,請務必追蹤「監控管道」一文中提到的指標,確保管道運作正常。建議您嘗試使用初始 worker 集區大小,並監控管道處理負載的方式,視需要增加節點數量。CPU 使用率是檢查負載是否正常,以及是否需要更多節點的重要指標。

已知限制

使用 Dataflow 搭配 Spanner 變更串流時,有幾項已知限制:

自動調度資源

如要為包含 SpannerIO.readChangeStream 的管道提供自動調度資源支援,就必須使用 Apache Beam 2.39.0 以上版本。

如果您使用的是 2.39.0 之前的 Apache Beam 版本,包含 SpannerIO.readChangeStream 的管道需要明確將自動調度資源演算法指定為 NONE,如水平自動調度資源所述。

如要手動調度 Dataflow 管道資源,而非使用自動調度資源功能,請參閱「手動調度串流管道資源」。

Runner V2

Spanner 變更串流連接器需要使用 Dataflow Runner 2。您必須在執行期間手動指定這項值,否則系統會擲回錯誤。您可以使用 --experiments=use_unified_worker,use_runner_v2 設定工作,指定 Runner V2

快照

Spanner 變更串流連接器不支援 Dataflow 快照

排除中

Spanner 變更串流連接器不支援排空工作。只能取消現有的工作。

您也可以更新現有的管道,而無需停止管道。

OpenCensus

如要使用 OpenCensus 監控管道,請指定 0.28.3 以上版本。

NullPointerException 在管道開始時

在特定情況下,Apache Beam 版本 2.38.0 中的錯誤可能會導致管道啟動時發生 NullPointerException。這會導致工作無法啟動,並顯示以下錯誤訊息:

java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null

如要解決這個問題,請使用 Apache Beam 2.39.0 以上版本,或手動將 beam-sdks-java-core 版本指定為 2.37.0

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.37.0</version>
</dependency>

更多資訊