PostgreSQL CDC
The PostgreSQL CDC publisher captures row-level changes (inserts, updates, deletes) from a PostgreSQL database using WAL logical replication. Changes are decoded by the wal2json output plugin (format version 2), buffered in memory, and staged in controlled batches.
Note: PostgreSQL CDC is currently marked as unstable and may undergo API changes in future releases.
Example
from typing import Tuple
import tabsdata as td
conn = td.PostgresCdcConn(
uri="postgresql://localhost:5432/ecommerce",
credentials=td.UserPasswordCredentials(
user=td.EnvironmentSecret("PG_USER"),
password=td.EnvironmentSecret("PG_PASS"),
),
)
trigger = td.PostgresCdcTrigger(
conn=conn,
tables=["public.orders", "public.order_items"],
start_from="tail",
replication_slot="ecommerce_cdc_slot",
slot_behavior="create",
)
@td.publisher(
trigger=trigger,
tables=["orders", "order_items"],
)
def capture_ecommerce(
orders: list[td.TableFrame],
order_items: list[td.TableFrame],
) -> Tuple[td.TableFrame, td.TableFrame]:
return td.concat(orders), td.concat(order_items)
This example publishes CDC data for the orders and order_items tables, capturing only changes that occur after the publisher has been first registered.
After defining the function, register it with a Tabsdata collection and trigger its execution.
Setup
PostgreSQL must be configured to enable CDC before using this publisher. See PostgreSQL Setup to Enable CDC.
Connection: PostgresCdcConn
PostgresCdcConn defines how to connect to the PostgreSQL server. It accepts a standard PostgreSQL URI and optional credentials.
conn = td.PostgresCdcConn(
uri="postgresql://localhost:5432/my_database",
credentials=td.UserPasswordCredentials(
user=td.EnvironmentSecret("PG_CDC_USER"),
password=td.EnvironmentSecret("PG_CDC_PASSWORD"),
),
)
Parameter |
Type |
Description |
|---|---|---|
|
|
PostgreSQL connection URI ( |
|
|
Optional user/password credentials. If |
|
|
Optional PostgreSQL-specific connection parameters passed to the underlying driver. |
Trigger: PostgresCdcTrigger
PostgresCdcTrigger connects to PostgreSQL, reads WAL events for the specified tables via logical replication, and stages batches of changes.
trigger = td.PostgresCdcTrigger(
conn=conn,
tables=["public.orders", "public.order_items"],
start_from="tail",
replication_slot="ecommerce_cdc_slot",
slot_behavior="create",
)
tables
Specifies which database tables to monitor. Tables must be fully qualified as schema.table. Accepts a single string or a list of strings.
# Single table
tables="public.orders"
# Multiple tables
tables=["public.orders", "public.order_items"]
All tables must exist in the source database before the trigger starts. Tables created after the trigger is running will not be captured.
Note: Schema changes (such as
ALTER TABLE,ADD COLUMN,DROP COLUMN) on tracked tables are handled automatically. No additional configuration is required — the connector detects the change and adjusts its output accordingly.
start_from
Determines where the connector begins reading from the WAL. On subsequent runs, the connector resumes automatically from its last committed offset.
Value |
Type |
Behavior |
|---|---|---|
|
|
Start from the earliest available position in the WAL. |
|
|
Start from the current end of the WAL, capturing only new events. |
|
|
Start reading from a specific Log Sequence Number (LSN). |
Note:
LsnPositionis for initial positioning only — it is distinct from the confirmed LSN that the connector sends to the server for WAL pruning.
Replication Slot Configuration
Parameter |
Type |
Default |
Description |
|---|---|---|---|
|
|
|
Name of the logical replication slot to use. |
|
|
|
Name of the PostgreSQL publication to subscribe to for server-side table filtering. |
|
|
|
If |
Advanced Configuration
CDC Output Format (cdc_format)
The cdc_format parameter controls how change data is structured in the output TableFrames, configured via CdcFormat.
from tabsdata.connector.cdc.common.typing import CdcFormat
cdc_format=CdcFormat(values_format="columns", flatten_values=True)
Parameter |
Type |
Default |
Description |
|---|---|---|---|
|
|
|
Controls how old and new row values are laid out in the output. |
|
|
|
When |
Metadata columns (always present)
Every output row includes the following metadata columns regardless of values_format:
Column |
Type |
Description |
|---|---|---|
|
|
Operation type: |
|
|
Transaction identifier from the source database. |
|
|
Sequence number ordering changes within a transaction. |
values_format = "columns"
Each source table column is represented as two explicit output columns — one for the old value and one for the new value:
Column |
Description |
|---|---|
|
Value before the change. |
|
Value after the change. Present when |
|
Value after the change. Present when |
Semantics by operation:
Operation |
|
New value column |
|---|---|---|
Insert ( |
|
Inserted data |
Update ( |
Value prior to the update |
Value after the update |
Delete ( |
|
Deleted data |
values_format = "map"
Old and new values are packed into map columns keyed by table column name:
Column |
Type |
Description |
|---|---|---|
|
|
Old values. Present when |
|
|
New values packed as a map. Present when |
|
— |
New values as individual columns. Present when |
Semantics by operation:
Operation |
|
New value column(s) |
|---|---|---|
Insert ( |
|
Inserted data |
Update ( |
Values prior to the update |
Values after the update |
Delete ( |
|
Deleted data |
values_format = "struct"
Identical to "map" but old and new values are packed into struct fields instead of map columns:
Column |
Type |
Description |
|---|---|---|
|
struct |
Old values. Present when |
|
struct |
New values packed as a struct. Present when |
|
— |
New values as individual columns. Present when |
Semantics by operation are identical to "map" above.
Output Examples
values_format="columns", flatten_values=True
@td.cdc.meta.op |
@td.cdc.meta.tx |
@td.cdc.meta.sq |
@td.cdc.meta.fmt |
@td.cdc.meta.flat |
id |
username |
first_name |
last_name |
@td.cdc.data.col.old.id |
@td.cdc.data.col.old.username |
@td.cdc.data.col.old.first_name |
@td.cdc.data.col.old.last_name |
@td.cdc.data.col.old.email |
|
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
i |
225e1410-…:18 |
1 |
columns |
true |
1 |
deals_1914 |
Johnny |
Woods |
replaced1800@gmail.com |
null |
null |
null |
null |
null |
u |
225e1410-…:19 |
1 |
columns |
true |
7 |
filename_2073 |
Gerardo |
Mcintosh |
surgery1995@duck.com |
7 |
filename_2073 |
Maren |
Puckett |
examinations2009@yahoo.com |
d |
225e1410-…:20 |
1 |
columns |
true |
2 |
incl_1972 |
Emery |
Reilly |
exposed2025@example.com |
null |
null |
null |
null |
null |
values_format="columns", flatten_values=False
@td.cdc.meta.op |
@td.cdc.meta.tx |
@td.cdc.meta.sq |
@td.cdc.meta.fmt |
@td.cdc.meta.flat |
@td.cdc.data.col.new.id |
@td.cdc.data.col.new.username |
@td.cdc.data.col.new.first_name |
@td.cdc.data.col.new.last_name |
@td.cdc.data.col.new.email |
@td.cdc.data.col.old.id |
@td.cdc.data.col.old.username |
@td.cdc.data.col.old.first_name |
@td.cdc.data.col.old.last_name |
@td.cdc.data.col.old.email |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
i |
225e1410-…:22 |
1 |
columns |
false |
1 |
beat_1843 |
Kathyrn |
Stokes |
true1875@outlook.com |
null |
null |
null |
null |
null |
u |
225e1410-…:23 |
1 |
columns |
false |
7 |
douglas_1901 |
Lawrence |
Bauer |
submission2025@yahoo.com |
7 |
douglas_1901 |
Hermine |
Preston |
commodities1921@outlook.com |
d |
225e1410-…:24 |
1 |
columns |
false |
7 |
douglas_1901 |
Lawrence |
Bauer |
submission2025@yahoo.com |
null |
null |
null |
null |
null |
values_format="struct", flatten_values=True
@td.cdc.meta.op |
@td.cdc.meta.tx |
@td.cdc.meta.sq |
@td.cdc.meta.fmt |
@td.cdc.meta.flat |
id |
username |
first_name |
last_name |
@td.cdc.data.row.old |
|
|---|---|---|---|---|---|---|---|---|---|---|
i |
225e1410-…:26 |
1 |
struct |
true |
1 |
loops_1939 |
Agueda |
Duncan |
clinical2027@protonmail.com |
{null,null,null,null,null} |
u |
225e1410-…:27 |
1 |
struct |
true |
8 |
evaluating_1979 |
Carletta |
Deleon |
wrapping1938@yandex.com |
{8,”evaluating_1979”,”Marlen”,”Estrada”,”hitachi1882@example.org”} |
d |
225e1410-…:28 |
1 |
struct |
true |
4 |
majority_1865 |
Eulah |
Whitney |
touched1819@yahoo.com |
{null,null,null,null,null} |
values_format="struct", flatten_values=False
@td.cdc.meta.op |
@td.cdc.meta.tx |
@td.cdc.meta.sq |
@td.cdc.meta.fmt |
@td.cdc.meta.flat |
@td.cdc.data.row.new |
@td.cdc.data.row.old |
|---|---|---|---|---|---|---|
i |
225e1410-…:30 |
1 |
struct |
false |
{1,”processes_2081”,”Leon”,”Pollard”,”browse1909@duck.com”} |
{null,null,null,null,null} |
u |
225e1410-…:31 |
1 |
struct |
false |
{5,”virtually_1823”,”Gavin”,”Macdonald”,”rocky2058@yandex.com”} |
{5,”virtually_1823”,”Erich”,”Hood”,”skin2004@gmail.com”} |
d |
225e1410-…:32 |
1 |
struct |
false |
{7,”thank_1865”,”Lashawna”,”Petty”,”classical2074@yandex.com”} |
{null,null,null,null,null} |
values_format="map", flatten_values=True
@td.cdc.meta.op |
@td.cdc.meta.tx |
@td.cdc.meta.sq |
@td.cdc.meta.fmt |
@td.cdc.meta.flat |
id |
username |
first_name |
last_name |
@td.cdc.data.map.old |
|
|---|---|---|---|---|---|---|---|---|---|---|
i |
225e1410-…:34 |
1 |
map |
true |
1 |
uni_2028 |
Sandy |
Hinton |
husband1960@example.org |
{“id”:null,”username”:null,”first_name”:null,”last_name”:null,”email”:null} |
u |
225e1410-…:35 |
1 |
map |
true |
1 |
uni_2028 |
Kelle |
Noel |
see2021@example.com |
{“id”:1,”username”:”uni_2028”,”first_name”:”Sandy”,”last_name”:”Hinton”,”email”:”husband1960@example.org”} |
d |
225e1410-…:36 |
1 |
map |
true |
1 |
uni_2028 |
Kelle |
Noel |
see2021@example.com |
{“id”:null,”username”:null,”first_name”:null,”last_name”:null,”email”:null} |
values_format="map", flatten_values=False
@td.cdc.meta.op |
@td.cdc.meta.tx |
@td.cdc.meta.sq |
@td.cdc.meta.fmt |
@td.cdc.meta.flat |
@td.cdc.data.map.new |
@td.cdc.data.map.old |
|---|---|---|---|---|---|---|
i |
a4a17b92-…:38 |
1 |
map |
false |
{“id”:1,”username”:”vacancies_2045”,”first_name”:”Tony”,”last_name”:”Oliver”,”email”:”rec1977@yandex.com”} |
{“id”:null,”username”:null,”first_name”:null,”last_name”:null,”email”:null} |
u |
a4a17b92-…:39 |
1 |
map |
false |
{“id”:7,”username”:”strategies_1852”,”first_name”:”Foster”,”last_name”:”Nolan”,”email”:”ambient1829@example.com”} |
{“id”:7,”username”:”strategies_1852”,”first_name”:”Doreatha”,”last_name”:”Mclaughlin”,”email”:”buffalo2065@yandex.com”} |
d |
a4a17b92-…:40 |
1 |
map |
false |
{“id”:8,”username”:”boc_1991”,”first_name”:”Peg”,”last_name”:”Vang”,”email”:”blacks1939@yandex.com”} |
{“id”:null,”username”:null,”first_name”:null,”last_name”:null,”email”:null} |
Start Position Examples
from tabsdata.connector.cdc.postgres.typing import LsnPosition
from datetime import datetime, timezone
# Start from the end — capture only new changes going forward
start_from="tail"
# Start from the beginning of the WAL
start_from="head"
# Start from a specific LSN
start_from=LsnPosition(lsn=23456789)
Buffer and Trigger Thresholds
The CDC connector uses a two-stage pipeline: changes accumulate in memory (buffer), are flushed to the working directory, then staged to the output location.
Buffer thresholds (memory → working directory)
Parameter |
Type |
Default |
Description |
|---|---|---|---|
|
|
|
Flush to disk when row count in memory reaches this limit. |
|
|
|
Flush to disk when byte size in memory reaches this limit. |
|
|
|
Flush to disk when this many seconds have elapsed since the last flush. |
Trigger thresholds (working directory → stage location)
Parameter |
Type |
Default |
Description |
|---|---|---|---|
|
|
|
Stage when total rows on disk reach this limit. |
|
|
|
Stage when total bytes on disk reach this limit. |
|
|
|
Stage when this many seconds have elapsed since the last stage. |
Other Parameters
Parameter |
Type |
Default |
Description |
|---|---|---|---|
|
|
|
Seconds between polling cycles when no new events are available. |
|
|
|
Timeout in seconds for blocking reads from the replication stream. |
|
|
|
Delay trigger execution until this datetime (UTC). |
|
|
|
Stop the trigger at this datetime (UTC). |
Limitations
TRUNCATE:
TRUNCATE TABLEoperations are not captured. A truncate on a tracked table will not produce any change events.Large/Blob types:
BLOB,CLOB,LONGBLOB,BYTEA, andTEXT(in some configurations) column types are not currently supported. Tables containing these types should exclude them from capture or use alternative ingestion methods.Static table list: All tables in the
tablesparameter must exist before the trigger starts. The connector does not perform runtime table discovery.
PostgreSQL Setup to Enable CDC
The steps below are provided for convenience. Refer to the PostgreSQL documentation for comprehensive and up-to-date configuration instructions.
Server Configuration
Add the following to postgresql.conf and restart the server:
wal_level = logical
max_replication_slots = 4 # at least 1 per CDC consumer
max_wal_senders = 4 # at least 1 per CDC consumer
Install wal2json
The wal2json extension must be installed on the PostgreSQL server. The connector uses wal2json format version 2 by default, which produces a JSON object per tuple (row change) with optional transaction boundary markers.
On Debian/Ubuntu, wal2json is available as a system package:
# Adjust version number to match your PostgreSQL version
apt-get install postgresql-17-wal2json
For installation instructions and detailed documentation, see the wal2json project.
Create a CDC User
CREATE ROLE cdc_user WITH LOGIN REPLICATION PASSWORD 'cdc_password';
GRANT CONNECT ON DATABASE my_database TO cdc_user;
GRANT USAGE ON SCHEMA public TO cdc_user;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO cdc_user;
Set Replica Identity
For full before-image data on updates and deletes, set REPLICA IDENTITY FULL on each table you want to capture. Without this, only primary key columns are included in the before-image.
ALTER TABLE orders REPLICA IDENTITY FULL;
ALTER TABLE order_items REPLICA IDENTITY FULL;
Create a Replication Slot (optional)
You can pre-create a logical replication slot, or let the connector manage it via the slot_behavior parameter.
SELECT pg_create_logical_replication_slot('my_cdc_slot', 'wal2json');
Warning: Abandoned replication slots cause unbounded WAL growth. Monitor slot lag via
pg_replication_slotsand drop unused slots promptly.