Triggers

class CronTrigger(
mask: str,
start: datetime | None = None,
end: datetime | None = None,
)

Bases: object

Categories:

trigger

Represents a cron-based trigger for scheduling Tabsdata publishers, subscribers and transformers.

This class defines a trigger based on a cron expression, with optional start and end times to constrain its activation period. It validates the cron mask and time boundaries to ensure the trigger is well-formed.

property end: str | None

Gets the end time of the trigger as a timezone-aware ISO 8601 string with ‘Z’ timezone.

Returns:

The end time in ISO format, or None if not set.

property mask: str

Gets the cron expression mask.

Returns:

The cron expression string.

property start: str | None

Gets the start time of the trigger as a timezone-aware ISO 8601 string with ‘Z’ timezone.

Returns:

The start time in ISO format, or None if not set.

class Db2CdcTrigger(
**kwargs,
)

Bases: CdcTrigger

Categories:

trigger

A CdcTrigger that captures changes from a Db2 database using SQL Replication.

This trigger connects to a Db2 server, reads change data capture events (inserts, updates, deletes) on specified tables via SQL Replication, and stages them.

property blocking_timeout_seconds: int

Timeout in seconds for blocking reads from the replication stream.

collect_tables(
session: Any,
cdc_format: CdcFormat = CdcFormat(values_format='columns', flatten_values=True),
) dict[str, TrackedTable]
harvest_tables(
session: Any,
cdc_format: CdcFormat = CdcFormat(values_format='columns', flatten_values=True),
) dict[str, TrackedTable]
property start_from: Literal['head', 'tail'] | CommitSeqPosition | TableCommitSeqPosition | TimestampPosition | None

Position to start reading changes from.

class KafkaTrigger(
**kwargs,
)

Bases: StageTrigger

Categories:

trigger

A StageTrigger that consumes messages from a Kafka topic and stages them.

This trigger connects to a Kafka cluster, consumes messages from a specified topic, and uses rollover conditions (time, size, and message count) to buffer and stage the data.

property conn: KafkaConn

Kafka consumer connection configuration.

property data_format: Literal['avro', 'json'] | Protobuf

Data format of the messages (“avro”, “json”, “protobuf”).

property dlq_topic: str | None

Dead-letter queue topic.

property group_id: str | None

Kafka consumer group ID.

property messages_rollover: Annotated[int, Gt(gt=0)] | None

Message count-based rollover configuration.

run(
context: StageTriggerContext,
)

Run the Kafka stage trigger to consume messages, buffer them, and stage them based on rollover conditions.

Parameters:

context – The StageTriggerContext instance.

property schema: str

Schema string for deserializing messages.

property size_rollover_mb: Annotated[int, Gt(gt=0)] | None

Size-based rollover configuration. Size is specified in megabytes.

property time_rollover_secs: int

Timeout in seconds for consuming each message.

property topic: str

Kafka topic to consume from.

class MySQLCdcTrigger(
**kwargs,
)

Bases: CdcTrigger

Categories:

trigger

A CdcTrigger that captures changes from a MySQL database using binlog replication.

This trigger connects to a MySQL server, reads the binary log for change data capture events (inserts, updates, deletes) on specified tables, and stages them.

property blocking_timeout_seconds: int

Timeout in seconds for blocking reads from the binlog stream.

collect_tables(
session: Any,
cdc_format: CdcFormat = CdcFormat(values_format='columns', flatten_values=True),
) dict[str, TrackedTable]
harvest_tables(
session: Any,
cdc_format: CdcFormat = CdcFormat(values_format='columns', flatten_values=True),
) dict[str, TrackedTable]
property server_id: int

MySQL server ID for binlog replication.

property start_from: Literal['head', 'tail'] | GtidPosition | BinlogPosition | TimestampPosition | None

Position to start reading changes from.

class PostgresCdcTrigger(
**kwargs,
)

Bases: CdcTrigger

Categories:

trigger

A CdcTrigger that captures changes from a PostgreSQL database using WAL logical replication.

This trigger connects to a PostgreSQL server, reads the write-ahead log via the wal2json output plugin for change data capture events (inserts, updates, deletes) on specified tables, and stages them.

property blocking_timeout_seconds: int

Timeout in seconds for blocking reads from the replication stream.

collect_tables(
session: Any,
cdc_format: CdcFormat = CdcFormat(values_format='columns', flatten_values=True),
) dict[str, TrackedTable]
harvest_tables(
session: Any,
cdc_format: CdcFormat = CdcFormat(values_format='columns', flatten_values=True),
) dict[str, TrackedTable]
property publication_name: str | None

Name of the PostgreSQL publication to subscribe to.

property replication_slot: str | None

Name of the logical replication slot.

property slot_behavior: Literal['create', 'reuse']

Whether to create a new replication slot or reuse an existing one.

property start_from: Literal['head', 'tail'] | LsnPosition | None

Position to start reading changes from.