MySQL CDC

The MySQL CDC publisher captures row-level changes (inserts, updates, deletes) from a MySQL database by reading its binary log (binlog) in real time.

Note: MySQL CDC is currently marked as unstable and may undergo API changes in future releases.

Installing the Connector Dependencies

pip install mysql-connector-python==9.3.0

Example

from typing import Tuple
import tabsdata as td

conn = td.MySQLCdcConn(
    uri="mysql://localhost:3306/ecommerce",
    credentials=td.UserPasswordCredentials(
        user=td.EnvironmentSecret("MYSQL_USER"),
        password=td.EnvironmentSecret("MYSQL_PASS"),
    ),
)

trigger = td.MySQLCdcTrigger(
    conn=conn,
    tables=["ecommerce.orders", "ecommerce.order_items"],
    start_from="tail",
)

@td.publisher(
    trigger=trigger,
    tables=["orders", "order_items"],
)
def capture_ecommerce(
    orders: list[td.TableFrame],
    order_items: list[td.TableFrame],
) -> Tuple[td.TableFrame, td.TableFrame]:
    return td.concat(orders), td.concat(order_items)

This example publishes CDC data for the orders and order_items tables, capturing only changes that occur after the publisher has been first registered.

After defining the function, register it with a Tabsdata collection and trigger its execution.


Setup

Configuring MySQL for CDC

Before using the MySQL CDC publisher, the source MySQL database must be configured to enable row-based binary logging and GTID support.

Server Configuration

Add the following to your MySQL configuration file (my.cnf or my.ini) and restart the server:

[mysqld]
server-id                = 1
log_bin                  = mysql-bin
binlog_format            = ROW
binlog_row_image         = FULL
binlog_row_metadata      = FULL
gtid_mode                = ON
enforce_gtid_consistency = ON

Parameter

Description

server-id

Enables binary logging with a unique server identifier.

binlog_format = ROW

Row-based format captures individual column values rather than SQL statements.

binlog_row_image = FULL

Logs all columns for every change, not just modified ones.

gtid_mode = ON

Required for resuming from a precise position across server restarts.

Create a CDC User

Create a dedicated MySQL user with the privileges required for binlog replication:

CREATE USER 'cdc_user'@'%' IDENTIFIED BY 'cdc_password';
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdc_user'@'%';
GRANT SELECT ON my_database.* TO 'cdc_user'@'%';
FLUSH PRIVILEGES;

REPLICATION SLAVE and REPLICATION CLIENT are the minimum privileges needed to connect as a binlog reader. SELECT is required for the initial table schema discovery.

Connection: MySQLCdcConn

MySQLCdcConn defines how to connect to the MySQL server. It accepts a standard MySQL URI and optional credentials.

conn = td.MySQLCdcConn(
    uri="mysql://localhost:3306/my_database",
    credentials=td.UserPasswordCredentials(
        user=td.EnvironmentSecret("MYSQL_CDC_USER"),
        password=td.EnvironmentSecret("MYSQL_CDC_PASSWORD"),
    ),
)

Parameter

Type

Description

uri

str

MySQL connection URI (mysql://host:port/database). If the port is omitted, defaults to 3306. If the database is omitted, defaults to "mysql".

credentials

UserPasswordCredentials | None

Optional user/password credentials. If None, credentials from the URI are used.

cx_src_configs_mysql

dict | None

Optional MySQL-specific connection parameters passed to the underlying driver.

Trigger: MySQLCdcTrigger

MySQLCdcTrigger connects to MySQL, reads binlog events for the specified tables, and stages batches of changes.

trigger = td.MySQLCdcTrigger(
    conn=conn,
    tables=["ecommerce.orders", "ecommerce.order_items"],
    start_from="tail",
)

tables

Specifies which database tables to monitor. Tables must be fully qualified as schema.table. Accepts a single string or a list of strings.

# Single table
tables="my_database.orders"

# Multiple tables
tables=["my_database.orders", "my_database.order_items"]

All tables must exist in the source database before the trigger starts. Tables created after the trigger is running will not be captured.

start_from

Determines where the connector begins reading the binlog. On subsequent runs, the connector resumes automatically from its last committed position.

Value

Type

Behavior

"head"

str

Start from the earliest available position in the binlog.

"tail"

str

Start from the current end of the binlog, capturing only new events.

GtidPosition(gtid="...")

GtidPosition

Resume from a specific Global Transaction ID.

BinlogPosition(file="...", pos=...)

BinlogPosition

Resume from a specific binlog file name and byte offset.

TimestampPosition(ts=datetime(...))

TimestampPosition

Start from the first event at or after the given timestamp.


Advanced Configuration

CDC Output Format (cdc_format)

The cdc_format parameter controls how change data is structured in the output TableFrames, configured via CdcFormat.

from tabsdata.connector.cdc.common.typing import CdcFormat

cdc_format=CdcFormat(values_format="columns", flatten_values=True)

Parameter

Type

Default

Description

values_format

"columns" | "struct" | "map"

"columns"

Controls how old and new row values are laid out in the output.

flatten_values

bool

True

When True, new values are promoted to individual top-level columns instead of being packed into a container column.

Metadata columns (always present)

Every output row includes the following metadata columns regardless of values_format:

Column

Type

Description

@td.cdc.meta.op

str

Operation type: "i" (insert), "u" (update), or "d" (delete).

@td.cdc.meta.tx

str

Transaction identifier from the source database.

@td.cdc.meta.sq

int

Sequence number ordering changes within a transaction.

values_format = "columns"

Each source table column is represented as two explicit output columns — one for the old value and one for the new value:

Column

Description

@td.cdc.data.col.old.<COL_NAME>

Value before the change.

@td.cdc.data.col.new.<COL_NAME>

Value after the change. Present when flatten_values=False.

<COL_NAME>

Value after the change. Present when flatten_values=True (replaces the new prefixed column).

Semantics by operation:

Operation

@td.cdc.data.col.old.<COL_NAME>

New value column

Insert (i)

null

Inserted data

Update (u)

Value prior to the update

Value after the update

Delete (d)

null

Deleted data

values_format = "map"

Old and new values are packed into map columns keyed by table column name:

Column

Type

Description

@td.cdc.data.map.old

Map<str, str>

Old values. Present when flatten_values=False or for old values always.

@td.cdc.data.map.new

Map<str, str>

New values packed as a map. Present when flatten_values=False.

<COL_NAME>

New values as individual columns. Present when flatten_values=True (replaces @td.cdc.data.map.new).

Semantics by operation:

Operation

@td.cdc.data.map.old

New value column(s)

Insert (i)

null

Inserted data

Update (u)

Values prior to the update

Values after the update

Delete (d)

null

Deleted data

values_format = "struct"

Identical to "map" but old and new values are packed into struct fields instead of map columns:

Column

Type

Description

@td.cdc.data.row.old

struct

Old values. Present when flatten_values=False or for old values always.

@td.cdc.data.row.new

struct

New values packed as a struct. Present when flatten_values=False.

<COL_NAME>

New values as individual columns. Present when flatten_values=True (replaces @td.cdc.data.row.new).

Semantics by operation are identical to "map" above.

Start Position Examples

from tabsdata.connector.cdc.mysql.typing import GtidPosition, BinlogPosition
from tabsdata.connector.cdc.common.typing import TimestampPosition
from datetime import datetime, timezone

# Start from the beginning of the binlog
start_from="head"

# Start from the end — capture only new changes going forward
start_from="tail"

# Resume from a specific GTID
start_from=GtidPosition(gtid="3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5")

# Resume from a binlog file and byte offset
start_from=BinlogPosition(file="mysql-bin.000003", pos=154)

# Start from a specific timestamp
start_from=TimestampPosition(ts=datetime(2026, 1, 15, tzinfo=timezone.utc))

Buffer and Trigger Thresholds

The CDC connector uses a two-stage pipeline: changes accumulate in memory (buffer), are flushed to the working directory, then staged to the output location.

Buffer thresholds (memory → working directory)

Parameter

Type

Default

Description

buffer_max_rows

int

10,000

Flush to disk when row count in memory reaches this limit.

buffer_max_bytes

int | None

None

Flush to disk when byte size in memory reaches this limit.

buffer_max_sec

float

60.0

Flush to disk when this many seconds have elapsed since the last flush.

Trigger thresholds (working directory → stage location)

Parameter

Type

Default

Description

trigger_max_rows

int | None

None

Stage when total rows on disk reach this limit.

trigger_max_bytes

int | None

None

Stage when total bytes on disk reach this limit.

trigger_max_sec

float

60.0

Stage when this many seconds have elapsed since the last stage.

Other Parameters

Parameter

Type

Default

Description

poll_interval_sec

float

1.0

Seconds between polling cycles when no new events are available.

blocking_timeout_sec

float

1.0

Timeout in seconds for blocking reads from the binlog stream.

server_id

int

512

MySQL server ID for binlog replication. Must be unique across all replication clients.

start

datetime | None

None

Delay trigger execution until this datetime (UTC).

end

datetime | None

None

Stop the trigger at this datetime (UTC).


Limitations

  • Schema changes: ALTER TABLE, ADD/DROP COLUMN, and similar DDL operations on tracked tables are not detected or handled. If the source schema changes, the connector must be stopped and reconfigured.

  • TRUNCATE: TRUNCATE TABLE operations are not captured. A truncate on a tracked table will not produce any change events.

  • Large/Blob types: BLOB, CLOB, LONGBLOB, BYTEA, and TEXT (in some configurations) column types are not currently supported. Tables containing these types should exclude them from capture or use alternative ingestion methods.

  • Static table list: All tables in the tables parameter must exist before the trigger starts. The connector does not perform runtime table discovery.