PostgreSQL CDC

The PostgreSQL CDC publisher captures row-level changes (inserts, updates, deletes) from a PostgreSQL database using WAL logical replication. Changes are decoded by the wal2json output plugin (format version 2), buffered in memory, and staged in controlled batches.

Note: PostgreSQL CDC is currently marked as unstable and may undergo API changes in future releases.

Example

from typing import Tuple
import tabsdata as td

conn = td.PostgresCdcConn(
    uri="postgresql://localhost:5432/ecommerce",
    credentials=td.UserPasswordCredentials(
        user=td.EnvironmentSecret("PG_USER"),
        password=td.EnvironmentSecret("PG_PASS"),
    ),
)

trigger = td.PostgresCdcTrigger(
    conn=conn,
    tables=["public.orders", "public.order_items"],
    start_from="tail",
    replication_slot="ecommerce_cdc_slot",
    slot_behavior="create",
)

@td.publisher(
    trigger=trigger,
    tables=["orders", "order_items"],
)
def capture_ecommerce(
    orders: list[td.TableFrame],
    order_items: list[td.TableFrame],
) -> Tuple[td.TableFrame, td.TableFrame]:
    return td.concat(orders), td.concat(order_items)

This example publishes CDC data for the orders and order_items tables, capturing only changes that occur after the publisher has been first registered.

After defining the function, register it with a Tabsdata collection and trigger its execution.


Setup

Configuring PostgreSQL for CDC

Before using the PostgreSQL CDC publisher, the source database must be configured for logical replication with the wal2json output plugin installed.

Server Configuration

Add the following to postgresql.conf and restart the server:

wal_level = logical
max_replication_slots = 4    # at least 1 per CDC consumer
max_wal_senders = 4          # at least 1 per CDC consumer

Install wal2json

The wal2json extension must be installed on the PostgreSQL server. The connector uses wal2json format version 2 by default, which produces a JSON object per tuple (row change) with optional transaction boundary markers.

On Debian/Ubuntu, wal2json is available as a system package:

# Adjust version number to match your PostgreSQL version
apt-get install postgresql-17-wal2json

For installation instructions and detailed documentation, see the wal2json project.

Create a CDC User

CREATE ROLE cdc_user WITH LOGIN REPLICATION PASSWORD 'cdc_password';
GRANT CONNECT ON DATABASE my_database TO cdc_user;
GRANT USAGE ON SCHEMA public TO cdc_user;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO cdc_user;

Set Replica Identity

For full before-image data on updates and deletes, set REPLICA IDENTITY FULL on each table you want to capture. Without this, only primary key columns are included in the before-image.

ALTER TABLE orders REPLICA IDENTITY FULL;
ALTER TABLE order_items REPLICA IDENTITY FULL;

Create a Replication Slot (optional)

You can pre-create a logical replication slot, or let the connector manage it via the slot_behavior parameter.

SELECT pg_create_logical_replication_slot('my_cdc_slot', 'wal2json');

Warning: Abandoned replication slots cause unbounded WAL growth. Monitor slot lag via pg_replication_slots and drop unused slots promptly.

Connection: PostgresCdcConn

PostgresCdcConn defines how to connect to the PostgreSQL server. It accepts a standard PostgreSQL URI and optional credentials.

conn = td.PostgresCdcConn(
    uri="postgresql://localhost:5432/my_database",
    credentials=td.UserPasswordCredentials(
        user=td.EnvironmentSecret("PG_CDC_USER"),
        password=td.EnvironmentSecret("PG_CDC_PASSWORD"),
    ),
)

Parameter Type Description uri str PostgreSQL connection URI (postgresql://host:port/database). If the port is omitted, defaults to 5432. If the database is omitted, defaults to "postgres". credentials UserPasswordCredentials | None Optional user/password credentials. If None, credentials from the URI are used. cx_src_configs_postgres dict | None Optional PostgreSQL-specific connection parameters passed to the underlying driver.

Trigger: PostgresCdcTrigger

PostgresCdcTrigger connects to PostgreSQL, reads WAL events for the specified tables via logical replication, and stages batches of changes.

trigger = td.PostgresCdcTrigger(
    conn=conn,
    tables=["public.orders", "public.order_items"],
    start_from="tail",
    replication_slot="ecommerce_cdc_slot",
    slot_behavior="create",
)

tables

Specifies which database tables to monitor. Tables must be fully qualified as schema.table. Accepts a single string or a list of strings.

# Single table
tables="public.orders"

# Multiple tables
tables=["public.orders", "public.order_items"]

All tables must exist in the source database before the trigger starts. Tables created after the trigger is running will not be captured.

start_from

Determines where the connector begins reading from the WAL. On subsequent runs, the connector resumes automatically from its last committed offset.

Value Type Behavior "head" str Start from the earliest available position in the WAL. "tail" str Start from the current end of the WAL, capturing only new events. LsnPosition(lsn=...) LsnPosition Start reading from a specific Log Sequence Number (LSN).

Note: LsnPosition is for initial positioning only — it is distinct from the confirmed LSN that the connector sends to the server for WAL pruning.

Replication Slot Configuration

Parameter Type Default Description replication_slot str | None None Name of the logical replication slot to use. publication_name str | None None Name of the PostgreSQL publication to subscribe to for server-side table filtering. slot_behavior "create" | "reuse" "reuse" If "create", creates the slot if it does not exist. If "reuse", the slot must already exist.


Advanced Configuration

CDC Output Format (cdc_format)

The cdc_format parameter controls how change data is structured in the output TableFrames, configured via CdcFormat. The available options are identical to those of the MySQL CDC publisher — see the MySQL CDC Publisher documentation for the full breakdown of values_format options, flatten_values behaviour, metadata columns, and per-operation semantics.

from tabsdata.connector.cdc.common.typing import CdcFormat

cdc_format=CdcFormat(values_format="columns", flatten_values=True)

Parameter Type Default Description values_format "columns" | "struct" | "map" "columns" Controls how old and new row values are laid out in the output. flatten_values bool True When True, new values are promoted to individual top-level columns instead of being packed into a container column.

Start Position Examples

from tabsdata.connector.cdc.postgres.typing import LsnPosition
from datetime import datetime, timezone

# Start from the end — capture only new changes going forward
start_from="tail"

# Start from the beginning of the WAL
start_from="head"

# Start from a specific LSN
start_from=LsnPosition(lsn=23456789)

Buffer and Trigger Thresholds

The CDC connector uses a two-stage pipeline: changes accumulate in memory (buffer), are flushed to the working directory, then staged to the output location.

Buffer thresholds (memory → working directory)

Parameter Type Default Description buffer_max_rows int 10,000 Flush to disk when row count in memory reaches this limit. buffer_max_bytes int | None None Flush to disk when byte size in memory reaches this limit. buffer_max_sec float 60.0 Flush to disk when this many seconds have elapsed since the last flush.

Trigger thresholds (working directory → stage location)

Parameter Type Default Description trigger_max_rows int | None None Stage when total rows on disk reach this limit. trigger_max_bytes int | None None Stage when total bytes on disk reach this limit. trigger_max_sec float 60.0 Stage when this many seconds have elapsed since the last stage.

Other Parameters

Parameter Type Default Description poll_interval_sec float 1.0 Seconds between polling cycles when no new events are available. blocking_timeout_sec float 1.0 Timeout in seconds for blocking reads from the replication stream. start datetime | None None Delay trigger execution until this datetime (UTC). end datetime | None None Stop the trigger at this datetime (UTC).


Limitations

  • Schema changes: ALTER TABLE, ADD/DROP COLUMN, and similar DDL operations on tracked tables are not detected or handled. If the source schema changes, the connector must be stopped and reconfigured.

  • TRUNCATE: TRUNCATE TABLE operations are not captured. A truncate on a tracked table will not produce any change events.

  • Large/Blob types: BLOB, CLOB, LONGBLOB, BYTEA, and TEXT (in some configurations) column types are not currently supported. Tables containing these types should exclude them from capture or use alternative ingestion methods.

  • Static table list: All tables in the tables parameter must exist before the trigger starts. The connector does not perform runtime table discovery.