Working with Kafka
The Kafka Trigger enables Publishers to ingest data directly from a Kafka topic into a Tabsdata table. This allows for seamless streaming of event-based data into Tabsdata.
How It Works
The Kafka Trigger operates in tandem with the Tabsdata Stage source (td.Stage()).
Staging: When the trigger is active, Tabsdata acts as a consumer, pulling messages from the specified Kafka topic and holding them in a private storage area called a Stage.
Buffering: Messages are accumulated in the Stage based on specific thresholds (Time, Size, or Count).
Publishing: Once a threshold is met, the accumulated data is passed to your publisher function (as a list of
TableFrameobjects), concatenated, and committed as a new incremental table version.
Supported Formats
Data Formats: JSON, Avro, Protobuf.
Schema Management: You can provide the schema string directly or connect to external registries (Confluent Schema Registry or AWS Glue Schema Registry).
Basic Configuration Example
The following example demonstrates how to configure a publisher to consume JSON data from a Kafka topic named my_topic without an external registry.
import tabsdata as td
# Define your schema (or load it from a file)
JSON_SCHEMA = "..."
@td.publisher(
# The source must be a Stage when using KafkaTrigger
source=td.Stage(),
tables="flight_events",
trigger_by=td.KafkaTrigger(
conn=td.KafkaConn(
servers="my_kafka:9092",
credentials=td.UserPasswordCredentials("user", "password"),
group_id="tabsdata_consumer_group",
),
topic="my_topic",
data_format="json",
schema=JSON_SCHEMA,
# Commit data every 30 seconds
time_rollover_secs=30,
),
)
def flight_events_publisher(tfs: list[td.TableFrame]) -> td.TableFrame:
# Concatenate the batch of messages into a single frame
return td.concat(tfs)
Configuration Reference
The KafkaTrigger object requires two main components: the Trigger Configuration (logic for handling data) and the Connection Configuration (details for connecting to the broker).
1. Kafka Trigger Configuration (td.KafkaTrigger)
These parameters control how data is consumed and when it is committed to the table.
Parameter |
Type |
Description |
|---|---|---|
|
|
Required. The connection object containing server and credential details. |
|
|
Required. The Kafka topic to consume from. |
|
|
Required. Format of the messages. Options: |
|
|
The schema string used to deserialize the messages (if not using a registry). |
|
|
Optional. Overrides the consumer group ID specified in the connection. |
|
|
Optional. A topic to send messages that fail deserialization (Dead Letter Queue). |
Rollover Strategies (Buffering) You must define at least the time rollover strategy.
Parameter |
Default |
Description |
|---|---|---|
|
|
The maximum time (in seconds) to buffer messages before committing. |
|
|
The maximum size (in MB) of the buffer before committing. |
|
|
The maximum number of messages to buffer before committing. |
Scheduling
Parameter |
Type |
Description |
|---|---|---|
|
|
Optional. The specific time to start consuming. |
|
|
Optional. The specific time to stop consuming. |
Note on Scheduling:
startandendaccept Pythondatetimeobjects with timezone info or ISO 8601 strings (e.g.,2024-01-01T12:00:00Z). If omitted, the trigger runs indefinitely from the moment it is registered.
2. Kafka Connection Configuration (td.KafkaConn)
These parameters control the connection to the Kafka Broker.
Parameter |
Description |
|---|---|
|
Required. The Kafka bootstrap servers (e.g., |
|
Optional. Authentication credentials (e.g., |
|
Optional. The default consumer group ID for this connection. |
|
Optional. Configuration object for Confluent or AWS Glue Schema Registry (see section below). |
|
Boolean. Whether to strictly enforce connection parameters. |
|
Source-specific configuration parameters (defaults to |
Schema Registry Configuration
If your Kafka messages rely on an external schema registry for deserialization (e.g., Avro or Protobuf), you can configure the schema_registry parameter within the td.KafkaConn object.
Tabsdata supports both Confluent Schema Registry and AWS Glue Schema Registry.
Confluent Schema Registry
Use this configuration when connecting to a self-hosted or Confluent Cloud schema registry.
Configuration Object: td.ConfluentSchemaRegistry
Parameter |
Type |
Description |
|---|---|---|
|
|
Required. The full URL of the Confluent Schema Registry. |
|
|
Optional. Credentials object containing the API Key and Secret. |
|
|
Optional. A dictionary of additional configuration properties. |
Example Snippet:
conn = td.KafkaConn(
# ... basic kafka params ...
schema_registry=td.ConfluentSchemaRegistry(
url="https://my-registry:8081",
credentials=td.ConfluentSchemaRegistryCredentialsSpec(key="KEY", secret="SECRET")
)
)
AWS Glue Schema Registry
Use this configuration when schemas are stored in the AWS Glue Schema Registry.
Configuration Object: td.AwsGlueSchemaRegistry
Parameter |
Type |
Description |
|---|---|---|
|
|
Required. The AWS region where the registry resides (e.g., |
|
|
Required. The name of the registry in AWS Glue. |
|
|
Required. The specific schema name to retrieve. |
|
|
Required. AWS credentials (access key, secret key, session token). |
|
|
Optional. A dictionary of additional configuration properties. |
Example Snippet:
conn = td.KafkaConn(
# ... basic kafka params ...
schema_registry=td.AwsGlueSchemaRegistry(
region_name="us-west-2",
registry_name="my-glue-registry",
schema_name="flight-events-schema",
credentials=td.AwsGlueSchemaRegistryCredentialsSpec(
access_key="ACCESS_KEY",
secret_key="SECRET_KEY"
)
)
)
Lifecycle Management (CLI)
You can manage the execution state of your Kafka Triggers using the Tabsdata CLI.
Starting and Stopping Triggers To manually pause or resume the ingestion of data without deleting the publisher:
# Stop a specific function trigger
td stage stop --coll <COLLECTION_NAME> --name <FUNCTION_NAME>
# Start a specific function trigger
td stage start --coll <COLLECTION_NAME> --name <FUNCTION_NAME>
Deploying in a Disabled State
If you wish to register a publisher but prevent it from immediately consuming data, use the --disabled flag during registration. You can then manually start it later using the CLI command above.