ClickHouse

You can use this built-in Tabsdata subscriber to write to a ClickHouse database.

Note

ClickHouseDest is currently marked as unstable and is subject to change in future releases.

ClickHouseDest works by first uploading the output data as Parquet files to a cloud staging area (Amazon S3, Azure Blob Storage, or Google Cloud Storage), and then loading them into ClickHouse using its native table functions (s3(), azureBlobStorage()). This approach leverages ClickHouse’s optimized parallel read capabilities from cloud storage.

Installing the connector

To work with the ClickHouse connector you are required to install the dependencies separately. This is to manage the size of the core package. Please run the following command in your terminal to install the dependencies.

$ pip install 'tabsdata[clickhouse]'

Example (Subscriber - ClickHouse)

The following example creates a subscriber named write_sales. It writes two Tabsdata tables to the database. The subscriber is automatically triggered by a new commit to any of its input tables, and writes data to the database without any modification.

import tabsdata as td


@td.subscriber(
    tables=["vendors", "items"],
    destination=td.ClickHouseDest(
        td.ClickHouseConn(
            host="clickhouse.acme-inc.com",
            staging=td.S3Config(
                bucket="acme-staging",
                region="us-east-1",
                base_path="staging/clickhouse",
                credentials=td.S3AccessKeyCredentials("<ACCESS_KEY>", "<SECRET_KEY>"),
            ),
            port=8123,
            database="procurement",
            credentials=td.UserPasswordCredentials("<USER>", "<PASSWORD>"),
            secure=False,
        ),
        destination_tables=["vendors", "items"],
        if_table_exists="replace",
    ),
)
def write_sales(tf1: td.TableFrame, tf2: td.TableFrame):
    return tf1, tf2

Note: After defining the function, you need to register it with a Tabsdata collection and execute it. For more information, see Register a Function and Execute a Function.

Setup (Subscriber - ClickHouse)

To use ClickHouseDest, you need two objects:

  1. A ClickHouseConn that defines how to connect to the ClickHouse server and where to stage files in the cloud.

  2. A ClickHouseDest that references the connection and specifies which table(s) to write to.

The following code uses placeholder values for defining a subscriber that reads Tabsdata tables and writes to ClickHouse:

import tabsdata as td

conn = td.ClickHouseConn(
    host="<clickhouse_host>",
    staging="<S3Config | AzureConfig | GCSConfig>",
    port="<port>",
    database="<database_name>",
    credentials=td.UserPasswordCredentials("<db_username>", "<db_password>"),
    secure="<secure_boolean>",
)

@td.subscriber(
    tables=["<input_table1>", "<input_table2>"],
    destination=td.ClickHouseDest(
        conn=conn,
        destination_tables=["<destination_table1>", "<destination_table2>"],
        if_table_exists="<value>",
        schema_evolution="<none|iceberg>",
    ),
    trigger_by=["<trigger_table1>", "<trigger_table2>"],
)
def <subscriber_name>(<table_frame1>: td.TableFrame, <table_frame2>: td.TableFrame):
    <function_logic>
    return <table_frame_output1>, <table_frame_output2>

Note: After defining the function, you need to register it with a Tabsdata collection and execute it. For more information, see Register a Function and Execute a Function.

Following properties are defined in the code above:

tables

<input_table1>, <input_table2>… are the names of the Tabsdata tables to be written to the external system.

destination

ClickHouseConn holds the ClickHouse server details and the cloud staging configuration.

host

The hostname or IP address of the ClickHouse server.

staging

The cloud storage configuration for staging Parquet files before loading them into ClickHouse. Must be one of td.S3Config, td.AzureConfig, or td.GCSConfig.

S3 (Amazon S3)

staging = td.S3Config(
    bucket="my-data-bucket",
    region="us-east-1",
    base_path="staging/clickhouse",
    credentials=td.S3AccessKeyCredentials(
        td.EnvironmentSecret("AWS_ACCESS_KEY_ID"),
        td.EnvironmentSecret("AWS_SECRET_ACCESS_KEY"),
    ),
)
  • bucket — The S3 bucket name.

  • region — The AWS region (e.g. "us-east-1").

  • base_path — The key prefix under which staged files are stored (e.g. "staging/clickhouse").

  • credentials (S3AccessKeyCredentials) — AWS access key credentials for uploading files to S3.

Azure Blob Storage

staging = td.AzureConfig(
    container="my-container",
    base_path="staging/clickhouse",
    credentials=td.AzureAccountKeyCredentials(
        td.EnvironmentSecret("AZURE_ACCOUNT_NAME"),
        td.EnvironmentSecret("AZURE_ACCOUNT_KEY"),
    ),
)
  • container — The Azure Blob container name.

  • base_path — The blob path prefix for staged files (e.g. "staging/clickhouse").

  • credentials (AzureAccountKeyCredentials) — Azure account key credentials for uploading files.

Google Cloud Storage (GCS)

staging = td.GCSConfig(
    bucket="my-gcs-bucket",
    base_path="staging/clickhouse",
    upload_credentials=td.GCPServiceAccountKeyCredentials(
        td.EnvironmentSecret("GCP_SERVICE_ACCOUNT_JSON"),
    ),
    hmac_credentials=td.HMACCredentials(
        td.EnvironmentSecret("GCS_HMAC_ACCESS_KEY"),
        td.EnvironmentSecret("GCS_HMAC_SECRET_KEY"),
    ),
)
  • bucket — The GCS bucket name.

  • base_path — The object path prefix for staged files (e.g. "staging/clickhouse").

  • upload_credentials (GCPServiceAccountKeyCredentials, optional) — GCP service account credentials for uploading files via the GCS SDK. When not provided, application default credentials (ADC) are used.

  • hmac_credentials (HMACCredentials, optional) — GCS HMAC credentials for S3-compatible access, which ClickHouse uses to read files from GCS via its s3() table function.

[optional]``port``

The HTTP(S) port of the ClickHouse server. Use 8123 for plain HTTP (default) and 8443 for HTTPS/TLS. Defaults to 8123.

[optional]``database``

The target database name. Defaults to "default".

[optional]``credentials``

UserPasswordCredentials for ClickHouse authentication. When not provided, the ClickHouse default user with no password is used. For more information, see Secrets Management.

For quick testing with plain strings:

conn = td.ClickHouseConn(
    host="clickhouse.example.com",
    staging=staging,
    port=8123,
    database="analytics",
    credentials=td.UserPasswordCredentials("admin", "my_password"),
)

For production, credentials should be wrapped in EnvironmentSecret to avoid hardcoding secrets:

conn = td.ClickHouseConn(
    host="clickhouse.example.com",
    staging=staging,
    database="analytics",
    credentials=td.UserPasswordCredentials(
        td.EnvironmentSecret("CLICKHOUSE_USER"),
        td.EnvironmentSecret("CLICKHOUSE_PASSWORD"),
    ),
)

For a secure HTTPS connection:

conn = td.ClickHouseConn(
    host="clickhouse.example.com",
    staging=staging,
    port=8443,
    database="analytics",
    secure=True,
    credentials=td.UserPasswordCredentials(
        td.EnvironmentSecret("CLICKHOUSE_USER"),
        td.EnvironmentSecret("CLICKHOUSE_PASSWORD"),
    ),
)

[optional]``secure``

If True, connect to ClickHouse over HTTPS/TLS. Defaults to False.

[Optional] Advanced parameters

There are also some advanced options for more technical use cases:

  • enforce_connection_params — Controls whether the database parameter is enforced when qualifying table names.

  • cx_src_configs_clickhouse — Additional source-specific configuration parameters for the ClickHouse connection.

  • cx_dst_configs_clickhouse — Additional destination-specific configuration parameters for the ClickHouse connection.

destination_tables

The name of the table(s) to load data into. A single table can be provided as a string; multiple tables must be provided as a list.

Example with a single table:

destination = td.ClickHouseDest(
    conn=conn,
    destination_tables="events",
)

Example with multiple tables:

destination = td.ClickHouseDest(
    conn=conn,
    destination_tables=["events", "users"],
    if_table_exists="replace",
)

[Optional] if_table_exists

This is an optional property to define the strategy to follow when the table already exists. 'replace' will truncate the table before loading new rows, and 'append' will insert new rows into the existing table. Defaults to 'append'.

If the destination table does not exist, it will be created automatically based on the schema of the data being written.

[Optional] schema_evolution

Configures automatic schema evolution when the incoming data schema differs from the existing table schema. When set to "iceberg", Tabsdata automatically applies safe schema changes (add/drop columns, widen types, make columns nullable) before loading data. Unsafe changes (narrow types, incompatible type changes) are rejected with an error. Defaults to "none".

destination = td.ClickHouseDest(
    conn=conn,
    destination_tables="events",
    schema_evolution="iceberg",
)

For the full list of safe and unsafe changes, type widening tables, and database-specific notes, see Schema Evolution below.

None as an input and output

A subscriber may receive and return a None value instead of TableFrames.

When a subscriber receives a None value instead of a TableFrame it means that the requested table dependency version does not exist.

When a subscriber returns a None value instead of a TableFrame it means there is no new data to write to the external system. This helps in avoiding the creation of multiple copies of the same data.

[Optional] trigger_by

<trigger_table1>, <trigger_table2>… are the names of the tables in the Tabsdata server. A new commit to any of these tables triggers the subscriber. All listed trigger tables must exist in the server before registering the subscriber.

Defining trigger tables is optional. If you don’t define the trigger_by property, the subscriber will be triggered by any of its input tables. If you define the trigger_by property, then only those tables listed in the property can automatically trigger the subscriber.

For more information, see Working with Triggers.

<subscriber_name>

<subscriber_name> is the name for the subscriber that you are configuring.

<function_logic>

<function_logic> governs the processing performed by the subscriber. You can specify function logic to be a simple write or to perform additional processing as needed. For more information about the function logic that you can include, see Working with Tables.

<table_frame1>, <table_frame2>… are the names for the variables that temporarily store source data for processing.

<table_frame_output1>, <table_frame_output2>… are the output from the function that are written to the external system.

Writing to Multiple Tables

When writing to multiple tables, the subscriber function must return one result per destination table, in the same order as they are listed in destination_tables:

@td.subscriber(
    name="write_to_clickhouse",
    tables="collection/raw_data",
    destination=td.ClickHouseDest(
        conn=conn,
        destination_tables=["events", "users"],
    ),
)
def write_to_clickhouse(df: td.TableFrame):
    events = df.filter(td.col("type") == "event")
    users = df.filter(td.col("type") == "user")
    return events, users

Skipping a Table with None

If a subscriber returns None for a particular output, the corresponding destination table is skipped entirely (not created, not written to):

@td.subscriber(
    name="conditional_write",
    tables="collection/data",
    destination=td.ClickHouseDest(
        conn=conn,
        destination_tables=["primary", "secondary"],
    ),
)
def conditional_write(df: td.TableFrame):
    primary = df.drop_nulls()
    return primary, None  # "secondary" table is skipped

Schema Evolution

When writing data to external databases, the schema of the data being written may change over time — for example, new columns may be added, old columns may be removed, or column types may need to be widened to accommodate larger values. By default, Tabsdata raises an error when the incoming data schema does not match the existing table schema. Schema evolution allows these changes to be applied automatically, following safety rules inspired by Apache Iceberg’s schema evolution specification.

Schema evolution is configured through the schema_evolution parameter on the destination. When enabled, Tabsdata automatically detects differences between the incoming data schema and the existing table schema, and applies safe changes before loading the data. Unsafe changes are rejected with an error.

[Optional] schema_evolution

Value

Behavior

"none" (default)

Raise an error if the incoming data schema does not exactly match the existing table schema.

"iceberg"

Automatically apply safe schema changes (add columns, drop columns, widen types, make columns nullable). Reject unsafe changes with an error.

Schema evolution can be combined with any if_table_exists strategy ("append" or "replace").

Safe Changes (Applied Automatically)

  • Add a column: A new column in the incoming data that does not exist in the table is added. New columns are always created as nullable, since existing rows will have no value for the new column.

  • Drop a column: A column that exists in the table but is absent from the incoming data is dropped from the table.

  • Widen a column type: A column type is changed to a wider type (e.g., INT to BIGINT, or FLOAT to DOUBLE). Only specific widening paths are allowed — see the type widening reference below.

  • Make a column nullable: A NOT NULL column becomes nullable. This happens when the incoming data contains null values in a column that was previously NOT NULL.

  • Add or change a default value: A default expression is added to or changed on a column.

  • Remove a default value: A default expression is removed from a column.

Unsafe Changes (Rejected with Error)

  • Narrow a column type: Changing a column to a narrower type (e.g., BIGINT to INT) is rejected because existing data may not fit in the smaller type.

  • Incompatible type change: Changing a column to an unrelated type (e.g., INT to STRING, or across type families like INT to DOUBLE) is rejected.

  • Make a column NOT NULL: Removing nullability from a column that was previously nullable is rejected, since existing rows may contain null values.

When an unsafe change is detected, the entire operation is aborted — no partial changes are applied to the table.

How Nullability Is Determined

Tabsdata determines column nullability from the actual data being written, not from metadata flags. It reads Parquet file statistics to check whether each column contains any null values:

  • If a column contains null values, it is treated as nullable.

  • If a column contains no null values, it is treated as NOT NULL.

  • If statistics are unavailable for a column, it is conservatively treated as nullable.

This means that a NOT NULL column in the existing table will only become nullable if the incoming data actually contains nulls in that column. Conversely, a column that is already nullable in the table will never be tightened back to NOT NULL, even if the current batch of data has no nulls.

Known Limitations

  • No column rename detection: If a column is renamed in the source data, Tabsdata sees it as a drop of the old column and an addition of the new column. The data in the old column is not migrated to the new column.

  • No type narrowing: It is not possible to change a column to a narrower type (e.g., BIGINT to INT). This requires manual intervention outside of Tabsdata.

  • Nullable columns cannot be made NOT NULL: Once a column becomes nullable, schema evolution will never tighten it back to NOT NULL, even if all subsequent data batches contain no nulls. This is consistent with Apache Iceberg’s safety rules.

  • Limited type widening: Only integer and float type families support widening. String, Date, Boolean, Decimal, and other types cannot be widened. Cross-family widening (e.g., INT to DOUBLE) is not supported.

  • New columns are always nullable: When a new column is added to an existing table, it is always created as nullable — even if the incoming data has no nulls in that column. This is because existing rows in the table have no value for the new column and need NULL.

Enabling Schema Evolution (ClickHouse)

destination = td.ClickHouseDest(
    conn=conn,
    destination_tables="events",
    schema_evolution="iceberg",
)

ClickHouse Type Widening

Target Type

Can Be Widened From

Int16

Int8

Int32

Int8, Int16

Int64

Int8, Int16, Int32

UInt16

UInt8

UInt32

UInt8, UInt16

UInt64

UInt8, UInt16, UInt32

Float64

Float32

Note

String, Date, Boolean, and Decimal types do not support widening. Cross-family widening (e.g., INT to DOUBLE, or unsigned to signed integer families) is not supported.

ClickHouse-Specific Notes

  • Table creation uses MergeTree engine with ORDER BY tuple(). If you need a different engine or ordering key, create the table manually before running with schema evolution.

  • ClickHouse does not support making a nullable column NOT NULL. This action is silently skipped.

  • ClickHouse wraps nullable columns with Nullable() (e.g., Nullable(Int64)). This is handled automatically.

  • Dropping a default from a column uses ALTER TABLE ... MODIFY COLUMN ... REMOVE DEFAULT.

Data Drift Support

This section talks about how Tabsdata handles data drift in the output data for this Subscriber connector when schema_evolution is set to "none" (the default).

Here, the schema from the first execution (or pre-existing table) is preserved irrespective of the value of any function property such as if_table_exists.

Here is how the system will respond to various kinds of changes due to data drift:

  • New columns introduced by data drift are ignored.

  • Columns removed from the source remain in the table, with missing values populated as NULL.

  • Changes to column data types (type drift) cause execution to fail, as the subscriber does not automatically reconcile type mismatches.

When schema_evolution="iceberg" is enabled, the schema evolution rules described above govern how schema differences are handled instead.

Important Notes

Nullable columns

ClickHouse wraps nullable columns with Nullable() (e.g. Nullable(Int32)). This is handled automatically when creating tables from the data schema.

Table creation

When a destination table does not exist, it is automatically created using the MergeTree engine with ORDER BY tuple(). If you need a different engine or ordering key, create the table manually in ClickHouse before running the function.

HTTPS/TLS

For secure connections, set secure=True and use port 8443 (the ClickHouse HTTPS port). The default port 8123 uses plain HTTP.