Postgres#
You can use this built-in Tabsdata subscriber to write to a Postgres database.
Installing the connector dependencies#
To work with the PostgreSQL connector you are required to install the dependencies separately which can be done by running the following command.
$ pip install psycopg2-binary==2.9.10
Example (Subscriber - Postgres)#
The following example creates a subscriber named write_sales
. It writes two Tabsdata tables to the database. The subscriber is automatically triggered by a new commit to any of its input tables, and writes data to the database without any modification.
import tabsdata as td
db_username = td.HashiCorpSecret("path-to-secret", "DB_USERNAME")
db_password = td.HashiCorpSecret("path-to-secret", "DB_PASSWORD")
@td.subscriber(
tables=["vendors", "items"],
destination=td.PostgresDestination(
uri="postgres://127.0.0.1:5432/sales_db",
destination_table=["vendors", "items"],
credentials=td.UserPasswordCredentials(db_username, db_password),
if_table_exists="replace",
),
)
def write_sales(tf1: td.TableFrame, tf2: td.TableFrame):
return tf1, tf2
Note: After defining the subscriber, you need to register it with a Tabsdata collection. For more information, see Register a Function.
Setup (Subscriber - Postgres)#
The following code uses placeholder values for defining a subscriber that reads Tabsdata tables and writes to MariaDB database:
import tabsdata as td
@td.subscriber(
tables=["<input_table1>", "<input_table2>"],
destination=td.PostgresDestination(
uri="postgres://<path_to_db>",
destination_table=["<destination_table1>", "<destination_table2>"],
credentials=td.UserPasswordCredentials(
"<db_username>", "<db_passowrd>"
),
if_table_exists="<value>",
),
trigger_by=["<trigger_table1>", "<trigger_table2>"],
)
def <subscriber_name>(<table_frame1>:td.TableFrame, <table_frame2>:td.TableFrame):
<function_logic>
return <table_frame_output1>, <table_frame_output2>
Note: After defining the subscriber, you need to register it with a Tabsdata collection. For more information, see Register a Function.
Following properties are defined in the code above:
tables#
<input_table1>
, <input_table2>
… are the names of the Tabsdata tables to be written to the external system.
destination#
uri#
<path_to_db>
is the URI to the database.
destination_table#
<destination_table1>
, <destination_table2>
are the tables to create or update in the database.
credentials#
<db_username>
and <db_password>
are the username and password to log in to the database respectively. You can use different ways to store the credentials which are highlighted here in the documentation.
[Optional] if_table_exists#
This is an optional property to define the strategy to follow when the table already exists. ‘replace’ will create a new database table, overwriting the existing one, and ‘append’ will append to the existing data in the table. Defaults to ‘append’.
None
as an input and output#
A subscriber may receive and return a None
value instead of TableFrames.
When a subscriber receives a None
value instead of a TableFrame it means that the requested table dependency version does not exist.
When a subscriber returns a None
value instead of a TableFrame it means there is no new data to write to the external system. This helps in avoiding the creation of multiple copies of the same data.
[Optional] trigger_by#
<trigger_table1>
, <trigger_table2>
… are the names of the tables in the Tabsdata server. A new commit to any of these tables triggers the subscriber. All listed trigger tables must exist in the server before registering the subscriber.
Defining trigger tables is optional. If you don’t define the trigger_by
property, the subscriber will be triggered by any of its input tables. If you define the trigger_by
property, then only those tables listed in the property can automatically trigger the subscriber.
For more information, see Working with Triggers.
<subscriber_name>#
<subscriber_name>
is the name for the subscriber that you are configuring.
<function_logic>#
<function_logic>
governs the processing performed by the subscriber. You can specify function logic to be a simple write or to perform additional processing as needed. For more information about the function logic that you can include, see Working with Tables.
<table_frame1>
, <table_frame2>
… are the names for the variables that temporarily store source data for processing.
<table_frame_output1>
, <table_frame_output2>
… are the output from the function that are written to the external system.
Data Drift Support#
This section talks about how Tabsdata handles data drift in the input data for this Subscriber connector.
The behaviour depends on how if_table_exists
is configured in the subscriber.
When if_table_exists=replace
is configured, the existing database table is dropped and recreated during each execution. This ensures that the table schema always matches the latest exported data, including any type changes.
When if_table_exists=append
is configured, the subscriber does not modify the existing schema. Any schema changes (added, removed, or modified columns) will cause execution to fail. In this mode, the subscriber function must ensure consistent column definitions across executions e.g., by explicitly specifying the columns to export.
Also, this applies only if the subscriber function does not contain schema-dependent logic. If such logic exists, changes in the schema may conflict with the function’s expectations, leading to execution errors irrespective of the configuration of if_table_exists
.