StarRocks
You can use this built-in Tabsdata subscriber to write to a StarRocks database.
Note
StarRocksDest is currently marked as unstable and is subject to change in future releases.
StarRocksDest works by first uploading the output data as Parquet files to a cloud staging area (Amazon S3,
Azure Blob Storage, or Google Cloud Storage), and then loading them into StarRocks using its native FILES()
table function. This approach leverages StarRocks’ optimized parallel read capabilities from cloud storage.
Installing the connector
To work with the StarRocks connector you are required to install the dependencies separately. This is to manage the size of the core package. Please run the following command in your terminal to install the dependencies.
$ pip install 'tabsdata[starrocks]'
Example (Subscriber - StarRocks)
The following example creates a subscriber named write_sales. It writes two Tabsdata tables to the database. The subscriber is automatically triggered by a new commit to any of its input tables, and writes data to the database without any modification.
import tabsdata as td
@td.subscriber(
tables=["vendors", "items"],
destination=td.StarRocksDest(
td.StarRocksConn(
host="starrocks.acme-inc.com",
staging=td.S3Config(
bucket="acme-staging",
region="us-east-1",
base_path="staging/starrocks",
credentials=td.S3AccessKeyCredentials("<ACCESS_KEY>", "<SECRET_KEY>"),
),
port=9030,
database="procurement",
credentials=td.UserPasswordCredentials("<USER>", "<PASSWORD>"),
),
destination_tables=["vendors", "items"],
if_table_exists="replace",
),
)
def write_sales(tf1: td.TableFrame, tf2: td.TableFrame):
return tf1, tf2
Note: After defining the function, you need to register it with a Tabsdata collection and execute it. For more information, see Register a Function and Execute a Function.
Setup (Subscriber - StarRocks)
To use StarRocksDest, you need two objects:
A
StarRocksConnthat defines how to connect to the StarRocks server and where to stage files in the cloud.A
StarRocksDestthat references the connection and specifies which table(s) to write to.
The following code uses placeholder values for defining a subscriber that reads Tabsdata tables and writes to StarRocks:
import tabsdata as td
conn = td.StarRocksConn(
host="<starrocks_fe_host>",
staging="<S3Config | AzureConfig | GCSConfig>",
port="<port>",
database="<database_name>",
credentials=td.UserPasswordCredentials("<db_username>", "<db_password>"),
)
@td.subscriber(
tables=["<input_table1>", "<input_table2>"],
destination=td.StarRocksDest(
conn=conn,
destination_tables=["<destination_table1>", "<destination_table2>"],
if_table_exists="<value>",
),
trigger_by=["<trigger_table1>", "<trigger_table2>"],
)
def <subscriber_name>(<table_frame1>: td.TableFrame, <table_frame2>: td.TableFrame):
<function_logic>
return <table_frame_output1>, <table_frame_output2>
Note: After defining the function, you need to register it with a Tabsdata collection and execute it. For more information, see Register a Function and Execute a Function.
Following properties are defined in the code above:
tables
<input_table1>, <input_table2>… are the names of the Tabsdata tables to be written to the external system.
destination
StarRocksConn holds the StarRocks server details and the cloud staging configuration.
host
The hostname or IP address of the StarRocks FE (Frontend) server.
staging
The cloud storage configuration for staging Parquet files before loading them into StarRocks. Must be one of
td.S3Config, td.AzureConfig, or td.GCSConfig.
S3 (Amazon S3)
staging = td.S3Config(
bucket="my-data-bucket",
region="us-east-1",
base_path="staging/starrocks",
credentials=td.S3AccessKeyCredentials(
td.EnvironmentSecret("AWS_ACCESS_KEY_ID"),
td.EnvironmentSecret("AWS_SECRET_ACCESS_KEY"),
),
)
bucket— The S3 bucket name.region— The AWS region (e.g."us-east-1").base_path— The key prefix under which staged files are stored (e.g."staging/starrocks").credentials(S3AccessKeyCredentials) — AWS access key credentials for uploading files to S3.
Azure Blob Storage
staging = td.AzureConfig(
container="my-container",
base_path="staging/starrocks",
credentials=td.AzureAccountKeyCredentials(
td.EnvironmentSecret("AZURE_ACCOUNT_NAME"),
td.EnvironmentSecret("AZURE_ACCOUNT_KEY"),
),
)
container— The Azure Blob container name.base_path— The blob path prefix for staged files (e.g."staging/starrocks").credentials(AzureAccountKeyCredentials) — Azure account key credentials for uploading files.
Google Cloud Storage (GCS)
staging = td.GCSConfig(
bucket="my-gcs-bucket",
base_path="staging/starrocks",
upload_credentials=td.GCPServiceAccountKeyCredentials(
td.EnvironmentSecret("GCP_SERVICE_ACCOUNT_JSON"),
),
hmac_credentials=td.HMACCredentials(
td.EnvironmentSecret("GCS_HMAC_ACCESS_KEY"),
td.EnvironmentSecret("GCS_HMAC_SECRET_KEY"),
),
)
bucket— The GCS bucket name.base_path— The object path prefix for staged files (e.g."staging/starrocks").upload_credentials(GCPServiceAccountKeyCredentials, optional) — GCP service account credentials for uploading files via the GCS SDK. When not provided, application default credentials (ADC) are used.hmac_credentials(HMACCredentials, optional) — GCS HMAC credentials for S3-compatible access, which StarRocks uses to read files from GCS.
[optional]``port``
The MySQL protocol port of the StarRocks FE server. Defaults to 9030.
[optional]``database``
The target database name. Defaults to "default".
[optional] credentials
UserPasswordCredentials for StarRocks authentication. When not provided, the StarRocks root user with no
password is used. For more information, see Secrets Management.
For quick testing with plain strings:
conn = td.StarRocksConn(
host="starrocks-fe.example.com",
staging=staging,
port=9030,
database="analytics",
credentials=td.UserPasswordCredentials("admin", "my_password"),
)
For production, credentials should be wrapped in EnvironmentSecret to avoid hardcoding secrets:
conn = td.StarRocksConn(
host="starrocks-fe.example.com",
staging=staging,
database="analytics",
credentials=td.UserPasswordCredentials(
td.EnvironmentSecret("STARROCKS_USER"),
td.EnvironmentSecret("STARROCKS_PASSWORD"),
),
)
[Optional] Advanced parameters
There are also some advanced options for more technical use cases:
enforce_connection_params— Controls whether the database parameter is enforced when qualifying table names.cx_src_configs_starrocks— Additional source-specific configuration parameters for the StarRocks connection.cx_dst_configs_starrocks— Additional destination-specific configuration parameters for the StarRocks connection.
destination_tables
The name of the table(s) to load data into. A single table can be provided as a string; multiple tables must be provided as a list.
Example with a single table:
destination = td.StarRocksDest(
conn=conn,
destination_tables="events",
)
Example with multiple tables:
destination = td.StarRocksDest(
conn=conn,
destination_tables=["events", "users"],
if_table_exists="replace",
)
[Optional] if_table_exists
This is an optional property to define the strategy to follow when the table already exists. 'replace' will
truncate the table before loading new rows, and 'append' will insert new rows into the existing table. Defaults
to 'append'.
If the destination table does not exist, it will be created automatically based on the schema of the data being written.
None as an input and output
A subscriber may receive and return a None value instead of TableFrames.
When a subscriber receives a None value instead of a TableFrame it means that the requested table dependency version does not exist.
When a subscriber returns a None value instead of a TableFrame it means there is no new data to write to the external system. This helps in avoiding the creation of multiple copies of the same data.
[Optional] trigger_by
<trigger_table1>, <trigger_table2>… are the names of the tables in the Tabsdata server. A new commit to any of these tables triggers the subscriber. All listed trigger tables must exist in the server before registering the subscriber.
Defining trigger tables is optional. If you don’t define the trigger_by property, the subscriber will be triggered by any of its input tables. If you define the trigger_by property, then only those tables listed in the property can automatically trigger the subscriber.
For more information, see Working with Triggers.
<subscriber_name>
<subscriber_name> is the name for the subscriber that you are configuring.
<function_logic>
<function_logic> governs the processing performed by the subscriber. You can specify function logic to be a simple write or to perform additional processing as needed. For more information about the function logic that you can include, see Working with Tables.
<table_frame1>, <table_frame2>… are the names for the variables that temporarily store source data for processing.
<table_frame_output1>, <table_frame_output2>… are the output from the function that are written to the external system.
Writing to Multiple Tables
When writing to multiple tables, the subscriber function must return one result per destination table, in the same
order as they are listed in destination_tables:
@td.subscriber(
name="write_to_starrocks",
tables="collection/raw_data",
destination=td.StarRocksDest(
conn=conn,
destination_tables=["events", "users"],
),
)
def write_to_starrocks(df: td.TableFrame):
events = df.filter(td.col("type") == "event")
users = df.filter(td.col("type") == "user")
return events, users
Skipping a Table with None
If a subscriber returns None for a particular output, the corresponding destination table is skipped entirely
(not created, not written to):
@td.subscriber(
name="conditional_write",
tables="collection/data",
destination=td.StarRocksDest(
conn=conn,
destination_tables=["primary", "secondary"],
),
)
def conditional_write(df: td.TableFrame):
primary = df.drop_nulls()
return primary, None # "secondary" table is skipped
Data Drift Support
This section talks about how Tabsdata handles data drift in the output data for this Subscriber connector.
Here, the schema from the first execution (or pre-existing table) is preserved irrespective of the value of any
function property such as if_table_exists.
Here is how the system will respond to various kinds of changes due to data drift:
New columns introduced by data drift are ignored.
Columns removed from the source remain in the table, with missing values populated as
NULL.Changes to column data types (type drift) cause execution to fail, as the subscriber does not automatically reconcile type mismatches.
Important Notes
Timestamp precision
StarRocks DATETIME has second-level precision. Sub-second precision from source data will be truncated when
writing to StarRocks. If sub-second precision is important, consider storing timestamps as strings.
Unsigned integers
StarRocks does not have unsigned integer types. Unsigned integer columns are automatically mapped to the next wider
signed type to preserve the full value range (e.g. UInt8 becomes SMALLINT, UInt64 becomes
LARGEINT).
Table creation
When a destination table does not exist, it is automatically created with a DUPLICATE KEY model using the first
column as the key and hash distribution column. If you need a different table structure (PRIMARY KEY, specific
distribution), create the table manually in StarRocks before running the function.