ClickHouse
You can use this built-in Tabsdata subscriber to write to a ClickHouse database.
Note
ClickHouseDest is currently marked as unstable and is subject to change in future releases.
ClickHouseDest works by first uploading the output data as Parquet files to a cloud staging area (Amazon S3,
Azure Blob Storage, or Google Cloud Storage), and then loading them into ClickHouse using its native table functions
(s3(), azureBlobStorage()). This approach leverages ClickHouse’s optimized parallel read capabilities from
cloud storage.
Installing the connector
To work with the ClickHouse connector you are required to install the dependencies separately. This is to manage the size of the core package. Please run the following command in your terminal to install the dependencies.
$ pip install 'tabsdata[clickhouse]'
Example (Subscriber - ClickHouse)
The following example creates a subscriber named write_sales. It writes two Tabsdata tables to the database. The subscriber is automatically triggered by a new commit to any of its input tables, and writes data to the database without any modification.
import tabsdata as td
@td.subscriber(
tables=["vendors", "items"],
destination=td.ClickHouseDest(
td.ClickHouseConn(
host="clickhouse.acme-inc.com",
staging=td.S3Config(
bucket="acme-staging",
region="us-east-1",
base_path="staging/clickhouse",
credentials=td.S3AccessKeyCredentials("<ACCESS_KEY>", "<SECRET_KEY>"),
),
port=8123,
database="procurement",
credentials=td.UserPasswordCredentials("<USER>", "<PASSWORD>"),
secure=False,
),
destination_tables=["vendors", "items"],
if_table_exists="replace",
),
)
def write_sales(tf1: td.TableFrame, tf2: td.TableFrame):
return tf1, tf2
Note: After defining the function, you need to register it with a Tabsdata collection and execute it. For more information, see Register a Function and Execute a Function.
Setup (Subscriber - ClickHouse)
To use ClickHouseDest, you need two objects:
A
ClickHouseConnthat defines how to connect to the ClickHouse server and where to stage files in the cloud.A
ClickHouseDestthat references the connection and specifies which table(s) to write to.
The following code uses placeholder values for defining a subscriber that reads Tabsdata tables and writes to ClickHouse:
import tabsdata as td
conn = td.ClickHouseConn(
host="<clickhouse_host>",
staging="<S3Config | AzureConfig | GCSConfig>",
port="<port>",
database="<database_name>",
credentials=td.UserPasswordCredentials("<db_username>", "<db_password>"),
secure="<secure_boolean>",
)
@td.subscriber(
tables=["<input_table1>", "<input_table2>"],
destination=td.ClickHouseDest(
conn=conn,
destination_tables=["<destination_table1>", "<destination_table2>"],
if_table_exists="<value>",
),
trigger_by=["<trigger_table1>", "<trigger_table2>"],
)
def <subscriber_name>(<table_frame1>: td.TableFrame, <table_frame2>: td.TableFrame):
<function_logic>
return <table_frame_output1>, <table_frame_output2>
Note: After defining the function, you need to register it with a Tabsdata collection and execute it. For more information, see Register a Function and Execute a Function.
Following properties are defined in the code above:
tables
<input_table1>, <input_table2>… are the names of the Tabsdata tables to be written to the external system.
destination
ClickHouseConn holds the ClickHouse server details and the cloud staging configuration.
host
The hostname or IP address of the ClickHouse server.
staging
The cloud storage configuration for staging Parquet files before loading them into ClickHouse. Must be one of
td.S3Config, td.AzureConfig, or td.GCSConfig.
S3 (Amazon S3)
staging = td.S3Config(
bucket="my-data-bucket",
region="us-east-1",
base_path="staging/clickhouse",
credentials=td.S3AccessKeyCredentials(
td.EnvironmentSecret("AWS_ACCESS_KEY_ID"),
td.EnvironmentSecret("AWS_SECRET_ACCESS_KEY"),
),
)
bucket— The S3 bucket name.region— The AWS region (e.g."us-east-1").base_path— The key prefix under which staged files are stored (e.g."staging/clickhouse").credentials(S3AccessKeyCredentials) — AWS access key credentials for uploading files to S3.
Azure Blob Storage
staging = td.AzureConfig(
container="my-container",
base_path="staging/clickhouse",
credentials=td.AzureAccountKeyCredentials(
td.EnvironmentSecret("AZURE_ACCOUNT_NAME"),
td.EnvironmentSecret("AZURE_ACCOUNT_KEY"),
),
)
container— The Azure Blob container name.base_path— The blob path prefix for staged files (e.g."staging/clickhouse").credentials(AzureAccountKeyCredentials) — Azure account key credentials for uploading files.
Google Cloud Storage (GCS)
staging = td.GCSConfig(
bucket="my-gcs-bucket",
base_path="staging/clickhouse",
upload_credentials=td.GCPServiceAccountKeyCredentials(
td.EnvironmentSecret("GCP_SERVICE_ACCOUNT_JSON"),
),
hmac_credentials=td.HMACCredentials(
td.EnvironmentSecret("GCS_HMAC_ACCESS_KEY"),
td.EnvironmentSecret("GCS_HMAC_SECRET_KEY"),
),
)
bucket— The GCS bucket name.base_path— The object path prefix for staged files (e.g."staging/clickhouse").upload_credentials(GCPServiceAccountKeyCredentials, optional) — GCP service account credentials for uploading files via the GCS SDK. When not provided, application default credentials (ADC) are used.hmac_credentials(HMACCredentials, optional) — GCS HMAC credentials for S3-compatible access, which ClickHouse uses to read files from GCS via itss3()table function.
[optional]``port``
The HTTP(S) port of the ClickHouse server. Use 8123 for plain HTTP (default) and 8443 for HTTPS/TLS.
Defaults to 8123.
[optional]``database``
The target database name. Defaults to "default".
[optional]``credentials``
UserPasswordCredentials for ClickHouse authentication. When not provided, the ClickHouse default user with
no password is used. For more information, see Secrets Management.
For quick testing with plain strings:
conn = td.ClickHouseConn(
host="clickhouse.example.com",
staging=staging,
port=8123,
database="analytics",
credentials=td.UserPasswordCredentials("admin", "my_password"),
)
For production, credentials should be wrapped in EnvironmentSecret to avoid hardcoding secrets:
conn = td.ClickHouseConn(
host="clickhouse.example.com",
staging=staging,
database="analytics",
credentials=td.UserPasswordCredentials(
td.EnvironmentSecret("CLICKHOUSE_USER"),
td.EnvironmentSecret("CLICKHOUSE_PASSWORD"),
),
)
For a secure HTTPS connection:
conn = td.ClickHouseConn(
host="clickhouse.example.com",
staging=staging,
port=8443,
database="analytics",
secure=True,
credentials=td.UserPasswordCredentials(
td.EnvironmentSecret("CLICKHOUSE_USER"),
td.EnvironmentSecret("CLICKHOUSE_PASSWORD"),
),
)
[optional]``secure``
If True, connect to ClickHouse over HTTPS/TLS. Defaults to False.
[Optional] Advanced parameters
There are also some advanced options for more technical use cases:
enforce_connection_params— Controls whether the database parameter is enforced when qualifying table names.cx_src_configs_clickhouse— Additional source-specific configuration parameters for the ClickHouse connection.cx_dst_configs_clickhouse— Additional destination-specific configuration parameters for the ClickHouse connection.
destination_tables
The name of the table(s) to load data into. A single table can be provided as a string; multiple tables must be provided as a list.
Example with a single table:
destination = td.ClickHouseDest(
conn=conn,
destination_tables="events",
)
Example with multiple tables:
destination = td.ClickHouseDest(
conn=conn,
destination_tables=["events", "users"],
if_table_exists="replace",
)
[Optional] if_table_exists
This is an optional property to define the strategy to follow when the table already exists. 'replace' will
truncate the table before loading new rows, and 'append' will insert new rows into the existing table. Defaults
to 'append'.
If the destination table does not exist, it will be created automatically based on the schema of the data being written.
None as an input and output
A subscriber may receive and return a None value instead of TableFrames.
When a subscriber receives a None value instead of a TableFrame it means that the requested table dependency version does not exist.
When a subscriber returns a None value instead of a TableFrame it means there is no new data to write to the external system. This helps in avoiding the creation of multiple copies of the same data.
[Optional] trigger_by
<trigger_table1>, <trigger_table2>… are the names of the tables in the Tabsdata server. A new commit to any of these tables triggers the subscriber. All listed trigger tables must exist in the server before registering the subscriber.
Defining trigger tables is optional. If you don’t define the trigger_by property, the subscriber will be triggered by any of its input tables. If you define the trigger_by property, then only those tables listed in the property can automatically trigger the subscriber.
For more information, see Working with Triggers.
<subscriber_name>
<subscriber_name> is the name for the subscriber that you are configuring.
<function_logic>
<function_logic> governs the processing performed by the subscriber. You can specify function logic to be a simple write or to perform additional processing as needed. For more information about the function logic that you can include, see Working with Tables.
<table_frame1>, <table_frame2>… are the names for the variables that temporarily store source data for processing.
<table_frame_output1>, <table_frame_output2>… are the output from the function that are written to the external system.
Writing to Multiple Tables
When writing to multiple tables, the subscriber function must return one result per destination table, in the same
order as they are listed in destination_tables:
@td.subscriber(
name="write_to_clickhouse",
tables="collection/raw_data",
destination=td.ClickHouseDest(
conn=conn,
destination_tables=["events", "users"],
),
)
def write_to_clickhouse(df: td.TableFrame):
events = df.filter(td.col("type") == "event")
users = df.filter(td.col("type") == "user")
return events, users
Skipping a Table with None
If a subscriber returns None for a particular output, the corresponding destination table is skipped entirely
(not created, not written to):
@td.subscriber(
name="conditional_write",
tables="collection/data",
destination=td.ClickHouseDest(
conn=conn,
destination_tables=["primary", "secondary"],
),
)
def conditional_write(df: td.TableFrame):
primary = df.drop_nulls()
return primary, None # "secondary" table is skipped
Data Drift Support
This section talks about how Tabsdata handles data drift in the output data for this Subscriber connector.
Here, the schema from the first execution (or pre-existing table) is preserved irrespective of the value of any
function property such as if_table_exists.
Here is how the system will respond to various kinds of changes due to data drift:
New columns introduced by data drift are ignored.
Columns removed from the source remain in the table, with missing values populated as
NULL.Changes to column data types (type drift) cause execution to fail, as the subscriber does not automatically reconcile type mismatches.
Important Notes
Nullable columns
ClickHouse wraps nullable columns with Nullable() (e.g. Nullable(Int32)). This is handled automatically
when creating tables from the data schema.
Table creation
When a destination table does not exist, it is automatically created using the MergeTree engine with
ORDER BY tuple(). If you need a different engine or ordering key, create the table manually in ClickHouse
before running the function.
HTTPS/TLS
For secure connections, set secure=True and use port 8443 (the ClickHouse HTTPS port). The default port
8123 uses plain HTTP.