Apache Druid
You can use this built-in Tabsdata subscriber to write to Apache Druid.
Note:
DruidDestis currently marked as unstable and is subject to change in future releases.
DruidDest works by first uploading the output data as Parquet files to a cloud staging area (Amazon S3, Azure Blob
Storage, or Google Cloud Storage), and then loading them into Druid via a native batch index_parallel task submitted
to the Overlord REST API.
Installing the connector
To work with the Druid connector you are required to install the dependencies separately. Please run the following command in your terminal to install the dependencies.
$ pip install 'tabsdata[druid]'
Example (Subscriber - Druid)
The following example creates a subscriber named write_sales. It writes two Tabsdata tables to Apache Druid. The
subscriber is automatically triggered by a new commit to any of its input tables, and writes data to the database
without any modification. In this example, S3 is used as the cloud staging area.
import tabsdata as td
@td.subscriber(
tables=["vendors", "items"],
destination=td.DruidDest(
td.DruidConn(
overlord_url="http://druid-overlord:8090",
staging=td.S3Config(
bucket="acme-staging",
region="us-east-1",
base_path="staging/druid",
credentials=td.S3AccessKeyCredentials(
td.EnvironmentSecret("AWS_ACCESS_KEY_ID"),
td.EnvironmentSecret("AWS_SECRET_ACCESS_KEY"),
),
),
time_column="created_at",
segment_granularity="DAY",
credentials=td.UserPasswordCredentials(
td.EnvironmentSecret("DRUID_USER"),
td.EnvironmentSecret("DRUID_PASSWORD"),
),
),
destination_tables=["vendors", "items"],
if_table_exists="replace",
),
)
def write_sales(tf1: td.TableFrame, tf2: td.TableFrame):
return tf1, tf2
After defining the function, register it with a Tabsdata collection and trigger its execution.
Setup (Subscriber - Druid)
To use DruidDest, you need two objects:
A
DruidConnthat defines how to connect to the Druid Overlord and where to stage files in the cloud.A
DruidDestthat references the connection and specifies which datasource(s) to write to.
The following code uses placeholder values for defining a subscriber that reads Tabsdata tables and writes to Apache Druid:
import tabsdata as td
conn = td.DruidConn(
overlord_url="",
staging="",
time_column="",
segment_granularity="",
max_concurrent_subtasks="",
max_rows_per_segment="",
max_rows_in_memory="",
http_timeout_sec="",
credentials=td.UserPasswordCredentials("", ""),
)
@td.subscriber(
tables=["", ""],
destination=td.DruidDest(
conn=conn,
destination_tables=["", ""],
if_table_exists="",
),
trigger_by=["", ""],
)
def (: td.TableFrame, : td.TableFrame):
return ,
After defining the function, register it with a Tabsdata collection and trigger its execution.
Properties
Following properties are defined in the code above:
tables
<input_table1>, <input_table2>… are the names of the Tabsdata tables to be written to the external system.
destination
DruidConn
DruidConn holds the Druid Overlord connection details and the cloud staging configuration.
overlord_url
Base URL of the Druid Overlord REST API (e.g. "http://druid-overlord:8090").
staging
Cloud storage staging configuration. Data is first written as Parquet files to the configured cloud storage staging
area and then ingested into Druid via a native batch index_parallel task submitted to the Overlord REST API. Provide
one of:
S3 (Amazon S3)
staging = td.S3Config(
bucket="my-data-bucket",
region="us-east-1",
base_path="staging/druid",
credentials=td.S3AccessKeyCredentials(
td.EnvironmentSecret("AWS_ACCESS_KEY_ID"),
td.EnvironmentSecret("AWS_SECRET_ACCESS_KEY"),
),
)
bucket— The S3 bucket name.region— The AWS region (e.g."us-east-1").base_path— The key prefix under which staged files are stored (e.g."staging/druid").credentials(S3AccessKeyCredentials) — AWS access key credentials for uploading files to S3.
Azure Blob Storage
staging = td.AzureConfig(
container="my-container",
base_path="staging/druid",
credentials=td.AzureAccountKeyCredentials(
td.EnvironmentSecret("AZURE_ACCOUNT_NAME"),
td.EnvironmentSecret("AZURE_ACCOUNT_KEY"),
),
)
container— The Azure Blob container name.base_path— The blob path prefix for staged files (e.g."staging/druid").credentials(AzureAccountKeyCredentials) — Azure account key credentials for uploading files.
Google Cloud Storage (GCS)
staging = td.GCSConfig(
bucket="my-gcs-bucket",
base_path="staging/druid",
upload_credentials=td.GCPServiceAccountKeyCredentials(
td.EnvironmentSecret("GCP_SERVICE_ACCOUNT_JSON"),
),
hmac_credentials=td.HMACCredentials(
td.EnvironmentSecret("GCS_HMAC_ACCESS_KEY"),
td.EnvironmentSecret("GCS_HMAC_SECRET_KEY"),
),
)
bucket— The GCS bucket name.base_path— The object path prefix for staged files (e.g."staging/druid").upload_credentials(GCPServiceAccountKeyCredentials, optional) — GCP service account credentials for uploading files via the GCS SDK. When not provided, application default credentials (ADC) are used.hmac_credentials(HMACCredentials, optional) — GCS HMAC credentials for S3-compatible access used during Druid ingestion.
[optional] time_column
Name of the column to use as Druid’s required __time primary timestamp. When None, a constant value is used
(2000-01-01T00:00:00.000Z). Defaults to None.
Important: If
time_columnisNone, the connector uses a synthetic timestamp column internally. If your data contains a column named__td_druid_time, the ingestion will fail. Either rename that column in your data or settime_columnto use one of your own columns as the Druid__timeprimary timestamp.
[optional] segment_granularity
Time-based partitioning granularity for new segments. One of "ALL", "DAY", "HOUR", "MONTH", or "YEAR".
Defaults to "ALL".
[optional] max_concurrent_subtasks
Maximum number of parallel sub-tasks inside the index_parallel ingestion task. When set to 1, the supervisor runs
inline without spawning subtasks. Defaults to 1.
[optional] max_rows_per_segment
Dynamic partitioning threshold — maximum number of rows per segment. Defaults to 5000000.
[optional] max_rows_in_memory
In-memory buffer size during ingestion. Defaults to 1000000.
[optional] http_timeout_sec
Timeout in seconds for HTTP requests to the Overlord API. Defaults to 300.
[optional] credentials
UserPasswordCredentials for HTTP Basic authentication against the Overlord API. When None, authentication is
skipped. Defaults to None.
For quick testing with plain strings:
conn = td.DruidConn(
overlord_url="http://druid-overlord:8090",
staging=staging,
credentials=td.UserPasswordCredentials("admin", "my_password"),
)
For production, credentials should be wrapped in EnvironmentSecret to avoid hardcoding secrets:
conn = td.DruidConn(
overlord_url="http://druid-overlord:8090",
staging=staging,
credentials=td.UserPasswordCredentials(
td.EnvironmentSecret("DRUID_USER"),
td.EnvironmentSecret("DRUID_PASSWORD"),
),
)
destination_tables
The datasource(s) to store the data in. In Druid terminology, a datasource is the equivalent of a table. A single datasource name may be provided as a string; multiple names must be provided as a list.
Example with a single datasource:
destination = td.DruidDest(
conn=conn,
destination_tables="events",
)
Example with multiple datasources:
destination = td.DruidDest(
conn=conn,
destination_tables=["vendors", "items"],
if_table_exists="replace",
)
[optional] if_table_exists
This is an optional property to define the strategy to follow when the datasource already exists. 'replace' will drop
existing segments and replace with new data, and 'append' will add new segments alongside existing ones. Defaults to
'append'.
Writing to Multiple Tables
When writing to multiple datasources, the subscriber function must return one result per destination datasource, in the
same order as they are listed in destination_tables:
@td.subscriber(
name="write_to_druid",
tables="collection/raw_data",
destination=td.DruidDest(
conn=conn,
destination_tables=["vendors", "items"],
),
)
def write_to_druid(df: td.TableFrame):
vendors = df.filter(td.col("type") == "vendor")
items = df.filter(td.col("type") == "item")
return vendors, items
Skipping a Table with None
If a subscriber returns None for a particular output, the corresponding destination datasource is skipped entirely
(not created, not written to):
@td.subscriber(
name="conditional_write",
tables="collection/data",
destination=td.DruidDest(
conn=conn,
destination_tables=["primary", "secondary"],
),
)
def conditional_write(df: td.TableFrame):
primary = df.drop_nulls()
return primary, None # "secondary" datasource is skipped
[optional] trigger_by
<trigger_table1>, <trigger_table2> are the names of the Tabsdata tables that trigger the execution of the
subscriber function.
subscriber_name
This is the user-defined function that processes the input tables and returns the table frames to be written to the destination.
Data Drift Support
Apache Druid creates the schema on write, which means schema changes are inherently supported. When new columns appear in the incoming data, Druid will automatically incorporate them into the datasource schema during ingestion. Similarly, if columns are removed from the source data, they will simply not appear in the newly ingested segments.
For a Druid subscriber with if_table_exists=replace, the existing segments are dropped and replaced entirely. The
resulting schema exactly matches the newly exported data.
For a Druid subscriber with if_table_exists=append, new segments are added alongside existing ones. New columns in
the incoming data will be present in the new segments, while older segments will have null values for those columns.
Columns that existed in older segments but are absent from the new data will remain queryable from the older segments.