MySQL CDC
The MySQL CDC publisher captures row-level changes (inserts, updates, deletes) from a MySQL database by reading its binary log (binlog) in real time.
Note: MySQL CDC is currently marked as unstable and may undergo API changes in future releases.
Installing the Connector Dependencies
pip install mysql-connector-python==9.3.0
Example
from typing import Tuple
import tabsdata as td
conn = td.MySQLCdcConn(
uri="mysql://localhost:3306/ecommerce",
credentials=td.UserPasswordCredentials(
user=td.EnvironmentSecret("MYSQL_USER"),
password=td.EnvironmentSecret("MYSQL_PASS"),
),
)
trigger = td.MySQLCdcTrigger(
conn=conn,
tables=["ecommerce.orders", "ecommerce.order_items"],
start_from="tail",
)
@td.publisher(
trigger=trigger,
tables=["orders", "order_items"],
)
def capture_ecommerce(
orders: list[td.TableFrame],
order_items: list[td.TableFrame],
) -> Tuple[td.TableFrame, td.TableFrame]:
return td.concat(orders), td.concat(order_items)
This example publishes CDC data for the orders and order_items tables, capturing only changes that occur after the publisher has been first registered.
After defining the function, register it with a Tabsdata collection and trigger its execution.
Setup
MySQL must be configured to enable CDC before using this publisher. See MySQL Setup to Enable CDC.
Connection: MySQLCdcConn
MySQLCdcConn defines how to connect to the MySQL server. It accepts a standard MySQL URI and optional credentials.
conn = td.MySQLCdcConn(
uri="mysql://localhost:3306/my_database",
credentials=td.UserPasswordCredentials(
user=td.EnvironmentSecret("MYSQL_CDC_USER"),
password=td.EnvironmentSecret("MYSQL_CDC_PASSWORD"),
),
)
Parameter |
Type |
Description |
|---|---|---|
|
|
MySQL connection URI ( |
|
|
Optional user/password credentials. If |
|
|
Optional MySQL-specific connection parameters passed to the underlying driver. |
Trigger: MySQLCdcTrigger
MySQLCdcTrigger connects to MySQL, reads binlog events for the specified tables, and stages batches of changes.
trigger = td.MySQLCdcTrigger(
conn=conn,
tables=["ecommerce.orders", "ecommerce.order_items"],
start_from="tail",
)
tables
Specifies which database tables to monitor. Tables must be fully qualified as schema.table. Accepts a single string or a list of strings.
# Single table
tables="my_database.orders"
# Multiple tables
tables=["my_database.orders", "my_database.order_items"]
All tables must exist in the source database before the trigger starts. Tables created after the trigger is running will not be captured.
Note: Schema changes (such as
ALTER TABLE,ADD COLUMN,DROP COLUMN) on tracked tables are handled automatically. No additional configuration is required — the connector detects the change and adjusts its output accordingly.
start_from
Determines where the connector begins reading the binlog. On subsequent runs, the connector resumes automatically from its last committed position.
Value |
Type |
Behavior |
|---|---|---|
|
|
Start from the earliest available position in the binlog. |
|
|
Start from the current end of the binlog, capturing only new events. |
|
|
Resume from a specific Global Transaction ID. |
|
|
Resume from a specific binlog file name and byte offset. |
|
|
Start from the first event at or after the given timestamp. |
Advanced Configuration
CDC Output Format (cdc_format)
The cdc_format parameter controls how change data is structured in the output TableFrames, configured via CdcFormat.
from tabsdata.connector.cdc.common.typing import CdcFormat
cdc_format=CdcFormat(values_format="columns", flatten_values=True)
Parameter |
Type |
Default |
Description |
|---|---|---|---|
|
|
|
Controls how old and new row values are laid out in the output. |
|
|
|
When |
Metadata columns (always present)
Every output row includes the following metadata columns regardless of values_format:
Column |
Type |
Description |
|---|---|---|
|
|
Operation type: |
|
|
Transaction identifier from the source database. |
|
|
Sequence number ordering changes within a transaction. |
values_format = "columns"
Each source table column is represented as two explicit output columns — one for the old value and one for the new value:
Column |
Description |
|---|---|
|
Value before the change. |
|
Value after the change. Present when |
|
Value after the change. Present when |
Semantics by operation:
Operation |
|
New value column |
|---|---|---|
Insert ( |
|
Inserted data |
Update ( |
Value prior to the update |
Value after the update |
Delete ( |
|
Deleted data |
values_format = "map"
Old and new values are packed into map columns keyed by table column name:
Column |
Type |
Description |
|---|---|---|
|
|
Old values. Present when |
|
|
New values packed as a map. Present when |
|
— |
New values as individual columns. Present when |
Semantics by operation:
Operation |
|
New value column(s) |
|---|---|---|
Insert ( |
|
Inserted data |
Update ( |
Values prior to the update |
Values after the update |
Delete ( |
|
Deleted data |
values_format = "struct"
Identical to "map" but old and new values are packed into struct fields instead of map columns:
Column |
Type |
Description |
|---|---|---|
|
struct |
Old values. Present when |
|
struct |
New values packed as a struct. Present when |
|
— |
New values as individual columns. Present when |
Semantics by operation are identical to "map" above.
Output Examples
values_format="columns", flatten_values=True
@td.cdc.meta.op |
@td.cdc.meta.tx |
@td.cdc.meta.sq |
@td.cdc.meta.fmt |
@td.cdc.meta.flat |
id |
username |
first_name |
last_name |
@td.cdc.data.col.old.id |
@td.cdc.data.col.old.username |
@td.cdc.data.col.old.first_name |
@td.cdc.data.col.old.last_name |
@td.cdc.data.col.old.email |
|
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
i |
225e1410-…:18 |
1 |
columns |
true |
1 |
deals_1914 |
Johnny |
Woods |
replaced1800@gmail.com |
null |
null |
null |
null |
null |
u |
225e1410-…:19 |
1 |
columns |
true |
7 |
filename_2073 |
Gerardo |
Mcintosh |
surgery1995@duck.com |
7 |
filename_2073 |
Maren |
Puckett |
examinations2009@yahoo.com |
d |
225e1410-…:20 |
1 |
columns |
true |
2 |
incl_1972 |
Emery |
Reilly |
exposed2025@example.com |
null |
null |
null |
null |
null |
values_format="columns", flatten_values=False
@td.cdc.meta.op |
@td.cdc.meta.tx |
@td.cdc.meta.sq |
@td.cdc.meta.fmt |
@td.cdc.meta.flat |
@td.cdc.data.col.new.id |
@td.cdc.data.col.new.username |
@td.cdc.data.col.new.first_name |
@td.cdc.data.col.new.last_name |
@td.cdc.data.col.new.email |
@td.cdc.data.col.old.id |
@td.cdc.data.col.old.username |
@td.cdc.data.col.old.first_name |
@td.cdc.data.col.old.last_name |
@td.cdc.data.col.old.email |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
i |
225e1410-…:22 |
1 |
columns |
false |
1 |
beat_1843 |
Kathyrn |
Stokes |
true1875@outlook.com |
null |
null |
null |
null |
null |
u |
225e1410-…:23 |
1 |
columns |
false |
7 |
douglas_1901 |
Lawrence |
Bauer |
submission2025@yahoo.com |
7 |
douglas_1901 |
Hermine |
Preston |
commodities1921@outlook.com |
d |
225e1410-…:24 |
1 |
columns |
false |
7 |
douglas_1901 |
Lawrence |
Bauer |
submission2025@yahoo.com |
null |
null |
null |
null |
null |
values_format="struct", flatten_values=True
@td.cdc.meta.op |
@td.cdc.meta.tx |
@td.cdc.meta.sq |
@td.cdc.meta.fmt |
@td.cdc.meta.flat |
id |
username |
first_name |
last_name |
@td.cdc.data.row.old |
|
|---|---|---|---|---|---|---|---|---|---|---|
i |
225e1410-…:26 |
1 |
struct |
true |
1 |
loops_1939 |
Agueda |
Duncan |
clinical2027@protonmail.com |
{null,null,null,null,null} |
u |
225e1410-…:27 |
1 |
struct |
true |
8 |
evaluating_1979 |
Carletta |
Deleon |
wrapping1938@yandex.com |
{8,”evaluating_1979”,”Marlen”,”Estrada”,”hitachi1882@example.org”} |
d |
225e1410-…:28 |
1 |
struct |
true |
4 |
majority_1865 |
Eulah |
Whitney |
touched1819@yahoo.com |
{null,null,null,null,null} |
values_format="struct", flatten_values=False
@td.cdc.meta.op |
@td.cdc.meta.tx |
@td.cdc.meta.sq |
@td.cdc.meta.fmt |
@td.cdc.meta.flat |
@td.cdc.data.row.new |
@td.cdc.data.row.old |
|---|---|---|---|---|---|---|
i |
225e1410-…:30 |
1 |
struct |
false |
{1,”processes_2081”,”Leon”,”Pollard”,”browse1909@duck.com”} |
{null,null,null,null,null} |
u |
225e1410-…:31 |
1 |
struct |
false |
{5,”virtually_1823”,”Gavin”,”Macdonald”,”rocky2058@yandex.com”} |
{5,”virtually_1823”,”Erich”,”Hood”,”skin2004@gmail.com”} |
d |
225e1410-…:32 |
1 |
struct |
false |
{7,”thank_1865”,”Lashawna”,”Petty”,”classical2074@yandex.com”} |
{null,null,null,null,null} |
values_format="map", flatten_values=True
@td.cdc.meta.op |
@td.cdc.meta.tx |
@td.cdc.meta.sq |
@td.cdc.meta.fmt |
@td.cdc.meta.flat |
id |
username |
first_name |
last_name |
@td.cdc.data.map.old |
|
|---|---|---|---|---|---|---|---|---|---|---|
i |
225e1410-…:34 |
1 |
map |
true |
1 |
uni_2028 |
Sandy |
Hinton |
husband1960@example.org |
{“id”:null,”username”:null,”first_name”:null,”last_name”:null,”email”:null} |
u |
225e1410-…:35 |
1 |
map |
true |
1 |
uni_2028 |
Kelle |
Noel |
see2021@example.com |
{“id”:1,”username”:”uni_2028”,”first_name”:”Sandy”,”last_name”:”Hinton”,”email”:”husband1960@example.org”} |
d |
225e1410-…:36 |
1 |
map |
true |
1 |
uni_2028 |
Kelle |
Noel |
see2021@example.com |
{“id”:null,”username”:null,”first_name”:null,”last_name”:null,”email”:null} |
values_format="map", flatten_values=False
@td.cdc.meta.op |
@td.cdc.meta.tx |
@td.cdc.meta.sq |
@td.cdc.meta.fmt |
@td.cdc.meta.flat |
@td.cdc.data.map.new |
@td.cdc.data.map.old |
|---|---|---|---|---|---|---|
i |
a4a17b92-…:38 |
1 |
map |
false |
{“id”:1,”username”:”vacancies_2045”,”first_name”:”Tony”,”last_name”:”Oliver”,”email”:”rec1977@yandex.com”} |
{“id”:null,”username”:null,”first_name”:null,”last_name”:null,”email”:null} |
u |
a4a17b92-…:39 |
1 |
map |
false |
{“id”:7,”username”:”strategies_1852”,”first_name”:”Foster”,”last_name”:”Nolan”,”email”:”ambient1829@example.com”} |
{“id”:7,”username”:”strategies_1852”,”first_name”:”Doreatha”,”last_name”:”Mclaughlin”,”email”:”buffalo2065@yandex.com”} |
d |
a4a17b92-…:40 |
1 |
map |
false |
{“id”:8,”username”:”boc_1991”,”first_name”:”Peg”,”last_name”:”Vang”,”email”:”blacks1939@yandex.com”} |
{“id”:null,”username”:null,”first_name”:null,”last_name”:null,”email”:null} |
Start Position Examples
from tabsdata.connector.cdc.mysql.typing import GtidPosition, BinlogPosition
from tabsdata.connector.cdc.common.typing import TimestampPosition
from datetime import datetime, timezone
# Start from the beginning of the binlog
start_from="head"
# Start from the end — capture only new changes going forward
start_from="tail"
# Resume from a specific GTID
start_from=GtidPosition(gtid="3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5")
# Resume from a binlog file and byte offset
start_from=BinlogPosition(file="mysql-bin.000003", pos=154)
# Start from a specific timestamp
start_from=TimestampPosition(ts=datetime(2026, 1, 15, tzinfo=timezone.utc))
Buffer and Trigger Thresholds
The CDC connector uses a two-stage pipeline: changes accumulate in memory (buffer), are flushed to the working directory, then staged to the output location.
Buffer thresholds (memory → working directory)
Parameter |
Type |
Default |
Description |
|---|---|---|---|
|
|
|
Flush to disk when row count in memory reaches this limit. |
|
|
|
Flush to disk when byte size in memory reaches this limit. |
|
|
|
Flush to disk when this many seconds have elapsed since the last flush. |
Trigger thresholds (working directory → stage location)
Parameter |
Type |
Default |
Description |
|---|---|---|---|
|
|
|
Stage when total rows on disk reach this limit. |
|
|
|
Stage when total bytes on disk reach this limit. |
|
|
|
Stage when this many seconds have elapsed since the last stage. |
Other Parameters
Parameter |
Type |
Default |
Description |
|---|---|---|---|
|
|
|
Seconds between polling cycles when no new events are available. |
|
|
|
Timeout in seconds for blocking reads from the binlog stream. |
|
|
|
MySQL server ID for binlog replication. Must be unique across all replication clients. |
|
|
|
Delay trigger execution until this datetime (UTC). |
|
|
|
Stop the trigger at this datetime (UTC). |
Limitations
TRUNCATE:
TRUNCATE TABLEoperations are not captured. A truncate on a tracked table will not produce any change events.Large/Blob types:
BLOB,CLOB,LONGBLOB,BYTEA, andTEXT(in some configurations) column types are not currently supported. Tables containing these types should exclude them from capture or use alternative ingestion methods.Static table list: All tables in the
tablesparameter must exist before the trigger starts. The connector does not perform runtime table discovery.
MySQL Setup to Enable CDC
The steps below are provided for convenience. Refer to the MySQL documentation for comprehensive and up-to-date configuration instructions.
Server Configuration
Add the following to your MySQL configuration file (my.cnf or my.ini) and restart the server:
[mysqld]
server-id = 1
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
binlog_row_metadata = FULL
gtid_mode = ON
enforce_gtid_consistency = ON
Parameter |
Description |
|---|---|
|
Enables binary logging with a unique server identifier. |
|
Row-based format captures individual column values rather than SQL statements. |
|
Logs all columns for every change, not just modified ones. |
|
Required for resuming from a precise position across server restarts. |
Create a CDC User
Create a dedicated MySQL user with the privileges required for binlog replication:
CREATE USER 'cdc_user'@'%' IDENTIFIED BY 'cdc_password';
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdc_user'@'%';
GRANT SELECT ON my_database.* TO 'cdc_user'@'%';
FLUSH PRIVILEGES;
REPLICATION SLAVE and REPLICATION CLIENT are the minimum privileges needed to connect as a binlog reader. SELECT is required for the initial table schema discovery.