Databricks

You can use this built-in Tabsdata publisher to read from Databricks using SQL queries.

Installing the connector

To work with the Databricks connector you are required to install the dependencies separately. This is to manage the size of the core package. Please run the following command in your terminal to install the dependencies.

$ pip install 'tabsdata[databricks]'

Example (Publisher - Databricks)

The following example creates a publisher named read_sales. This publisher runs two queries to the database at the specified URL. It writes the results of the queries to two Tabsdata tables. The publisher is triggered by each new commit to the new_customer table, and writes data to the output tables in the Tabsdata server without any modification. Using initial_values and id, the publisher picks up new data for every run.

import tabsdata as td

# HashiCorp is a type of secret store, ideal for production use cases.
# For rapid testing, you can assign the credential values directly. You can also use environment variables as
# detailed here: https://docs.tabsdata.com/latest/guide/secrets_management/env_variables/main.html.
databricks_token = td.HashiCorpSecret("path-to-secret", "DATABRICKS_TOKEN")

@td.publisher(
    source=td.DatabricksSource(
        host_url="https://adb-1234567890123456.7.azuredatabricks.net",
        token=databricks_token,
        queries=[
            "select * from hive_metastore.default.INVOICE_VENDOR where id > :number",
            "select * from hive_metastore.default.INVOICE_ITEM where id > :number",
        ],
        warehouse="my_sql_warehouse",
        initial_values={"number": 2},
    ),
    tables=["vendors", "items"],
    trigger_by="new_customer",
)
def import_sales(tf1: td.TableFrame, tf2: td.TableFrame):
    id = tf1.select(td.col("id").max()).item()
    return tf1, tf2, {"number": str(id)}

Note: After defining the function, you need to register it with a Tabsdata collection and execute it. For more information, see Register a Function and Execute a Function.

Setup (Publisher - Databricks)

The following code uses placeholder values for defining a publisher that reads data from Databricks and publishes it to Tabsdata tables:

import tabsdata as td


@td.publisher(
    source=td.DatabricksSource(
        host_url: str,                  # Databricks URL
        token: str|Secret,              # Databricks Personal Account Token (PAT)
        queries: list[str]|str,         # SQL queries to execute
        initial_values: dict|None,      # Initial values for query parameters
        warehouse: str|None,            # SQL warehouse name (use either this or warehouse_id)
        warehouse_id: str|None,         # Use null if warehouse name is provided
    ),
    tables=["<output_table1>", "<output_table2>"],
    on_tables=[...],
    trigger_by=["<trigger_table1>", "<trigger_table2>"],
)
def <publisher_name>(<table_frame1>:td.TableFrame, <table_frame2>:td.TableFrame):
    <function_logic>
    return <table_frame_output1>, <table_frame_output2>

Note: After defining the function, you need to register it with a Tabsdata collection and execute it. For more information, see Register a Function and Execute a Function.

Following properties are defined in the code above:

source

host_url

host_url is the Databricks URL.

token

token is the Databricks Personal Account Token (PAT). The user owning the PAT requires ‘USE CATALOG’, ‘USE SCHEMA’ & ‘SELECT’ privileges. You can use different ways to store the credentials which are highlighted here in the documentation.

queries

<db_query1>, <db_query2>,… are the SQL queries to execute on Databricks. Each query will produce one output. Add as many as needed. Queries can contain placeholders in the form :param_name that will be replaced with the values from initial_values.

[Optional] initial_values

<variable_name1>, <variable_name2>,… are the variable names included in the queries.

<variable_value1>, <variable_value2>,… are the values to assign to the defined variables.

These values are updated between runs to support incremental ingestion.

If you are using initial_values, the function can return the initial values for the next run. The last parameter in the function output, after the tableframes, can be a dictionary with the initial values for the next run.

[Optional] warehouse

warehouse is the name of the SQL warehouse that will be used for executing queries. Use either this or warehouse_id. The user has to have privileges to use the warehouse.

[Optional] warehouse_id

warehouse_id is the warehouse Id. Use null if warehouse name is provided.

tables

<output_table1>, <output_table2>… are the names of the Tabsdata tables to publish to.

None as an input and output

A publisher may receive and return a None value instead of a TableFrame parameter or return value.

When a publisher receives a None value instead of a TableFrame it means the absence of input data. For example, the system didn’t return any results because of the file being absent or query returning empty results.

When a publisher returns a None value instead of a TableFrame it means that the associated table will preserve the existing data from the previous commit. This helps in avoiding the creation of multiple copies of the same data.

on_tables

Tabsdata Data Quality (DQ) parameter on_tables lets you attach declarative quality checks to the output of publishers and transformers. Every run inspects the data that was just produced, enriches it with quality signals, and optionally quarantines or rejects rows that fail your criteria.

For more details, check Data Quality.

[Optional] trigger_by

<trigger_table1>, <trigger_table2>… are the names of the tables in the Tabsdata server. A new commit to any of these tables triggers the publisher. This can be relevant in cases where you want to import a data if something else in the organization changes. e.g trigger the import of latest manufacturing data in the company if a new supplier get added added. While a new supplier would not be a direct input to the publisher importing manufacturing data, it can still trigger the function.

All listed trigger tables must exist in the server before registering the publisher.

Defining trigger tables is optional. If you don’t define the trigger_by property, the publisher can only be triggered manually.

For more information, see Working with Triggers.

<publisher_name>

<publisher_name> is the name for the publisher that you are configuring.

<function_logic>

<function_logic> governs the processing performed by the publisher. You can specify function logic to be a simple write or to perform additional processing, such as dropping nulls, before writing data to output tables. For more information about the function logic that you can include, see Working with Tables.

<table_frame1>, <table_frame2>… are the names for the variables that temporarily store source data for processing.

<table_frame_output1>, <table_frame_output2>… are the output from the function that are stored as Tabsdata tables with names as defined in the tables property. Consequently, the number of tables returned from the function have to exactly match the number of tables defined in the tables property.

<new_value1>, <new_value2>.. are the revised values of variable_names as defined in function logic. This enables you to have dynamic allocation of variable values for usage in queries. You can return a new dictionary e.g. {"<variable_name1>": <new_value1>, "<variable_name2>": <new_value2>} to change the initial values for processing new data, or return SAME to keep the current initial values.

Data Drift Support

This section talks about how Tabsdata handles data drift in the input data for this Publisher connector.

This connector natively supports data drift as Tabsdata automatically creates a new version of the table with each function execution. Hence, schema changes in the input data won’t cause the function to fail. Any modifications are captured in a newly versioned output table generated by the publisher.

However, this applies only if the publisher function does not contain schema-dependent logic. If such logic exists, changes in the schema may conflict with the function’s expectations, leading to execution errors.