Postgres CDC Connector

The Postgres CDC connector allows for reading snapshot data and incremental data from PostgreSQL database. This document describes how to setup the Postgres CDC connector to run SQL queries against PostgreSQL databases.


In order to setup the Postgres CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependency

  <!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. -->

SQL Client JAR

Download link is available only for stable releases.

Download flink-sql-connector-postgres-cdc-2.4-SNAPSHOT.jar and put it under <FLINK_HOME>/lib/.

Note: flink-sql-connector-postgres-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as flink-sql-connector-postgres-cdc-2.2.1.jar, the released version will be available in the Maven central warehouse.

How to create a Postgres CDC table

The Postgres CDC table can be defined as following:

-- register a PostgreSQL table 'shipments' in Flink SQL
CREATE TABLE shipments (
  shipment_id INT,
  order_id INT,
  origin STRING,
  destination STRING,
  is_arrived BOOLEAN
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = 'localhost',
  'port' = '5432',
  'username' = 'postgres',
  'password' = 'postgres',
  'database-name' = 'postgres',
  'schema-name' = 'public',
  'table-name' = 'shipments'

-- read snapshot and binlogs from shipments table
SELECT * FROM shipments;

Connector Options

Option Required Default Type Description
connector required (none) String Specify what connector to use, here should be 'postgres-cdc'.
hostname required (none) String IP address or hostname of the PostgreSQL database server.
username required (none) String Name of the PostgreSQL database to use when connecting to the PostgreSQL database server.
password required (none) String Password to use when connecting to the PostgreSQL database server.
database-name required (none) String Database name of the PostgreSQL server to monitor.
schema-name required (none) String Schema name of the PostgreSQL database to monitor.
table-name required (none) String Table name of the PostgreSQL database to monitor.
port optional 5432 Integer Integer port number of the PostgreSQL database server. optional decoderbufs String The name of the Postgres logical decoding plug-in installed on the server. Supported values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming and pgoutput. optional flink String The name of the PostgreSQL logical decoding slot that was created for streaming changes from a particular plug-in for a particular database/schema. The server uses this slot to stream events to the connector that you are configuring.
Slot names must conform to PostgreSQL replication slot naming rules, which state: "Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character."
changelog-mode optional all String The changelog mode used for encoding streaming changes. Supported values are all (which encodes changes as retract stream using all RowKinds) and upsert (which encodes changes as upsert stream that describes idempotent updates on a key).
upsert mode can be used for tables with primary keys when replica identity FULL is not an option. Primary keys must be set to use upsert mode.
debezium.* optional (none) String Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from Postgres server. For example: 'debezium.snapshot.mode' = 'never'. See more about the Debezium's Postgres Connector properties optional (none) String If you encounter a situation where there is a large amount of data in the table and you don't need all the historical data. You can try to specify the underlying configuration in debezium to select the data range you want to snapshot. This parameter only affects snapshots and does not affect subsequent data reading consumption.
Note: PostgreSQL must use schema name and table name.
For example: '' = 'schema.table'.
After specifying the above attributes, you must also add the following attributes:[schema].[table][schema].[table] optional (none) String You can specify SQL statements to limit the data range of snapshot.
Note1: Schema and table need to be specified in the SQL statement, and the SQL should conform to the syntax of the data source.Currently.
For example: '' = 'select * from schema.table where 1 != 1'.
Note2: The Flink SQL client submission task does not support functions with single quotation marks in the content.
For example: '' = 'select * from schema.table where to_char(rq, 'yyyy-MM-dd')'.

Note: is recommended to set for different tables to avoid the potential PSQLException: ERROR: replication slot "flink" is active for PID 974 error. See more here.

Available Metadata

The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.

Key DataType Description
table_name STRING NOT NULL Name of the table that contain the row.
schema_name STRING NOT NULL Name of the schema that contain the row.
database_name STRING NOT NULL Name of the database that contain the row.
op_ts TIMESTAMP_LTZ(3) NOT NULL It indicates the time that the change was made in the database.
If the record is read from snapshot of the table instead of the change stream, the value is always 0.


Can’t perform checkpoint during scanning snapshot of tables

During scanning snapshot of database tables, since there is no recoverable position, we can’t perform checkpoints. In order to not perform checkpoints, Postgres CDC source will keep the checkpoint waiting to timeout. The timeout checkpoint will be recognized as failed checkpoint, by default, this will trigger a failover for the Flink job. So if the database table is large, it is recommended to add following Flink configurations to avoid failover because of the timeout checkpoints:

execution.checkpointing.interval: 10min
execution.checkpointing.tolerable-failed-checkpoints: 100
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2147483647

The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:

CREATE TABLE products (
    db_name STRING METADATA FROM 'database_name' VIRTUAL,
    table_name STRING METADATA  FROM 'table_name' VIRTUAL,
    operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    shipment_id INT,
    order_id INT,
    origin STRING,
    destination STRING,
    is_arrived BOOLEAN
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = 'localhost',
  'port' = '5432',
  'username' = 'postgres',
  'password' = 'postgres',
  'database-name' = 'postgres',
  'schema-name' = 'public',
  'table-name' = 'shipments'


Exactly-Once Processing

The Postgres CDC connector is a Flink Source connector which will read database snapshot first and then continues to read binlogs with exactly-once processing even failures happen. Please read How the connector works.

Single Thread Reading

The Postgres CDC source can’t work in parallel reading, because there is only one task can receive binlog events.

DataStream Source

The Postgres CDC connector can also be a DataStream source. You can create a SourceFunction as the following shows:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.postgres.PostgreSQLSource;

public class PostgreSQLSourceExample {
  public static void main(String[] args) throws Exception {
    SourceFunction<String> sourceFunction = PostgreSQLSource.<String>builder()
      .database("postgres") // monitor postgres database
      .schemaList("inventory")  // monitor inventory schema
      .tableList("inventory.products") // monitor products table
      .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

      .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering


Note: Please refer Deserialization for more details about the JSON deserialization.

Data Type Mapping

PostgreSQL type Flink SQL type
DECIMAL(20, 0)