ClickHouse

You can use this built-in Tabsdata subscriber to write to a ClickHouse database.

Note

ClickHouseDest is currently marked as unstable and is subject to change in future releases.

ClickHouseDest works by first uploading the output data as Parquet files to a cloud staging area (Amazon S3, Azure Blob Storage, or Google Cloud Storage), and then loading them into ClickHouse using its native table functions (s3(), azureBlobStorage()). This approach leverages ClickHouse’s optimized parallel read capabilities from cloud storage.

Installing the connector

To work with the ClickHouse connector you are required to install the dependencies separately. This is to manage the size of the core package. Please run the following command in your terminal to install the dependencies.

$ pip install 'tabsdata[clickhouse]'

Example (Subscriber - ClickHouse)

The following example creates a subscriber named write_sales. It writes two Tabsdata tables to the database. The subscriber is automatically triggered by a new commit to any of its input tables, and writes data to the database without any modification.

import tabsdata as td


@td.subscriber(
    tables=["vendors", "items"],
    destination=td.ClickHouseDest(
        td.ClickHouseConn(
            host="clickhouse.acme-inc.com",
            staging=td.S3Config(
                bucket="acme-staging",
                region="us-east-1",
                base_path="staging/clickhouse",
                credentials=td.S3AccessKeyCredentials("<ACCESS_KEY>", "<SECRET_KEY>"),
            ),
            port=8123,
            database="procurement",
            credentials=td.UserPasswordCredentials("<USER>", "<PASSWORD>"),
            secure=False,
        ),
        destination_tables=["vendors", "items"],
        if_table_exists="replace",
    ),
)
def write_sales(tf1: td.TableFrame, tf2: td.TableFrame):
    return tf1, tf2

Note: After defining the function, you need to register it with a Tabsdata collection and execute it. For more information, see Register a Function and Execute a Function.

Setup (Subscriber - ClickHouse)

To use ClickHouseDest, you need two objects:

  1. A ClickHouseConn that defines how to connect to the ClickHouse server and where to stage files in the cloud.

  2. A ClickHouseDest that references the connection and specifies which table(s) to write to.

The following code uses placeholder values for defining a subscriber that reads Tabsdata tables and writes to ClickHouse:

import tabsdata as td

conn = td.ClickHouseConn(
    host="<clickhouse_host>",
    staging="<S3Config | AzureConfig | GCSConfig>",
    port="<port>",
    database="<database_name>",
    credentials=td.UserPasswordCredentials("<db_username>", "<db_password>"),
    secure="<secure_boolean>",
)

@td.subscriber(
    tables=["<input_table1>", "<input_table2>"],
    destination=td.ClickHouseDest(
        conn=conn,
        destination_tables=["<destination_table1>", "<destination_table2>"],
        if_table_exists="<value>",
    ),
    trigger_by=["<trigger_table1>", "<trigger_table2>"],
)
def <subscriber_name>(<table_frame1>: td.TableFrame, <table_frame2>: td.TableFrame):
    <function_logic>
    return <table_frame_output1>, <table_frame_output2>

Note: After defining the function, you need to register it with a Tabsdata collection and execute it. For more information, see Register a Function and Execute a Function.

Following properties are defined in the code above:

tables

<input_table1>, <input_table2>… are the names of the Tabsdata tables to be written to the external system.

destination

ClickHouseConn holds the ClickHouse server details and the cloud staging configuration.

host

The hostname or IP address of the ClickHouse server.

staging

The cloud storage configuration for staging Parquet files before loading them into ClickHouse. Must be one of td.S3Config, td.AzureConfig, or td.GCSConfig.

S3 (Amazon S3)

staging = td.S3Config(
    bucket="my-data-bucket",
    region="us-east-1",
    base_path="staging/clickhouse",
    credentials=td.S3AccessKeyCredentials(
        td.EnvironmentSecret("AWS_ACCESS_KEY_ID"),
        td.EnvironmentSecret("AWS_SECRET_ACCESS_KEY"),
    ),
)
  • bucket — The S3 bucket name.

  • region — The AWS region (e.g. "us-east-1").

  • base_path — The key prefix under which staged files are stored (e.g. "staging/clickhouse").

  • credentials (S3AccessKeyCredentials) — AWS access key credentials for uploading files to S3.

Azure Blob Storage

staging = td.AzureConfig(
    container="my-container",
    base_path="staging/clickhouse",
    credentials=td.AzureAccountKeyCredentials(
        td.EnvironmentSecret("AZURE_ACCOUNT_NAME"),
        td.EnvironmentSecret("AZURE_ACCOUNT_KEY"),
    ),
)
  • container — The Azure Blob container name.

  • base_path — The blob path prefix for staged files (e.g. "staging/clickhouse").

  • credentials (AzureAccountKeyCredentials) — Azure account key credentials for uploading files.

Google Cloud Storage (GCS)

staging = td.GCSConfig(
    bucket="my-gcs-bucket",
    base_path="staging/clickhouse",
    upload_credentials=td.GCPServiceAccountKeyCredentials(
        td.EnvironmentSecret("GCP_SERVICE_ACCOUNT_JSON"),
    ),
    hmac_credentials=td.HMACCredentials(
        td.EnvironmentSecret("GCS_HMAC_ACCESS_KEY"),
        td.EnvironmentSecret("GCS_HMAC_SECRET_KEY"),
    ),
)
  • bucket — The GCS bucket name.

  • base_path — The object path prefix for staged files (e.g. "staging/clickhouse").

  • upload_credentials (GCPServiceAccountKeyCredentials, optional) — GCP service account credentials for uploading files via the GCS SDK. When not provided, application default credentials (ADC) are used.

  • hmac_credentials (HMACCredentials, optional) — GCS HMAC credentials for S3-compatible access, which ClickHouse uses to read files from GCS via its s3() table function.

[optional]``port``

The HTTP(S) port of the ClickHouse server. Use 8123 for plain HTTP (default) and 8443 for HTTPS/TLS. Defaults to 8123.

[optional]``database``

The target database name. Defaults to "default".

[optional]``credentials``

UserPasswordCredentials for ClickHouse authentication. When not provided, the ClickHouse default user with no password is used. For more information, see Secrets Management.

For quick testing with plain strings:

conn = td.ClickHouseConn(
    host="clickhouse.example.com",
    staging=staging,
    port=8123,
    database="analytics",
    credentials=td.UserPasswordCredentials("admin", "my_password"),
)

For production, credentials should be wrapped in EnvironmentSecret to avoid hardcoding secrets:

conn = td.ClickHouseConn(
    host="clickhouse.example.com",
    staging=staging,
    database="analytics",
    credentials=td.UserPasswordCredentials(
        td.EnvironmentSecret("CLICKHOUSE_USER"),
        td.EnvironmentSecret("CLICKHOUSE_PASSWORD"),
    ),
)

For a secure HTTPS connection:

conn = td.ClickHouseConn(
    host="clickhouse.example.com",
    staging=staging,
    port=8443,
    database="analytics",
    secure=True,
    credentials=td.UserPasswordCredentials(
        td.EnvironmentSecret("CLICKHOUSE_USER"),
        td.EnvironmentSecret("CLICKHOUSE_PASSWORD"),
    ),
)

[optional]``secure``

If True, connect to ClickHouse over HTTPS/TLS. Defaults to False.

[Optional] Advanced parameters

There are also some advanced options for more technical use cases:

  • enforce_connection_params — Controls whether the database parameter is enforced when qualifying table names.

  • cx_src_configs_clickhouse — Additional source-specific configuration parameters for the ClickHouse connection.

  • cx_dst_configs_clickhouse — Additional destination-specific configuration parameters for the ClickHouse connection.

destination_tables

The name of the table(s) to load data into. A single table can be provided as a string; multiple tables must be provided as a list.

Example with a single table:

destination = td.ClickHouseDest(
    conn=conn,
    destination_tables="events",
)

Example with multiple tables:

destination = td.ClickHouseDest(
    conn=conn,
    destination_tables=["events", "users"],
    if_table_exists="replace",
)

[Optional] if_table_exists

This is an optional property to define the strategy to follow when the table already exists. 'replace' will truncate the table before loading new rows, and 'append' will insert new rows into the existing table. Defaults to 'append'.

If the destination table does not exist, it will be created automatically based on the schema of the data being written.

None as an input and output

A subscriber may receive and return a None value instead of TableFrames.

When a subscriber receives a None value instead of a TableFrame it means that the requested table dependency version does not exist.

When a subscriber returns a None value instead of a TableFrame it means there is no new data to write to the external system. This helps in avoiding the creation of multiple copies of the same data.

[Optional] trigger_by

<trigger_table1>, <trigger_table2>… are the names of the tables in the Tabsdata server. A new commit to any of these tables triggers the subscriber. All listed trigger tables must exist in the server before registering the subscriber.

Defining trigger tables is optional. If you don’t define the trigger_by property, the subscriber will be triggered by any of its input tables. If you define the trigger_by property, then only those tables listed in the property can automatically trigger the subscriber.

For more information, see Working with Triggers.

<subscriber_name>

<subscriber_name> is the name for the subscriber that you are configuring.

<function_logic>

<function_logic> governs the processing performed by the subscriber. You can specify function logic to be a simple write or to perform additional processing as needed. For more information about the function logic that you can include, see Working with Tables.

<table_frame1>, <table_frame2>… are the names for the variables that temporarily store source data for processing.

<table_frame_output1>, <table_frame_output2>… are the output from the function that are written to the external system.

Writing to Multiple Tables

When writing to multiple tables, the subscriber function must return one result per destination table, in the same order as they are listed in destination_tables:

@td.subscriber(
    name="write_to_clickhouse",
    tables="collection/raw_data",
    destination=td.ClickHouseDest(
        conn=conn,
        destination_tables=["events", "users"],
    ),
)
def write_to_clickhouse(df: td.TableFrame):
    events = df.filter(td.col("type") == "event")
    users = df.filter(td.col("type") == "user")
    return events, users

Skipping a Table with None

If a subscriber returns None for a particular output, the corresponding destination table is skipped entirely (not created, not written to):

@td.subscriber(
    name="conditional_write",
    tables="collection/data",
    destination=td.ClickHouseDest(
        conn=conn,
        destination_tables=["primary", "secondary"],
    ),
)
def conditional_write(df: td.TableFrame):
    primary = df.drop_nulls()
    return primary, None  # "secondary" table is skipped

Data Drift Support

This section talks about how Tabsdata handles data drift in the output data for this Subscriber connector.

Here, the schema from the first execution (or pre-existing table) is preserved irrespective of the value of any function property such as if_table_exists.

Here is how the system will respond to various kinds of changes due to data drift:

  • New columns introduced by data drift are ignored.

  • Columns removed from the source remain in the table, with missing values populated as NULL.

  • Changes to column data types (type drift) cause execution to fail, as the subscriber does not automatically reconcile type mismatches.

Important Notes

Nullable columns

ClickHouse wraps nullable columns with Nullable() (e.g. Nullable(Int32)). This is handled automatically when creating tables from the data schema.

Table creation

When a destination table does not exist, it is automatically created using the MergeTree engine with ORDER BY tuple(). If you need a different engine or ordering key, create the table manually in ClickHouse before running the function.

HTTPS/TLS

For secure connections, set secure=True and use port 8443 (the ClickHouse HTTPS port). The default port 8123 uses plain HTTP.