MySQL CDC
The MySQL CDC publisher captures row-level changes (inserts, updates, deletes) from a MySQL database by reading its binary log (binlog) in real time.
Note: MySQL CDC is currently marked as unstable and may undergo API changes in future releases.
Installing the Connector Dependencies
pip install mysql-connector-python==9.3.0
Example
from typing import Tuple
import tabsdata as td
conn = td.MySQLCdcConn(
uri="mysql://localhost:3306/ecommerce",
credentials=td.UserPasswordCredentials(
user=td.EnvironmentSecret("MYSQL_USER"),
password=td.EnvironmentSecret("MYSQL_PASS"),
),
)
trigger = td.MySQLCdcTrigger(
conn=conn,
tables=["ecommerce.orders", "ecommerce.order_items"],
start_from="tail",
)
@td.publisher(
trigger=trigger,
tables=["orders", "order_items"],
)
def capture_ecommerce(
orders: list[td.TableFrame],
order_items: list[td.TableFrame],
) -> Tuple[td.TableFrame, td.TableFrame]:
return td.concat(orders), td.concat(order_items)
This example publishes CDC data for the orders and order_items tables, capturing only changes that occur after the publisher has been first registered.
After defining the function, register it with a Tabsdata collection and trigger its execution.
Setup
Configuring MySQL for CDC
Before using the MySQL CDC publisher, the source MySQL database must be configured to enable row-based binary logging and GTID support.
Server Configuration
Add the following to your MySQL configuration file (my.cnf or my.ini) and restart the server:
[mysqld]
server-id = 1
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
binlog_row_metadata = FULL
gtid_mode = ON
enforce_gtid_consistency = ON
Parameter |
Description |
|---|---|
|
Enables binary logging with a unique server identifier. |
|
Row-based format captures individual column values rather than SQL statements. |
|
Logs all columns for every change, not just modified ones. |
|
Required for resuming from a precise position across server restarts. |
Create a CDC User
Create a dedicated MySQL user with the privileges required for binlog replication:
CREATE USER 'cdc_user'@'%' IDENTIFIED BY 'cdc_password';
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdc_user'@'%';
GRANT SELECT ON my_database.* TO 'cdc_user'@'%';
FLUSH PRIVILEGES;
REPLICATION SLAVE and REPLICATION CLIENT are the minimum privileges needed to connect as a binlog reader. SELECT is required for the initial table schema discovery.
Connection: MySQLCdcConn
MySQLCdcConn defines how to connect to the MySQL server. It accepts a standard MySQL URI and optional credentials.
conn = td.MySQLCdcConn(
uri="mysql://localhost:3306/my_database",
credentials=td.UserPasswordCredentials(
user=td.EnvironmentSecret("MYSQL_CDC_USER"),
password=td.EnvironmentSecret("MYSQL_CDC_PASSWORD"),
),
)
Parameter |
Type |
Description |
|---|---|---|
|
|
MySQL connection URI ( |
|
|
Optional user/password credentials. If |
|
|
Optional MySQL-specific connection parameters passed to the underlying driver. |
Trigger: MySQLCdcTrigger
MySQLCdcTrigger connects to MySQL, reads binlog events for the specified tables, and stages batches of changes.
trigger = td.MySQLCdcTrigger(
conn=conn,
tables=["ecommerce.orders", "ecommerce.order_items"],
start_from="tail",
)
tables
Specifies which database tables to monitor. Tables must be fully qualified as schema.table. Accepts a single string or a list of strings.
# Single table
tables="my_database.orders"
# Multiple tables
tables=["my_database.orders", "my_database.order_items"]
All tables must exist in the source database before the trigger starts. Tables created after the trigger is running will not be captured.
start_from
Determines where the connector begins reading the binlog. On subsequent runs, the connector resumes automatically from its last committed position.
Value |
Type |
Behavior |
|---|---|---|
|
|
Start from the earliest available position in the binlog. |
|
|
Start from the current end of the binlog, capturing only new events. |
|
|
Resume from a specific Global Transaction ID. |
|
|
Resume from a specific binlog file name and byte offset. |
|
|
Start from the first event at or after the given timestamp. |
Advanced Configuration
CDC Output Format (cdc_format)
The cdc_format parameter controls how change data is structured in the output TableFrames, configured via CdcFormat.
from tabsdata.connector.cdc.common.typing import CdcFormat
cdc_format=CdcFormat(values_format="columns", flatten_values=True)
Parameter |
Type |
Default |
Description |
|---|---|---|---|
|
|
|
Controls how old and new row values are laid out in the output. |
|
|
|
When |
Metadata columns (always present)
Every output row includes the following metadata columns regardless of values_format:
Column |
Type |
Description |
|---|---|---|
|
|
Operation type: |
|
|
Transaction identifier from the source database. |
|
|
Sequence number ordering changes within a transaction. |
values_format = "columns"
Each source table column is represented as two explicit output columns — one for the old value and one for the new value:
Column |
Description |
|---|---|
|
Value before the change. |
|
Value after the change. Present when |
|
Value after the change. Present when |
Semantics by operation:
Operation |
|
New value column |
|---|---|---|
Insert ( |
|
Inserted data |
Update ( |
Value prior to the update |
Value after the update |
Delete ( |
|
Deleted data |
values_format = "map"
Old and new values are packed into map columns keyed by table column name:
Column |
Type |
Description |
|---|---|---|
|
|
Old values. Present when |
|
|
New values packed as a map. Present when |
|
— |
New values as individual columns. Present when |
Semantics by operation:
Operation |
|
New value column(s) |
|---|---|---|
Insert ( |
|
Inserted data |
Update ( |
Values prior to the update |
Values after the update |
Delete ( |
|
Deleted data |
values_format = "struct"
Identical to "map" but old and new values are packed into struct fields instead of map columns:
Column |
Type |
Description |
|---|---|---|
|
struct |
Old values. Present when |
|
struct |
New values packed as a struct. Present when |
|
— |
New values as individual columns. Present when |
Semantics by operation are identical to "map" above.
Start Position Examples
from tabsdata.connector.cdc.mysql.typing import GtidPosition, BinlogPosition
from tabsdata.connector.cdc.common.typing import TimestampPosition
from datetime import datetime, timezone
# Start from the beginning of the binlog
start_from="head"
# Start from the end — capture only new changes going forward
start_from="tail"
# Resume from a specific GTID
start_from=GtidPosition(gtid="3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5")
# Resume from a binlog file and byte offset
start_from=BinlogPosition(file="mysql-bin.000003", pos=154)
# Start from a specific timestamp
start_from=TimestampPosition(ts=datetime(2026, 1, 15, tzinfo=timezone.utc))
Buffer and Trigger Thresholds
The CDC connector uses a two-stage pipeline: changes accumulate in memory (buffer), are flushed to the working directory, then staged to the output location.
Buffer thresholds (memory → working directory)
Parameter |
Type |
Default |
Description |
|---|---|---|---|
|
|
|
Flush to disk when row count in memory reaches this limit. |
|
|
|
Flush to disk when byte size in memory reaches this limit. |
|
|
|
Flush to disk when this many seconds have elapsed since the last flush. |
Trigger thresholds (working directory → stage location)
Parameter |
Type |
Default |
Description |
|---|---|---|---|
|
|
|
Stage when total rows on disk reach this limit. |
|
|
|
Stage when total bytes on disk reach this limit. |
|
|
|
Stage when this many seconds have elapsed since the last stage. |
Other Parameters
Parameter |
Type |
Default |
Description |
|---|---|---|---|
|
|
|
Seconds between polling cycles when no new events are available. |
|
|
|
Timeout in seconds for blocking reads from the binlog stream. |
|
|
|
MySQL server ID for binlog replication. Must be unique across all replication clients. |
|
|
|
Delay trigger execution until this datetime (UTC). |
|
|
|
Stop the trigger at this datetime (UTC). |
Limitations
Schema changes:
ALTER TABLE,ADD/DROP COLUMN, and similar DDL operations on tracked tables are not detected or handled. If the source schema changes, the connector must be stopped and reconfigured.TRUNCATE:
TRUNCATE TABLEoperations are not captured. A truncate on a tracked table will not produce any change events.Large/Blob types:
BLOB,CLOB,LONGBLOB,BYTEA, andTEXT(in some configurations) column types are not currently supported. Tables containing these types should exclude them from capture or use alternative ingestion methods.Static table list: All tables in the
tablesparameter must exist before the trigger starts. The connector does not perform runtime table discovery.