Working with Kafka

The Kafka Trigger enables Publishers to ingest data directly from a Kafka topic into a Tabsdata table. This allows for seamless streaming of event-based data into Tabsdata.

How It Works

The Kafka Trigger operates in tandem with the Tabsdata Stage source (td.Stage()).

  1. Staging: When the trigger is active, Tabsdata acts as a consumer, pulling messages from the specified Kafka topic and holding them in a private storage area called a Stage.

  2. Buffering: Messages are accumulated in the Stage based on specific thresholds (Time, Size, or Count).

  3. Publishing: Once a threshold is met, the accumulated data is passed to your publisher function (as a list of TableFrame objects), concatenated, and committed as a new incremental table version.

Supported Formats

  • Data Formats: JSON, Avro, Protobuf.

  • Schema Management: You can provide the schema string directly or connect to external registries (Confluent Schema Registry or AWS Glue Schema Registry).


Basic Configuration Example

The following example demonstrates how to configure a publisher to consume JSON data from a Kafka topic named my_topic without an external registry.

import tabsdata as td

# Define your schema (or load it from a file)
JSON_SCHEMA = "..." 

@td.publisher(
    # The source must be a Stage when using KafkaTrigger
    source=td.Stage(),
    tables="flight_events",
    trigger_by=td.KafkaTrigger(
        conn=td.KafkaConn(
            servers="my_kafka:9092",
            credentials=td.UserPasswordCredentials("user", "password"),
            group_id="tabsdata_consumer_group",
        ),
        topic="my_topic",
        data_format="json",
        schema=JSON_SCHEMA,
        # Commit data every 30 seconds
        time_rollover_secs=30,
    ),
)
def flight_events_publisher(tfs: list[td.TableFrame]) -> td.TableFrame:
    # Concatenate the batch of messages into a single frame
    return td.concat(tfs)


Configuration Reference

The KafkaTrigger object requires two main components: the Trigger Configuration (logic for handling data) and the Connection Configuration (details for connecting to the broker).

1. Kafka Trigger Configuration (td.KafkaTrigger)

These parameters control how data is consumed and when it is committed to the table.

Parameter

Type

Description

conn

KafkaConn

Required. The connection object containing server and credential details.

topic

str

Required. The Kafka topic to consume from.

data_format

str

Required. Format of the messages. Options: "avro", "json", "protobuf".

schema

str

The schema string used to deserialize the messages (if not using a registry).

group_id

str

Optional. Overrides the consumer group ID specified in the connection.

dlq_topic

str

Optional. A topic to send messages that fail deserialization (Dead Letter Queue).

Rollover Strategies (Buffering) You must define at least the time rollover strategy.

Parameter

Default

Description

time_rollover_secs

60

The maximum time (in seconds) to buffer messages before committing.

size_rollover_mb

None

The maximum size (in MB) of the buffer before committing.

messages_rollover

None

The maximum number of messages to buffer before committing.

Scheduling

Parameter

Type

Description

start

datetime

Optional. The specific time to start consuming.

end

datetime

Optional. The specific time to stop consuming.

Note on Scheduling: start and end accept Python datetime objects with timezone info or ISO 8601 strings (e.g., 2024-01-01T12:00:00Z). If omitted, the trigger runs indefinitely from the moment it is registered.

2. Kafka Connection Configuration (td.KafkaConn)

These parameters control the connection to the Kafka Broker.

Parameter

Description

servers

Required. The Kafka bootstrap servers (e.g., localhost:9092).

credentials

Optional. Authentication credentials (e.g., td.UserPasswordCredentials).

group_id

Optional. The default consumer group ID for this connection.

schema_registry

Optional. Configuration object for Confluent or AWS Glue Schema Registry (see section below).

enforce_connection_params

Boolean. Whether to strictly enforce connection parameters.

cx_src_configs

Source-specific configuration parameters (defaults to None).


Schema Registry Configuration

If your Kafka messages rely on an external schema registry for deserialization (e.g., Avro or Protobuf), you can configure the schema_registry parameter within the td.KafkaConn object.

Tabsdata supports both Confluent Schema Registry and AWS Glue Schema Registry.

Confluent Schema Registry

Use this configuration when connecting to a self-hosted or Confluent Cloud schema registry.

Configuration Object: td.ConfluentSchemaRegistry

Parameter

Type

Description

url

str

Required. The full URL of the Confluent Schema Registry.

credentials

ConfluentSchemaRegistryCredentialsSpec

Optional. Credentials object containing the API Key and Secret.

configs

dict

Optional. A dictionary of additional configuration properties.

Example Snippet:

conn = td.KafkaConn(
    # ... basic kafka params ...
    schema_registry=td.ConfluentSchemaRegistry(
        url="https://my-registry:8081",
        credentials=td.ConfluentSchemaRegistryCredentialsSpec(key="KEY", secret="SECRET")
    )
)

AWS Glue Schema Registry

Use this configuration when schemas are stored in the AWS Glue Schema Registry.

Configuration Object: td.AwsGlueSchemaRegistry

Parameter

Type

Description

region_name

str

Required. The AWS region where the registry resides (e.g., us-east-1).

registry_name

str

Required. The name of the registry in AWS Glue.

schema_name

str

Required. The specific schema name to retrieve.

credentials

AwsGlueSchemaRegistryCredentialsSpec

Required. AWS credentials (access key, secret key, session token).

configs

dict

Optional. A dictionary of additional configuration properties.

Example Snippet:

conn = td.KafkaConn(
    # ... basic kafka params ...
    schema_registry=td.AwsGlueSchemaRegistry(
        region_name="us-west-2",
        registry_name="my-glue-registry",
        schema_name="flight-events-schema",
        credentials=td.AwsGlueSchemaRegistryCredentialsSpec(
            access_key="ACCESS_KEY", 
            secret_key="SECRET_KEY"
        )
    )
)


Lifecycle Management (CLI)

You can manage the execution state of your Kafka Triggers using the Tabsdata CLI.

Starting and Stopping Triggers To manually pause or resume the ingestion of data without deleting the publisher:

# Stop a specific function trigger
td stage stop --coll <COLLECTION_NAME> --name <FUNCTION_NAME>

# Start a specific function trigger
td stage start --coll <COLLECTION_NAME> --name <FUNCTION_NAME>

Deploying in a Disabled State If you wish to register a publisher but prevent it from immediately consuming data, use the --disabled flag during registration. You can then manually start it later using the CLI command above.