Working with Connector Plugins#

To connect with sources and destinations that are not currently supported natively in Tabsdata, you can use these connector plugins.

Working with Source Plugin#

The SourcePlugin class enables reading from sources which do not have built-in support in Tabsdata.

Here is an example of the SourcePlugin class in use.

import os

import polars as pl
import requests

import tabsdata as td

class PyPIPkgStatsSource(td.SourcePlugin):
    def __init__(self, package_name: str):
        self.package_name = package_name

    def trigger_input(self, working_dir: str) -> str:
        # Endpoint with the downloads information of the package
        base_endpoint = f"{self.package_name}"

        # Get the downloads by system
        downloads_by_system = requests.get(f"{base_endpoint}/system").json().get("data")

        # Store the information
        destination_file = "data.parquet"
        # Full path to file including the working directory of the plugin
        destination_path = os.path.join(working_dir, destination_file)
        # Using Polars to create the target file 'data.parquet' with the information
        # retrieved from the API
        return destination_file

def input_plugin_from_pypi(df: td.TableFrame):
    return df

In the above example, you create a PyPIPkgStatsSource class that inherits from the SourcePlugin class.

The PyPIPkgStatsSource class takes a Python package name as an input, retrieves the statistics related to the package, stores the data in a file on the server, and returns that file to the publisher. You use the PyPIPkgStatsSource class while defining the source property of the publisher.

Working with Destination Plugin#

The DestinationPlugin class enables writing to destinations which do not have built-in support in Tabsdata.

Here is an example of the DestinationPlugin class in use.

import os
import tempfile

import polars as pl
from import storage

import tabsdata as td

class GCPFileUpload(td.DestinationPlugin):
    def __init__(self, bucket_name: str, gcp_credentials_path: str):
        self.bucket_name = bucket_name
        self.gcp_credentials_path = gcp_credentials_path

    def trigger_output(self, lf: pl.LazyFrame):
        # Set the GCP credentials path
        os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.gcp_credentials_path

        # Create a temporary file
        with tempfile.NamedTemporaryFile(suffix=".csv") as tmp_file:
            temp_file_path =
            # Extract filename
            destination_file = os.path.basename(temp_file_path)
            # Upload the file to GCP Storage
            client = storage.Client()
            bucket = client.bucket(self.bucket_name)
            blob = bucket.blob(destination_file)

    GCPFileUpload("<gcp-bucket-name>", "<path_to_gcp_credentials.json>"),
def subscriber(df: pl.DataFrame):
    return df


  • <gcp-bucket-name> is the name of your GCP bucket.

  • <path_to_gcp_credentials.json> is the full system path (typically starting with /users/user_name) to your gcp credentials file.

In the above example, you create a GCPFileUpload class that inherits from the DestinationPlugin class.

The GCPFileUpload class takes a GCP bucket name and credentials path as input, converts the table into a CSV and stores the csv file in the GCP bucket. Finally, you use the GCPFileUpload class while defining the destination for a subscriber.