PostgreSQL CDC

The PostgreSQL CDC publisher captures row-level changes (inserts, updates, deletes) from a PostgreSQL database using WAL logical replication. Changes are decoded by the wal2json output plugin (format version 2), buffered in memory, and staged in controlled batches.

Note: PostgreSQL CDC is currently marked as unstable and may undergo API changes in future releases.

Example

from typing import Tuple
import tabsdata as td

conn = td.PostgresCdcConn(
    uri="postgresql://localhost:5432/ecommerce",
    credentials=td.UserPasswordCredentials(
        user=td.EnvironmentSecret("PG_USER"),
        password=td.EnvironmentSecret("PG_PASS"),
    ),
)

trigger = td.PostgresCdcTrigger(
    conn=conn,
    tables=["public.orders", "public.order_items"],
    start_from="tail",
    replication_slot="ecommerce_cdc_slot",
    slot_behavior="create",
)

@td.publisher(
    trigger=trigger,
    tables=["orders", "order_items"],
)
def capture_ecommerce(
    orders: list[td.TableFrame],
    order_items: list[td.TableFrame],
) -> Tuple[td.TableFrame, td.TableFrame]:
    return td.concat(orders), td.concat(order_items)

This example publishes CDC data for the orders and order_items tables, capturing only changes that occur after the publisher has been first registered.

After defining the function, register it with a Tabsdata collection and trigger its execution.


Setup

PostgreSQL must be configured to enable CDC before using this publisher. See PostgreSQL Setup to Enable CDC.

Connection: PostgresCdcConn

PostgresCdcConn defines how to connect to the PostgreSQL server. It accepts a standard PostgreSQL URI and optional credentials.

conn = td.PostgresCdcConn(
    uri="postgresql://localhost:5432/my_database",
    credentials=td.UserPasswordCredentials(
        user=td.EnvironmentSecret("PG_CDC_USER"),
        password=td.EnvironmentSecret("PG_CDC_PASSWORD"),
    ),
)

Parameter

Type

Description

uri

str

PostgreSQL connection URI (postgresql://host:port/database). If the port is omitted, defaults to 5432. If the database is omitted, defaults to "postgres".

credentials

UserPasswordCredentials | None

Optional user/password credentials. If None, credentials from the URI are used.

cx_src_configs_postgres

dict | None

Optional PostgreSQL-specific connection parameters passed to the underlying driver.

Trigger: PostgresCdcTrigger

PostgresCdcTrigger connects to PostgreSQL, reads WAL events for the specified tables via logical replication, and stages batches of changes.

trigger = td.PostgresCdcTrigger(
    conn=conn,
    tables=["public.orders", "public.order_items"],
    start_from="tail",
    replication_slot="ecommerce_cdc_slot",
    slot_behavior="create",
)

tables

Specifies which database tables to monitor. Tables must be fully qualified as schema.table. Accepts a single string or a list of strings.

# Single table
tables="public.orders"

# Multiple tables
tables=["public.orders", "public.order_items"]

All tables must exist in the source database before the trigger starts. Tables created after the trigger is running will not be captured.

Note: Schema changes (such as ALTER TABLE, ADD COLUMN, DROP COLUMN) on tracked tables are handled automatically. No additional configuration is required — the connector detects the change and adjusts its output accordingly.

start_from

Determines where the connector begins reading from the WAL. On subsequent runs, the connector resumes automatically from its last committed offset.

Value

Type

Behavior

"head"

str

Start from the earliest available position in the WAL.

"tail"

str

Start from the current end of the WAL, capturing only new events.

LsnPosition(lsn=...)

LsnPosition

Start reading from a specific Log Sequence Number (LSN).

Note: LsnPosition is for initial positioning only — it is distinct from the confirmed LSN that the connector sends to the server for WAL pruning.

Replication Slot Configuration

Parameter

Type

Default

Description

replication_slot

str | None

None

Name of the logical replication slot to use.

publication_name

str | None

None

Name of the PostgreSQL publication to subscribe to for server-side table filtering.

slot_behavior

"create" | "reuse"

"reuse"

If "create", creates the slot if it does not exist. If "reuse", the slot must already exist.


Advanced Configuration

CDC Output Format (cdc_format)

The cdc_format parameter controls how change data is structured in the output TableFrames, configured via CdcFormat.

from tabsdata.connector.cdc.common.typing import CdcFormat

cdc_format=CdcFormat(values_format="columns", flatten_values=True)

Parameter

Type

Default

Description

values_format

"columns" | "struct" | "map"

"columns"

Controls how old and new row values are laid out in the output.

flatten_values

bool

True

When True, new values are promoted to individual top-level columns instead of being packed into a container column.

Metadata columns (always present)

Every output row includes the following metadata columns regardless of values_format:

Column

Type

Description

@td.cdc.meta.op

str

Operation type: "i" (insert), "u" (update), or "d" (delete).

@td.cdc.meta.tx

str

Transaction identifier from the source database.

@td.cdc.meta.sq

int

Sequence number ordering changes within a transaction.

values_format = "columns"

Each source table column is represented as two explicit output columns — one for the old value and one for the new value:

Column

Description

@td.cdc.data.col.old.<COL_NAME>

Value before the change.

@td.cdc.data.col.new.<COL_NAME>

Value after the change. Present when flatten_values=False.

<COL_NAME>

Value after the change. Present when flatten_values=True (replaces the new prefixed column).

Semantics by operation:

Operation

@td.cdc.data.col.old.<COL_NAME>

New value column

Insert (i)

null

Inserted data

Update (u)

Value prior to the update

Value after the update

Delete (d)

null

Deleted data

values_format = "map"

Old and new values are packed into map columns keyed by table column name:

Column

Type

Description

@td.cdc.data.map.old

Map<str, str>

Old values. Present when flatten_values=False or for old values always.

@td.cdc.data.map.new

Map<str, str>

New values packed as a map. Present when flatten_values=False.

<COL_NAME>

New values as individual columns. Present when flatten_values=True (replaces @td.cdc.data.map.new).

Semantics by operation:

Operation

@td.cdc.data.map.old

New value column(s)

Insert (i)

null

Inserted data

Update (u)

Values prior to the update

Values after the update

Delete (d)

null

Deleted data

values_format = "struct"

Identical to "map" but old and new values are packed into struct fields instead of map columns:

Column

Type

Description

@td.cdc.data.row.old

struct

Old values. Present when flatten_values=False or for old values always.

@td.cdc.data.row.new

struct

New values packed as a struct. Present when flatten_values=False.

<COL_NAME>

New values as individual columns. Present when flatten_values=True (replaces @td.cdc.data.row.new).

Semantics by operation are identical to "map" above.

Output Examples

values_format="columns", flatten_values=True

@td.cdc.meta.op

@td.cdc.meta.tx

@td.cdc.meta.sq

@td.cdc.meta.fmt

@td.cdc.meta.flat

id

username

first_name

last_name

email

@td.cdc.data.col.old.id

@td.cdc.data.col.old.username

@td.cdc.data.col.old.first_name

@td.cdc.data.col.old.last_name

@td.cdc.data.col.old.email

i

225e1410-…:18

1

columns

true

1

deals_1914

Johnny

Woods

replaced1800@gmail.com

null

null

null

null

null

u

225e1410-…:19

1

columns

true

7

filename_2073

Gerardo

Mcintosh

surgery1995@duck.com

7

filename_2073

Maren

Puckett

examinations2009@yahoo.com

d

225e1410-…:20

1

columns

true

2

incl_1972

Emery

Reilly

exposed2025@example.com

null

null

null

null

null

values_format="columns", flatten_values=False

@td.cdc.meta.op

@td.cdc.meta.tx

@td.cdc.meta.sq

@td.cdc.meta.fmt

@td.cdc.meta.flat

@td.cdc.data.col.new.id

@td.cdc.data.col.new.username

@td.cdc.data.col.new.first_name

@td.cdc.data.col.new.last_name

@td.cdc.data.col.new.email

@td.cdc.data.col.old.id

@td.cdc.data.col.old.username

@td.cdc.data.col.old.first_name

@td.cdc.data.col.old.last_name

@td.cdc.data.col.old.email

i

225e1410-…:22

1

columns

false

1

beat_1843

Kathyrn

Stokes

true1875@outlook.com

null

null

null

null

null

u

225e1410-…:23

1

columns

false

7

douglas_1901

Lawrence

Bauer

submission2025@yahoo.com

7

douglas_1901

Hermine

Preston

commodities1921@outlook.com

d

225e1410-…:24

1

columns

false

7

douglas_1901

Lawrence

Bauer

submission2025@yahoo.com

null

null

null

null

null

values_format="struct", flatten_values=True

@td.cdc.meta.op

@td.cdc.meta.tx

@td.cdc.meta.sq

@td.cdc.meta.fmt

@td.cdc.meta.flat

id

username

first_name

last_name

email

@td.cdc.data.row.old

i

225e1410-…:26

1

struct

true

1

loops_1939

Agueda

Duncan

clinical2027@protonmail.com

{null,null,null,null,null}

u

225e1410-…:27

1

struct

true

8

evaluating_1979

Carletta

Deleon

wrapping1938@yandex.com

{8,”evaluating_1979”,”Marlen”,”Estrada”,”hitachi1882@example.org”}

d

225e1410-…:28

1

struct

true

4

majority_1865

Eulah

Whitney

touched1819@yahoo.com

{null,null,null,null,null}

values_format="struct", flatten_values=False

@td.cdc.meta.op

@td.cdc.meta.tx

@td.cdc.meta.sq

@td.cdc.meta.fmt

@td.cdc.meta.flat

@td.cdc.data.row.new

@td.cdc.data.row.old

i

225e1410-…:30

1

struct

false

{1,”processes_2081”,”Leon”,”Pollard”,”browse1909@duck.com”}

{null,null,null,null,null}

u

225e1410-…:31

1

struct

false

{5,”virtually_1823”,”Gavin”,”Macdonald”,”rocky2058@yandex.com”}

{5,”virtually_1823”,”Erich”,”Hood”,”skin2004@gmail.com”}

d

225e1410-…:32

1

struct

false

{7,”thank_1865”,”Lashawna”,”Petty”,”classical2074@yandex.com”}

{null,null,null,null,null}

values_format="map", flatten_values=True

@td.cdc.meta.op

@td.cdc.meta.tx

@td.cdc.meta.sq

@td.cdc.meta.fmt

@td.cdc.meta.flat

id

username

first_name

last_name

email

@td.cdc.data.map.old

i

225e1410-…:34

1

map

true

1

uni_2028

Sandy

Hinton

husband1960@example.org

{“id”:null,”username”:null,”first_name”:null,”last_name”:null,”email”:null}

u

225e1410-…:35

1

map

true

1

uni_2028

Kelle

Noel

see2021@example.com

{“id”:1,”username”:”uni_2028”,”first_name”:”Sandy”,”last_name”:”Hinton”,”email”:”husband1960@example.org”}

d

225e1410-…:36

1

map

true

1

uni_2028

Kelle

Noel

see2021@example.com

{“id”:null,”username”:null,”first_name”:null,”last_name”:null,”email”:null}

values_format="map", flatten_values=False

@td.cdc.meta.op

@td.cdc.meta.tx

@td.cdc.meta.sq

@td.cdc.meta.fmt

@td.cdc.meta.flat

@td.cdc.data.map.new

@td.cdc.data.map.old

i

a4a17b92-…:38

1

map

false

{“id”:1,”username”:”vacancies_2045”,”first_name”:”Tony”,”last_name”:”Oliver”,”email”:”rec1977@yandex.com”}

{“id”:null,”username”:null,”first_name”:null,”last_name”:null,”email”:null}

u

a4a17b92-…:39

1

map

false

{“id”:7,”username”:”strategies_1852”,”first_name”:”Foster”,”last_name”:”Nolan”,”email”:”ambient1829@example.com”}

{“id”:7,”username”:”strategies_1852”,”first_name”:”Doreatha”,”last_name”:”Mclaughlin”,”email”:”buffalo2065@yandex.com”}

d

a4a17b92-…:40

1

map

false

{“id”:8,”username”:”boc_1991”,”first_name”:”Peg”,”last_name”:”Vang”,”email”:”blacks1939@yandex.com”}

{“id”:null,”username”:null,”first_name”:null,”last_name”:null,”email”:null}

Start Position Examples

from tabsdata.connector.cdc.postgres.typing import LsnPosition
from datetime import datetime, timezone

# Start from the end — capture only new changes going forward
start_from="tail"

# Start from the beginning of the WAL
start_from="head"

# Start from a specific LSN
start_from=LsnPosition(lsn=23456789)

Buffer and Trigger Thresholds

The CDC connector uses a two-stage pipeline: changes accumulate in memory (buffer), are flushed to the working directory, then staged to the output location.

Buffer thresholds (memory → working directory)

Parameter

Type

Default

Description

buffer_max_rows

int

10,000

Flush to disk when row count in memory reaches this limit.

buffer_max_bytes

int | None

None

Flush to disk when byte size in memory reaches this limit.

buffer_max_sec

float

60.0

Flush to disk when this many seconds have elapsed since the last flush.

Trigger thresholds (working directory → stage location)

Parameter

Type

Default

Description

trigger_max_rows

int | None

None

Stage when total rows on disk reach this limit.

trigger_max_bytes

int | None

None

Stage when total bytes on disk reach this limit.

trigger_max_sec

float

60.0

Stage when this many seconds have elapsed since the last stage.

Other Parameters

Parameter

Type

Default

Description

poll_interval_sec

float

1.0

Seconds between polling cycles when no new events are available.

blocking_timeout_sec

float

1.0

Timeout in seconds for blocking reads from the replication stream.

start

datetime | None

None

Delay trigger execution until this datetime (UTC).

end

datetime | None

None

Stop the trigger at this datetime (UTC).


Limitations

  • TRUNCATE: TRUNCATE TABLE operations are not captured. A truncate on a tracked table will not produce any change events.

  • Large/Blob types: BLOB, CLOB, LONGBLOB, BYTEA, and TEXT (in some configurations) column types are not currently supported. Tables containing these types should exclude them from capture or use alternative ingestion methods.

  • Static table list: All tables in the tables parameter must exist before the trigger starts. The connector does not perform runtime table discovery.


PostgreSQL Setup to Enable CDC

The steps below are provided for convenience. Refer to the PostgreSQL documentation for comprehensive and up-to-date configuration instructions.

Server Configuration

Add the following to postgresql.conf and restart the server:

wal_level = logical
max_replication_slots = 4    # at least 1 per CDC consumer
max_wal_senders = 4          # at least 1 per CDC consumer

Install wal2json

The wal2json extension must be installed on the PostgreSQL server. The connector uses wal2json format version 2 by default, which produces a JSON object per tuple (row change) with optional transaction boundary markers.

On Debian/Ubuntu, wal2json is available as a system package:

# Adjust version number to match your PostgreSQL version
apt-get install postgresql-17-wal2json

For installation instructions and detailed documentation, see the wal2json project.

Create a CDC User

CREATE ROLE cdc_user WITH LOGIN REPLICATION PASSWORD 'cdc_password';
GRANT CONNECT ON DATABASE my_database TO cdc_user;
GRANT USAGE ON SCHEMA public TO cdc_user;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO cdc_user;

Set Replica Identity

For full before-image data on updates and deletes, set REPLICA IDENTITY FULL on each table you want to capture. Without this, only primary key columns are included in the before-image.

ALTER TABLE orders REPLICA IDENTITY FULL;
ALTER TABLE order_items REPLICA IDENTITY FULL;

Create a Replication Slot (optional)

You can pre-create a logical replication slot, or let the connector manage it via the slot_behavior parameter.

SELECT pg_create_logical_replication_slot('my_cdc_slot', 'wal2json');

Warning: Abandoned replication slots cause unbounded WAL growth. Monitor slot lag via pg_replication_slots and drop unused slots promptly.