Working with Connector Plugins#

To connect with sources and destinations that are not currently supported natively in Tabsdata, you can use these connector plugins.

Working with Source Plugin#

The SourcePlugin class enables reading from sources which do not have built-in support in Tabsdata.

Here is an example of the SourcePlugin class in use.

import os

import polars as pl
import requests

import tabsdata as td


class PyPIPkgStatsSource(td.SourcePlugin):
    def __init__(self, package_name: str):
        self.package_name = package_name

    def trigger_input(self, working_dir: str) -> str:
        # Endpoint with the downloads information of the package
        base_endpoint = f"https://pypistats.org/api/packages/{self.package_name}"

        # Get the downloads by system
        downloads_by_system = requests.get(f"{base_endpoint}/system").json().get("data")

        # Store the information
        destination_file = "data.parquet"
        # Full path to file including the working directory of the plugin
        destination_path = os.path.join(working_dir, destination_file)
        # Using Polars to create the target file 'data.parquet' with the information
        # retrieved from the API
        pl.DataFrame(downloads_by_system).write_parquet(destination_path)
        return destination_file


@td.publisher(
    source=PyPIPkgStatsSource("polars"),
    tables="output",
)
def input_plugin_from_pypi(df: td.TableFrame):
    return df

In the above example, you create a PyPIPkgStatsSource class that inherits from the SourcePlugin class.

The PyPIPkgStatsSource class takes a Python package name as an input, retrieves the statistics related to the package, stores the data in a file on the server, and returns that file to the publisher. You use the PyPIPkgStatsSource class while defining the source property of the publisher.

Working with Destination Plugin#

The DestinationPlugin class enables writing to destinations which do not have built-in support in Tabsdata.

Here is an example of the DestinationPlugin class in use.

import os
import tempfile

import polars as pl
from google.cloud import storage

import tabsdata as td


class GCPFileUpload(td.DestinationPlugin):
    def __init__(self, bucket_name: str, gcp_credentials_path: str):
        self.bucket_name = bucket_name
        self.gcp_credentials_path = gcp_credentials_path

    def trigger_output(self, lf: pl.LazyFrame):
        # Set the GCP credentials path
        os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.gcp_credentials_path

        # Create a temporary file
        with tempfile.NamedTemporaryFile(suffix=".csv") as tmp_file:
            temp_file_path = tmp_file.name
            lf.sink_csv(temp_file_path)
            # Extract filename
            destination_file = os.path.basename(temp_file_path)
            # Upload the file to GCP Storage
            client = storage.Client()
            bucket = client.bucket(self.bucket_name)
            blob = bucket.blob(destination_file)
            blob.upload_from_filename(temp_file_path)


@td.subscriber(
    "data",
    GCPFileUpload("<gcp-bucket-name>", "<path_to_gcp_credentials.json>"),
)
def subscriber(df: pl.DataFrame):
    return df

Where,

  • <gcp-bucket-name> is the name of your GCP bucket.

  • <path_to_gcp_credentials.json> is the full system path (typically starting with /users/user_name) to your gcp credentials file.

In the above example, you create a GCPFileUpload class that inherits from the DestinationPlugin class.

The GCPFileUpload class takes a GCP bucket name and credentials path as input, converts the table into a CSV and stores the csv file in the GCP bucket. Finally, you use the GCPFileUpload class while defining the destination for a subscriber.