Working with Connector Plugins#
To connect with sources and destinations that are not currently supported natively in Tabsdata, you can use these connector plugins.
Working with Source Plugin#
The SourcePlugin
class enables reading from sources which do not have built-in support in Tabsdata.
Here is an example of the SourcePlugin
class in use.
import os
import polars as pl
import requests
import tabsdata as td
class PyPIPkgStatsSource(td.SourcePlugin):
def __init__(self, package_name: str):
self.package_name = package_name
def trigger_input(self, working_dir: str) -> str:
# Endpoint with the downloads information of the package
base_endpoint = f"https://pypistats.org/api/packages/{self.package_name}"
# Get the downloads by system
downloads_by_system = requests.get(f"{base_endpoint}/system").json().get("data")
# Store the information
destination_file = "data.parquet"
# Full path to file including the working directory of the plugin
destination_path = os.path.join(working_dir, destination_file)
# Using Polars to create the target file 'data.parquet' with the information
# retrieved from the API
pl.DataFrame(downloads_by_system).write_parquet(destination_path)
return destination_file
@td.publisher(
source=PyPIPkgStatsSource("polars"),
tables="output",
)
def input_plugin_from_pypi(df: td.TableFrame):
return df
In the above example, you create a PyPIPkgStatsSource
class that inherits from the SourcePlugin
class.
The PyPIPkgStatsSource
class takes a Python package name as an input, retrieves the statistics related to the package, stores the data in a file on the server, and returns that file to the publisher. You use the PyPIPkgStatsSource
class while defining the source property of the publisher.
Working with Destination Plugin#
The DestinationPlugin
class enables writing to destinations which do not have built-in support in Tabsdata.
Here is an example of the DestinationPlugin
class in use.
import os
import tempfile
import polars as pl
from google.cloud import storage
import tabsdata as td
class GCPFileUpload(td.DestinationPlugin):
def __init__(self, bucket_name: str, gcp_credentials_path: str):
self.bucket_name = bucket_name
self.gcp_credentials_path = gcp_credentials_path
def trigger_output(self, lf: pl.LazyFrame):
# Set the GCP credentials path
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.gcp_credentials_path
# Create a temporary file
with tempfile.NamedTemporaryFile(suffix=".csv") as tmp_file:
temp_file_path = tmp_file.name
lf.sink_csv(temp_file_path)
# Extract filename
destination_file = os.path.basename(temp_file_path)
# Upload the file to GCP Storage
client = storage.Client()
bucket = client.bucket(self.bucket_name)
blob = bucket.blob(destination_file)
blob.upload_from_filename(temp_file_path)
@td.subscriber(
"data",
GCPFileUpload("<gcp-bucket-name>", "<path_to_gcp_credentials.json>"),
)
def subscriber(df: pl.DataFrame):
return df
Where,
<gcp-bucket-name>
is the name of your GCP bucket.<path_to_gcp_credentials.json>
is the full system path (typically starting with /users/user_name) to your gcp credentials file.
In the above example, you create a GCPFileUpload
class that inherits from the DestinationPlugin
class.
The GCPFileUpload
class takes a GCP bucket name and credentials path as input, converts the table into a CSV and stores the csv file in the GCP bucket. Finally, you use the GCPFileUpload
class while defining the destination for a subscriber.