Postgres

Postgres CDC Connector #

The Postgres CDC connector allows for reading snapshot data and incremental data from PostgreSQL database. This document describes how to setup the Postgres CDC connector to run SQL queries against PostgreSQL databases.

Dependencies #

In order to setup the Postgres CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependency #

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-postgres-cdc</artifactId>
    <version>3.4.0</version>
</dependency>

SQL Client JAR #

Download link is available only for stable releases.

Download flink-sql-connector-postgres-cdc-3.4.0.jar and put it under <FLINK_HOME>/lib/.

Note: Refer to flink-sql-connector-postgres-cdc, more released versions will be available in the Maven central warehouse.

How to create a Postgres CDC table #

The Postgres CDC table can be defined as following:

-- register a PostgreSQL table 'shipments' in Flink SQL
CREATE TABLE shipments (
  shipment_id INT,
  order_id INT,
  origin STRING,
  destination STRING,
  is_arrived BOOLEAN
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = 'localhost',
  'port' = '5432',
  'username' = 'postgres',
  'password' = 'postgres',
  'database-name' = 'postgres',
  'schema-name' = 'public',
  'table-name' = 'shipments',
  'slot.name' = 'flink',
   -- experimental feature: incremental snapshot (default off)
  'scan.incremental.snapshot.enabled' = 'true'
);

-- read snapshot and binlogs from shipments table
SELECT * FROM shipments;

Connector Options #

Option Required Default Type Description
connector required (none) String Specify what connector to use, here should be 'postgres-cdc'.
hostname required (none) String IP address or hostname of the PostgreSQL database server.
username required (none) String Name of the PostgreSQL database to use when connecting to the PostgreSQL database server.
password required (none) String Password to use when connecting to the PostgreSQL database server.
database-name required (none) String Database name of the PostgreSQL server to monitor.
schema-name required (none) String Schema name of the PostgreSQL database to monitor.
table-name required (none) String Table name of the PostgreSQL database to monitor.
port optional 5432 Integer Integer port number of the PostgreSQL database server.
slot.name required (none) String The name of the PostgreSQL logical decoding slot that was created for streaming changes from a particular plug-in for a particular database/schema. The server uses this slot to stream events to the connector that you are configuring.
Slot names must conform to PostgreSQL replication slot naming rules, which state: "Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character."
decoding.plugin.name optional decoderbufs String The name of the Postgres logical decoding plug-in installed on the server. Supported values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming and pgoutput.
changelog-mode optional all String The changelog mode used for encoding streaming changes. Supported values are all (which encodes changes as retract stream using all RowKinds) and upsert (which encodes changes as upsert stream that describes idempotent updates on a key).
upsert mode can be used for tables with primary keys when replica identity FULL is not an option. Primary keys must be set to use upsert mode.
heartbeat.interval.ms optional 30s Duration The interval of sending heartbeat event for tracing the latest available replication slot offsets
debezium.* optional (none) String Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from Postgres server. For example: 'debezium.snapshot.mode' = 'never'. See more about the Debezium's Postgres Connector properties
debezium.snapshot.select.statement.overrides optional (none) String If you encounter a situation where there is a large amount of data in the table and you don't need all the historical data. You can try to specify the underlying configuration in debezium to select the data range you want to snapshot. This parameter only affects snapshots and does not affect subsequent data reading consumption.
Note: PostgreSQL must use schema name and table name.
For example: 'debezium.snapshot.select.statement.overrides' = 'schema.table'.
After specifying the above attributes, you must also add the following attributes: debezium.snapshot.select.statement.overrides.[schema].[table]
debezium.snapshot.select.statement.overrides.[schema].[table] optional (none) String You can specify SQL statements to limit the data range of snapshot.
Note1: Schema and table need to be specified in the SQL statement, and the SQL should conform to the syntax of the data source.Currently.
For example: 'debezium.snapshot.select.statement.overrides.schema.table' = 'select * from schema.table where 1 != 1'.
Note2: The Flink SQL client submission task does not support functions with single quotation marks in the content.
For example: 'debezium.snapshot.select.statement.overrides.schema.table' = 'select * from schema.table where to_char(rq, 'yyyy-MM-dd')'.
scan.incremental.snapshot.enabled optional false Boolean Incremental snapshot is a new mechanism to read snapshot of a table. Compared to the old snapshot mechanism, the incremental snapshot has many advantages, including: (1) source can be parallel during snapshot reading, (2) source can perform checkpoints in the chunk granularity during snapshot reading, (3) source doesn't need to acquire global read lock (FLUSH TABLES WITH READ LOCK) before snapshot reading. Please see Incremental Snapshot Readingsection for more detailed information.
scan.incremental.close-idle-reader.enabled optional false Boolean Whether to close idle readers at the end of the snapshot phase.
The flink version is required to be greater than or equal to 1.14 when 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' is set to true.
If the flink version is greater than or equal to 1.15, the default value of 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' has been changed to true, so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
scan.lsn-commit.checkpoints-num-delay optional 3 Integer The number of checkpoint delays before starting to commit the LSN offsets.
The checkpoint LSN offsets will be committed in rolling fashion, the earliest checkpoint identifier will be committed first from the delayed checkpoints.
scan.incremental.snapshot.unbounded-chunk-first.enabled optional false Boolean Whether to assign the unbounded chunks first during snapshot reading phase.
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.
Experimental option, defaults to false.

Notes #

slot.name option #

The slot.name is recommended to set for different tables to avoid the potential PSQLException: ERROR: replication slot "flink" is active for PID 974 error. See more here.

scan.lsn-commit.checkpoints-num-delay option #

When consuming PostgreSQL logs, the LSN offset must be committed to trigger the log data cleanup for the corresponding slot. However, once the LSN offset is committed, earlier offsets become invalid. To ensure access to earlier LSN offsets for job recovery, we delay the LSN commit by scan.lsn-commit.checkpoints-num-delay (default value is 3) checkpoints. This feature is available when config option scan.incremental.snapshot.enabled is set to true.

Incremental Snapshot Options #

The following options is available only when scan.incremental.snapshot.enabled=true:

Option Required Default Type Description
scan.incremental.snapshot.chunk.size optional 8096 Integer The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table.
scan.startup.mode optional initial String Optional startup mode for Postgres CDC consumer, valid enumerations are "initial", "latest-offset", "committed-offset" and "snapshot". Please see Startup Reading Position section for more detailed information.
chunk-meta.group.size optional 1000 Integer The group size of chunk meta, if the meta size exceeds the group size, the meta will be divided into multiple groups.
connect.timeout optional 30s Duration The maximum time that the connector should wait after trying to connect to the PostgreSQL database server before timing out.
connect.pool.size optional 30 Integer The connection pool size.
connect.max-retries optional 3 Integer The max retry times that the connector should retry to build database server connection.
scan.snapshot.fetch.size optional 1024 Integer The maximum fetch size for per poll when read table snapshot.
scan.incremental.snapshot.chunk.key-column optional (none) String The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table. By default, the chunk key is the first column of the primary key. A column that is not part of the primary key can be used as a chunk key, but this may lead to slower query performance.
Warning: Using a non-primary key column as a chunk key may lead to data inconsistencies. Please see Warning for details.
chunk-key.even-distribution.factor.lower-bound optional 0.05d Double The lower bound of chunk key distribution factor. The distribution factor is used to determine whether the table is evenly distribution or not. The table chunks would use evenly calculation optimization when the data distribution is even, and the query for splitting would happen when it is uneven. The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.
chunk-key.even-distribution.factor.upper-bound optional 1000.0d Double The upper bound of chunk key distribution factor. The distribution factor is used to determine whether the table is evenly distribution or not. The table chunks would use evenly calculation optimization when the data distribution is even, and the query for splitting would happen when it is uneven. The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.

Available Metadata #

The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.

Key DataType Description
table_name STRING NOT NULL Name of the table that contain the row.
schema_name STRING NOT NULL Name of the schema that contain the row.
database_name STRING NOT NULL Name of the database that contain the row.
op_ts TIMESTAMP_LTZ(3) NOT NULL It indicates the time that the change was made in the database.
If the record is read from snapshot of the table instead of the change stream, the value is always 0.
row_kind STRING NOT NULL It indicates the row kind of the changelog,Note: The downstream SQL operator may fail to compare due to this new added column when processing the row retraction if the source operator chooses to output the 'row_kind' column for each record. It is recommended to use this metadata column only in simple synchronization jobs.
'+I' means INSERT message, '-D' means DELETE message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER message.

Limitation #

Can’t perform checkpoint during scanning snapshot of tables when incremental snapshot is disabled #

When scan.incremental.snapshot.enabled=false, we have the following limitation.

During scanning snapshot of database tables, since there is no recoverable position, we can’t perform checkpoints. In order to not perform checkpoints, Postgres CDC source will keep the checkpoint waiting to timeout. The timeout checkpoint will be recognized as failed checkpoint, by default, this will trigger a failover for the Flink job. So if the database table is large, it is recommended to add following Flink configurations to avoid failover because of the timeout checkpoints:

execution.checkpointing.interval: 10min
execution.checkpointing.tolerable-failed-checkpoints: 100
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2147483647

The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:

CREATE TABLE products (
    db_name STRING METADATA FROM 'database_name' VIRTUAL,
    table_name STRING METADATA  FROM 'table_name' VIRTUAL,
    operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    shipment_id INT,
    order_id INT,
    origin STRING,
    destination STRING,
    is_arrived BOOLEAN
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = 'localhost',
  'port' = '5432',
  'username' = 'postgres',
  'password' = 'postgres',
  'database-name' = 'postgres',
  'schema-name' = 'public',
  'table-name' = 'shipments',
  'slot.name' = 'flink'
);

Features #

Incremental Snapshot Reading (Experimental) #

Incremental snapshot reading is a new mechanism to read snapshot of a table. Compared to the old snapshot mechanism, the incremental snapshot has many advantages, including:

  • (1) PostgreSQL CDC Source can be parallel during snapshot reading
  • (2) PostgreSQL CDC Source can perform checkpoints in the chunk granularity during snapshot reading
  • (3) PostgreSQL CDC Source doesn’t need to acquire global read lock before snapshot reading

During the incremental snapshot reading, the PostgreSQL CDC Source firstly splits snapshot chunks (splits) by user specified chunk key of table, and then PostgreSQL CDC Source assigns the chunks to multiple readers to read the data of snapshot chunk.

Exactly-Once Processing #

The Postgres CDC connector is a Flink Source connector which will read database snapshot first and then continues to read binlogs with exactly-once processing even failures happen. Please read How the connector works.

Startup Reading Position #

The config option scan.startup.mode specifies the startup mode for PostgreSQL CDC consumer. The valid enumerations are:

  • initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the replication slot.
  • latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the replication which means only have the changes since the connector was started.
  • committed-offset: Skip snapshot phase and start reading events from a confirmed_flush_lsn offset of replication slot.
  • snapshot: Only the snapshot phase is performed and exits after the snapshot phase reading is completed.

DataStream Source #

The Postgres CDC connector can also be a DataStream source. There are two modes for the DataStream source:

  • incremental snapshot based, which allows parallel reading
  • SourceFunction based, which only supports single thread reading

Incremental Snapshot based DataStream (Experimental) #

import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import org.apache.flink.cdc.connectors.postgres.source.PostgresSourceBuilder;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class PostgresParallelSourceExample {

    public static void main(String[] args) throws Exception {

        DebeziumDeserializationSchema<String> deserializer =
                new JsonDebeziumDeserializationSchema();

        JdbcIncrementalSource<String> postgresIncrementalSource =
                PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
                        .hostname("localhost")
                        .port(5432)
                        .database("postgres")
                        .schemaList("inventory")
                        .tableList("inventory.products")
                        .username("postgres")
                        .password("postgres")
                        .slotName("flink")
                        .decodingPluginName("decoderbufs") // use pgoutput for PostgreSQL 10+
                        .deserializer(deserializer)
                        .splitSize(2) // the split size of each snapshot split
                        .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(3000);

        env.fromSource(
                        postgresIncrementalSource,
                        WatermarkStrategy.noWatermarks(),
                        "PostgresParallelSource")
                .setParallelism(2)
                .print();

        env.execute("Output Postgres Snapshot");
    }
}

SourceFunction-based DataStream #

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.cdc.connectors.postgres.PostgreSQLSource;

public class PostgreSQLSourceExample {
  public static void main(String[] args) throws Exception {
    SourceFunction<String> sourceFunction = PostgreSQLSource.<String>builder()
      .hostname("localhost")
      .port(5432)
      .database("postgres") // monitor postgres database
      .schemaList("inventory")  // monitor inventory schema
      .tableList("inventory.products") // monitor products table
      .username("flinkuser")
      .password("flinkpw")
      .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
      .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env
      .addSource(sourceFunction)
      .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

    env.execute();
  }
}

Available Source metrics #

Metrics can help understand the progress of assignments, and the following are the supported Flink metrics:

Group Name Type Description
namespace.schema.table isSnapshotting Gauge Weather the table is snapshotting or not
namespace.schema.table isStreamReading Gauge Weather the table is stream reading or not
namespace.schema.table numTablesSnapshotted Gauge The number of tables that have been snapshotted
namespace.schema.table numTablesRemaining Gauge The number of tables that have not been snapshotted
namespace.schema.table numSnapshotSplitsProcessed Gauge The number of splits that is being processed
namespace.schema.table numSnapshotSplitsRemaining Gauge The number of splits that have not been processed
namespace.schema.table numSnapshotSplitsFinished Gauge The number of splits that have been processed
namespace.schema.table snapshotStartTime Gauge The time when the snapshot started
namespace.schema.table snapshotEndTime Gauge The time when the snapshot ended

Notice:

  1. The group name is namespace.schema.table, where namespace is the actual database name, schema is the actual schema name, and table is the actual table name.
  2. For PostgreSQL, the group name will be like test_database.test_schema.test_table.

Tables Without primary keys #

Starting from version 3.4.0, Postgres CDC support tables that do not have a primary key. To use a table without primary keys, you must configure the scan.incremental.snapshot.chunk.key-column option and specify one non-null field.

There are two places that need to be taken care of.

  1. If there is an index in the table, try to use a column which is contained in the index in scan.incremental.snapshot.chunk.key-column. This will increase the speed of select statement.
  2. The processing semantics of a Postgres CDC table without primary keys is determined based on the behavior of the column that are specified by the scan.incremental.snapshot.chunk.key-column.
  • If no update operation is performed on the specified column, the exactly-once semantics is ensured.
  • If the update operation is performed on the specified column, only the at-least-once semantics is ensured. However, you can specify primary keys at downstream and perform the idempotence operation to ensure data correctness.

Warning #

Using a non-primary key column as the scan.incremental.snapshot.chunk.key-column for a Postgres table with primary keys may lead to data inconsistencies. Below is a scenario illustrating this issue and recommendations to mitigate potential problems.

Problem Scenario #

  • Table Structure:

    • Primary Key: id
    • Chunk Key Column: pid (Not a primary key)
  • Snapshot Splits:

    • Split 0: 1 < pid <= 3
    • Split 1: 3 < pid <= 5
  • Operation:

    • Two different subtasks are reading Split 0 and Split 1 concurrently.
    • An update operation changes pid from 2 to 4 for id=0 while both splits are being read. This update occurs between the low and high watermark of both splits.
  • Result:

    • Split 0: Contains the record [id=0, pid=2]
    • Split 1: Contains the record [id=0, pid=4]

Since the order of processing these records cannot be guaranteed, the final value of pid for id=0 may end up being either 2 or 4, leading to potential data inconsistencies.

Data Type Mapping #

PostgreSQL type Flink SQL type
TINYINT
SMALLINT
INT2
SMALLSERIAL
SERIAL2
SMALLINT
INTEGER
SERIAL
INT
BIGINT
BIGSERIAL
BIGINT
DECIMAL(20, 0)
BIGINT BIGINT
REAL
FLOAT4
FLOAT
FLOAT8
DOUBLE PRECISION
DOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
DECIMAL(p, s)
BOOLEAN BOOLEAN
DATE DATE
TIME [(p)] [WITHOUT TIMEZONE] TIME [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)] [WITHOUT TIMEZONE] TIMESTAMP [(p)] [WITHOUT TIMEZONE]
CHAR(n)
CHARACTER(n)
VARCHAR(n)
CHARACTER VARYING(n)
TEXT
STRING
BYTEA BYTES

Back to top