AWS Glue - Iceberg tables#

You can use a subscriber to write tables from the Tabsdata server to Amazon S3 as parquet files and add those files to AWS Glue Iceberg tables.

Example (Subscriber - AWS Glue - Iceberg tables)#

Here is an example subscriber named write_employees. It reads departments table and multiple employees tables from Tabsdata. Subsequently, it writes the tables to the output HR folder. This subscriber executes automatically as soon as a new commit occurs on any of its input tables.

import tabsdata as td

s3_credentials = td.S3AccessKeyCredentials(
    aws_access_key_id=td.HashiCorpSecret("path-to-secret","S3_ACCESS_KEY"),
    aws_secret_access_key=td.HashiCorpSecret("path-to-secret","S3_SECRET_KEY"),

catalog_definition =
[
    {
    "name": "default",
    "type": "glue",
    "client.region": "us-east-1",
    }
]

@td.subscriber(
    tables=["departments", "employees_1", "employees_2"],
    destination=td.S3Destination(
        [
            "s3://opt/hr/departments.parquet",
            "s3://opt/hr/employees_*.parquet",
        ],
        s3_credentials,
        region = "us-east-1",
        catalog = td.AWSGlue(
            definition=catalog_definition,
            tables=[
                "iceberg_namespace.table",
                "iceberg_namespace_2.table",
                "iceberg_namespace_3.table",
            ],
        credentials = s3_credentials,
        ....
    ),
    ),
)
def write_employees(tf1: td.TableFrame, tf2: td.TableFrame, tf3: td.TableFrame):
    return tf1, tf2, tf3

Where:

S3_ACCESS_KEY is the value of your Amazon S3 access key.

S3_SECRET_KEY is the value of your Amazon S3 secret key.

Note: After defining the function, you need to register it with a Tabsdata collection. For more information, see here.

Setup (Subscriber - AWS Glue - Iceberg tables)#

The following code uses placeholder values for defining a subscriber to write tables from the Tabsdata server to Amazon S3 as parquet files and add those files to AWS Glue Iceberg tables.

import tabsdata as td

s3_credentials = td.S3AccessKeyCredentials(
    aws_access_key_id=td.HashiCorpSecret("path-to-secret","S3_ACCESS_KEY"),
    aws_secret_access_key=td.HashiCorpSecret("path-to-secret","S3_SECRET_KEY"),

@td.subscriber(
    tables=["<input_table1>", "<input_table2>"],
    destination=td.S3Destination(
        ["s3://<path_to_file1>", "s3://<path_to_file2>"],
        credentials = s3_credentials,
        region = "<region_name>",
        catalog = td.AWSGlue(
                definition = "<definition>",
                tables = ["<table_1>","<table_2>"],
                auto_create_at = ["<auto_create_1>","<auto_create_2>"],
                if_table_exists = "<if_table_exists_append_replace>",
                partitioned_table = "<partitioned_table_bool>",
                schema_strategy = "<schema_strategy_update_strict>",
                ),
                ),
    trigger_by=["<trigger_table1>", "<trigger_table2>"]),
def <subscriber_name>(<table_frame1>: td.TableFrame, <table_frame2>: td.TableFrame):
    <function_logic>
    return <table_frame_output1>, <table_frame_output2>

Following properties are defined in the setup code above:

tables#

<input_table1>, <input_table2>… are the names of the Tabsdata tables to be written to the external system.

destination#

<path_to_files>#

<path_to_file1>, <path_to_file2>… are the full system directory paths to the parquet files to write. Please note files must be Parquet files.

You can specify as many file paths as needed.

You can define the destination files in the following ways:

File Path

To write by file path where the file extension is included as part of the file path, define the destination as follows:

destination=td.S3Destination(["s3://<path_to_file1.parquet>","s3://<path_to_file2.parquet>"], s3_credentials),

<path_to_file1.parquet>… have the extensions of the file included in the file name as part of the path.

File Format

To write files by file format where format is declared separately and not in the file name, define the destination as follows:

destination=td.S3Destination([
        "s3://<path_to_file1_no_extension>",
        "s3://<path_to_file2_no_extension>",
        ], s3_credentials, format="parquet"),

"<path_to_file1_no_extension>", "<path_to_file2_no_extension>"… don’t have the extension in the file name. The extension to all files is mentioned separately in format property.

s3_credentials#

A subscriber needs credentials to write files from Amazon S3. Here the value is defined using a variable s3_credentials. The variable is an object of class S3AccessKeyCredentials with following values.

S3_ACCESS_KEY is the value of your Amazon S3 access key.

S3_SECRET_KEY is the value of your Amazon S3 secret key.

path-to-secret is the name of the default key value store in HashiCorp where the credential values are required to be stored.

You can use different ways to store the credentials which are highlighted here in the documentation.

region#

"<region_name>" is where your S3 bucket is located.

catalog#

This property is used to define the properties of your catalog.

definition#

This property defines the catalog. It is of dictionary format, and matches the pyiceberg catalog definition. The value is simply passed through.

tables#

These are the names of the Iceberg tables we want to add the parquet files to. They need to have the same number as the parquet files being created in S3.

auto_create_at#

If you don’t specify auto_create_at the table must exist in the Iceberg catalog. If you want the publisher to create the table if it does not exists, in the auto_create_at you have to provide an S3 URI where the entry point of Iceberg metadata of the table will be created. Typically you would put the base path where you write the parquet files.

if_table_exists#

This is an optional property to define the strategy to follow when the table already exists. replace will create a new table in Iceberg, overwriting the existing one, and append will append to the existing data in the table. Defaults to append.

partitioned_table#

This option indicates that the Iceberg table definition corresponds to a partitioned table.

If the Iceberg table is a partitioned table, the subscriber function should return an array of TableFrames with each of those TableFrames containing data for one partition only. It is the responsibility of the user publisher creator to partition the data accordingly before returning it.

Partition tables can only be used with already existing (and partitioned) Iceberg tables. auto_create_at is not supported, and when using partition tables the if_table_exists strategy must be append.

schema_strategy#

Schema strategy indicates if the schema in iceberg will be modified (augmented) if necessary to be compatible with the parquet files being uploaded. if using strict, it will fail if the schemas differ.

Important Points to note:#

  • Existing tables in AWS Glue can have their schema updated (augmented) to match the written parquet files.

  • Tables can be automatically created in AWS Glue using Tabsdata table schema.

  • The parquet files are registered with AWS Glue as external files for a table.

  • Future versions will add support for replacing partitions in Iceberg partitioned tables.

trigger_by#

[optional] <trigger_table1>, <trigger_table2>… are the names of the tables in the Tabsdata server. A new commit to any of these tables triggers the subscriber. All listed trigger tables must exist in the server before registering the subscriber.

Defining trigger tables is optional. If you don’t define the trigger_by property, the subscriber will be triggered by any of its input tables. If you define the trigger_by property, then only those tables listed in the property can automatically trigger the subscriber.

For more information, see Working with Triggers.

<subscriber_name>#

<subscriber_name> is the name for the subscriber that you are configuring.

<function_logic>#

<function_logic> governs the processing performed by the subscriber. You can specify function logic to be a simple write or to perform additional processing as needed. For more information about the function logic that you can include, see Working with Tables.

<table_frame1>, <table_frame2>… are the names for the variables that temporarily store source data for processing.

<table_frame_output1>, <table_frame_output2>… are the output from the function that are written to the external system.