Apache Druid

You can use this built-in Tabsdata subscriber to write to Apache Druid.

Note: DruidDest is currently marked as unstable and is subject to change in future releases.

DruidDest works by first uploading the output data as Parquet files to a cloud staging area (Amazon S3, Azure Blob Storage, or Google Cloud Storage), and then loading them into Druid via a native batch index_parallel task submitted to the Overlord REST API.

Installing the connector

To work with the Druid connector you are required to install the dependencies separately. Please run the following command in your terminal to install the dependencies.

$ pip install 'tabsdata[druid]'

Example (Subscriber - Druid)

The following example creates a subscriber named write_sales. It writes two Tabsdata tables to Apache Druid. The subscriber is automatically triggered by a new commit to any of its input tables, and writes data to the database without any modification. In this example, S3 is used as the cloud staging area.

import tabsdata as td

@td.subscriber(
    tables=["vendors", "items"],
    destination=td.DruidDest(
        td.DruidConn(
            overlord_url="http://druid-overlord:8090",
            staging=td.S3Config(
                bucket="acme-staging",
                region="us-east-1",
                base_path="staging/druid",
                credentials=td.S3AccessKeyCredentials(
                    td.EnvironmentSecret("AWS_ACCESS_KEY_ID"),
                    td.EnvironmentSecret("AWS_SECRET_ACCESS_KEY"),
                ),
            ),
            time_column="created_at",
            segment_granularity="DAY",
            credentials=td.UserPasswordCredentials(
                td.EnvironmentSecret("DRUID_USER"),
                td.EnvironmentSecret("DRUID_PASSWORD"),
            ),
        ),
        destination_tables=["vendors", "items"],
        if_table_exists="replace",
    ),
)
def write_sales(tf1: td.TableFrame, tf2: td.TableFrame):
    return tf1, tf2

After defining the function, register it with a Tabsdata collection and trigger its execution.

Setup (Subscriber - Druid)

To use DruidDest, you need two objects:

  1. A DruidConn that defines how to connect to the Druid Overlord and where to stage files in the cloud.

  2. A DruidDest that references the connection and specifies which datasource(s) to write to.

The following code uses placeholder values for defining a subscriber that reads Tabsdata tables and writes to Apache Druid:

import tabsdata as td

conn = td.DruidConn(
    overlord_url="",
    staging="",
    time_column="",
    segment_granularity="",
    max_concurrent_subtasks="",
    max_rows_per_segment="",
    max_rows_in_memory="",
    http_timeout_sec="",
    credentials=td.UserPasswordCredentials("", ""),
)

@td.subscriber(
    tables=["", ""],
    destination=td.DruidDest(
        conn=conn,
        destination_tables=["", ""],
        if_table_exists="",
    ),
    trigger_by=["", ""],
)
def (: td.TableFrame, : td.TableFrame):
    
    return , 

After defining the function, register it with a Tabsdata collection and trigger its execution.

Properties

Following properties are defined in the code above:

tables

<input_table1>, <input_table2>… are the names of the Tabsdata tables to be written to the external system.

destination

DruidConn

DruidConn holds the Druid Overlord connection details and the cloud staging configuration.

overlord_url

Base URL of the Druid Overlord REST API (e.g. "http://druid-overlord:8090").

staging

Cloud storage staging configuration. Data is first written as Parquet files to the configured cloud storage staging area and then ingested into Druid via a native batch index_parallel task submitted to the Overlord REST API. Provide one of:

S3 (Amazon S3)

staging = td.S3Config(
    bucket="my-data-bucket",
    region="us-east-1",
    base_path="staging/druid",
    credentials=td.S3AccessKeyCredentials(
        td.EnvironmentSecret("AWS_ACCESS_KEY_ID"),
        td.EnvironmentSecret("AWS_SECRET_ACCESS_KEY"),
    ),
)
  • bucket — The S3 bucket name.

  • region — The AWS region (e.g. "us-east-1").

  • base_path — The key prefix under which staged files are stored (e.g. "staging/druid").

  • credentials (S3AccessKeyCredentials) — AWS access key credentials for uploading files to S3.

Azure Blob Storage

staging = td.AzureConfig(
    container="my-container",
    base_path="staging/druid",
    credentials=td.AzureAccountKeyCredentials(
        td.EnvironmentSecret("AZURE_ACCOUNT_NAME"),
        td.EnvironmentSecret("AZURE_ACCOUNT_KEY"),
    ),
)
  • container — The Azure Blob container name.

  • base_path — The blob path prefix for staged files (e.g. "staging/druid").

  • credentials (AzureAccountKeyCredentials) — Azure account key credentials for uploading files.

Google Cloud Storage (GCS)

staging = td.GCSConfig(
    bucket="my-gcs-bucket",
    base_path="staging/druid",
    upload_credentials=td.GCPServiceAccountKeyCredentials(
        td.EnvironmentSecret("GCP_SERVICE_ACCOUNT_JSON"),
    ),
    hmac_credentials=td.HMACCredentials(
        td.EnvironmentSecret("GCS_HMAC_ACCESS_KEY"),
        td.EnvironmentSecret("GCS_HMAC_SECRET_KEY"),
    ),
)
  • bucket — The GCS bucket name.

  • base_path — The object path prefix for staged files (e.g. "staging/druid").

  • upload_credentials (GCPServiceAccountKeyCredentials, optional) — GCP service account credentials for uploading files via the GCS SDK. When not provided, application default credentials (ADC) are used.

  • hmac_credentials (HMACCredentials, optional) — GCS HMAC credentials for S3-compatible access used during Druid ingestion.

[optional] time_column

Name of the column to use as Druid’s required __time primary timestamp. When None, a constant value is used (2000-01-01T00:00:00.000Z). Defaults to None.

Important: If time_column is None, the connector uses a synthetic timestamp column internally. If your data contains a column named __td_druid_time, the ingestion will fail. Either rename that column in your data or set time_column to use one of your own columns as the Druid __time primary timestamp.

[optional] segment_granularity

Time-based partitioning granularity for new segments. One of "ALL", "DAY", "HOUR", "MONTH", or "YEAR". Defaults to "ALL".

[optional] max_concurrent_subtasks

Maximum number of parallel sub-tasks inside the index_parallel ingestion task. When set to 1, the supervisor runs inline without spawning subtasks. Defaults to 1.

[optional] max_rows_per_segment

Dynamic partitioning threshold — maximum number of rows per segment. Defaults to 5000000.

[optional] max_rows_in_memory

In-memory buffer size during ingestion. Defaults to 1000000.

[optional] http_timeout_sec

Timeout in seconds for HTTP requests to the Overlord API. Defaults to 300.

[optional] credentials

UserPasswordCredentials for HTTP Basic authentication against the Overlord API. When None, authentication is skipped. Defaults to None.

For quick testing with plain strings:

conn = td.DruidConn(
    overlord_url="http://druid-overlord:8090",
    staging=staging,
    credentials=td.UserPasswordCredentials("admin", "my_password"),
)

For production, credentials should be wrapped in EnvironmentSecret to avoid hardcoding secrets:

conn = td.DruidConn(
    overlord_url="http://druid-overlord:8090",
    staging=staging,
    credentials=td.UserPasswordCredentials(
        td.EnvironmentSecret("DRUID_USER"),
        td.EnvironmentSecret("DRUID_PASSWORD"),
    ),
)

destination_tables

The datasource(s) to store the data in. In Druid terminology, a datasource is the equivalent of a table. A single datasource name may be provided as a string; multiple names must be provided as a list.

Example with a single datasource:

destination = td.DruidDest(
    conn=conn,
    destination_tables="events",
)

Example with multiple datasources:

destination = td.DruidDest(
    conn=conn,
    destination_tables=["vendors", "items"],
    if_table_exists="replace",
)

[optional] if_table_exists

This is an optional property to define the strategy to follow when the datasource already exists. 'replace' will drop existing segments and replace with new data, and 'append' will add new segments alongside existing ones. Defaults to 'append'.

Writing to Multiple Tables

When writing to multiple datasources, the subscriber function must return one result per destination datasource, in the same order as they are listed in destination_tables:

@td.subscriber(
    name="write_to_druid",
    tables="collection/raw_data",
    destination=td.DruidDest(
        conn=conn,
        destination_tables=["vendors", "items"],
    ),
)
def write_to_druid(df: td.TableFrame):
    vendors = df.filter(td.col("type") == "vendor")
    items = df.filter(td.col("type") == "item")
    return vendors, items

Skipping a Table with None

If a subscriber returns None for a particular output, the corresponding destination datasource is skipped entirely (not created, not written to):

@td.subscriber(
    name="conditional_write",
    tables="collection/data",
    destination=td.DruidDest(
        conn=conn,
        destination_tables=["primary", "secondary"],
    ),
)
def conditional_write(df: td.TableFrame):
    primary = df.drop_nulls()
    return primary, None  # "secondary" datasource is skipped

[optional] trigger_by

<trigger_table1>, <trigger_table2> are the names of the Tabsdata tables that trigger the execution of the subscriber function.

subscriber_name

This is the user-defined function that processes the input tables and returns the table frames to be written to the destination.

Data Drift Support

Apache Druid creates the schema on write, which means schema changes are inherently supported. When new columns appear in the incoming data, Druid will automatically incorporate them into the datasource schema during ingestion. Similarly, if columns are removed from the source data, they will simply not appear in the newly ingested segments.

For a Druid subscriber with if_table_exists=replace, the existing segments are dropped and replaced entirely. The resulting schema exactly matches the newly exported data.

For a Druid subscriber with if_table_exists=append, new segments are added alongside existing ones. New columns in the incoming data will be present in the new segments, while older segments will have null values for those columns. Columns that existed in older segments but are absent from the new data will remain queryable from the older segments.