tabsdata
- class AWSGlue(
- definition: dict,
- tables: str | list[str],
- auto_create_at: list[str | None] | str | None = None,
- if_table_exists: Literal['append', 'replace'] = 'append',
- partitioned_table: bool = False,
- schema_strategy: Literal['update', 'strict'] = 'update',
- s3_credentials: S3Credentials = None,
- s3_region: str = None,
- **kwargs,
Bases:
Catalog- property definition: dict
- property if_table_exists: Literal['append', 'replace']
The strategy to follow when the table already exists.
- Type:
- property s3_credentials: S3Credentials | None
- property schema_strategy: Literal['update', 'strict']
The strategy to follow when appending to a table with an existing schema.
- Type:
- class AvroFormat(
- chunk_size: int = 50000,
Bases:
FileFormat- Categories:
file-format
Avro file format.
- class AwsGlueSchemaRegistry(
- **kwargs,
Bases:
KafkaSchemaRegistryConfiguration for AWS Glue Schema Registry.
- property configs: dict
Additional configuration properties for the AWS Glue client.
- property credentials: S3AccessKeyCredentials
Credentials for authenticating with AWS Glue.
- property region_name: str
The AWS region of the Schema Registry.
- property registry_name: str
Registry name for AWS Glue Schema Registry.
- property schema_name: str
Schema name for AWS Glue Schema Registry.
- AwsGlueSchemaRegistryCredentialsSpec
alias of
S3AccessKeyCredentials
- class AzureAccountKeyCredentials( )
Bases:
AzureCredentials- Categories:
credentials
Credentials class to access Azure using account key credentials (account name and account key).
- Variables:
account_name (Secret) – The Azure account name.
account_key (Secret) – The Azure account key.
dict (to_dict() ->) – Convert the AzureAccountNameKeyCredentials object to a dictionary
- property account_key: Secret
The Azure account key.
- Type:
Secret
- property account_name: Secret
The Azure account name.
- Type:
Secret
- class AzureConfig(
- container: str,
- base_path: str,
- credentials: AzureAccountKeyCredentials,
Bases:
objectConfiguration for staging files in Azure Blob Storage.
- Parameters:
container – Blob container name.
base_path – Blob path prefix for staged files (e.g.
"staging/clickhouse").credentials – Azure account-key credentials used for uploading staged files (via
azure-storage-blob).
- base_path: str
- container: str
- credentials: AzureAccountKeyCredentials
- class AzureDestination( )
Bases:
DestinationPlugin- Categories:
destination
Azure-file-based data outputs.
- class SupportedFormats(
- *values,
Bases:
EnumEnum for the supported formats for the AzureDestination.
- avro = <class 'tabsdata._format.AvroFormat'>
- csv = <class 'tabsdata._format.CSVFormat'>
- ndjson = <class 'tabsdata._format.NDJSONFormat'>
- parquet = <class 'tabsdata._format.ParquetFormat'>
- chunk(
- working_dir: str,
- *results,
Trigger the exporting of the data to local parquet chunks. This method will receive the resulting data from the user function and must store it in the local system as parquet files, using the working_dir. Note: This method should not materialize the data, it should only store it in the local system.
- Parameters:
working_dir (str) – The folder where any files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
A list of the intermediate files created
- property credentials: AzureCredentials
The credentials required to access Azure.
- Type:
AzureCredentials
- property format: FileFormat
The format of the file. If not provided, it will be inferred from the file extension of the URI.
- Type:
FileFormat
- stream(
- working_dir: str,
- *results,
Trigger the exporting of the data. This method will receive the resulting data from the user function and must store it in the desired location. Note: this method might materialize the data provided in a single chunk generated by the chunk function if invoked, so chunks should be of an appropriate size.
- Parameters:
working_dir (str) – The folder where any intermediate files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
None
- AzureDestination.chunk(
- working_dir: str,
- *results,
Trigger the exporting of the data to local parquet chunks. This method will receive the resulting data from the user function and must store it in the local system as parquet files, using the working_dir. Note: This method should not materialize the data, it should only store it in the local system.
- Parameters:
working_dir (str) – The folder where any files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
A list of the intermediate files created
- AzureDestination.stream(
- working_dir: str,
- *results,
Trigger the exporting of the data. This method will receive the resulting data from the user function and must store it in the desired location. Note: this method might materialize the data provided in a single chunk generated by the chunk function if invoked, so chunks should be of an appropriate size.
- Parameters:
working_dir (str) – The folder where any intermediate files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
None
- class AzureSource(
- uri: str | list[str],
- credentials: AzureCredentials,
- format: str | FileFormat = None,
- initial_last_modified: str | datetime = None,
- file_info: str | FileInfo | None = None,
Bases:
SourcePlugin- Categories:
source
Azure-file-based data inputs.
- class SupportedFormats(
- *values,
Bases:
EnumEnum for the supported formats for the AzureSource.
- avro = <class 'tabsdata._format.AvroFormat'>
- csv = <class 'tabsdata._format.CSVFormat'>
- log = <class 'tabsdata._format.LogFormat'>
- ndjson = <class 'tabsdata._format.NDJSONFormat'>
- parquet = <class 'tabsdata._format.ParquetFormat'>
- chunk(
- working_dir: str,
Trigger the import of the data. This must be implemented in any class that inherits from this class unless directly implementing streaming. The method will receive a folder where it must store the data as parquet files, and return a list of the paths of the files created. This files will then be loaded and mapped to the dataset function in positional order, so if you want file.parquet to be the first argument of the dataset function, you must return it first. If you want a parameter to receive multiple files, return a list of the paths. For example, you would give the following return to provide a first argument with a single file and a second argument with two files:return [“file1.parquet”, [“file2.parquet”, “file3.parquet”]]
- property credentials: AzureCredentials
The credentials required to access Azure.
- Type:
AzureCredentials
- property file_info: FileInfo | None
The file information to add as system columns.
- Type:
FileInfo | None
- property format: FileFormat
The format of the file. If not provided, it will be inferred from the file extension of the data.
- Type:
FileFormat
- property initial_last_modified: str
The date and time after which the files were modified.
- Type:
- property initial_values: dict
Return a dictionary with the initial values to be stored after execution of the plugin. They will be accessible in the next execution of the plugin. The dictionary must have the parameter names as keys and the initial values as values, all the type string.
- Returns:
A dictionary with the initial values of the parameters of the plugin.
- Return type:
- AzureSource.chunk(
- working_dir: str,
Trigger the import of the data. This must be implemented in any class that inherits from this class unless directly implementing streaming. The method will receive a folder where it must store the data as parquet files, and return a list of the paths of the files created. This files will then be loaded and mapped to the dataset function in positional order, so if you want file.parquet to be the first argument of the dataset function, you must return it first. If you want a parameter to receive multiple files, return a list of the paths. For example, you would give the following return to provide a first argument with a single file and a second argument with two files:return [“file1.parquet”, [“file2.parquet”, “file3.parquet”]]
- class BigQueryConn(
- gcs_folder: str,
- credentials: GCPCredentials,
- project: str = None,
- dataset: str = None,
- enforce_connection_params: bool = True,
- cx_dst_configs_gcs: dict = None,
- cx_dst_configs_bigquery: dict = None,
Bases:
ConnFor connection configuration to BigQueryDest.
- property credentials: GCPCredentials
- property cx_dst_configs_bigquery: dict
- property cx_dst_configs_gcs: dict
- property enforce_connection_params: bool
- property gcs_folder: str
- class BigQueryDest(
- conn: BigQueryConn,
- tables: TableSpec | None = None,
- if_table_exists: IfTableExistStrategySpec = 'append',
- schema_strategy: SchemaStrategySpec = 'update',
Bases:
DestinationPlugin- Categories:
destination
BigQuery based data outputs. The data is first stored in parquet files in a GCS bucket, and then loaded into the BigQuery tables.
- property conn: BigQueryConn
- property if_table_exists: Literal['append', 'replace']
The strategy to follow when the table already exists.
- Type:
- property schema_strategy: Literal['update', 'strict']
The strategy to follow when appending to a table with an existing schema.
- Type:
- stream( )
Trigger the exporting of the data. This method will receive the resulting data from the user function and must store it in the desired location. Note: this method might materialize the data provided in a single chunk generated by the chunk function if invoked, so chunks should be of an appropriate size.
- Parameters:
working_dir (str) – The folder where any intermediate files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
None
- BigQueryDest.stream( )
Trigger the exporting of the data. This method will receive the resulting data from the user function and must store it in the desired location. Note: this method might materialize the data provided in a single chunk generated by the chunk function if invoked, so chunks should be of an appropriate size.
- Parameters:
working_dir (str) – The folder where any intermediate files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
None
- class CSVFormat(
- separator: str | int = ',',
- quote_char: str | int = '"',
- eol_char: str | int = '\n',
- input_encoding: str = 'Utf8',
- input_null_values: list | None = None,
- input_missing_is_null: bool = True,
- input_truncate_ragged_lines: bool = False,
- input_comment_prefix: str | int | None = None,
- input_try_parse_dates: bool = False,
- input_decimal_comma: bool = False,
- input_has_header: bool = True,
- input_skip_rows: int = 0,
- input_skip_rows_after_header: int = 0,
- input_raise_if_empty: bool = True,
- input_ignore_errors: bool = False,
- output_include_header: bool = True,
- output_datetime_format: str | None = None,
- output_date_format: str | None = None,
- output_time_format: str | None = None,
- output_float_scientific: bool | None = None,
- output_float_precision: int | None = None,
- output_null_value: str | None = None,
- output_quote_style: str | None = None,
- output_maintain_order: bool = True,
Bases:
FileFormat- Categories:
file-format
CSV file format.
- Variables:
quote_char (str | int) – The quote character of the CSV file.
eol_char (str | int) – The end of line character of the CSV file.
input_encoding (str) – The encoding of the CSV file. Only used when importing data.
input_null_values (list | None) – The null values of the CSV file. Only used when importing data.
input_missing_is_null (bool) – Whether missing values should be marked as null. Only used when importing data.
input_truncate_ragged_lines (bool) – Whether to truncate ragged lines of the CSV file. Only used when importing data.
input_comment_prefix (str | int | None) – The comment prefix of the CSV file. Only used when importing data.
input_try_parse_dates (bool) – Whether to try parse dates of the CSV file. Only used when importing data.
input_decimal_comma (bool) – Whether the CSV file uses decimal comma. Only used when importing data.
input_has_header (bool) – If the CSV file has header. Only used when importing data.
input_skip_rows (int) – How many rows should be skipped in the CSV file. Only used when importing data.
input_skip_rows_after_header (int) – How many rows should be skipped after the header in the CSV file. Only used when importing data.
input_raise_if_empty (bool) – If an error should be raised for an empty CSV. Only used when importing data.
input_ignore_errors (bool) – If the errors loading the CSV must be ignored. Only used when importing data.
output_include_header (bool) – Whether to include header in the CSV output. Only used when exporting data.
output_datetime_format (str | None) – A format string, with the specifiers defined by the chrono Rust crate. If no format specified, the default fractional-second precision is inferred from the maximum timeunit found in the frame’s Datetime cols (if any). Only used when exporting data.
output_date_format (str | None) – A format string, with the specifiers defined by the chrono Rust crate. Only used when exporting data.
output_time_format (str | None) – A format string, with the specifiers defined by the chrono Rust crate. Only used when exporting data.
output_float_scientific (bool | None) – Whether to use scientific form always (true), never (false), or automatically (None). Only used when exporting data.
output_float_precision (int | None) – Number of decimal places to write. Only used when exporting data.
output_null_value (str | None) – A string representing null values (defaulting to the empty string). Only used when exporting data.
output_quote_style (str | None) – Determines the quoting strategy used. Only used when exporting data. * necessary (default): This puts quotes around fields only when necessary. They are necessary when fields contain a quote, separator or record terminator. Quotes are also necessary when writing an empty record (which is indistinguishable from a record with one empty field). This is the default. * always: This puts quotes around every field. Always. * never: This never puts quotes around fields, even if that results in invalid CSV data (e.g.: by not quoting strings containing the separator). * non_numeric: This puts quotes around all fields that are non-numeric. Namely, when writing a field that does not parse as a valid float or integer, then quotes will be used even if they aren`t strictly necessary.
output_maintain_order (bool) – Maintain the order in which data is processed. Setting this to False will be slightly faster. Only used when exporting data.
- class CdcConn
-
Base connection configuration for Change Data Capture (CDC) connectors.
- credentials: UserPasswordCredentials | None
- uri: str
- class CdcTrigger(
- **kwargs,
Bases:
StageTriggerBase StageTrigger for Change Data Capture (CDC) connectors.
This abstract trigger provides the common polling loop, buffering, and staging logic for all CDC connectors. Database-specific subclasses implement the ingestion details for their respective replication protocols.
- property buffer_max_bytes: int | None
Maximum bytes in memory before flushing to the working directory.
- property buffer_max_rows: int
Maximum rows in memory before flushing to the working directory.
- property buffer_max_secs: int
Maximum seconds before flushing memory data to the working directory.
- property cdc_format: CdcFormat
Output format specification for CDC data.
- abstractmethod collect_tables(
- session: Any,
- cdc_format: CdcFormat = CdcFormat(values_format='columns', flatten_values=True),
- property conn: Conn
CDC connection configuration.
- abstractmethod harvest_tables(
- session: Any,
- cdc_format: CdcFormat = CdcFormat(values_format='columns', flatten_values=True),
- property poll_interval_seconds: int
Interval in seconds between polling for new changes.
- run(
- context: StageTriggerContext,
Continuously runs the stage trigger logic using the received context to trigger and monitor executions. SUCCESS if the StageTrigger.end datetime is reached. exit with 0. ERROR if any issue stops it from working or recovering. exit with 1. STOPPED upon a signal. exit with 0.
- property trigger_max_bytes: int | None
Maximum total bytes before staging from working directory to the stage location.
- property trigger_max_rows: int | None
Maximum total rows before staging from working directory to the stage location.
- property trigger_max_secs: int
Maximum seconds before staging from working directory to the stage location.
- abstractmethod CdcTrigger.collect_tables(
- session: Any,
- cdc_format: CdcFormat = CdcFormat(values_format='columns', flatten_values=True),
- abstractmethod CdcTrigger.harvest_tables(
- session: Any,
- cdc_format: CdcFormat = CdcFormat(values_format='columns', flatten_values=True),
- CdcTrigger.run(
- context: StageTriggerContext,
Continuously runs the stage trigger logic using the received context to trigger and monitor executions. SUCCESS if the StageTrigger.end datetime is reached. exit with 0. ERROR if any issue stops it from working or recovering. exit with 1. STOPPED upon a signal. exit with 0.
- class ClickHouseConn(
- **kwargs,
Bases:
ConnRepresents a connection configuration for ClickHouse with cloud-staged bulk loading.
ClickHouse ingests data via cloud storage (S3, Azure Blob, or GCS): files are first uploaded to the configured staging area and then loaded into ClickHouse using the matching table function (
s3(),azureBlobStorage(), ors3()pointed at GCS).- property credentials: UserPasswordCredentials | None
The credentials used to authenticate with ClickHouse.
Noneif no credentials were provided (ClickHousedefaultuser, no password).
- property cx_dst_configs_clickhouse: dict
Destination-specific configuration parameters for the ClickHouse connection.
- property cx_src_configs_clickhouse: dict
Source-specific configuration parameters for the ClickHouse connection.
- property database: str
The target ClickHouse database name.
- property enforce_connection_params: bool
Whether to enforce connection parameters for the ClickHouse connection.
- property host: str
The ClickHouse server hostname or IP address.
- property port: int
The HTTP(S) port of the ClickHouse server.
- property secure: bool
Whether to connect to ClickHouse over HTTPS/TLS.
- property staging: S3Config | AzureConfig | GCSConfig
The cloud storage staging configuration (S3Config, AzureConfig, or GCSConfig).
- class ClickHouseDest(
- **kwargs,
Bases:
DestinationPlugin- Categories:
destination
Destination plugin for writing data to ClickHouse using cloud-staged bulk loading.
Data is first written as Parquet files to the configured cloud storage staging area (S3, Azure Blob, or GCS) and then ingested into ClickHouse via its native table functions (
s3(),azureBlobStorage()).- property conn: ClickHouseConn
The ClickHouse connection configuration.
- property if_table_exists: Literal['append', 'replace']
The strategy to follow when the table already exists.
- property schema_evolution: Literal['none', 'iceberg']
The schema evolution strategy.
- stream( )
Write each result as a Parquet file and bulk load it into ClickHouse via the cloud staging area.
- Parameters:
working_dir – Temporary directory for intermediate Parquet files.
results – One polars LazyFrame (or
None) per destination table, in the same order asdestination_tables.
- ClickHouseDest.stream( )
Write each result as a Parquet file and bulk load it into ClickHouse via the cloud staging area.
- Parameters:
working_dir – Temporary directory for intermediate Parquet files.
results – One polars LazyFrame (or
None) per destination table, in the same order asdestination_tables.
- class ConfluentSchemaRegistry(
- **kwargs,
Bases:
KafkaSchemaRegistryConfiguration for Confluent Schema Registry.
- property configs: dict
Additional configuration properties for the Schema Registry client.
- property credentials: UserPasswordCredentials | None
Credentials for authenticating with the Schema Registry.
- property url: str
The URL of the Schema Registry.
- ConfluentSchemaRegistryCredentialsSpec
alias of
UserPasswordCredentials
- class Conn
Bases:
objectRepresents a connection configuration. This is a base class.
- class CronTrigger( )
Bases:
object- Categories:
trigger
Represents a cron-based trigger for scheduling Tabsdata publishers, subscribers and transformers.
This class defines a trigger based on a cron expression, with optional start and end times to constrain its activation period. It validates the cron mask and time boundaries to ensure the trigger is well-formed.
- property end: str | None
Gets the end time of the trigger as a timezone-aware ISO 8601 string with ‘Z’ timezone.
- Returns:
The end time in ISO format, or None if not set.
- property mask: str
Gets the cron expression mask.
- Returns:
The cron expression string.
- class CustomException(
- message: str,
- error_code=None,
Bases:
ExceptionCustom exception for execution errors.
- class DatabricksDestination(
- host_url: str,
- token: str | Secret,
- tables: list[str] | str,
- volume: str,
- catalog: str | None = None,
- schema: str | None = None,
- warehouse: str | None = None,
- warehouse_id: str | None = None,
- if_table_exists: Literal['append', 'replace'] = 'append',
- schema_strategy: Literal['update', 'strict'] = 'update',
- **kwargs,
Bases:
DestinationPlugin- Categories:
destination
Databricks based data outputs.
- chunk( ) List[None | str]
Trigger the exporting of the data to local parquet chunks. This method will receive the resulting data from the user function and must store it in the local system as parquet files, using the working_dir. Note: This method should not materialize the data, it should only store it in the local system.
- Parameters:
working_dir (str) – The folder where any files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
A list of the intermediate files created
- property host_url: str
- property if_table_exists: Literal['append', 'replace']
The strategy to follow when the table already exists.
- Type:
- property schema_strategy: Literal['update', 'strict']
The strategy to follow when appending to a table with an existing schema.
- Type:
- stream( )
Trigger the exporting of the data. This method will receive the resulting data from the user function and must store it in the desired location. Note: this method might materialize the data provided in a single chunk generated by the chunk function if invoked, so chunks should be of an appropriate size.
- Parameters:
working_dir (str) – The folder where any intermediate files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
None
- property token: Secret
- write(
- files,
This method is used to write the files to the databricks. It is called from the stream method, and it is not intended to be called directly.
- DatabricksDestination.chunk( ) List[None | str]
Trigger the exporting of the data to local parquet chunks. This method will receive the resulting data from the user function and must store it in the local system as parquet files, using the working_dir. Note: This method should not materialize the data, it should only store it in the local system.
- Parameters:
working_dir (str) – The folder where any files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
A list of the intermediate files created
- DatabricksDestination.stream( )
Trigger the exporting of the data. This method will receive the resulting data from the user function and must store it in the desired location. Note: this method might materialize the data provided in a single chunk generated by the chunk function if invoked, so chunks should be of an appropriate size.
- Parameters:
working_dir (str) – The folder where any intermediate files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
None
- DatabricksDestination.write(
- files,
This method is used to write the files to the databricks. It is called from the stream method, and it is not intended to be called directly.
- class DatabricksSource(
- host_url: str,
- token: str | Secret,
- queries: list[str] | str,
- initial_values: dict | None = None,
- warehouse: str | None = None,
- warehouse_id: str | None = None,
- **kwargs,
Bases:
SourcePlugin- Categories:
source
Databricks based data inputs.
- chunk(
- working_dir: str,
Trigger the import of the data. This must be implemented in any class that inherits from this class unless directly implementing streaming. The method will receive a folder where it must store the data as parquet files, and return a list of the paths of the files created. This files will then be loaded and mapped to the dataset function in positional order, so if you want file.parquet to be the first argument of the dataset function, you must return it first. If you want a parameter to receive multiple files, return a list of the paths. For example, you would give the following return to provide a first argument with a single file and a second argument with two files:return [“file1.parquet”, [“file2.parquet”, “file3.parquet”]]
- property host_url: str
- property initial_values: dict
The initial values for the parameters in the SQL queries.
- property token: Secret
- DatabricksSource.chunk(
- working_dir: str,
Trigger the import of the data. This must be implemented in any class that inherits from this class unless directly implementing streaming. The method will receive a folder where it must store the data as parquet files, and return a list of the paths of the files created. This files will then be loaded and mapped to the dataset function in positional order, so if you want file.parquet to be the first argument of the dataset function, you must return it first. If you want a parameter to receive multiple files, return a list of the paths. For example, you would give the following return to provide a first argument with a single file and a second argument with two files:return [“file1.parquet”, [“file2.parquet”, “file3.parquet”]]
- class Db2CdcTrigger(
- **kwargs,
Bases:
CdcTrigger- Categories:
trigger
A CdcTrigger that captures changes from a Db2 database using SQL Replication.
This trigger connects to a Db2 server, reads change data capture events (inserts, updates, deletes) on specified tables via SQL Replication, and stages them.
- property blocking_timeout_seconds: int
Timeout in seconds for blocking reads from the replication stream.
- collect_tables(
- session: Any,
- cdc_format: CdcFormat = CdcFormat(values_format='columns', flatten_values=True),
- Db2CdcTrigger.collect_tables(
- session: Any,
- cdc_format: CdcFormat = CdcFormat(values_format='columns', flatten_values=True),
- Db2CdcTrigger.harvest_tables(
- session: Any,
- cdc_format: CdcFormat = CdcFormat(values_format='columns', flatten_values=True),
- class Db2Conn(
- **kwargs,
Bases:
ConnRepresents a connection configuration to Db2.
- property credentials: UserPasswordCredentials | None
The credentials required to access the Db2 database. If no credentials were provided, it will return None.
- property cx_dst_configs_db2: dict
Destination-specific configuration parameters for the Db2 connection.
- property cx_src_configs_db2: dict
Source-specific configuration parameters for the Db2 connection.
- property enforce_connection_params: bool
Whether to enforce connection parameters for the Db2 connection.
- property uri: str
The URI of the database where the data is located.
- class Db2Src(
- **kwargs,
Bases:
SourcePlugin- Categories:
source
Source plugin for reading data from Db2.
- property chunk_size: int
The chunk size in records for reading large queries in batches.
- property conn: Db2Conn
The Db2 connection configuration.
- property initial_values: dict
The initial values for the parameters in the SQL queries.
- property loader: Literal['polars_sqlalchemy', 'pandas']
The data processing loader to use for executing SQL imports.
- property schema_overrides: list[dict[str, Boolean | Categorical | Date | Datetime | Decimal | Duration | Enum | Float16 | Float32 | Float64 | Int8 | Int16 | Int64 | Int32 | Int128 | Null | String | Time | UInt8 | UInt16 | UInt32 | UInt64]] | None
The schema overrides for the tables being read.
- stream(
- working_dir: str,
Execute the query and load the results as a list of TableFrames.
- Parameters:
working_dir – The directory where the Parquet files will be saved.
- Returns:
A list of TableFrames containing the query results.
- property transactional: bool
Whether to use transactions for reading to ensure consistency.
- Db2Src.stream(
- working_dir: str,
Execute the query and load the results as a list of TableFrames.
- Parameters:
working_dir – The directory where the Parquet files will be saved.
- Returns:
A list of TableFrames containing the query results.
- class DestinationPlugin
Bases:
object- Categories:
plugin
Parent class for subscriber connectors.
- trigger_output(working_dir,
*args,**kwargs): Trigger the exporting of the data. This function will receive the resulting data from the dataset function and must store it in the desired location.
- chunk(
- working_dir: str,
- *results: VALID_PLUGIN_RESULT,
Trigger the exporting of the data to local parquet chunks. This method will receive the resulting data from the user function and must store it in the local system as parquet files, using the working_dir. Note: This method should not materialize the data, it should only store it in the local system.
- Parameters:
working_dir (str) – The folder where any files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
A list of the intermediate files created
- stream(
- working_dir: str,
- *results: VALID_PLUGIN_RESULT,
Trigger the exporting of the data. This method will receive the resulting data from the user function and must store it in the desired location. Note: this method might materialize the data provided in a single chunk generated by the chunk function if invoked, so chunks should be of an appropriate size.
- Parameters:
working_dir (str) – The folder where any intermediate files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
None
- write( )
Given a file or a list of files, write to the desired destination. Note: this method might materialize the data in the files it receives, so chunks should be of an appropriate size.
- Parameters:
files (str) – The file or files to be stored in the final destination.
- DestinationPlugin.chunk(
- working_dir: str,
- *results: VALID_PLUGIN_RESULT,
Trigger the exporting of the data to local parquet chunks. This method will receive the resulting data from the user function and must store it in the local system as parquet files, using the working_dir. Note: This method should not materialize the data, it should only store it in the local system.
- Parameters:
working_dir (str) – The folder where any files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
A list of the intermediate files created
- DestinationPlugin.stream(
- working_dir: str,
- *results: VALID_PLUGIN_RESULT,
Trigger the exporting of the data. This method will receive the resulting data from the user function and must store it in the desired location. Note: this method might materialize the data provided in a single chunk generated by the chunk function if invoked, so chunks should be of an appropriate size.
- Parameters:
working_dir (str) – The folder where any intermediate files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
None
- DestinationPlugin.write( )
Given a file or a list of files, write to the desired destination. Note: this method might materialize the data in the files it receives, so chunks should be of an appropriate size.
- Parameters:
files (str) – The file or files to be stored in the final destination.
- class DruidConn(
- **kwargs,
Bases:
ConnRepresents a connection configuration for Apache Druid with cloud-staged bulk loading.
Data is first uploaded to the configured cloud staging area (S3, Azure Blob, or GCS) and then ingested into Druid via the Overlord REST API using a native batch
index_parallelingestion task.- property credentials: UserPasswordCredentials | None
The credentials used for HTTP Basic authentication.
Noneif no credentials were provided.
- property http_timeout_sec: int
Timeout in seconds for HTTP requests to the Overlord API.
- property max_concurrent_subtasks: int
Maximum number of parallel sub-tasks inside the ingestion task.
- property max_rows_in_memory: int
In-memory buffer size during ingestion.
- property max_rows_per_segment: int
Maximum number of rows per segment (dynamic partitioning threshold).
- property overlord_url: str
The base URL of the Druid Overlord REST API.
- property segment_granularity: Literal['ALL', 'DAY', 'HOUR', 'MONTH', 'YEAR']
Time-based partitioning granularity for new segments.
- property staging: S3Config | AzureConfig | GCSConfig
The cloud storage staging configuration (S3Config, AzureConfig, or GCSConfig).
- class DruidDest(
- **kwargs,
Bases:
DestinationPlugin- Categories:
destination
Destination plugin for writing data to Apache Druid using cloud-staged bulk loading.
Data is first written as Parquet files to the configured cloud storage staging area (S3, Azure Blob, or GCS) and then ingested into Druid via a native batch
index_paralleltask submitted to the Overlord REST API.- property conn: DruidConn
The Druid connection configuration.
- property if_table_exists: Literal['append', 'replace']
The strategy to follow when the datasource already exists.
- stream( )
Write each result as a Parquet file and bulk load it into Druid via the cloud staging area.
- Parameters:
working_dir – Temporary directory for intermediate Parquet files.
results – One polars LazyFrame (or
None) per destination datasource, in the same order asdestination_tables.
- DruidDest.stream( )
Write each result as a Parquet file and bulk load it into Druid via the cloud staging area.
- Parameters:
working_dir – Temporary directory for intermediate Parquet files.
results – One polars LazyFrame (or
None) per destination datasource, in the same order asdestination_tables.
- class EnvironmentSecret(
- environment_variable_name: str,
Bases:
Secret- Categories:
secret
Secrets class representing a secret obtained from an environment variable in the server.
- Variables:
environment_variable_name (str) – Name of the environment variable from which we will obtain the secret value.
dict (to_dict() ->) – Convert the EnvironmentSecret object to a dictionary.
str (secret_value() ->) – Get the secret value.
- class FileInfo(
- *values,
Bases:
StrEnum
- class GCPServiceAccountKeyCredentials(
- service_account_key: str | Secret,
Bases:
GCPCredentials- Categories:
credentials
Credentials class to store the credentials needed to access GCS using account key credentials (service account key).
- Variables:
service_account_key (Secret) – The GCS service account key.
dict (to_dict() ->) – Convert the GCPServiceAccountKeyCredentials object to a dictionary
- property service_account_key: Secret
The GCS service account key.
- Type:
Secret
- class GCSConfig(
- bucket: str,
- base_path: str,
- upload_credentials: GCPServiceAccountKeyCredentials | None = None,
- hmac_credentials: HMACCredentials | None = None,
Bases:
objectConfiguration for staging files in Google Cloud Storage.
- Parameters:
bucket – GCS bucket name.
base_path – Object path prefix for staged files (e.g.
"staging/clickhouse").upload_credentials – Optional GCP service-account credentials used for uploading staged files via the GCS SDK. When
None, application default credentials (ADC) are used.hmac_credentials – Optional
HMACCredentialscarrying GCS HMAC access key and secret for S3-compatible access. WhenNone, the storage backend operates without explicit credentials (suitable for public buckets or server-side IAM).
- base_path: str
- bucket: str
- hmac_credentials: HMACCredentials | None = None
- upload_credentials: GCPServiceAccountKeyCredentials | None = None
- class GCSDestination( )
Bases:
DestinationPlugin- Categories:
destination
GCS-file-based data outputs.
- class SupportedFormats(
- *values,
Bases:
EnumEnum for the supported formats for the GCSDestination.
- avro = <class 'tabsdata._format.AvroFormat'>
- csv = <class 'tabsdata._format.CSVFormat'>
- ndjson = <class 'tabsdata._format.NDJSONFormat'>
- parquet = <class 'tabsdata._format.ParquetFormat'>
- chunk(
- working_dir: str,
- *results,
Trigger the exporting of the data to local parquet chunks. This method will receive the resulting data from the user function and must store it in the local system as parquet files, using the working_dir. Note: This method should not materialize the data, it should only store it in the local system.
- Parameters:
working_dir (str) – The folder where any files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
A list of the intermediate files created
- property credentials: GCPCredentials
The credentials required to access GCS.
- Type:
GCPCredentials
- property format: FileFormat
The format of the file. If not provided, it will be inferred from the file extension of the URI.
- Type:
FileFormat
- stream(
- working_dir: str,
- *results,
Trigger the exporting of the data. This method will receive the resulting data from the user function and must store it in the desired location. Note: this method might materialize the data provided in a single chunk generated by the chunk function if invoked, so chunks should be of an appropriate size.
- Parameters:
working_dir (str) – The folder where any intermediate files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
None
- GCSDestination.chunk(
- working_dir: str,
- *results,
Trigger the exporting of the data to local parquet chunks. This method will receive the resulting data from the user function and must store it in the local system as parquet files, using the working_dir. Note: This method should not materialize the data, it should only store it in the local system.
- Parameters:
working_dir (str) – The folder where any files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
A list of the intermediate files created
- GCSDestination.stream(
- working_dir: str,
- *results,
Trigger the exporting of the data. This method will receive the resulting data from the user function and must store it in the desired location. Note: this method might materialize the data provided in a single chunk generated by the chunk function if invoked, so chunks should be of an appropriate size.
- Parameters:
working_dir (str) – The folder where any intermediate files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
None
- class GCSSource(
- uri: str | list[str],
- credentials: GCPCredentials,
- format: str | FileFormat = None,
- initial_last_modified: str | datetime = None,
- file_info: str | FileInfo | None = None,
Bases:
SourcePlugin- Categories:
source
GCS-file-based data inputs.
- class SupportedFormats(
- *values,
Bases:
EnumEnum for the supported formats for the GCSSource.
- avro = <class 'tabsdata._format.AvroFormat'>
- csv = <class 'tabsdata._format.CSVFormat'>
- log = <class 'tabsdata._format.LogFormat'>
- ndjson = <class 'tabsdata._format.NDJSONFormat'>
- parquet = <class 'tabsdata._format.ParquetFormat'>
- chunk(
- working_dir: str,
Trigger the import of the data. This must be implemented in any class that inherits from this class unless directly implementing streaming. The method will receive a folder where it must store the data as parquet files, and return a list of the paths of the files created. This files will then be loaded and mapped to the dataset function in positional order, so if you want file.parquet to be the first argument of the dataset function, you must return it first. If you want a parameter to receive multiple files, return a list of the paths. For example, you would give the following return to provide a first argument with a single file and a second argument with two files:return [“file1.parquet”, [“file2.parquet”, “file3.parquet”]]
- property credentials: GCPCredentials
The credentials required to access GCS.
- Type:
GCPCredentials
- property file_info: FileInfo | None
The file information to add as system columns.
- Type:
FileInfo | None
- property format: FileFormat
The format of the file. If not provided, it will be inferred from the file extension of the data.
- Type:
FileFormat
- property initial_last_modified: str
The date and time after which the files were modified.
- Type:
- property initial_values: dict
Return a dictionary with the initial values to be stored after execution of the plugin. They will be accessible in the next execution of the plugin. The dictionary must have the parameter names as keys and the initial values as values, all the type string.
- Returns:
A dictionary with the initial values of the parameters of the plugin.
- Return type:
- GCSSource.chunk(
- working_dir: str,
Trigger the import of the data. This must be implemented in any class that inherits from this class unless directly implementing streaming. The method will receive a folder where it must store the data as parquet files, and return a list of the paths of the files created. This files will then be loaded and mapped to the dataset function in positional order, so if you want file.parquet to be the first argument of the dataset function, you must return it first. If you want a parameter to receive multiple files, return a list of the paths. For example, you would give the following return to provide a first argument with a single file and a second argument with two files:return [“file1.parquet”, [“file2.parquet”, “file3.parquet”]]
- class HMACCredentials( )
Bases:
Credentials- Categories:
credentials
Generic HMAC credentials consisting of an access key and a secret key.
These credentials follow the same shape as AWS S3 access keys and are compatible with any service that uses HMAC-based authentication over the S3-compatible API, such as GCS HMAC keys or MinIO credentials.
- Variables:
access_key (Secret) – The HMAC access key id.
secret_key (Secret) – The HMAC secret key.
dict (to_dict() ->) – Convert the HMACCredentials object to a dictionary.
- property access_key: Secret
The HMAC access key id.
- Type:
Secret
- property secret_key: Secret
The HMAC secret key.
- Type:
Secret
- class HashiCorpSecret( )
Bases:
Secret- Categories:
secret
Secrets class representing a secret stored in Hashicorp Vault.
- Variables:
path (str) – The path to the secret in Hashicorp Vault.
name (str) – The name of the secret in Hashicorp Vault.
vault (str) – If multiple vaults exist in the system, the name of the vault to use. When executing in the server, the URL and token associated to that specific vault will be used. The name can only contain uppercase letters, numbers and underscores, and can’t begin with a number. Defaults to “HASHICORP”.
dict (to_dict() ->) – Convert the HashiCorpSecret object to a dictionary.
str (secret_value() ->) – Get the secret value.
- property secret_value: str
Get the secret value pointed at by the secret. To be used only during execution in the backend.
- Returns:
The secret value.
- Return type:
- property vault: str
- class KafkaConn(
- **kwargs,
Bases:
ConnKafka Consumer Connection configuration.
- property credentials: UserPasswordCredentials
Kafka credentials.
- property cx_src_configs: dict
Source-specific configuration parameters for the Kafka connection.
- property enforce_connection_params: bool
Whether to enforce connection parameters.
- property schema_registry: ConfluentSchemaRegistry | AwsGlueSchemaRegistry | None
Kafka schema registry configuration.
- KafkaCredentialsSpec
alias of
UserPasswordCredentials
- class KafkaTrigger(
- **kwargs,
Bases:
StageTrigger- Categories:
trigger
A StageTrigger that consumes messages from a Kafka topic and stages them.
This trigger connects to a Kafka cluster, consumes messages from a specified topic, and uses rollover conditions (time, size, and message count) to buffer and stage the data.
- property conn: KafkaConn
Kafka consumer connection configuration.
- property data_format: Literal['avro', 'json'] | Protobuf
Data format of the messages (“avro”, “json”, “protobuf”).
- property messages_rollover: Annotated[int, Gt(gt=0)] | None
Message count-based rollover configuration.
- run(
- context: StageTriggerContext,
Run the Kafka stage trigger to consume messages, buffer them, and stage them based on rollover conditions.
- Parameters:
context – The StageTriggerContext instance.
- property schema: str
Schema string for deserializing messages.
- property size_rollover_mb: Annotated[int, Gt(gt=0)] | None
Size-based rollover configuration. Size is specified in megabytes.
- property time_rollover_secs: int
Timeout in seconds for consuming each message.
- property topic: str
Kafka topic to consume from.
- KafkaTrigger.run(
- context: StageTriggerContext,
Run the Kafka stage trigger to consume messages, buffer them, and stage them based on rollover conditions.
- Parameters:
context – The StageTriggerContext instance.
- class LocalFileDestination( )
Bases:
DestinationPlugin- Categories:
destination
LocalFile-based data outputs.
- class SupportedFormats(
- *values,
Bases:
EnumEnum for the supported formats for the LocalFileDestination.
- avro = <class 'tabsdata._format.AvroFormat'>
- csv = <class 'tabsdata._format.CSVFormat'>
- ndjson = <class 'tabsdata._format.NDJSONFormat'>
- parquet = <class 'tabsdata._format.ParquetFormat'>
- chunk(
- working_dir: str,
- *results,
Trigger the exporting of the data to local parquet chunks. This method will receive the resulting data from the user function and must store it in the local system as parquet files, using the working_dir. Note: This method should not materialize the data, it should only store it in the local system.
- Parameters:
working_dir (str) – The folder where any files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
A list of the intermediate files created
- property format: FileFormat
The format of the file or files. If not provided, it will be inferred from the file extension in the path.
- Type:
FileFormat
- stream(
- working_dir: str,
- *results,
Trigger the exporting of the data. This method will receive the resulting data from the user function and must store it in the desired location. Note: this method might materialize the data provided in a single chunk generated by the chunk function if invoked, so chunks should be of an appropriate size.
- Parameters:
working_dir (str) – The folder where any intermediate files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
None
- LocalFileDestination.chunk(
- working_dir: str,
- *results,
Trigger the exporting of the data to local parquet chunks. This method will receive the resulting data from the user function and must store it in the local system as parquet files, using the working_dir. Note: This method should not materialize the data, it should only store it in the local system.
- Parameters:
working_dir (str) – The folder where any files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
A list of the intermediate files created
- LocalFileDestination.stream(
- working_dir: str,
- *results,
Trigger the exporting of the data. This method will receive the resulting data from the user function and must store it in the desired location. Note: this method might materialize the data provided in a single chunk generated by the chunk function if invoked, so chunks should be of an appropriate size.
- Parameters:
working_dir (str) – The folder where any intermediate files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
None
- class LocalFileSource(
- path: str | list[str],
- format: str | FileFormat = None,
- initial_last_modified: str | datetime = None,
- file_info: str | FileInfo | None = None,
Bases:
SourcePlugin- Categories:
source
Local-file-based data inputs.
- class SupportedFormats(
- *values,
Bases:
EnumEnum for the supported formats for the LocalFileSource.
- avro = <class 'tabsdata._format.AvroFormat'>
- csv = <class 'tabsdata._format.CSVFormat'>
- log = <class 'tabsdata._format.LogFormat'>
- ndjson = <class 'tabsdata._format.NDJSONFormat'>
- parquet = <class 'tabsdata._format.ParquetFormat'>
- chunk(
- working_dir: str,
Trigger the import of the data. This must be implemented in any class that inherits from this class unless directly implementing streaming. The method will receive a folder where it must store the data as parquet files, and return a list of the paths of the files created. This files will then be loaded and mapped to the dataset function in positional order, so if you want file.parquet to be the first argument of the dataset function, you must return it first. If you want a parameter to receive multiple files, return a list of the paths. For example, you would give the following return to provide a first argument with a single file and a second argument with two files:return [“file1.parquet”, [“file2.parquet”, “file3.parquet”]]
- property file_info: FileInfo | None
The file information to add as system columns.
- Type:
FileInfo | None
- property format: FileFormat
The format of the file or files. If not provided, it will be inferred from the file extension in the path.
- Type:
FileFormat
- property initial_last_modified: str
The date and time after which the files were modified.
- Type:
- property initial_values: dict
Return a dictionary with the initial values to be stored after execution of the plugin. They will be accessible in the next execution of the plugin. The dictionary must have the parameter names as keys and the initial values as values, all the type string.
- Returns:
A dictionary with the initial values of the parameters of the plugin.
- Return type:
- LocalFileSource.chunk(
- working_dir: str,
Trigger the import of the data. This must be implemented in any class that inherits from this class unless directly implementing streaming. The method will receive a folder where it must store the data as parquet files, and return a list of the paths of the files created. This files will then be loaded and mapped to the dataset function in positional order, so if you want file.parquet to be the first argument of the dataset function, you must return it first. If you want a parameter to receive multiple files, return a list of the paths. For example, you would give the following return to provide a first argument with a single file and a second argument with two files:return [“file1.parquet”, [“file2.parquet”, “file3.parquet”]]
- class LogFormat
Bases:
FileFormat- Categories:
file-format
Log file format.
- class MSSQLConn(
- connection_string_or_uri: str,
- credentials: UserPasswordCredentials | None = None,
- enforce_connection_params: bool = True,
- cx_src_configs_mssql: dict | None = None,
- cx_dst_configs_mssql: dict | None = None,
Bases:
ConnRepresents a connection configuration to Microsoft SQL Server (MSSQL).
- property connection_string_or_uri: str
The connection string or URI for the MSSQL database.
- property credentials: UserPasswordCredentials | None
The credentials used to authenticate with the database.
- property cx_dst_configs_mssql: dict
The destination configuration parameters for the MSSQL connection.
- property cx_src_configs_mssql: dict
The source configuration parameters for the MSSQL connection.
- property enforce_connection_params: bool
Whether to enforce connection parameters for the MSSQL connection. Applicable only for a connection to a MSSQL Destination.
- property uri: str
The URI for the MSSQL database.
- class MSSQLDestination(
- connection_string: str,
- destination_table: str | list[str],
- credentials: dict | UserPasswordCredentials | None = None,
- server: str | Secret = None,
- database: str | Secret = None,
- driver: str | Secret = None,
- if_table_exists: Literal['append', 'replace'] = 'append',
- chunk_size: int = 50000,
- **kwargs,
Bases:
DestinationPlugin- Categories:
destination
MSSQL-based data outputs.
- chunk( ) list[None | str]
Trigger the exporting of the data to local parquet chunks. This method will receive the resulting data from the user function and must store it in the local system as parquet files, using the working_dir. Note: This method should not materialize the data, it should only store it in the local system.
- Parameters:
working_dir (str) – The folder where any files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
A list of the intermediate files created
- property connection_string: str
Get the connection string for the database.
- Returns:
The connection string.
- Return type:
- property credentials: UserPasswordCredentials | None
The credentials required to access Microsoft SQL Server. If no credentials were provided, it will return None.
- Type:
UserPasswordCredentials | None
- property if_table_exists: Literal['append', 'replace']
Returns the value of the if_table_exists property. This property determines what to do if the table already exists.
- stream( )
Trigger the exporting of the data. This method will receive the resulting data from the user function and must store it in the desired location. Note: this method might materialize the data provided in a single chunk generated by the chunk function if invoked, so chunks should be of an appropriate size.
- Parameters:
working_dir (str) – The folder where any intermediate files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
None
- write(
- files,
This method is used to write the files to the database. It is called from the stream method, and it is not intended to be called directly.
- MSSQLDestination.chunk( ) list[None | str]
Trigger the exporting of the data to local parquet chunks. This method will receive the resulting data from the user function and must store it in the local system as parquet files, using the working_dir. Note: This method should not materialize the data, it should only store it in the local system.
- Parameters:
working_dir (str) – The folder where any files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
A list of the intermediate files created
- MSSQLDestination.stream( )
Trigger the exporting of the data. This method will receive the resulting data from the user function and must store it in the desired location. Note: this method might materialize the data provided in a single chunk generated by the chunk function if invoked, so chunks should be of an appropriate size.
- Parameters:
working_dir (str) – The folder where any intermediate files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
None
- MSSQLDestination.write(
- files,
This method is used to write the files to the database. It is called from the stream method, and it is not intended to be called directly.
- class MSSQLSource(
- **kwargs,
Bases:
SourcePlugin- Categories:
source
Microsoft SQL Server based data inputs.
- chunk(
- working_dir: str,
Execute the query and yield chunks of data.
- Parameters:
working_dir (str) – The working directory for temporary files.
- Yields:
pd.DataFrame – A chunk of data from the query result.
- property connection_string: str
Get the connection string for the database.
- Returns:
The connection string.
- Return type:
- property credentials: UserPasswordCredentials | None
The credentials required to access Microsoft SQL Server. If no credentials were provided, it will return None.
- Type:
UserPasswordCredentials | None
- MSSQLSource.chunk(
- working_dir: str,
Execute the query and yield chunks of data.
- Parameters:
working_dir (str) – The working directory for temporary files.
- Yields:
pd.DataFrame – A chunk of data from the query result.
- class MSSQLSrc(
- conn: MSSQLConn,
- queries: str | list[str],
- initial_values: dict[str, Any] | None = None,
- transactional: bool = True,
- chunk_size: Annotated[int, Gt(gt=0)] = 100000,
- loader: Literal['polars_sqlalchemy', 'pandas'] = 'polars_sqlalchemy',
- schema_overrides: dict[str, Boolean | Categorical | Date | Datetime | Decimal | Duration | Enum | Float16 | Float32 | Float64 | Int8 | Int16 | Int64 | Int32 | Int128 | Null | String | Time | UInt8 | UInt16 | UInt32 | UInt64] | list[dict[str, Boolean | Categorical | Date | Datetime | Decimal | Duration | Enum | Float16 | Float32 | Float64 | Int8 | Int16 | Int64 | Int32 | Int128 | Null | String | Time | UInt8 | UInt16 | UInt32 | UInt64]] | None = None,
Bases:
SourcePlugin- Categories:
source
Source Plugin for reading data from a Microsoft SQL Server (MSSQL) database.
- property chunk_size: int
The chunk size in records for reading large queries in batches.
- property conn: MSSQLConn
The MSSQL connection configuration.
- property initial_values: dict
The initial values for the parameters in the SQL queries.
- property loader: Literal['polars_sqlalchemy', 'pandas']
The data processing loader to use for executing SQL imports.
- property schema_overrides: list[dict[str, Boolean | Categorical | Date | Datetime | Decimal | Duration | Enum | Float16 | Float32 | Float64 | Int8 | Int16 | Int64 | Int32 | Int128 | Null | String | Time | UInt8 | UInt16 | UInt32 | UInt64]] | None
The schema overrides for the tables being read.
- stream(
- working_dir: str,
Execute the query and load the results as a list of TableFrames.
- Parameters:
working_dir – The directory where the Parquet files will be saved.
- Returns:
A list of TableFrames containing the query results.
- property transactional: bool
Whether to use transactions for reading to ensure consistency.
- MSSQLSrc.stream(
- working_dir: str,
Execute the query and load the results as a list of TableFrames.
- Parameters:
working_dir – The directory where the Parquet files will be saved.
- Returns:
A list of TableFrames containing the query results.
- class MariaDBConn(
- uri: str,
- credentials: UserPasswordCredentials | None = None,
- enforce_connection_params: bool = True,
- cx_src_configs_mariadb: dict | None = None,
- cx_dst_configs_mariadb: dict | None = None,
Bases:
ConnRepresents a connection configuration to MariaDB.
- property credentials: UserPasswordCredentials | None
The credentials required to access the MariaDB database. If no credentials were provided, it will return None.
- property cx_dst_configs_mariadb: dict
Destination-specific configuration parameters for the MariaDB connection.
- property cx_src_configs_mariadb: dict
Source-specific configuration parameters for the MariaDB connection.
- property enforce_connection_params: bool
Whether to enforce connection parameters for the MariaDB connection.
- property uri: str
The URI of the database where the data is located.
- class MariaDBDest(
- conn: MariaDBConn,
- destination_tables: str | list[str],
- if_table_exists: Literal['append', 'replace'] = 'append',
- transactional: bool = True,
- chunk_size: Annotated[int, Gt(gt=0)] = 100000,
- loader: Literal['polars_sqlalchemy'] = 'polars_sqlalchemy',
Bases:
DestinationPlugin- Categories:
destination
Destination plugin for writing data to MySQL.
- property chunk_size: int
The chunk size for writing large results in batches.
- property conn: MariaDBConn
The MariaDB connection configuration.
- property destination_tables: list[str]
The table(s) to create. If multiple tables are provided, they must be provided as a list.
- property if_table_exists: Literal['append', 'replace']
The strategy to follow when the table already exists.
- property loader: Literal['polars_sqlalchemy']
The data processing loader to use for executing DB write operation.
- stream( )
Store the results into the MariaDB database.
- Parameters:
working_dir – The working directory where the results are stored.
results – The results to store in the SQL destination.
- property transactional: bool
Whether to use transactions for writing data to ensure consistency.
- MariaDBDest.stream( )
Store the results into the MariaDB database.
- Parameters:
working_dir – The working directory where the results are stored.
results – The results to store in the SQL destination.
- class MariaDBDestination(
- **kwargs,
Bases:
DestinationPlugin- Categories:
destination
MariaDB-based data outputs.
- chunk( ) list[str | None]
Store the results in the SQL destination.
- Parameters:
working_dir – The working directory where the results will be stored.
results (list[pl.LazyFrame | None]) – The results to store in the SQL destination.
- property credentials: UserPasswordCredentials
The credentials required to access the MariaDB database.
- Type:
- property destination_table: str | List[str]
The table(s) to create. If multiple tables are provided, they must be provided as a list.
- property if_table_exists: Literal['append', 'replace']
The strategy to follow when the table already exists.
- Type:
- MariaDBDestination.chunk( ) list[str | None]
Store the results in the SQL destination.
- Parameters:
working_dir – The working directory where the results will be stored.
results (list[pl.LazyFrame | None]) – The results to store in the SQL destination.
- MariaDBDestination.write( )
Given a file or a list of files, write to the desired destination. Note: this method might materialize the data in the files it receives, so chunks should be of an appropriate size.
- Parameters:
files (str) – The file or files to be stored in the final destination.
- class MariaDBSource(
- **kwargs,
Bases:
SourcePlugin- Categories:
source
MariaDB-based data inputs.
- chunk(
- working_dir: str,
Trigger the import of the data. This must be implemented in any class that inherits from this class unless directly implementing streaming. The method will receive a folder where it must store the data as parquet files, and return a list of the paths of the files created. This files will then be loaded and mapped to the dataset function in positional order, so if you want file.parquet to be the first argument of the dataset function, you must return it first. If you want a parameter to receive multiple files, return a list of the paths. For example, you would give the following return to provide a first argument with a single file and a second argument with two files:return [“file1.parquet”, [“file2.parquet”, “file3.parquet”]]
- property credentials: UserPasswordCredentials | None
The credentials required to access MariaDB. If no credentials were provided, it will return None.
- Type:
UserPasswordCredentials | None
- MariaDBSource.chunk(
- working_dir: str,
Trigger the import of the data. This must be implemented in any class that inherits from this class unless directly implementing streaming. The method will receive a folder where it must store the data as parquet files, and return a list of the paths of the files created. This files will then be loaded and mapped to the dataset function in positional order, so if you want file.parquet to be the first argument of the dataset function, you must return it first. If you want a parameter to receive multiple files, return a list of the paths. For example, you would give the following return to provide a first argument with a single file and a second argument with two files:return [“file1.parquet”, [“file2.parquet”, “file3.parquet”]]
- class MariaDBSrc(
- conn: MariaDBConn,
- queries: str | list[str],
- initial_values: dict[str, Any] | None = None,
- transactional: bool = True,
- chunk_size: Annotated[int, Gt(gt=0)] = 100000,
- loader: Literal['polars_sqlalchemy', 'pandas'] = 'polars_sqlalchemy',
- schema_overrides: dict[str, Boolean | Categorical | Date | Datetime | Decimal | Duration | Enum | Float16 | Float32 | Float64 | Int8 | Int16 | Int64 | Int32 | Int128 | Null | String | Time | UInt8 | UInt16 | UInt32 | UInt64] | list[dict[str, Boolean | Categorical | Date | Datetime | Decimal | Duration | Enum | Float16 | Float32 | Float64 | Int8 | Int16 | Int64 | Int32 | Int128 | Null | String | Time | UInt8 | UInt16 | UInt32 | UInt64]] | None = None,
Bases:
SourcePlugin- Categories:
source
Source plugin for reading data from MariaDB.
- property chunk_size: int
The chunk size in records for reading large queries in batches.
- property conn: MariaDBConn
The MariaDB connection configuration.
- property initial_values: dict
The initial values for the parameters in the SQL queries.
- property loader: Literal['polars_sqlalchemy', 'pandas']
The data processing loader to use for executing SQL imports.
- property schema_overrides: list[dict[str, Boolean | Categorical | Date | Datetime | Decimal | Duration | Enum | Float16 | Float32 | Float64 | Int8 | Int16 | Int64 | Int32 | Int128 | Null | String | Time | UInt8 | UInt16 | UInt32 | UInt64]] | None
- stream(
- working_dir: str,
Execute the query and load the results as a list of TableFrames.
- Parameters:
working_dir – The directory where the Parquet files will be saved.
- Returns:
A list of TableFrames containing the query results.
- property transactional: bool
Whether to use transactions for reading to ensure consistency.
- MariaDBSrc.stream(
- working_dir: str,
Execute the query and load the results as a list of TableFrames.
- Parameters:
working_dir – The directory where the Parquet files will be saved.
- Returns:
A list of TableFrames containing the query results.
- class MongoDBDestination(
- uri: str,
- collections_with_ids: tuple[str, str | None] | List[tuple[str, str | None]],
- credentials: UserPasswordCredentials = None,
- connection_options: dict = None,
- if_collection_exists: Literal['append', 'replace'] = 'append',
- use_trxs: bool = False,
- docs_per_trx: int = 1000,
- maintain_order: bool = False,
- update_existing: bool = True,
- fail_on_duplicate_key: bool = True,
- log_intermediate_files: bool = False,
- **kwargs,
Bases:
DestinationPlugin- Categories:
destination
MongoDB-based data outputs.
- chunk( ) List[None | List[str]]
Trigger the exporting of the data to local parquet chunks. This method will receive the resulting data from the user function and must store it in the local system as parquet files, using the working_dir. Note: This method should not materialize the data, it should only store it in the local system.
- Parameters:
working_dir (str) – The folder where any files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
A list of the intermediate files created
- property connection_options: dict
- property credentials: UserPasswordCredentials | None
- property if_collection_exists: Literal['append', 'replace']
- stream( )
Trigger the exporting of the data. This method will receive the resulting data from the user function and must store it in the desired location. Note: this method might materialize the data provided in a single chunk generated by the chunk function if invoked, so chunks should be of an appropriate size.
- Parameters:
working_dir (str) – The folder where any intermediate files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
None
- property uri: str
- write(
- files,
This method is used to write the files to the database. It is called from the stream method, and it is not intended to be called directly.
- MongoDBDestination.chunk( ) List[None | List[str]]
Trigger the exporting of the data to local parquet chunks. This method will receive the resulting data from the user function and must store it in the local system as parquet files, using the working_dir. Note: This method should not materialize the data, it should only store it in the local system.
- Parameters:
working_dir (str) – The folder where any files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
A list of the intermediate files created
- MongoDBDestination.stream( )
Trigger the exporting of the data. This method will receive the resulting data from the user function and must store it in the desired location. Note: this method might materialize the data provided in a single chunk generated by the chunk function if invoked, so chunks should be of an appropriate size.
- Parameters:
working_dir (str) – The folder where any intermediate files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
None
- MongoDBDestination.write(
- files,
This method is used to write the files to the database. It is called from the stream method, and it is not intended to be called directly.
- class MySQLCdcTrigger(
- **kwargs,
Bases:
CdcTrigger- Categories:
trigger
A CdcTrigger that captures changes from a MySQL database using binlog replication.
This trigger connects to a MySQL server, reads the binary log for change data capture events (inserts, updates, deletes) on specified tables, and stages them.
- property blocking_timeout_seconds: int
Timeout in seconds for blocking reads from the binlog stream.
- collect_tables(
- session: Any,
- cdc_format: CdcFormat = CdcFormat(values_format='columns', flatten_values=True),
- harvest_tables(
- session: Any,
- cdc_format: CdcFormat = CdcFormat(values_format='columns', flatten_values=True),
- property server_id: int
MySQL server ID for binlog replication.
- MySQLCdcTrigger.collect_tables(
- session: Any,
- cdc_format: CdcFormat = CdcFormat(values_format='columns', flatten_values=True),
- MySQLCdcTrigger.harvest_tables(
- session: Any,
- cdc_format: CdcFormat = CdcFormat(values_format='columns', flatten_values=True),
- class MySQLConn(
- uri: str,
- credentials: UserPasswordCredentials | None = None,
- enforce_connection_params: bool = True,
- cx_src_configs_mysql: dict | None = None,
- cx_dst_configs_mysql: dict | None = None,
Bases:
ConnRepresents a connection configuration to MySQL.
- property credentials: UserPasswordCredentials | None
The credentials required to access the MySQL database. If no credentials were provided, it will return None.
- property cx_dst_configs_mysql: dict
The destination configuration parameters for the MySQL connection.
- property cx_src_configs_mysql: dict
The source configuration parameters for the MySQL connection.
- property enforce_connection_params: bool
Whether to enforce connection parameters for the MySQL connection.
- property uri: str
The URI of the database where the data is located.
- class MySQLDest(
- conn: MySQLConn,
- destination_tables: str | list[str],
- if_table_exists: Literal['append', 'replace'] = 'append',
- transactional: bool = True,
- chunk_size: Annotated[int, Gt(gt=0)] = 100000,
- loader: Literal['polars_sqlalchemy'] = 'polars_sqlalchemy',
Bases:
DestinationPlugin- Categories:
destination
Destination plugin for writing data to MySQL.
- property chunk_size: int
The chunk size for writing large results in batches.
- property conn: MySQLConn
The MySQL connection configuration.
- property destination_tables: list[str]
The table(s) to create. If multiple tables are provided, they must be provided as a list.
- property if_table_exists: Literal['append', 'replace']
The strategy to follow when the table already exists.
- property loader: Literal['polars_sqlalchemy']
The data processing loader to use for executing DB write operation.
- stream( )
Store the results into the MySQL database.
- Parameters:
working_dir – The working directory where the results are stored.
results – The results to store in the SQL destination.
- property transactional: bool
Whether to use transactions for writing data to ensure consistency.
- MySQLDest.stream( )
Store the results into the MySQL database.
- Parameters:
working_dir – The working directory where the results are stored.
results – The results to store in the SQL destination.
- class MySQLDestination(
- **kwargs,
Bases:
DestinationPlugin- Categories:
destination
MySQL-based data outputs.
- property credentials: UserPasswordCredentials
The credentials required to access the MySQLDatabase.
- Type:
- property destination_table: str | List[str]
The table(s) to create. If multiple tables are provided, they must be provided as a list.
- property if_table_exists: Literal['append', 'replace']
The strategy to follow when the table already exists.
- Type:
- MySQLDestination.write( )
Given a file or a list of files, write to the desired destination. Note: this method might materialize the data in the files it receives, so chunks should be of an appropriate size.
- Parameters:
files (str) – The file or files to be stored in the final destination.
- class MySQLSource(
- **kwargs,
Bases:
SourcePlugin- Categories:
source
MySQL-based data inputs.
- chunk(
- working_dir: str,
Trigger the import of the data. This must be implemented in any class that inherits from this class unless directly implementing streaming. The method will receive a folder where it must store the data as parquet files, and return a list of the paths of the files created. This files will then be loaded and mapped to the dataset function in positional order, so if you want file.parquet to be the first argument of the dataset function, you must return it first. If you want a parameter to receive multiple files, return a list of the paths. For example, you would give the following return to provide a first argument with a single file and a second argument with two files:return [“file1.parquet”, [“file2.parquet”, “file3.parquet”]]
- property credentials: UserPasswordCredentials | None
The credentials required to access the MySQLDatabase. If no credentials were provided, it will return None.
- Type:
UserPasswordCredentials | None
- MySQLSource.chunk(
- working_dir: str,
Trigger the import of the data. This must be implemented in any class that inherits from this class unless directly implementing streaming. The method will receive a folder where it must store the data as parquet files, and return a list of the paths of the files created. This files will then be loaded and mapped to the dataset function in positional order, so if you want file.parquet to be the first argument of the dataset function, you must return it first. If you want a parameter to receive multiple files, return a list of the paths. For example, you would give the following return to provide a first argument with a single file and a second argument with two files:return [“file1.parquet”, [“file2.parquet”, “file3.parquet”]]
- class MySQLSrc(
- conn: MySQLConn,
- queries: str | list[str],
- initial_values: dict[str, Any] | None = None,
- transactional: bool = True,
- chunk_size: Annotated[int, Gt(gt=0)] = 100000,
- loader: Literal['polars_sqlalchemy', 'pandas'] = 'polars_sqlalchemy',
- schema_overrides: dict[str, Boolean | Categorical | Date | Datetime | Decimal | Duration | Enum | Float16 | Float32 | Float64 | Int8 | Int16 | Int64 | Int32 | Int128 | Null | String | Time | UInt8 | UInt16 | UInt32 | UInt64] | list[dict[str, Boolean | Categorical | Date | Datetime | Decimal | Duration | Enum | Float16 | Float32 | Float64 | Int8 | Int16 | Int64 | Int32 | Int128 | Null | String | Time | UInt8 | UInt16 | UInt32 | UInt64]] | None = None,
Bases:
SourcePlugin- Categories:
source
Source plugin for reading data from MySQL.
- property chunk_size: int
The chunk size in records for reading large queries in batches.
- property conn: MySQLConn
The MySQL connection configuration.
- property initial_values: dict
The initial values for the parameters in the SQL queries.
- property loader: Literal['polars_sqlalchemy', 'pandas']
The data processing loader to use for executing SQL imports.
- property schema_overrides: list[dict[str, Boolean | Categorical | Date | Datetime | Decimal | Duration | Enum | Float16 | Float32 | Float64 | Int8 | Int16 | Int64 | Int32 | Int128 | Null | String | Time | UInt8 | UInt16 | UInt32 | UInt64]] | None
The schema overrides for the tables being read.
- stream(
- working_dir: str,
Execute the query and load the results as a list of TableFrames.
- Parameters:
working_dir – The directory where the Parquet files will be saved.
- Returns:
A list of TableFrames containing the query results.
- property transactional: bool
Whether to use transactions for reading to ensure consistency.
- MySQLSrc.stream(
- working_dir: str,
Execute the query and load the results as a list of TableFrames.
- Parameters:
working_dir – The directory where the Parquet files will be saved.
- Returns:
A list of TableFrames containing the query results.
- class NDJSONFormat
Bases:
FileFormat- Categories:
file-format
JSON file format.
- class OracleConn(
- uri: str,
- credentials: UserPasswordCredentials | None = None,
- enforce_connection_params: bool = True,
- cx_src_configs_oracle: dict | None = None,
- cx_dst_configs_oracle: dict | None = None,
Bases:
ConnRepresents a connection configuration to Oracle.
- property credentials: UserPasswordCredentials | None
The credentials required to access the Oracle database. If no credentials were provided, it will return None.
- property cx_dst_configs_oracle: dict
Destination-specific configuration parameters for the Oracle connection.
- property cx_src_configs_oracle: dict
Source-specific configuration parameters for the Oracle connection.
- property enforce_connection_params: bool
Whether to enforce connection parameters for the Oracle connection.
- property uri: str
The URI of the database where the data is located.
- class OracleDestination(
- uri: str,
- destination_table: List[str] | str,
- credentials: UserPasswordCredentials = None,
- if_table_exists: Literal['append', 'replace'] = 'append',
Bases:
DestinationPlugin- Categories:
destination
Oracle-based data outputs.
- property credentials: UserPasswordCredentials
The credentials required to access the Oracle database.
- Type:
- property destination_table: str | List[str]
The table(s) to create. If multiple tables are provided, they must be provided as a list.
- property if_table_exists: Literal['append', 'replace']
The strategy to follow when the table already exists.
- Type:
- OracleDestination.write( )
Given a file or a list of files, write to the desired destination. Note: this method might materialize the data in the files it receives, so chunks should be of an appropriate size.
- Parameters:
files (str) – The file or files to be stored in the final destination.
- class OracleSource(
- **kwargs,
Bases:
SourcePlugin- Categories:
source
Oracle-based data inputs.
- chunk(
- working_dir: str,
Trigger the import of the data. This must be implemented in any class that inherits from this class unless directly implementing streaming. The method will receive a folder where it must store the data as parquet files, and return a list of the paths of the files created. This files will then be loaded and mapped to the dataset function in positional order, so if you want file.parquet to be the first argument of the dataset function, you must return it first. If you want a parameter to receive multiple files, return a list of the paths. For example, you would give the following return to provide a first argument with a single file and a second argument with two files:return [“file1.parquet”, [“file2.parquet”, “file3.parquet”]]
- property credentials: UserPasswordCredentials | None
The credentials required to access Oracle. If no credentials were provided, it will return None.
- Type:
UserPasswordCredentials | None
- OracleSource.chunk(
- working_dir: str,
Trigger the import of the data. This must be implemented in any class that inherits from this class unless directly implementing streaming. The method will receive a folder where it must store the data as parquet files, and return a list of the paths of the files created. This files will then be loaded and mapped to the dataset function in positional order, so if you want file.parquet to be the first argument of the dataset function, you must return it first. If you want a parameter to receive multiple files, return a list of the paths. For example, you would give the following return to provide a first argument with a single file and a second argument with two files:return [“file1.parquet”, [“file2.parquet”, “file3.parquet”]]
- class OracleSrc(
- conn: OracleConn,
- queries: str | list[str],
- initial_values: dict[str, Any] | None = None,
- transactional: bool = True,
- chunk_size: Annotated[int, Gt(gt=0)] = 100000,
- loader: Literal['polars_sqlalchemy', 'pandas'] = 'polars_sqlalchemy',
- schema_overrides: dict[str, Boolean | Categorical | Date | Datetime | Decimal | Duration | Enum | Float16 | Float32 | Float64 | Int8 | Int16 | Int64 | Int32 | Int128 | Null | String | Time | UInt8 | UInt16 | UInt32 | UInt64] | list[dict[str, Boolean | Categorical | Date | Datetime | Decimal | Duration | Enum | Float16 | Float32 | Float64 | Int8 | Int16 | Int64 | Int32 | Int128 | Null | String | Time | UInt8 | UInt16 | UInt32 | UInt64]] | None = None,
Bases:
SourcePlugin- Categories:
source
Source plugin for reading data from Oracle.
- property chunk_size: int
The chunk size in records for reading large queries in batches.
- property conn: OracleConn
The Oracle connection configuration.
- property initial_values: dict
The initial values for the parameters in the SQL queries.
- property loader: Literal['polars_sqlalchemy', 'pandas']
The data processing loader to use for executing SQL imports.
- property schema_overrides: list[dict[str, Boolean | Categorical | Date | Datetime | Decimal | Duration | Enum | Float16 | Float32 | Float64 | Int8 | Int16 | Int64 | Int32 | Int128 | Null | String | Time | UInt8 | UInt16 | UInt32 | UInt64]] | None
The schema overrides for the tables being read.
- stream(
- working_dir: str,
Execute the query and load the results as a list of TableFrames.
- Parameters:
working_dir – The directory where the Parquet files will be saved.
- Returns:
A list of TableFrames containing the query results.
- property transactional: bool
Whether to use transactions for reading to ensure consistency.
- OracleSrc.stream(
- working_dir: str,
Execute the query and load the results as a list of TableFrames.
- Parameters:
working_dir – The directory where the Parquet files will be saved.
- Returns:
A list of TableFrames containing the query results.
- class ParquetFormat
Bases:
FileFormat- Categories:
file-format
Parquet file format.
- class PostgresCdcConn(
- **kwargs,
Bases:
CdcConn,PostgresConnPostgreSQL CDC connection configuration.
- class PostgresCdcTrigger(
- **kwargs,
Bases:
CdcTrigger- Categories:
trigger
A CdcTrigger that captures changes from a PostgreSQL database using WAL logical replication.
This trigger connects to a PostgreSQL server, reads the write-ahead log via the wal2json output plugin for change data capture events (inserts, updates, deletes) on specified tables, and stages them.
- property blocking_timeout_seconds: int
Timeout in seconds for blocking reads from the replication stream.
- collect_tables(
- session: Any,
- cdc_format: CdcFormat = CdcFormat(values_format='columns', flatten_values=True),
- harvest_tables(
- session: Any,
- cdc_format: CdcFormat = CdcFormat(values_format='columns', flatten_values=True),
- property slot_behavior: Literal['create', 'reuse']
Whether to create a new replication slot or reuse an existing one.
- PostgresCdcTrigger.collect_tables(
- session: Any,
- cdc_format: CdcFormat = CdcFormat(values_format='columns', flatten_values=True),
- PostgresCdcTrigger.harvest_tables(
- session: Any,
- cdc_format: CdcFormat = CdcFormat(values_format='columns', flatten_values=True),
- class PostgresConn(
- uri: str,
- credentials: UserPasswordCredentials | None = None,
- enforce_connection_params: bool = True,
- cx_src_configs_postgres: dict | None = None,
- cx_dst_configs_postgres: dict | None = None,
Bases:
ConnRepresents a connection configuration to Postgres.
- property credentials: UserPasswordCredentials | None
The credentials required to access the Postgres database. If no credentials were provided, it will return None.
- property cx_dst_configs_postgres: dict
Destination-specific configuration parameters for the Postgres connection.
- property cx_src_configs_postgres: dict
Source-specific configuration parameters for the Postgres connection.
- property enforce_connection_params: bool
Whether to enforce connection parameters for the Postgres connection.
- property uri: str
The URI of the database where the data is located.
- class PostgresDest(
- conn: PostgresConn,
- destination_tables: str | list[str],
- if_table_exists: Literal['append', 'replace'] = 'append',
- transactional: bool = True,
- chunk_size: Annotated[int, Gt(gt=0)] = 100000,
- loader: Literal['polars_sqlalchemy'] = 'polars_sqlalchemy',
Bases:
DestinationPlugin- Categories:
destination
Destination plugin for writing data to PostgreSQL.
- property chunk_size: int
The chunk size for writing large results in batches.
- property conn: PostgresConn
The Postgres connection configuration.
- property destination_tables: list[str]
The table(s) to create. If multiple tables are provided, they must be provided as a list.
- property if_table_exists: Literal['append', 'replace']
The strategy to follow when the table already exists.
- property loader: Literal['polars_sqlalchemy']
The data processing loader to use for executing DB write operation.
- stream( )
Store the results into the PostgreSQL database.
- Parameters:
working_dir – The working directory where the results are stored.
results – The results to store in the SQL destination.
- property transactional: bool
Whether to use transactions for writing data to ensure consistency.
- PostgresDest.stream( )
Store the results into the PostgreSQL database.
- Parameters:
working_dir – The working directory where the results are stored.
results – The results to store in the SQL destination.
- class PostgresDestination(
- **kwargs,
Bases:
DestinationPlugin- Categories:
destination
Postgres-based data outputs.
- property credentials: UserPasswordCredentials
The credentials required to access the Postgres database.
- Type:
- property destination_table: str | List[str]
The table(s) to create. If multiple tables are provided, they must be provided as a list.
- property if_table_exists: Literal['append', 'replace']
The strategy to follow when the table already exists.
- Type:
- PostgresDestination.write( )
Given a file or a list of files, write to the desired destination. Note: this method might materialize the data in the files it receives, so chunks should be of an appropriate size.
- Parameters:
files (str) – The file or files to be stored in the final destination.
- class PostgresSource(
- **kwargs,
Bases:
SourcePlugin- Categories:
source
Postgres-based data inputs.
- chunk(
- working_dir: str,
Trigger the import of the data. This must be implemented in any class that inherits from this class unless directly implementing streaming. The method will receive a folder where it must store the data as parquet files, and return a list of the paths of the files created. This files will then be loaded and mapped to the dataset function in positional order, so if you want file.parquet to be the first argument of the dataset function, you must return it first. If you want a parameter to receive multiple files, return a list of the paths. For example, you would give the following return to provide a first argument with a single file and a second argument with two files:return [“file1.parquet”, [“file2.parquet”, “file3.parquet”]]
- property credentials: UserPasswordCredentials | None
The credentials required to access the PostgresDatabase. If no credentials were provided, it will return None.
- Type:
UserPasswordCredentials | None
- PostgresSource.chunk(
- working_dir: str,
Trigger the import of the data. This must be implemented in any class that inherits from this class unless directly implementing streaming. The method will receive a folder where it must store the data as parquet files, and return a list of the paths of the files created. This files will then be loaded and mapped to the dataset function in positional order, so if you want file.parquet to be the first argument of the dataset function, you must return it first. If you want a parameter to receive multiple files, return a list of the paths. For example, you would give the following return to provide a first argument with a single file and a second argument with two files:return [“file1.parquet”, [“file2.parquet”, “file3.parquet”]]
- class PostgresSrc(
- conn: PostgresConn,
- queries: str | list[str],
- initial_values: dict[str, Any] | None = None,
- transactional: bool = True,
- chunk_size: Annotated[int, Gt(gt=0)] = 100000,
- loader: Literal['polars_sqlalchemy', 'pandas'] = 'polars_sqlalchemy',
- schema_overrides: dict[str, Boolean | Categorical | Date | Datetime | Decimal | Duration | Enum | Float16 | Float32 | Float64 | Int8 | Int16 | Int64 | Int32 | Int128 | Null | String | Time | UInt8 | UInt16 | UInt32 | UInt64] | list[dict[str, Boolean | Categorical | Date | Datetime | Decimal | Duration | Enum | Float16 | Float32 | Float64 | Int8 | Int16 | Int64 | Int32 | Int128 | Null | String | Time | UInt8 | UInt16 | UInt32 | UInt64]] | None = None,
Bases:
SourcePlugin- Categories:
source
Source plugin for reading data from Postgres.
- property chunk_size: int
The chunk size in records for reading large queries in batches.
- property conn: PostgresConn
The Postgres connection configuration.
- property initial_values: dict
The initial values for the parameters in the SQL queries.
- property loader: Literal['polars_sqlalchemy', 'pandas']
The data processing loader to use for executing SQL imports.
- property schema_overrides: list[dict[str, Boolean | Categorical | Date | Datetime | Decimal | Duration | Enum | Float16 | Float32 | Float64 | Int8 | Int16 | Int64 | Int32 | Int128 | Null | String | Time | UInt8 | UInt16 | UInt32 | UInt64]] | None
The schema overrides for the tables being read.
- stream(
- working_dir: str,
Execute the query and load the results as a list of TableFrames.
- Parameters:
working_dir – The directory where the Parquet files will be saved.
- Returns:
A list of TableFrames containing the query results.
- property transactional: bool
Whether to use transactions for reading to ensure consistency.
- PostgresSrc.stream(
- working_dir: str,
Execute the query and load the results as a list of TableFrames.
- Parameters:
working_dir – The directory where the Parquet files will be saved.
- Returns:
A list of TableFrames containing the query results.
- class S3AccessKeyCredentials( )
Bases:
S3Credentials- Categories:
credentials
Credentials class to access an S3 bucket using access key credentials (access key id and secret access key).
- Variables:
aws_access_key_id (Secret) – The AWS access key id.
aws_secret_access_key (Secret) – The AWS secret access key.
dict (to_dict() ->) – Convert the S3AccessKeyCredentials object to a dictionary
- property aws_access_key_id: Secret
The AWS access key id.
- Type:
Secret
- property aws_secret_access_key: Secret
The AWS secret access key.
- Type:
Secret
- class S3Config(
- bucket: str,
- region: str,
- base_path: str,
- credentials: S3AccessKeyCredentials,
Bases:
objectConfiguration for staging files in Amazon S3.
- Parameters:
bucket – S3 bucket name.
region – AWS region (e.g.
"us-east-1").base_path – Key prefix under which staged files are stored (e.g.
"staging/clickhouse").credentials – AWS access-key credentials used for uploading staged files (via
boto3).
- base_path: str
- bucket: str
- credentials: S3AccessKeyCredentials
- region: str
- class S3Destination(
- uri: str | list[str],
- credentials: S3Credentials,
- format: str | FileFormat = None,
- region: str = None,
- catalog: AWSGlue = None,
Bases:
DestinationPlugin- Categories:
destination
S3-file-based data outputs.
- class SupportedFormats(
- *values,
Bases:
EnumEnum for the supported formats for the S3Destination.
- avro = <class 'tabsdata._format.AvroFormat'>
- csv = <class 'tabsdata._format.CSVFormat'>
- ndjson = <class 'tabsdata._format.NDJSONFormat'>
- parquet = <class 'tabsdata._format.ParquetFormat'>
- property catalog: AWSGlue
The catalog to store the data in.
- Type:
Catalog
- chunk(
- working_dir: str,
- *results,
Trigger the exporting of the data to local parquet chunks. This method will receive the resulting data from the user function and must store it in the local system as parquet files, using the working_dir. Note: This method should not materialize the data, it should only store it in the local system.
- Parameters:
working_dir (str) – The folder where any files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
A list of the intermediate files created
- property credentials: S3Credentials | S3AccessKeyCredentials
The credentials required to access the S3 bucket.
- Type:
S3Credentials
- property format: FileFormat
The format of the file. If not provided, it will be inferred from the file.
- Type:
FileFormat
- stream(
- working_dir: str,
- *results,
Trigger the exporting of the data. This method will receive the resulting data from the user function and must store it in the desired location. Note: this method might materialize the data provided in a single chunk generated by the chunk function if invoked, so chunks should be of an appropriate size.
- Parameters:
working_dir (str) – The folder where any intermediate files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
None
- S3Destination.chunk(
- working_dir: str,
- *results,
Trigger the exporting of the data to local parquet chunks. This method will receive the resulting data from the user function and must store it in the local system as parquet files, using the working_dir. Note: This method should not materialize the data, it should only store it in the local system.
- Parameters:
working_dir (str) – The folder where any files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
A list of the intermediate files created
- S3Destination.stream(
- working_dir: str,
- *results,
Trigger the exporting of the data. This method will receive the resulting data from the user function and must store it in the desired location. Note: this method might materialize the data provided in a single chunk generated by the chunk function if invoked, so chunks should be of an appropriate size.
- Parameters:
working_dir (str) – The folder where any intermediate files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
None
- class S3Source(
- uri: str | list[str],
- credentials: S3Credentials,
- format: str | FileFormat = None,
- initial_last_modified: str | datetime = None,
- region: str = None,
- file_info: str | FileInfo | None = None,
Bases:
SourcePlugin- Categories:
source
S3-file-based data inputs.
- class SupportedFormats(
- *values,
Bases:
EnumEnum for the supported formats for the S3Source.
- avro = <class 'tabsdata._format.AvroFormat'>
- csv = <class 'tabsdata._format.CSVFormat'>
- log = <class 'tabsdata._format.LogFormat'>
- ndjson = <class 'tabsdata._format.NDJSONFormat'>
- parquet = <class 'tabsdata._format.ParquetFormat'>
- chunk(
- working_dir: str,
Trigger the import of the data. This must be implemented in any class that inherits from this class unless directly implementing streaming. The method will receive a folder where it must store the data as parquet files, and return a list of the paths of the files created. This files will then be loaded and mapped to the dataset function in positional order, so if you want file.parquet to be the first argument of the dataset function, you must return it first. If you want a parameter to receive multiple files, return a list of the paths. For example, you would give the following return to provide a first argument with a single file and a second argument with two files:return [“file1.parquet”, [“file2.parquet”, “file3.parquet”]]
- property credentials: S3Credentials
The credentials required to access the S3 bucket.
- Type:
S3Credentials
- property file_info: FileInfo | None
The file information to add as system columns.
- Type:
FileInfo | None
- property format: FileFormat
The format of the file. If not provided, it will be inferred from the file.
- Type:
FileFormat
- property initial_last_modified: str
The date and time after which the files were modified.
- Type:
- property initial_values: dict
Return a dictionary with the initial values to be stored after execution of the plugin. They will be accessible in the next execution of the plugin. The dictionary must have the parameter names as keys and the initial values as values, all the type string.
- Returns:
A dictionary with the initial values of the parameters of the plugin.
- Return type:
- S3Source.chunk(
- working_dir: str,
Trigger the import of the data. This must be implemented in any class that inherits from this class unless directly implementing streaming. The method will receive a folder where it must store the data as parquet files, and return a list of the paths of the files created. This files will then be loaded and mapped to the dataset function in positional order, so if you want file.parquet to be the first argument of the dataset function, you must return it first. If you want a parameter to receive multiple files, return a list of the paths. For example, you would give the following return to provide a first argument with a single file and a second argument with two files:return [“file1.parquet”, [“file2.parquet”, “file3.parquet”]]
- class SalesforceReportSource(
- credentials: SalesforceCredentials,
- report: str | list[str],
- column_name_strategy: Literal['columnName', 'label'],
- find_report_by: Literal['id', 'name'] = None,
- filter: tuple[str, str, str] | list[tuple[str, str, str]] = None,
- filter_logic: str = None,
- instance_url: str = None,
- last_modified_column: str = None,
- initial_last_modified: str = None,
- **kwargs,
Bases:
SourcePlugin- Categories:
source
Salesforce Reports based data inputs.
- chunk(
- working_dir: str,
Trigger the import of the data. This must be implemented in any class that inherits from this class unless directly implementing streaming. The method will receive a folder where it must store the data as parquet files, and return a list of the paths of the files created. This files will then be loaded and mapped to the dataset function in positional order, so if you want file.parquet to be the first argument of the dataset function, you must return it first. If you want a parameter to receive multiple files, return a list of the paths. For example, you would give the following return to provide a first argument with a single file and a second argument with two files:return [“file1.parquet”, [“file2.parquet”, “file3.parquet”]]
- property column_name_strategy: Literal['columnName', 'label']
- property credentials: SalesforceCredentials | SalesforceTokenCredentials
- property find_report_by: Literal['id', 'name']
- SalesforceReportSource.chunk(
- working_dir: str,
Trigger the import of the data. This must be implemented in any class that inherits from this class unless directly implementing streaming. The method will receive a folder where it must store the data as parquet files, and return a list of the paths of the files created. This files will then be loaded and mapped to the dataset function in positional order, so if you want file.parquet to be the first argument of the dataset function, you must return it first. If you want a parameter to receive multiple files, return a list of the paths. For example, you would give the following return to provide a first argument with a single file and a second argument with two files:return [“file1.parquet”, [“file2.parquet”, “file3.parquet”]]
- class SalesforceSource(
- credentials: SalesforceCredentials,
- query: str | list[str],
- instance_url: str = None,
- include_deleted: bool = False,
- initial_last_modified: str = None,
- **kwargs,
Bases:
SourcePlugin- Categories:
source
Salesforce (SOQL query) based data inputs (not Salesforce Reports).
- chunk(
- working_dir: str,
Trigger the import of the data. This must be implemented in any class that inherits from this class unless directly implementing streaming. The method will receive a folder where it must store the data as parquet files, and return a list of the paths of the files created. This files will then be loaded and mapped to the dataset function in positional order, so if you want file.parquet to be the first argument of the dataset function, you must return it first. If you want a parameter to receive multiple files, return a list of the paths. For example, you would give the following return to provide a first argument with a single file and a second argument with two files:return [“file1.parquet”, [“file2.parquet”, “file3.parquet”]]
- property credentials: SalesforceCredentials | SalesforceTokenCredentials
- SalesforceSource.chunk(
- working_dir: str,
Trigger the import of the data. This must be implemented in any class that inherits from this class unless directly implementing streaming. The method will receive a folder where it must store the data as parquet files, and return a list of the paths of the files created. This files will then be loaded and mapped to the dataset function in positional order, so if you want file.parquet to be the first argument of the dataset function, you must return it first. If you want a parameter to receive multiple files, return a list of the paths. For example, you would give the following return to provide a first argument with a single file and a second argument with two files:return [“file1.parquet”, [“file2.parquet”, “file3.parquet”]]
- class SalesforceTokenCredentials( )
Bases:
SalesforceCredentials- Categories:
credentials
Credentials class to store the credentials needed to access a Salesforce using a username, a password and a token.
- Variables:
username (Secret) – The username to access Salesforce.
password (Secret) – The password to access Salesforce.
security_token (Secret) – The security token to access Salesforce.
dict (to_dict() ->) – Convert the AzureAccountNameKeyCredentials object to a dictionary
- property password: Secret
The password to access Salesforce.
- Type:
Secret
- property security_token: Secret
The security token to access Salesforce.
- Type:
Secret
- property username: Secret
The username to access Salesforce.
- Type:
Secret
- class SnowflakeDestination(
- connection_parameters: dict,
- destination_table: List[str] | str,
- if_table_exists: Literal['append', 'replace'] = 'append',
- stage: str | None = None,
- **kwargs,
Bases:
DestinationPlugin- Categories:
destination
Snowflake based data outputs.
- chunk( ) List[None | str]
Trigger the exporting of the data to local parquet chunks. This method will receive the resulting data from the user function and must store it in the local system as parquet files, using the working_dir. Note: This method should not materialize the data, it should only store it in the local system.
- Parameters:
working_dir (str) – The folder where any files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
A list of the intermediate files created
- property if_table_exists: Literal['append', 'replace']
Returns the value of the if_table_exists property. This property determines what to do if the table already exists.
- stream( )
Trigger the exporting of the data. This method will receive the resulting data from the user function and must store it in the desired location. Note: this method might materialize the data provided in a single chunk generated by the chunk function if invoked, so chunks should be of an appropriate size.
- Parameters:
working_dir (str) – The folder where any intermediate files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
None
- write(
- files,
This method is used to write the files to the database. It is called from the stream method, and it is not intended to be called directly.
- SnowflakeDestination.chunk( ) List[None | str]
Trigger the exporting of the data to local parquet chunks. This method will receive the resulting data from the user function and must store it in the local system as parquet files, using the working_dir. Note: This method should not materialize the data, it should only store it in the local system.
- Parameters:
working_dir (str) – The folder where any files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
A list of the intermediate files created
- SnowflakeDestination.stream( )
Trigger the exporting of the data. This method will receive the resulting data from the user function and must store it in the desired location. Note: this method might materialize the data provided in a single chunk generated by the chunk function if invoked, so chunks should be of an appropriate size.
- Parameters:
working_dir (str) – The folder where any intermediate files generated must be stored (this refers to temporary files that will be deleted after the execution of the plugin, not the final destination of the data)
results – The data to be exported. It is a list of polars LazyFrames or None.
- Returns:
None
- SnowflakeDestination.write(
- files,
This method is used to write the files to the database. It is called from the stream method, and it is not intended to be called directly.
- class SourcePlugin
Bases:
object- Categories:
plugin
Parent class for publisher connectors.
- def chunk(working_dir) -> Union[str, Tuple[str, …], List[str]]
Trigger the import of the data. The method will receive a folder where it must store the data as parquet files, and return a list of the paths of the files created. This files will then be loaded and mapped to the dataset function in positional order, so if you want file.parquet to be the first argument of the dataset function, you must return it first. If you want a parameter to receive multiple files, return a list of the paths. For example, you would give the following return to provide a first argument with a single file and a second argument with two files: return [“file1.parquet”, [“file2.parquet”, “file3.parquet”]]
- chunk(
- working_dir: str,
Trigger the import of the data. This must be implemented in any class that inherits from this class unless directly implementing streaming. The method will receive a folder where it must store the data as parquet files, and return a list of the paths of the files created. This files will then be loaded and mapped to the dataset function in positional order, so if you want file.parquet to be the first argument of the dataset function, you must return it first. If you want a parameter to receive multiple files, return a list of the paths. For example, you would give the following return to provide a first argument with a single file and a second argument with two files:return [“file1.parquet”, [“file2.parquet”, “file3.parquet”]]
- property initial_values: dict
Return a dictionary with the initial values to be stored after execution of the plugin. They will be accessible in the next execution of the plugin. The dictionary must have the parameter names as keys and the initial values as values, all the type string.
- Returns:
A dictionary with the initial values of the parameters of the plugin.
- Return type:
- stream(
- working_dir: str,
- SourcePlugin.chunk(
- working_dir: str,
Trigger the import of the data. This must be implemented in any class that inherits from this class unless directly implementing streaming. The method will receive a folder where it must store the data as parquet files, and return a list of the paths of the files created. This files will then be loaded and mapped to the dataset function in positional order, so if you want file.parquet to be the first argument of the dataset function, you must return it first. If you want a parameter to receive multiple files, return a list of the paths. For example, you would give the following return to provide a first argument with a single file and a second argument with two files:return [“file1.parquet”, [“file2.parquet”, “file3.parquet”]]
- SourcePlugin.stream(
- working_dir: str,
- class Stage(
- use_existing_data: bool = True,
Bases:
SourcePlugin- Categories:
source
Inputs that have been placed in a stage, usually from a StageTrigger.
- stream(
- working_dir: str,
- property use_existing_data: bool
- Stage.stream(
- working_dir: str,
- class StageTrigger(
- **kwargs,
Bases:
ABC- property out: dict[int, str] | None
Returns a mapping of positional index to table name for the outputs produced by this trigger, or None when the StageTrigger implementation only supports generating a single table.
- abstractmethod run(
- ctx: StageTriggerContext,
Continuously runs the stage trigger logic using the received context to trigger and monitor executions. SUCCESS if the StageTrigger.end datetime is reached. exit with 0. ERROR if any issue stops it from working or recovering. exit with 1. STOPPED upon a signal. exit with 0.
- abstractmethod StageTrigger.run(
- ctx: StageTriggerContext,
Continuously runs the stage trigger logic using the received context to trigger and monitor executions. SUCCESS if the StageTrigger.end datetime is reached. exit with 0. ERROR if any issue stops it from working or recovering. exit with 1. STOPPED upon a signal. exit with 0.
- class StarRocksConn(
- **kwargs,
Bases:
ConnRepresents a connection configuration for StarRocks with cloud-staged bulk loading.
StarRocks ingests data via cloud storage (S3, Azure Blob, or GCS): files are first uploaded to the configured staging area and then loaded into StarRocks using the
FILES()table function in anINSERT INTO … SELECT * FROM FILES(...)statement.- property credentials: UserPasswordCredentials | None
The credentials used to authenticate with StarRocks.
Noneif no credentials were provided (StarRocksrootuser, no password).
- property cx_dst_configs_starrocks: dict
Destination-specific configuration parameters for the StarRocks connection.
- property cx_src_configs_starrocks: dict
Source-specific configuration parameters for the StarRocks connection.
- property database: str
The target StarRocks database name.
- property enforce_connection_params: bool
Whether to enforce connection parameters for the StarRocks connection.
- property host: str
The StarRocks FE hostname or IP address.
- property port: int
The MySQL protocol port of the StarRocks FE server.
- property staging: S3Config | AzureConfig | GCSConfig
The cloud storage staging configuration (S3Config, AzureConfig, or GCSConfig).
- class StarRocksDest(
- **kwargs,
Bases:
DestinationPlugin- Categories:
destination
Destination plugin for writing data to StarRocks using cloud-staged bulk loading.
Data is first written as Parquet files to the configured cloud storage staging area (S3, Azure Blob, or GCS) and then ingested into StarRocks via its
FILES()table function in anINSERT INTO … SELECT * FROM FILES(...)statement.- property conn: StarRocksConn
The StarRocks connection configuration.
- property if_table_exists: Literal['append', 'replace']
The strategy to follow when the table already exists.
- property schema_evolution: Literal['none', 'iceberg']
The schema evolution strategy.
- stream( )
Write each result as a Parquet file and bulk load it into StarRocks via the cloud staging area.
- Parameters:
working_dir – Temporary directory for intermediate Parquet files.
results – One polars LazyFrame (or
None) per destination table, in the same order asdestination_tables.
- StarRocksDest.stream( )
Write each result as a Parquet file and bulk load it into StarRocks via the cloud staging area.
- Parameters:
working_dir – Temporary directory for intermediate Parquet files.
results – One polars LazyFrame (or
None) per destination table, in the same order asdestination_tables.
- class TableFrame(
- df: Mapping[str, Sequence[object] | Mapping[str, Sequence[object]]] | TableFrame | None = None,
- *,
- origin: TableFrameOrigin | None = TableFrameOrigin.INIT,
- properties: TableFrameProperties | None = None,
Bases:
object> Private Functions
- assert_has_cols( ) None
- Categories:
tableframe
Ensures that the (non-system) columns in the TableFrame match the expected columns.
Raises an exception if the expectation is not met.
If exact is True, the check verifies that the TableFrame contains exactly the expected columns, with no extra or missing ones.
- Parameters:
cols – The expected column name(s). Can be a string or a list of strings.
exact – If True, checks that the TableFrame contains exactly the specified columns.
- Raises:
ValueError – If expected columns are missing or unexpected columns are present in the TableFrame.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> tf.assert_has_cols("a") >>> tf.assert_has_cols(["a", "b"], exact=True)
- cast(
- dtypes: Mapping[td_typing.ColumnNameOrSelector | td_typing.DataType, td_typing.DataType] | td_typing.DataType,
- *,
- strict: bool = True,
- Categories:
manipulation
Cast columns to a new data type.
- Parameters:
dtypes – Mapping of the column name(s) to the new data type(s).
strict – If True, raises an error if the cast cannot be performed.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌──────┬──────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞══════╪══════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ D ┆ 5 │ │ M ┆ 9 │ │ A ┆ 100 │ │ M ┆ 50 │ │ null ┆ 20 │ │ F ┆ null │ └──────┴──────┘ >>> >>> tf.cast({"b":td.Float32}).collect() >>> ┌──────┬───────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ f32 │ ╞══════╪═══════╡ │ A ┆ 1.0 │ │ X ┆ 10.0 │ │ C ┆ 3.0 │ │ D ┆ 5.0 │ │ M ┆ 9.0 │ │ A ┆ 100.0 │ │ M ┆ 50.0 │ │ null ┆ 20.0 │ │ F ┆ null │ └──────┴───────┘
- clear(
- n: int = 0,
- Categories:
tableframe
Clears all rows of the TableFrame preserving the schema.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌──────┬──────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞══════╪══════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ D ┆ 5 │ │ M ┆ 9 │ │ A ┆ 100 │ │ M ┆ 50 │ │ null ┆ 20 │ │ F ┆ null │ └──────┴──────┘ >>> >>> tf.cast({"b":td.Float32}).collect() >>> ┌──────┬───────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ f32 │ ╞══════╪═══════╡ └──────┴───────┘
- drop(
- *columns: td_typing.ColumnNameOrSelector | Iterable[td_typing.ColumnNameOrSelector],
- strict: bool = True,
- Categories:
projection
Discard columns from the TableFrame.
- Parameters:
columns – Columns to drop.
strict – If True, raises an error if a column does not exist.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌──────┬──────┐ │ x ┆ y │ │ --- ┆ --- │ │ f64 ┆ f64 │ ╞══════╪══════╡ │ 1.0 ┆ 2.0 │ │ 2.0 ┆ 2.0 │ │ 10.0 ┆ 10.0 │ │ 4.0 ┆ 10.0 │ │ 5.0 ┆ null │ │ null ┆ null │ └──────┴──────┘ >>> >>> tf.drop("y") >>> ┌──────┐ │ x │ │ --- │ │ f64 │ ╞══════╡ │ 1.0 │ │ 2.0 │ │ NaN │ │ 4.0 │ │ 5.0 │ │ null │ └──────┘
- drop_nans(
- subset: td_typing.ColumnNameOrSelector | Collection[td_typing.ColumnNameOrSelector] | None = None,
- Categories:
manipulation
Drop rows with NaN values.
- Parameters:
subset – Columns to look for Nan values. If None, all columns are considered.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌──────┬──────┬──────┐ │ ss ┆ u ┆ ff │ │ --- ┆ --- ┆ --- │ │ str ┆ i64 ┆ f64 │ ╞══════╪══════╪══════╡ │ A ┆ 1 ┆ 1.1 │ │ B ┆ 0 ┆ 0.0 │ │ A ┆ 2 ┆ 2.2 │ │ B ┆ 3 ┆ 3.3 │ │ B ┆ 4 ┆ 4.4 │ │ C ┆ 5 ┆ -1.1 │ │ C ┆ 6 ┆ -2.2 │ │ C ┆ 7 ┆ -3.3 │ │ D ┆ 8 ┆ inf │ │ F ┆ 9 ┆ NaN │ │ null ┆ null ┆ null │ └──────┴──────┴──────┘ >>> >>> tf.unique("a", keep="last") ┌─────┬─────┬──────┐ │ ss ┆ u ┆ ff │ │ --- ┆ --- ┆ --- │ │ str ┆ i64 ┆ f64 │ ╞═════╪═════╪══════╡ │ A ┆ 1 ┆ 1.1 │ │ B ┆ 0 ┆ 0.0 │ │ A ┆ 2 ┆ 2.2 │ │ B ┆ 3 ┆ 3.3 │ │ B ┆ 4 ┆ 4.4 │ │ C ┆ 5 ┆ -1.1 │ │ C ┆ 6 ┆ -2.2 │ │ C ┆ 7 ┆ -3.3 │ │ D ┆ 8 ┆ inf │ └─────┴─────┴──────┘ >>>
- drop_nulls(
- subset: td_typing.ColumnNameOrSelector | Collection[td_typing.ColumnNameOrSelector] | None = None,
- Categories:
manipulation
Drop rows with null values.
- Parameters:
subset – Columns to evaluate for null values. If None, all columns are considered.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌──────┬──────┬──────┐ │ ss ┆ u ┆ ff │ │ --- ┆ --- ┆ --- │ │ str ┆ i64 ┆ f64 │ ╞══════╪══════╪══════╡ │ A ┆ 1 ┆ 1.1 │ │ B ┆ 0 ┆ 0.0 │ │ A ┆ 2 ┆ 2.2 │ │ B ┆ 3 ┆ 3.3 │ │ B ┆ 4 ┆ 4.4 │ │ C ┆ 5 ┆ -1.1 │ │ C ┆ 6 ┆ -2.2 │ │ C ┆ 7 ┆ -3.3 │ │ D ┆ 8 ┆ inf │ │ F ┆ 9 ┆ NaN │ │ G ┆ null ┆ 2.3 │ └──────┴──────┴──────┘ >>> >>> tf.drop_nulls("a") >>> ┌─────┬─────┬──────┐ │ ss ┆ u ┆ ff │ │ --- ┆ --- ┆ --- │ │ str ┆ i64 ┆ f64 │ ╞═════╪═════╪══════╡ │ A ┆ 1 ┆ 1.1 │ │ B ┆ 0 ┆ 0.0 │ │ A ┆ 2 ┆ 2.2 │ │ B ┆ 3 ┆ 3.3 │ │ B ┆ 4 ┆ 4.4 │ │ C ┆ 5 ┆ -1.1 │ │ C ┆ 6 ┆ -2.2 │ │ C ┆ 7 ┆ -3.3 │ │ D ┆ 8 ┆ inf │ │ F ┆ 9 ┆ NaN │ └─────┴─────┴──────┘
- classmethod empty(
- schema: SimpleSchema = None,
- Categories:
tableframe
Creates an empty (no column - no row) TableFrame.
- explain(
- **kwargs,
Provide private accessibility level wrapper.
- extract_as_columns( ) dict[str, list[Any]]
- Categories:
filters
Extract a slice of rows from the table as a column-oriented dictionary.
The result is a mapping of column names to lists of values from the selected rows.
- Parameters:
- Returns:
A dictionary where each key is a column name, and its value is a list of values from the selected slice.
- Return type:
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ D ┆ 5 │ │ M ┆ 9 │ └─────┴─────┘ >>> >>> tf.extract_as_columns(offset=0, length=2) { "a": ["A", 1], "b": ["X", 10] }
- extract_as_rows( ) list[dict[str, Any]]
- Categories:
filters
Extract a slice of rows from the TableFrame as a list of dictionaries.
Each dictionary represents one row, where keys are column names and values are the corresponding cell values.
- Parameters:
- Returns:
A list of row dictionaries.
- Return type:
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ D ┆ 5 │ │ M ┆ 9 │ └─────┴─────┘ >>> >>> tf.extract_as_rows(offset=0, length=2) [ >>> >>> tf.extract_as_rows(offset=0, length=2) [ {"a": "A", "b": 1}, {"a": "X", "b": 10}, ]
- fill_nan( ) TableFrame
- Categories:
manipulation
Replace all NaN values in the TableFrame with the given value.
- Parameters:
value – The value to replace NaN with.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌──────┬──────┐ │ x ┆ y │ │ --- ┆ --- │ │ f64 ┆ f64 │ ╞══════╪══════╡ │ 1.0 ┆ 2.0 │ │ 2.0 ┆ 2.0 │ │ NaN ┆ NaN │ │ 4.0 ┆ NaN │ │ 5.0 ┆ null │ │ null ┆ null │ └──────┴──────┘ >>> >>> tf.fill_nan(10) >>> ┌──────┬──────┐ │ x ┆ y │ │ --- ┆ --- │ │ f64 ┆ f64 │ ╞══════╪══════╡ │ 1.0 ┆ 2.0 │ │ 2.0 ┆ 2.0 │ │ 10.0 ┆ 10.0 │ │ 4.0 ┆ 10.0 │ │ 5.0 ┆ null │ │ null ┆ null │ └──────┴──────┘
- fill_null( ) TableFrame
- Categories:
manipulation
Replace all null values in the TableFrame with the given value.
- Parameters:
value – The value to replace null with.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌──────┬──────┐ │ x ┆ y │ │ --- ┆ --- │ │ f64 ┆ f64 │ ╞══════╪══════╡ │ 1.0 ┆ 2.0 │ │ 2.0 ┆ 2.0 │ │ NaN ┆ NaN │ │ 4.0 ┆ NaN │ │ 5.0 ┆ null │ │ null ┆ null │ └──────┴──────┘ >>> >>> tf.fill_null(20) >>> ┌──────┬──────┐ │ x ┆ y │ │ --- ┆ --- │ │ f64 ┆ f64 │ ╞══════╪══════╡ │ 1.0 ┆ 2.0 │ │ 2.0 ┆ 2.0 │ │ NaN ┆ NaN │ │ 4.0 ┆ NaN │ │ 5.0 ┆ 20.0 │ │ 20.0 ┆ 20.0 │ └──────┴──────┘
- filter(
- *predicates: td_typing.IntoExprColumn | Iterable[td_typing.IntoExprColumn] | bool | list[bool] | np.ndarray[Any, Any],
- Categories:
filters
Filter the TableFrame based on the given predicates.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ D ┆ 5 │ │ M ┆ 9 │ │ A ┆ 100 │ │ M ┆ 50 │ └─────┴─────┘ >>> >>> tf.filter(td.col("a").is_in(["A", "C"]).or_(td.col("b").eq(10))) >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ A ┆ 100 │ └─────┴─────┘
- first() TableFrame
- Categories:
filters
Return a TableFrame with the first row.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ D ┆ 5 │ │ M ┆ 9 │ └─────┴─────┘ >>> >>> tf.first() >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ A ┆ 1 │ └─────┴─────┘
- first_row(
- named: bool = False,
- Categories:
filters
Return a tuple or dictionary with the first row, or None if no row.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌─────┬─────┐ │ A ┆ B │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ a ┆ 1 │ │ b ┆ 2 │ │ c ┆ 3 │ └─────┴─────┘ >>> >>> tf.last_row() >>> ('a', 1) >>> >>> tf.last_row(named=True) >>> {'A': 'a', 'B': '1'}
- classmethod from_dict( ) TableFrame
- Categories:
tableframe
Creates tableframe from a dictionary, or None. None produces as an empty (no column - no row) tableframe.
- Parameters:
data – Input data.
- classmethod from_pandas(
- data: pd.DataFrame | None = None,
- Categories:
tableframe
Creates tableframe from a pandas dataframe, or None. None produces as an empty (no column - no row) tableframe.
- Parameters:
data – Input data.
- classmethod from_polars(
- data: LazyFrame | DataFrame | None = None,
- Categories:
tableframe
Creates tableframe from a polars dataframe or lazyframe, or None. None produces as an empty (no column - no row) tableframe.
- Parameters:
data – Input data.
- grok( ) TableFrame
- Categories:
string
Parse log text into structured fields using a Grok pattern.
Applies a Grok pattern to the given column or expression and directly appends one new column per named capture in the pattern to the output TableFrame. Rows that do not match the pattern will contain null values for the extracted columns.
- Parameters:
expr (IntoExpr) – Column name or expression that resolves to a single string column containing log lines.
pattern (str) – Grok pattern with named captures (e.g., %{WORD:user}).
schema (dict[str, td_col.Column]) – A mapping where each capture name is associated with its corresponding column definition, specifying both the column name and its data type.
- Returns:
A new TableFrame with one column per Grok capture added.
- Return type:
Example
>>> import tabsdata as td >>> tf = td.TableFrame({"logs": [ ... "alice-login-2023", ... "bob-logout-2024", ... ]}) >>> >>> # Capture 3 fields: user, action, year >>> log_pattern = r"%{WORD:user}-%{WORD:action}-%{INT:year}" >>> log_schema = { >>> "word": td_col.Column("user", td.String), >>> "action": td_col.Column("action", td.String), >>> "year": td_col.Column("year", td.Int8), >>> } >>> out = tf.grok("logs", log_pattern, log_schema) >>> out.collect() ┌──────────────────┬───────┬────────┬──────┐ │ logs ┆ user ┆ action ┆ year │ │ --- ┆ --- ┆ --- ┆ --- │ │ str ┆ str ┆ str ┆ i64 │ ╞══════════════════╪═══════╪════════╪══════╡ │ alice-login-2023 ┆ alice ┆ login ┆ 2023 │ │ bob-logout-2024 ┆ bob ┆ logout ┆ 2024 │ └──────────────────┴───────┴────────┴──────┘
Notes
The function automatically expands the Grok captures into separate columns.
Non-matching rows will show null for the extracted columns.
If a pattern defines duplicate capture names, numeric suffixes like field, field[1] will be used to disambiguate them.
- group_by(
- *by: td_typing.IntoExpr | Iterable[td_typing.IntoExpr],
- Categories:
aggregation
Perform a group by on the TableFrame.
- Parameters:
by – Columns or expressions to group by.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ D ┆ 5 │ │ M ┆ 9 │ │ A ┆ 100 │ │ M ┆ 50 │ └─────┴─────┘ >>> >>> tf.group_by(td.col("a")).agg(td.col("b").sum()) >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ M ┆ 59 │ │ A ┆ 101 │ │ C ┆ 3 │ │ D ┆ 5 │ │ X ┆ 10 │ └─────┴─────┘
- has_cols( ) Tuple[bool, set[str], set[str]]
- Categories:
tableframe
Verifies the presence of (non-system) columns in the TableFrame.
If exact is True, the check ensures that the TableFrame contains exactly the specified columns (excluding system columns), with no extras or omissions.
- Parameters:
cols – The column name(s) to verify. Can be a string or a list of strings.
exact – If True, checks that the TableFrame contains exactly the specified columns.
- Returns:
A boolean indicating whether the check was successful.
A set of columns in cols missing in the TableFrame.
A set of columns in the TableFrame missing in cols.
- Return type:
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌──────┬──────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞══════╪══════╡ │ A ┆ 1 │ └──────┴──────┘ >>> >>> tf.has_cols("a") >>> (True, {}, {"b"}) >>> >>> tf.has_cols(["a", "c", "d"]) >>> (False, {"c", "d"}, {"b"}) >>> >>> tf.has_cols("a", exact=True) >>> (False, {}, {"b"}) >>> >>> tf.has_cols(["a", "b"], exact=True) >>> (True, {}, {})
- has_same_schema(
- tf: TableFrame,
- Categories:
tableframe
- Categories:
description
- Categories:
description
Verifies if the schema of the current TableFrame is same than the provided TableFrame.
- Parameters:
tf – The TableFrame to compare with.
- Returns:
Whether the condition is met or not.
- Return type:
Example:
>>> import tabsdata as td >>> >>> tf1: td.TableFrame ... >>> ┌──────┬──────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞══════╪══════╡ │ A ┆ 1 │ └──────┴──────┘ >>> >>> tf2: td.TableFrame ... >>> ┌──────┬──────┐ │ a ┆ c │ │ --- ┆ --- │ │ str ┆ i64 │ ╞══════╪══════╡ │ A ┆ 1 │ └──────┴──────┘ >>> tf1.has_same_schema(tf2) >>> False >>> >>> tf1: td.TableFrame ... >>> ┌──────┬──────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞══════╪══════╡ │ A ┆ 1 │ └──────┴──────┘ >>> >>> tf2: td.TableFrame ... >>> ┌──────┬──────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ str │ ╞══════╪══════╡ │ A ┆ 1 │ └──────┴──────┘ >>> tf1.has_same_schema(tf2) >>> False
- head(
- n: int = 5,
- Categories:
filters
Return a TableFrame with the first n rows.
- Parameters:
n – The number of rows to return.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ D ┆ 5 │ │ M ┆ 9 │ └─────┴─────┘ >>> >>> tf.head(2) >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ A ┆ 1 │ │ X ┆ 10 │ └─────┴─────┘
- inspect(
- **kwargs,
Provide private accessibility level wrapper.
- is_empty() bool
- Categories:
tableframe
Checks if a TableFrame has no rows.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌──────┬──────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞══════╪══════╡ │ A ┆ 1 │ └──────┴──────┘ >>> >>> tf.is_empty() >>> False >>> >>> tf: td.TableFrame ... >>> ┌──────┬──────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞══════╪══════╡ └──────┴──────┘ >>> >>> tf.is_empty() >>> True
- item() Any
- Categories:
projection
Returns a scalar value if the TableFrame contains exactly one user column and one row.
Raises an exception if there is more than one user column or more than one row.
Returns None if the TableFrame is empty.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌─────┐ │ a │ │ --- │ │ str │ ╞═════╡ │ A │ └─────┘ >>> >>> tf.python_version() >>> A
- join(
- other: TableFrame,
- on: str | Expr | Sequence[str | Expr] | None = None,
- how: Literal['inner', 'left', 'right', 'full', 'semi', 'anti', 'cross', 'outer'] = 'inner',
- *,
- left_on: str | Expr | Sequence[str | Expr] | None = None,
- right_on: str | Expr | Sequence[str | Expr] | None = None,
- suffix: str = '_right',
- join_nulls: bool = False,
- coalesce: bool | None = None,
- Categories:
join
Join the TableFrame with another TableFrame.
- Parameters:
other – The TableFrame to join.
on – Name(s) of the columns to join on. The column name(s) must be in both TableFrame’s. Don’t use this parameter if using `left_on and right_on parameters, or if how=”cross”.
how –
Join strategy:
inner: An inner join.
left: A left join.
right: A rigth join.
full: A full join.
cross: The cartesian product.
semi: An inner join but only returning the columns from left TableFrame.
anti: Rows from the left TableFrame that have no match in the right TableFrame.
left_on – Name(s) of the columns to join on from the left TableFrame. It must be used together wit the right_on parameter. It cannot be used with the on parameter.
right_on – Name(s) of the columns to join on from the right TableFrame. It must be used together wit the left_on parameter. It cannot be used with the on parameter.
suffix – Duplicate columns on the right Table are appended this suffix.
join_nulls – If null value matches should produce join rows or not.
coalesce – Collapse join columns into a single column.
Example:
>>> import tabsdata as td >>> >>> tf1: td.TableFrame ... >>> ┌──────┬──────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞══════╪══════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ D ┆ 5 │ │ M ┆ 9 │ │ A ┆ 100 │ │ M ┆ 50 │ │ null ┆ 20 │ │ F ┆ null │ └──────┴──────┘ >>> >>> tf2: td.TableFrame ... >>> ┌──────┬──────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞══════╪══════╡ │ A ┆ 3 │ │ Y ┆ 4 │ │ Z ┆ 5 │ │ A ┆ 0 │ │ M ┆ 6 │ │ null ┆ 8 │ │ F ┆ null │ └──────┴──────┘ >>> An inner join: >>> >>> tf1.join(tf2, on="a", how="inner") >>> ┌─────┬──────┬─────────┐ │ a ┆ b ┆ b_right │ │ --- ┆ --- ┆ --- │ │ str ┆ i64 ┆ i64 │ ╞═════╪══════╪═════════╡ │ A ┆ 1 ┆ 3 │ │ A ┆ 1 ┆ 0 │ │ M ┆ 9 ┆ 6 │ │ A ┆ 100 ┆ 3 │ │ A ┆ 100 ┆ 0 │ │ M ┆ 50 ┆ 6 │ │ F ┆ null ┆ null │ └─────┴──────┴─────────┘ >>> A left join: >>> >>> tf1.join(tf2, on="a", how="left") >>> ┌──────┬──────┬─────────┐ │ a ┆ b ┆ b_right │ │ --- ┆ --- ┆ --- │ │ str ┆ i64 ┆ i64 │ ╞══════╪══════╪═════════╡ │ A ┆ 1 ┆ 3 │ │ A ┆ 1 ┆ 0 │ │ X ┆ 10 ┆ null │ │ C ┆ 3 ┆ null │ │ D ┆ 5 ┆ null │ │ … ┆ … ┆ … │ │ A ┆ 100 ┆ 3 │ │ A ┆ 100 ┆ 0 │ │ M ┆ 50 ┆ 6 │ │ null ┆ 20 ┆ null │ │ F ┆ null ┆ null │ └──────┴──────┴─────────┘ >>> Turning off column coalesce: >>> >>> tf1.join(tf2, on="a", coalesce=False) >>> ┌─────┬──────┬─────────┬─────────┐ │ a ┆ b ┆ a_right ┆ b_right │ │ --- ┆ --- ┆ --- ┆ --- │ │ str ┆ i64 ┆ str ┆ i64 │ ╞═════╪══════╪═════════╪═════════╡ │ A ┆ 1 ┆ A ┆ 3 │ │ A ┆ 1 ┆ A ┆ 0 │ │ M ┆ 9 ┆ M ┆ 6 │ │ A ┆ 100 ┆ A ┆ 3 │ │ A ┆ 100 ┆ A ┆ 0 │ │ M ┆ 50 ┆ M ┆ 6 │ │ F ┆ null ┆ F ┆ null │ └─────┴──────┴─────────┴─────────┘
- last() TableFrame
- Categories:
filters
Return a TableFrame with the last row.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ D ┆ 5 │ │ M ┆ 9 │ └─────┴─────┘ >>> >>> tf.last() >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ M ┆ 9 │ └─────┴─────┘
- last_row(
- named: bool = False,
- Categories:
filters
Return a tuple or dictionary with the last row, or None if no row.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌─────┬─────┐ │ A ┆ B │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ a ┆ 1 │ │ b ┆ 2 │ │ c ┆ 3 │ └─────┴─────┘ >>> >>> tf.last_row() >>> ('c', 3) >>> >>> tf.last_row(named=True) >>> {'A': 'c', 'B': 3}
- limit(
- n: int = 5,
- Categories:
filters
Return a TableFrame with the first n rows. This is equivalent to head.
- Parameters:
n – The number of rows to return.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ D ┆ 5 │ │ M ┆ 9 │ └─────┴─────┘ >>> >>> tf.limit(2) >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ A ┆ 1 │ │ X ┆ 10 │ └─────┴─────┘
- rename( ) TableFrame
- Categories:
projection
Rename columns from the TableFrame.
- Parameters:
mapping – A dictionary mapping column names to their new names. The operation will fail if any specified column name does not exist.
Examples:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌──────┬──────┐ │ i ┆ f │ │ --- ┆ --- │ │ i32 ┆ f64 │ ╞══════╪══════╡ │ 1 ┆ 3.1 │ │ 2 ┆ 4.1 │ │ 3 ┆ 5.9 │ │ 4 ┆ 2.6 │ │ 5 ┆ 53.5 │ │ 6 ┆ 8.97 │ └──────┴──────┘ >>> >>> tf.{"i": "index", "f": "amount"}) >>> ┌───────┬────────┐ │ index ┆ amount │ │ ----- ┆ ------ │ │ i32 ┆ f64 │ ╞═══════╪════════╡ │ 1 ┆ 3.1 │ │ 2 ┆ 4.1 │ │ 3 ┆ 5.9 │ │ 4 ┆ 2.6 │ │ 5 ┆ 53.5 │ │ 6 ┆ 8.97 │ └───────┴────────┘
- property schema: Schema
- select(
- *exprs: td_typing.IntoExpr | Iterable[td_typing.IntoExpr],
- **named_exprs: td_typing.IntoExpr,
- Categories:
projection
Select column(s) from the TableFrame.
- Parameters:
exprs – Columns or expressions to select.
named_exprs – Named expressions to select.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ D ┆ 5 │ │ M ┆ 9 │ │ A ┆ 100 │ │ M ┆ 50 │ └─────┴─────┘ >>> >>> tf.select(td.col("a"), td.col("b").mul(2).alias("bx2"),) >>> ┌─────┬─────┐ │ a ┆ bx2 │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ A ┆ 2 │ │ X ┆ 20 │ │ C ┆ 6 │ │ D ┆ 10 │ │ M ┆ 18 │ │ A ┆ 200 │ │ M ┆ 100 │ └─────┴─────┘
- show_graph(
- **kwargs,
Provide private accessibility level wrapper.
- slice( ) TableFrame
- Categories:
filters
Return a TableFrame with a slice of the original TableFrame
- Parameters:
offset – Slice starting index.
length – The length of the slice. None means all the way to the end.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ D ┆ 5 │ │ M ┆ 9 │ └─────┴─────┘ >>> >>> tf.slice(2,2) >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ C ┆ 3 │ │ D ┆ 5 │ └─────┴─────┘
- sort(
- by: td_typing.IntoExpr | Iterable[td_typing.IntoExpr],
- *more_by: td_typing.IntoExpr,
- descending: bool | Sequence[bool] = False,
- nulls_last: bool | Sequence[bool] = False,
- maintain_order: bool = False,
- Categories:
tableframe
Sort the TableFrame by the given column(s) or expression(s).
- Parameters:
by – Column(s) or expression(s) to sort by.
more_by – Additional colums to sort by.
descending – Specifies if the sorting should be descending or not.
nulls_last – Specifies if null values should be placed last.
maintain_order – Preserve the order of equal rows.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌──────┬──────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞══════╪══════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ D ┆ 5 │ │ M ┆ 9 │ │ A ┆ 100 │ │ M ┆ 50 │ │ null ┆ 20 │ │ F ┆ null │ └──────┴──────┘ >>> >>> tf.sort(td.col("a"), descending = True) >>> ┌──────┬───────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ f32 │ ╞══════╪═══════╡ │ A ┆ 1.0 │ │ X ┆ 10.0 │ │ C ┆ 3.0 │ │ D ┆ 5.0 │ │ M ┆ 9.0 │ │ A ┆ 100.0 │ │ M ┆ 50.0 │ │ null ┆ 20.0 │ │ F ┆ null │ └──────┴───────┘
- tail(
- n: int = 5,
- Categories:
filters
Return a TableFrame with the last n rows.
- Parameters:
n – The number of rows to return.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ D ┆ 5 │ │ M ┆ 9 │ └─────┴─────┘ >>> >>> tf.tail(2) >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ D ┆ 5 │ │ M ┆ 9 │ └─────┴─────┘
- to_pandas() pd.DataFrame
- Categories:
tableframe
Creates a pandas dataframe from this tableframe.
- to_polars_df() DataFrame
- Categories:
tableframe
Creates a polars dataframe from this tableframe.
- to_polars_lf() LazyFrame
- Categories:
tableframe
Creates a polars lazyframe from this tableframe.
- udf(
- on: td_typing.IntoExpr | list[td_typing.IntoExpr],
- function: td_udf.UDF,
- Categories:
projection
Apply a user-defined function (UDF) to the columns resolved by expr.
The selected columns are supplied to function, which can implement either on_batch or on_element. An on_batch implementation receives a list of Polars series representing the selected columns and must return a list of Polars series with matching length. An on_element implementation receives a list of Python scalars for each row and returns a list of scalars; the framework wraps this in an efficient batch executor, so data still flows in batches even when authoring row-wise logic. In both cases the returned series become new columns appended to the original TableFrame.
By default, both methods receive their inputs as a list. Override the signature property in your UDF class to return “unpacked” to have each column passed as a separate argument instead.
- Creating UDFs:
Subclass
tabsdata.tableframe.udf.function.UDF.Implement
__init__to callsuper().__init__(output_columns)whereoutput_columnsis a tuple or list of tuples(name, data type)specifying the UDF default output schema (column names and data types). Each tuple must contain a column name (string) and a data type (DataType).Override exactly one of on_batch or on_element, to implement the UDF function logic.
Return a list of TabsData Series (for on_batch) or TabsData supported scalars (for on_element) with the same length as specified in the output schema.
If overriding the on_batch method, the return type must be a list of TabsData Series. If overriding the on_element method, the return type must be a list of supported TabsData scalar values. For both cases, the number of elements in the returned lists must match the number of elements in the output_columns list provided to the UDF constructor.
Using UDFs:
Instantiate a function created as above.
Pass it to TableFrame method udf().
Optionally use
UDF.output_columns()to override output column names or data types after instantiation.
- Parameters:
on – Expression selecting the input column(s) of the UDF.
function – Instance of
tabsdata.tableframe.udf.function.UDFdefining on_batch or on_element to produce the output series.
Examples
>>> import tabsdata as td >>> import tabsdata.tableframe as tdf >>> >>> class SumUDF(tdf.UDF): ... def __init__(self): ... super().__init__(("total", tdf.Int64)) ... ... def on_batch(self, series): ... return [series[0] + series[1]] >>> >>> tf = td.TableFrame({"a": [1, 2, 3], "b": [10, 20, 30]}) >>> print(tf) ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ i64 ┆ i64 │ ╞═════╪═════╡ │ 1 ┆ 10 │ │ 2 ┆ 20 │ │ 3 ┆ 30 │ └─────┴─────┘ >>> tf.udf(td.col("a", "b"), SumUDF()) >>> print(tf) ┌─────┬─────┬───────┐ │ a ┆ b ┆ total │ │ --- ┆ --- ┆ --- │ │ i64 ┆ i64 ┆ i64 │ ╞═════╪═════╪═══════╡ │ 1 ┆ 10 ┆ 11 │ │ 2 ┆ 20 ┆ 22 │ │ 3 ┆ 30 ┆ 33 │ └─────┴─────┴───────┘
>>> class RatioUDF(tdf.UDF): ... def __init__(self): ... super().__init__(("ratio", tdf.Float64)) ... ... def on_element(self, values): ... return [values[0] / values[1]] >>> >>> tf = td.TableFrame({"numerator": [10, 20, 30], >>> "denominator": [2, 5, 10],}) >>> print(tf) ┌───────────┬──────────────┐ │ numerator ┆ denominator │ │ --- ┆ --- │ │ i64 ┆ i64 │ ╞═══════════╪══════════════╡ │ 10 ┆ 2 │ │ 20 ┆ 5 │ │ 30 ┆ 10 │ └───────────┴──────────────┘ >>> tf.udf(td.col("numerator", "denominator"), RatioUDF()).collect() >>> print(tf) ┌───────────┬──────────────┬──────┐ │ numerator ┆ denominator ┆ ratio│ │ --- ┆ --- ┆ --- │ │ i64 ┆ i64 ┆ f64 │ ╞═══════════╪══════════════╪══════╡ │ 10 ┆ 2 ┆ 5.0 │ │ 20 ┆ 5 ┆ 4.0 │ │ 30 ┆ 10 ┆ 3.0 │ └───────────┴──────────────┴──────┘
Using signature property to receive individual arguments:
>>> class RatioUnpackedUDF(tdf.UDF): ... def __init__(self): ... super().__init__(("ratio", tdf.Float64)) ... ... @property ... def signature(self): ... return "unpacked" ... ... def on_element(self, numerator, denominator): ... return [numerator / denominator] >>> >>> tf = td.TableFrame({"numerator": [10, 20, 30], >>> "denominator": [2, 5, 10],}) >>> tf.udf(td.col("numerator", "denominator"), RatioUnpackedUDF()).collect() >>> print(tf) ┌───────────┬──────────────┬──────┐ │ numerator ┆ denominator ┆ ratio│ │ --- ┆ --- ┆ --- │ │ i64 ┆ i64 ┆ f64 │ ╞═══════════╪══════════════╪══════╡ │ 10 ┆ 2 ┆ 5.0 │ │ 20 ┆ 5 ┆ 4.0 │ │ 30 ┆ 10 ┆ 3.0 │ └───────────┴──────────────┴──────┘
- unique(
- subset: td_typing.ColumnNameOrSelector | Collection[td_typing.ColumnNameOrSelector] | None = None,
- *,
- keep: td_typing.UniqueKeepStrategy = 'any',
- maintain_order: bool = False,
- Categories:
filters
Deduplicate rows from the TableFrame.
- Parameters:
subset – Columns to evaluate for duplicate values. If None, all columns are considered.
keep – Strategy to keep duplicates: first, last, any, none ( eliminate duplicate rows).
maintain_order – Preserve the order of the rows.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌──────┬──────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞══════╪══════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ D ┆ 5 │ │ M ┆ 9 │ │ A ┆ 100 │ │ M ┆ 50 │ │ null ┆ 20 │ │ F ┆ null │ └──────┴──────┘ >>> >>> tf.unique("a", keep="last") >>> ┌──────┬──────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞══════╪══════╡ │ D ┆ 5 │ │ C ┆ 3 │ │ X ┆ 10 │ │ A ┆ 100 │ │ M ┆ 50 │ │ F ┆ null │ │ null ┆ 20 │ └──────┴──────┘
- unnest(
- columns: td_typing.ColumnNameOrSelector | Collection[td_typing.ColumnNameOrSelector],
- *more_columns: td_typing.ColumnNameOrSelector,
- Categories:
projection
Expand one or more struct columns so that each field within the struct becomes a separate column in the TableFrame.
The resulting TableFrame will place these new columns in the same position as the original struct column, effectively replacing it. This makes it easier to work directly with the inner fields as standard columns.
- Parameters:
columns – Name of the struct column(s) to expand.
more_columns – Additional struct columns to expand, provided as positional arguments.
Example:
>>> import tabsdata as td >>> >>> tf = td.TableFrame({ ... "id": [1, 2], ... "info": [ ... {"name": "Alice", "age": 30}, ... {"name": "Bob", "age": None}, ... ], ... "status": ["active", "inactive"], ... }) >>> >>> tf >>> ┌─────┬───────────────┬───────────┐ │ id ┆ info ┆ status │ │ --- ┆ --- ┆ --- │ │ i64 ┆ struct[2] ┆ str │ ╞═════╪═══════════════╪═══════════╡ │ 1 ┆ {"Alice",30} ┆ active │ │ 2 ┆ {"Bob",null} ┆ inactive │ └─────┴───────────────┴───────────┘ >>> >>> tf.unnest("info") >>> ┌─────┬───────┬──────┬───────────┐ │ id ┆ name ┆ age ┆ status │ │ --- ┆ --- ┆ --- ┆ --- │ │ i64 ┆ str ┆ i64 ┆ str │ ╞═════╪═══════╪══════╪═══════════╡ │ 1 ┆ Alice ┆ 30 ┆ active │ │ 2 ┆ Bob ┆ null ┆ inactive │ └─────┴───────┴──────┴───────────┘
- property width: int
- with_columns(
- *exprs: td_typing.IntoExpr | Iterable[td_typing.IntoExpr],
- **named_exprs: td_typing.IntoExpr,
- Categories:
projection
Add columns to the TableFrame.
- Parameters:
exprs – Columns or expressions to add.
named_exprs – Named expressions to add.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌──────┬──────┐ │ x ┆ y │ │ --- ┆ --- │ │ f64 ┆ f64 │ ╞══════╪══════╡ │ 1.0 ┆ 2.0 │ │ 2.0 ┆ 2.0 │ │ 10.0 ┆ 10.0 │ │ 4.0 ┆ 10.0 │ │ 5.0 ┆ null │ │ null ┆ null │ └──────┴──────┘ >>> >>> tf.with_columns(td.col("x").mul(td.col("y")).alias("z")) >>> ┌──────┬──────┬──────┐ │ x ┆ y ┆ z │ │ --- ┆ --- ┆ --- │ │ f64 ┆ f64 ┆ f64 │ ╞══════╪══════╪══════╡ │ 1.0 ┆ 2.0 ┆ 2.0 │ │ 2.0 ┆ 2.0 ┆ 4.0 │ │ NaN ┆ NaN ┆ NaN │ │ 4.0 ┆ NaN ┆ NaN │ │ 5.0 ┆ null ┆ null │ │ null ┆ null ┆ null │ └──────┴──────┴──────┘
- TableFrame.assert_has_cols( ) None
- Categories:
tableframe
Ensures that the (non-system) columns in the TableFrame match the expected columns.
Raises an exception if the expectation is not met.
If exact is True, the check verifies that the TableFrame contains exactly the expected columns, with no extra or missing ones.
- Parameters:
cols – The expected column name(s). Can be a string or a list of strings.
exact – If True, checks that the TableFrame contains exactly the specified columns.
- Raises:
ValueError – If expected columns are missing or unexpected columns are present in the TableFrame.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> tf.assert_has_cols("a") >>> tf.assert_has_cols(["a", "b"], exact=True)
- TableFrame.cast(
- dtypes: Mapping[td_typing.ColumnNameOrSelector | td_typing.DataType, td_typing.DataType] | td_typing.DataType,
- *,
- strict: bool = True,
- Categories:
manipulation
Cast columns to a new data type.
- Parameters:
dtypes – Mapping of the column name(s) to the new data type(s).
strict – If True, raises an error if the cast cannot be performed.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌──────┬──────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞══════╪══════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ D ┆ 5 │ │ M ┆ 9 │ │ A ┆ 100 │ │ M ┆ 50 │ │ null ┆ 20 │ │ F ┆ null │ └──────┴──────┘ >>> >>> tf.cast({"b":td.Float32}).collect() >>> ┌──────┬───────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ f32 │ ╞══════╪═══════╡ │ A ┆ 1.0 │ │ X ┆ 10.0 │ │ C ┆ 3.0 │ │ D ┆ 5.0 │ │ M ┆ 9.0 │ │ A ┆ 100.0 │ │ M ┆ 50.0 │ │ null ┆ 20.0 │ │ F ┆ null │ └──────┴───────┘
- TableFrame.clear(
- n: int = 0,
- Categories:
tableframe
Clears all rows of the TableFrame preserving the schema.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌──────┬──────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞══════╪══════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ D ┆ 5 │ │ M ┆ 9 │ │ A ┆ 100 │ │ M ┆ 50 │ │ null ┆ 20 │ │ F ┆ null │ └──────┴──────┘ >>> >>> tf.cast({"b":td.Float32}).collect() >>> ┌──────┬───────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ f32 │ ╞══════╪═══════╡ └──────┴───────┘
- TableFrame.drop(
- *columns: td_typing.ColumnNameOrSelector | Iterable[td_typing.ColumnNameOrSelector],
- strict: bool = True,
- Categories:
projection
Discard columns from the TableFrame.
- Parameters:
columns – Columns to drop.
strict – If True, raises an error if a column does not exist.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌──────┬──────┐ │ x ┆ y │ │ --- ┆ --- │ │ f64 ┆ f64 │ ╞══════╪══════╡ │ 1.0 ┆ 2.0 │ │ 2.0 ┆ 2.0 │ │ 10.0 ┆ 10.0 │ │ 4.0 ┆ 10.0 │ │ 5.0 ┆ null │ │ null ┆ null │ └──────┴──────┘ >>> >>> tf.drop("y") >>> ┌──────┐ │ x │ │ --- │ │ f64 │ ╞══════╡ │ 1.0 │ │ 2.0 │ │ NaN │ │ 4.0 │ │ 5.0 │ │ null │ └──────┘
- TableFrame.drop_nans(
- subset: td_typing.ColumnNameOrSelector | Collection[td_typing.ColumnNameOrSelector] | None = None,
- Categories:
manipulation
Drop rows with NaN values.
- Parameters:
subset – Columns to look for Nan values. If None, all columns are considered.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌──────┬──────┬──────┐ │ ss ┆ u ┆ ff │ │ --- ┆ --- ┆ --- │ │ str ┆ i64 ┆ f64 │ ╞══════╪══════╪══════╡ │ A ┆ 1 ┆ 1.1 │ │ B ┆ 0 ┆ 0.0 │ │ A ┆ 2 ┆ 2.2 │ │ B ┆ 3 ┆ 3.3 │ │ B ┆ 4 ┆ 4.4 │ │ C ┆ 5 ┆ -1.1 │ │ C ┆ 6 ┆ -2.2 │ │ C ┆ 7 ┆ -3.3 │ │ D ┆ 8 ┆ inf │ │ F ┆ 9 ┆ NaN │ │ null ┆ null ┆ null │ └──────┴──────┴──────┘ >>> >>> tf.unique("a", keep="last") ┌─────┬─────┬──────┐ │ ss ┆ u ┆ ff │ │ --- ┆ --- ┆ --- │ │ str ┆ i64 ┆ f64 │ ╞═════╪═════╪══════╡ │ A ┆ 1 ┆ 1.1 │ │ B ┆ 0 ┆ 0.0 │ │ A ┆ 2 ┆ 2.2 │ │ B ┆ 3 ┆ 3.3 │ │ B ┆ 4 ┆ 4.4 │ │ C ┆ 5 ┆ -1.1 │ │ C ┆ 6 ┆ -2.2 │ │ C ┆ 7 ┆ -3.3 │ │ D ┆ 8 ┆ inf │ └─────┴─────┴──────┘ >>>
- TableFrame.drop_nulls(
- subset: td_typing.ColumnNameOrSelector | Collection[td_typing.ColumnNameOrSelector] | None = None,
- Categories:
manipulation
Drop rows with null values.
- Parameters:
subset – Columns to evaluate for null values. If None, all columns are considered.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌──────┬──────┬──────┐ │ ss ┆ u ┆ ff │ │ --- ┆ --- ┆ --- │ │ str ┆ i64 ┆ f64 │ ╞══════╪══════╪══════╡ │ A ┆ 1 ┆ 1.1 │ │ B ┆ 0 ┆ 0.0 │ │ A ┆ 2 ┆ 2.2 │ │ B ┆ 3 ┆ 3.3 │ │ B ┆ 4 ┆ 4.4 │ │ C ┆ 5 ┆ -1.1 │ │ C ┆ 6 ┆ -2.2 │ │ C ┆ 7 ┆ -3.3 │ │ D ┆ 8 ┆ inf │ │ F ┆ 9 ┆ NaN │ │ G ┆ null ┆ 2.3 │ └──────┴──────┴──────┘ >>> >>> tf.drop_nulls("a") >>> ┌─────┬─────┬──────┐ │ ss ┆ u ┆ ff │ │ --- ┆ --- ┆ --- │ │ str ┆ i64 ┆ f64 │ ╞═════╪═════╪══════╡ │ A ┆ 1 ┆ 1.1 │ │ B ┆ 0 ┆ 0.0 │ │ A ┆ 2 ┆ 2.2 │ │ B ┆ 3 ┆ 3.3 │ │ B ┆ 4 ┆ 4.4 │ │ C ┆ 5 ┆ -1.1 │ │ C ┆ 6 ┆ -2.2 │ │ C ┆ 7 ┆ -3.3 │ │ D ┆ 8 ┆ inf │ │ F ┆ 9 ┆ NaN │ └─────┴─────┴──────┘
- classmethod TableFrame.empty(
- schema: SimpleSchema = None,
- Categories:
tableframe
Creates an empty (no column - no row) TableFrame.
- TableFrame.extract_as_columns( ) dict[str, list[Any]]
- Categories:
filters
Extract a slice of rows from the table as a column-oriented dictionary.
The result is a mapping of column names to lists of values from the selected rows.
- Parameters:
- Returns:
A dictionary where each key is a column name, and its value is a list of values from the selected slice.
- Return type:
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ D ┆ 5 │ │ M ┆ 9 │ └─────┴─────┘ >>> >>> tf.extract_as_columns(offset=0, length=2) { "a": ["A", 1], "b": ["X", 10] }
- TableFrame.extract_as_rows( ) list[dict[str, Any]]
- Categories:
filters
Extract a slice of rows from the TableFrame as a list of dictionaries.
Each dictionary represents one row, where keys are column names and values are the corresponding cell values.
- Parameters:
- Returns:
A list of row dictionaries.
- Return type:
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ D ┆ 5 │ │ M ┆ 9 │ └─────┴─────┘ >>> >>> tf.extract_as_rows(offset=0, length=2) [ >>> >>> tf.extract_as_rows(offset=0, length=2) [ {"a": "A", "b": 1}, {"a": "X", "b": 10}, ]
- TableFrame.fill_nan( ) TableFrame
- Categories:
manipulation
Replace all NaN values in the TableFrame with the given value.
- Parameters:
value – The value to replace NaN with.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌──────┬──────┐ │ x ┆ y │ │ --- ┆ --- │ │ f64 ┆ f64 │ ╞══════╪══════╡ │ 1.0 ┆ 2.0 │ │ 2.0 ┆ 2.0 │ │ NaN ┆ NaN │ │ 4.0 ┆ NaN │ │ 5.0 ┆ null │ │ null ┆ null │ └──────┴──────┘ >>> >>> tf.fill_nan(10) >>> ┌──────┬──────┐ │ x ┆ y │ │ --- ┆ --- │ │ f64 ┆ f64 │ ╞══════╪══════╡ │ 1.0 ┆ 2.0 │ │ 2.0 ┆ 2.0 │ │ 10.0 ┆ 10.0 │ │ 4.0 ┆ 10.0 │ │ 5.0 ┆ null │ │ null ┆ null │ └──────┴──────┘
- TableFrame.fill_null( ) TableFrame
- Categories:
manipulation
Replace all null values in the TableFrame with the given value.
- Parameters:
value – The value to replace null with.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌──────┬──────┐ │ x ┆ y │ │ --- ┆ --- │ │ f64 ┆ f64 │ ╞══════╪══════╡ │ 1.0 ┆ 2.0 │ │ 2.0 ┆ 2.0 │ │ NaN ┆ NaN │ │ 4.0 ┆ NaN │ │ 5.0 ┆ null │ │ null ┆ null │ └──────┴──────┘ >>> >>> tf.fill_null(20) >>> ┌──────┬──────┐ │ x ┆ y │ │ --- ┆ --- │ │ f64 ┆ f64 │ ╞══════╪══════╡ │ 1.0 ┆ 2.0 │ │ 2.0 ┆ 2.0 │ │ NaN ┆ NaN │ │ 4.0 ┆ NaN │ │ 5.0 ┆ 20.0 │ │ 20.0 ┆ 20.0 │ └──────┴──────┘
- TableFrame.filter(
- *predicates: td_typing.IntoExprColumn | Iterable[td_typing.IntoExprColumn] | bool | list[bool] | np.ndarray[Any, Any],
- Categories:
filters
Filter the TableFrame based on the given predicates.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ D ┆ 5 │ │ M ┆ 9 │ │ A ┆ 100 │ │ M ┆ 50 │ └─────┴─────┘ >>> >>> tf.filter(td.col("a").is_in(["A", "C"]).or_(td.col("b").eq(10))) >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ A ┆ 100 │ └─────┴─────┘
- TableFrame.first() TableFrame
- Categories:
filters
Return a TableFrame with the first row.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ D ┆ 5 │ │ M ┆ 9 │ └─────┴─────┘ >>> >>> tf.first() >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ A ┆ 1 │ └─────┴─────┘
- TableFrame.first_row(
- named: bool = False,
- Categories:
filters
Return a tuple or dictionary with the first row, or None if no row.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌─────┬─────┐ │ A ┆ B │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ a ┆ 1 │ │ b ┆ 2 │ │ c ┆ 3 │ └─────┴─────┘ >>> >>> tf.last_row() >>> ('a', 1) >>> >>> tf.last_row(named=True) >>> {'A': 'a', 'B': '1'}
- classmethod TableFrame.from_dict( ) TableFrame
- Categories:
tableframe
Creates tableframe from a dictionary, or None. None produces as an empty (no column - no row) tableframe.
- Parameters:
data – Input data.
- classmethod TableFrame.from_pandas(
- data: pd.DataFrame | None = None,
- Categories:
tableframe
Creates tableframe from a pandas dataframe, or None. None produces as an empty (no column - no row) tableframe.
- Parameters:
data – Input data.
- classmethod TableFrame.from_polars(
- data: LazyFrame | DataFrame | None = None,
- Categories:
tableframe
Creates tableframe from a polars dataframe or lazyframe, or None. None produces as an empty (no column - no row) tableframe.
- Parameters:
data – Input data.
- TableFrame.grok( ) TableFrame
- Categories:
string
Parse log text into structured fields using a Grok pattern.
Applies a Grok pattern to the given column or expression and directly appends one new column per named capture in the pattern to the output TableFrame. Rows that do not match the pattern will contain null values for the extracted columns.
- Parameters:
expr (IntoExpr) – Column name or expression that resolves to a single string column containing log lines.
pattern (str) – Grok pattern with named captures (e.g., %{WORD:user}).
schema (dict[str, td_col.Column]) – A mapping where each capture name is associated with its corresponding column definition, specifying both the column name and its data type.
- Returns:
A new TableFrame with one column per Grok capture added.
- Return type:
Example
>>> import tabsdata as td >>> tf = td.TableFrame({"logs": [ ... "alice-login-2023", ... "bob-logout-2024", ... ]}) >>> >>> # Capture 3 fields: user, action, year >>> log_pattern = r"%{WORD:user}-%{WORD:action}-%{INT:year}" >>> log_schema = { >>> "word": td_col.Column("user", td.String), >>> "action": td_col.Column("action", td.String), >>> "year": td_col.Column("year", td.Int8), >>> } >>> out = tf.grok("logs", log_pattern, log_schema) >>> out.collect() ┌──────────────────┬───────┬────────┬──────┐ │ logs ┆ user ┆ action ┆ year │ │ --- ┆ --- ┆ --- ┆ --- │ │ str ┆ str ┆ str ┆ i64 │ ╞══════════════════╪═══════╪════════╪══════╡ │ alice-login-2023 ┆ alice ┆ login ┆ 2023 │ │ bob-logout-2024 ┆ bob ┆ logout ┆ 2024 │ └──────────────────┴───────┴────────┴──────┘
Notes
The function automatically expands the Grok captures into separate columns.
Non-matching rows will show null for the extracted columns.
If a pattern defines duplicate capture names, numeric suffixes like field, field[1] will be used to disambiguate them.
- TableFrame.group_by(
- *by: td_typing.IntoExpr | Iterable[td_typing.IntoExpr],
- Categories:
aggregation
Perform a group by on the TableFrame.
- Parameters:
by – Columns or expressions to group by.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ D ┆ 5 │ │ M ┆ 9 │ │ A ┆ 100 │ │ M ┆ 50 │ └─────┴─────┘ >>> >>> tf.group_by(td.col("a")).agg(td.col("b").sum()) >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ M ┆ 59 │ │ A ┆ 101 │ │ C ┆ 3 │ │ D ┆ 5 │ │ X ┆ 10 │ └─────┴─────┘
- TableFrame.has_cols( ) Tuple[bool, set[str], set[str]]
- Categories:
tableframe
Verifies the presence of (non-system) columns in the TableFrame.
If exact is True, the check ensures that the TableFrame contains exactly the specified columns (excluding system columns), with no extras or omissions.
- Parameters:
cols – The column name(s) to verify. Can be a string or a list of strings.
exact – If True, checks that the TableFrame contains exactly the specified columns.
- Returns:
A boolean indicating whether the check was successful.
A set of columns in cols missing in the TableFrame.
A set of columns in the TableFrame missing in cols.
- Return type:
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌──────┬──────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞══════╪══════╡ │ A ┆ 1 │ └──────┴──────┘ >>> >>> tf.has_cols("a") >>> (True, {}, {"b"}) >>> >>> tf.has_cols(["a", "c", "d"]) >>> (False, {"c", "d"}, {"b"}) >>> >>> tf.has_cols("a", exact=True) >>> (False, {}, {"b"}) >>> >>> tf.has_cols(["a", "b"], exact=True) >>> (True, {}, {})
- TableFrame.has_same_schema(
- tf: TableFrame,
- Categories:
tableframe
- Categories:
description
- Categories:
description
Verifies if the schema of the current TableFrame is same than the provided TableFrame.
- Parameters:
tf – The TableFrame to compare with.
- Returns:
Whether the condition is met or not.
- Return type:
Example:
>>> import tabsdata as td >>> >>> tf1: td.TableFrame ... >>> ┌──────┬──────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞══════╪══════╡ │ A ┆ 1 │ └──────┴──────┘ >>> >>> tf2: td.TableFrame ... >>> ┌──────┬──────┐ │ a ┆ c │ │ --- ┆ --- │ │ str ┆ i64 │ ╞══════╪══════╡ │ A ┆ 1 │ └──────┴──────┘ >>> tf1.has_same_schema(tf2) >>> False >>> >>> tf1: td.TableFrame ... >>> ┌──────┬──────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞══════╪══════╡ │ A ┆ 1 │ └──────┴──────┘ >>> >>> tf2: td.TableFrame ... >>> ┌──────┬──────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ str │ ╞══════╪══════╡ │ A ┆ 1 │ └──────┴──────┘ >>> tf1.has_same_schema(tf2) >>> False
- TableFrame.head(
- n: int = 5,
- Categories:
filters
Return a TableFrame with the first n rows.
- Parameters:
n – The number of rows to return.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ D ┆ 5 │ │ M ┆ 9 │ └─────┴─────┘ >>> >>> tf.head(2) >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ A ┆ 1 │ │ X ┆ 10 │ └─────┴─────┘
- TableFrame.is_empty() bool
- Categories:
tableframe
Checks if a TableFrame has no rows.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌──────┬──────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞══════╪══════╡ │ A ┆ 1 │ └──────┴──────┘ >>> >>> tf.is_empty() >>> False >>> >>> tf: td.TableFrame ... >>> ┌──────┬──────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞══════╪══════╡ └──────┴──────┘ >>> >>> tf.is_empty() >>> True
- TableFrame.item() Any
- Categories:
projection
Returns a scalar value if the TableFrame contains exactly one user column and one row.
Raises an exception if there is more than one user column or more than one row.
Returns None if the TableFrame is empty.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌─────┐ │ a │ │ --- │ │ str │ ╞═════╡ │ A │ └─────┘ >>> >>> tf.python_version() >>> A
- TableFrame.join(
- other: TableFrame,
- on: str | Expr | Sequence[str | Expr] | None = None,
- how: Literal['inner', 'left', 'right', 'full', 'semi', 'anti', 'cross', 'outer'] = 'inner',
- *,
- left_on: str | Expr | Sequence[str | Expr] | None = None,
- right_on: str | Expr | Sequence[str | Expr] | None = None,
- suffix: str = '_right',
- join_nulls: bool = False,
- coalesce: bool | None = None,
- Categories:
join
Join the TableFrame with another TableFrame.
- Parameters:
other – The TableFrame to join.
on – Name(s) of the columns to join on. The column name(s) must be in both TableFrame’s. Don’t use this parameter if using `left_on and right_on parameters, or if how=”cross”.
how –
Join strategy:
inner: An inner join.
left: A left join.
right: A rigth join.
full: A full join.
cross: The cartesian product.
semi: An inner join but only returning the columns from left TableFrame.
anti: Rows from the left TableFrame that have no match in the right TableFrame.
left_on – Name(s) of the columns to join on from the left TableFrame. It must be used together wit the right_on parameter. It cannot be used with the on parameter.
right_on – Name(s) of the columns to join on from the right TableFrame. It must be used together wit the left_on parameter. It cannot be used with the on parameter.
suffix – Duplicate columns on the right Table are appended this suffix.
join_nulls – If null value matches should produce join rows or not.
coalesce – Collapse join columns into a single column.
Example:
>>> import tabsdata as td >>> >>> tf1: td.TableFrame ... >>> ┌──────┬──────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞══════╪══════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ D ┆ 5 │ │ M ┆ 9 │ │ A ┆ 100 │ │ M ┆ 50 │ │ null ┆ 20 │ │ F ┆ null │ └──────┴──────┘ >>> >>> tf2: td.TableFrame ... >>> ┌──────┬──────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞══════╪══════╡ │ A ┆ 3 │ │ Y ┆ 4 │ │ Z ┆ 5 │ │ A ┆ 0 │ │ M ┆ 6 │ │ null ┆ 8 │ │ F ┆ null │ └──────┴──────┘ >>> An inner join: >>> >>> tf1.join(tf2, on="a", how="inner") >>> ┌─────┬──────┬─────────┐ │ a ┆ b ┆ b_right │ │ --- ┆ --- ┆ --- │ │ str ┆ i64 ┆ i64 │ ╞═════╪══════╪═════════╡ │ A ┆ 1 ┆ 3 │ │ A ┆ 1 ┆ 0 │ │ M ┆ 9 ┆ 6 │ │ A ┆ 100 ┆ 3 │ │ A ┆ 100 ┆ 0 │ │ M ┆ 50 ┆ 6 │ │ F ┆ null ┆ null │ └─────┴──────┴─────────┘ >>> A left join: >>> >>> tf1.join(tf2, on="a", how="left") >>> ┌──────┬──────┬─────────┐ │ a ┆ b ┆ b_right │ │ --- ┆ --- ┆ --- │ │ str ┆ i64 ┆ i64 │ ╞══════╪══════╪═════════╡ │ A ┆ 1 ┆ 3 │ │ A ┆ 1 ┆ 0 │ │ X ┆ 10 ┆ null │ │ C ┆ 3 ┆ null │ │ D ┆ 5 ┆ null │ │ … ┆ … ┆ … │ │ A ┆ 100 ┆ 3 │ │ A ┆ 100 ┆ 0 │ │ M ┆ 50 ┆ 6 │ │ null ┆ 20 ┆ null │ │ F ┆ null ┆ null │ └──────┴──────┴─────────┘ >>> Turning off column coalesce: >>> >>> tf1.join(tf2, on="a", coalesce=False) >>> ┌─────┬──────┬─────────┬─────────┐ │ a ┆ b ┆ a_right ┆ b_right │ │ --- ┆ --- ┆ --- ┆ --- │ │ str ┆ i64 ┆ str ┆ i64 │ ╞═════╪══════╪═════════╪═════════╡ │ A ┆ 1 ┆ A ┆ 3 │ │ A ┆ 1 ┆ A ┆ 0 │ │ M ┆ 9 ┆ M ┆ 6 │ │ A ┆ 100 ┆ A ┆ 3 │ │ A ┆ 100 ┆ A ┆ 0 │ │ M ┆ 50 ┆ M ┆ 6 │ │ F ┆ null ┆ F ┆ null │ └─────┴──────┴─────────┴─────────┘
- TableFrame.last() TableFrame
- Categories:
filters
Return a TableFrame with the last row.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ D ┆ 5 │ │ M ┆ 9 │ └─────┴─────┘ >>> >>> tf.last() >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ M ┆ 9 │ └─────┴─────┘
- TableFrame.last_row(
- named: bool = False,
- Categories:
filters
Return a tuple or dictionary with the last row, or None if no row.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌─────┬─────┐ │ A ┆ B │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ a ┆ 1 │ │ b ┆ 2 │ │ c ┆ 3 │ └─────┴─────┘ >>> >>> tf.last_row() >>> ('c', 3) >>> >>> tf.last_row(named=True) >>> {'A': 'c', 'B': 3}
- TableFrame.limit(
- n: int = 5,
- Categories:
filters
Return a TableFrame with the first n rows. This is equivalent to head.
- Parameters:
n – The number of rows to return.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ D ┆ 5 │ │ M ┆ 9 │ └─────┴─────┘ >>> >>> tf.limit(2) >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ A ┆ 1 │ │ X ┆ 10 │ └─────┴─────┘
- TableFrame.rename( ) TableFrame
- Categories:
projection
Rename columns from the TableFrame.
- Parameters:
mapping – A dictionary mapping column names to their new names. The operation will fail if any specified column name does not exist.
Examples:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌──────┬──────┐ │ i ┆ f │ │ --- ┆ --- │ │ i32 ┆ f64 │ ╞══════╪══════╡ │ 1 ┆ 3.1 │ │ 2 ┆ 4.1 │ │ 3 ┆ 5.9 │ │ 4 ┆ 2.6 │ │ 5 ┆ 53.5 │ │ 6 ┆ 8.97 │ └──────┴──────┘ >>> >>> tf.{"i": "index", "f": "amount"}) >>> ┌───────┬────────┐ │ index ┆ amount │ │ ----- ┆ ------ │ │ i32 ┆ f64 │ ╞═══════╪════════╡ │ 1 ┆ 3.1 │ │ 2 ┆ 4.1 │ │ 3 ┆ 5.9 │ │ 4 ┆ 2.6 │ │ 5 ┆ 53.5 │ │ 6 ┆ 8.97 │ └───────┴────────┘
- TableFrame.select(
- *exprs: td_typing.IntoExpr | Iterable[td_typing.IntoExpr],
- **named_exprs: td_typing.IntoExpr,
- Categories:
projection
Select column(s) from the TableFrame.
- Parameters:
exprs – Columns or expressions to select.
named_exprs – Named expressions to select.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ D ┆ 5 │ │ M ┆ 9 │ │ A ┆ 100 │ │ M ┆ 50 │ └─────┴─────┘ >>> >>> tf.select(td.col("a"), td.col("b").mul(2).alias("bx2"),) >>> ┌─────┬─────┐ │ a ┆ bx2 │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ A ┆ 2 │ │ X ┆ 20 │ │ C ┆ 6 │ │ D ┆ 10 │ │ M ┆ 18 │ │ A ┆ 200 │ │ M ┆ 100 │ └─────┴─────┘
- TableFrame.slice( ) TableFrame
- Categories:
filters
Return a TableFrame with a slice of the original TableFrame
- Parameters:
offset – Slice starting index.
length – The length of the slice. None means all the way to the end.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ D ┆ 5 │ │ M ┆ 9 │ └─────┴─────┘ >>> >>> tf.slice(2,2) >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ C ┆ 3 │ │ D ┆ 5 │ └─────┴─────┘
- TableFrame.sort(
- by: td_typing.IntoExpr | Iterable[td_typing.IntoExpr],
- *more_by: td_typing.IntoExpr,
- descending: bool | Sequence[bool] = False,
- nulls_last: bool | Sequence[bool] = False,
- maintain_order: bool = False,
- Categories:
tableframe
Sort the TableFrame by the given column(s) or expression(s).
- Parameters:
by – Column(s) or expression(s) to sort by.
more_by – Additional colums to sort by.
descending – Specifies if the sorting should be descending or not.
nulls_last – Specifies if null values should be placed last.
maintain_order – Preserve the order of equal rows.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌──────┬──────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞══════╪══════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ D ┆ 5 │ │ M ┆ 9 │ │ A ┆ 100 │ │ M ┆ 50 │ │ null ┆ 20 │ │ F ┆ null │ └──────┴──────┘ >>> >>> tf.sort(td.col("a"), descending = True) >>> ┌──────┬───────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ f32 │ ╞══════╪═══════╡ │ A ┆ 1.0 │ │ X ┆ 10.0 │ │ C ┆ 3.0 │ │ D ┆ 5.0 │ │ M ┆ 9.0 │ │ A ┆ 100.0 │ │ M ┆ 50.0 │ │ null ┆ 20.0 │ │ F ┆ null │ └──────┴───────┘
- TableFrame.tail(
- n: int = 5,
- Categories:
filters
Return a TableFrame with the last n rows.
- Parameters:
n – The number of rows to return.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ D ┆ 5 │ │ M ┆ 9 │ └─────┴─────┘ >>> >>> tf.tail(2) >>> ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞═════╪═════╡ │ D ┆ 5 │ │ M ┆ 9 │ └─────┴─────┘
- TableFrame.to_dict() dict[str, list[Any]]
- Categories:
tableframe
Creates a dictionary from this tableframe.
- TableFrame.to_pandas() pd.DataFrame
- Categories:
tableframe
Creates a pandas dataframe from this tableframe.
- TableFrame.to_polars_df() DataFrame
- Categories:
tableframe
Creates a polars dataframe from this tableframe.
- TableFrame.to_polars_lf() LazyFrame
- Categories:
tableframe
Creates a polars lazyframe from this tableframe.
- TableFrame.udf(
- on: td_typing.IntoExpr | list[td_typing.IntoExpr],
- function: td_udf.UDF,
- Categories:
projection
Apply a user-defined function (UDF) to the columns resolved by expr.
The selected columns are supplied to function, which can implement either on_batch or on_element. An on_batch implementation receives a list of Polars series representing the selected columns and must return a list of Polars series with matching length. An on_element implementation receives a list of Python scalars for each row and returns a list of scalars; the framework wraps this in an efficient batch executor, so data still flows in batches even when authoring row-wise logic. In both cases the returned series become new columns appended to the original TableFrame.
By default, both methods receive their inputs as a list. Override the signature property in your UDF class to return “unpacked” to have each column passed as a separate argument instead.
- Creating UDFs:
Subclass
tabsdata.tableframe.udf.function.UDF.Implement
__init__to callsuper().__init__(output_columns)whereoutput_columnsis a tuple or list of tuples(name, data type)specifying the UDF default output schema (column names and data types). Each tuple must contain a column name (string) and a data type (DataType).Override exactly one of on_batch or on_element, to implement the UDF function logic.
Return a list of TabsData Series (for on_batch) or TabsData supported scalars (for on_element) with the same length as specified in the output schema.
If overriding the on_batch method, the return type must be a list of TabsData Series. If overriding the on_element method, the return type must be a list of supported TabsData scalar values. For both cases, the number of elements in the returned lists must match the number of elements in the output_columns list provided to the UDF constructor.
Using UDFs:
Instantiate a function created as above.
Pass it to TableFrame method udf().
Optionally use
UDF.output_columns()to override output column names or data types after instantiation.
- Parameters:
on – Expression selecting the input column(s) of the UDF.
function – Instance of
tabsdata.tableframe.udf.function.UDFdefining on_batch or on_element to produce the output series.
Examples
>>> import tabsdata as td >>> import tabsdata.tableframe as tdf >>> >>> class SumUDF(tdf.UDF): ... def __init__(self): ... super().__init__(("total", tdf.Int64)) ... ... def on_batch(self, series): ... return [series[0] + series[1]] >>> >>> tf = td.TableFrame({"a": [1, 2, 3], "b": [10, 20, 30]}) >>> print(tf) ┌─────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ i64 ┆ i64 │ ╞═════╪═════╡ │ 1 ┆ 10 │ │ 2 ┆ 20 │ │ 3 ┆ 30 │ └─────┴─────┘ >>> tf.udf(td.col("a", "b"), SumUDF()) >>> print(tf) ┌─────┬─────┬───────┐ │ a ┆ b ┆ total │ │ --- ┆ --- ┆ --- │ │ i64 ┆ i64 ┆ i64 │ ╞═════╪═════╪═══════╡ │ 1 ┆ 10 ┆ 11 │ │ 2 ┆ 20 ┆ 22 │ │ 3 ┆ 30 ┆ 33 │ └─────┴─────┴───────┘
>>> class RatioUDF(tdf.UDF): ... def __init__(self): ... super().__init__(("ratio", tdf.Float64)) ... ... def on_element(self, values): ... return [values[0] / values[1]] >>> >>> tf = td.TableFrame({"numerator": [10, 20, 30], >>> "denominator": [2, 5, 10],}) >>> print(tf) ┌───────────┬──────────────┐ │ numerator ┆ denominator │ │ --- ┆ --- │ │ i64 ┆ i64 │ ╞═══════════╪══════════════╡ │ 10 ┆ 2 │ │ 20 ┆ 5 │ │ 30 ┆ 10 │ └───────────┴──────────────┘ >>> tf.udf(td.col("numerator", "denominator"), RatioUDF()).collect() >>> print(tf) ┌───────────┬──────────────┬──────┐ │ numerator ┆ denominator ┆ ratio│ │ --- ┆ --- ┆ --- │ │ i64 ┆ i64 ┆ f64 │ ╞═══════════╪══════════════╪══════╡ │ 10 ┆ 2 ┆ 5.0 │ │ 20 ┆ 5 ┆ 4.0 │ │ 30 ┆ 10 ┆ 3.0 │ └───────────┴──────────────┴──────┘
Using signature property to receive individual arguments:
>>> class RatioUnpackedUDF(tdf.UDF): ... def __init__(self): ... super().__init__(("ratio", tdf.Float64)) ... ... @property ... def signature(self): ... return "unpacked" ... ... def on_element(self, numerator, denominator): ... return [numerator / denominator] >>> >>> tf = td.TableFrame({"numerator": [10, 20, 30], >>> "denominator": [2, 5, 10],}) >>> tf.udf(td.col("numerator", "denominator"), RatioUnpackedUDF()).collect() >>> print(tf) ┌───────────┬──────────────┬──────┐ │ numerator ┆ denominator ┆ ratio│ │ --- ┆ --- ┆ --- │ │ i64 ┆ i64 ┆ f64 │ ╞═══════════╪══════════════╪══════╡ │ 10 ┆ 2 ┆ 5.0 │ │ 20 ┆ 5 ┆ 4.0 │ │ 30 ┆ 10 ┆ 3.0 │ └───────────┴──────────────┴──────┘
- TableFrame.unique(
- subset: td_typing.ColumnNameOrSelector | Collection[td_typing.ColumnNameOrSelector] | None = None,
- *,
- keep: td_typing.UniqueKeepStrategy = 'any',
- maintain_order: bool = False,
- Categories:
filters
Deduplicate rows from the TableFrame.
- Parameters:
subset – Columns to evaluate for duplicate values. If None, all columns are considered.
keep – Strategy to keep duplicates: first, last, any, none ( eliminate duplicate rows).
maintain_order – Preserve the order of the rows.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌──────┬──────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞══════╪══════╡ │ A ┆ 1 │ │ X ┆ 10 │ │ C ┆ 3 │ │ D ┆ 5 │ │ M ┆ 9 │ │ A ┆ 100 │ │ M ┆ 50 │ │ null ┆ 20 │ │ F ┆ null │ └──────┴──────┘ >>> >>> tf.unique("a", keep="last") >>> ┌──────┬──────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞══════╪══════╡ │ D ┆ 5 │ │ C ┆ 3 │ │ X ┆ 10 │ │ A ┆ 100 │ │ M ┆ 50 │ │ F ┆ null │ │ null ┆ 20 │ └──────┴──────┘
- TableFrame.unnest(
- columns: td_typing.ColumnNameOrSelector | Collection[td_typing.ColumnNameOrSelector],
- *more_columns: td_typing.ColumnNameOrSelector,
- Categories:
projection
Expand one or more struct columns so that each field within the struct becomes a separate column in the TableFrame.
The resulting TableFrame will place these new columns in the same position as the original struct column, effectively replacing it. This makes it easier to work directly with the inner fields as standard columns.
- Parameters:
columns – Name of the struct column(s) to expand.
more_columns – Additional struct columns to expand, provided as positional arguments.
Example:
>>> import tabsdata as td >>> >>> tf = td.TableFrame({ ... "id": [1, 2], ... "info": [ ... {"name": "Alice", "age": 30}, ... {"name": "Bob", "age": None}, ... ], ... "status": ["active", "inactive"], ... }) >>> >>> tf >>> ┌─────┬───────────────┬───────────┐ │ id ┆ info ┆ status │ │ --- ┆ --- ┆ --- │ │ i64 ┆ struct[2] ┆ str │ ╞═════╪═══════════════╪═══════════╡ │ 1 ┆ {"Alice",30} ┆ active │ │ 2 ┆ {"Bob",null} ┆ inactive │ └─────┴───────────────┴───────────┘ >>> >>> tf.unnest("info") >>> ┌─────┬───────┬──────┬───────────┐ │ id ┆ name ┆ age ┆ status │ │ --- ┆ --- ┆ --- ┆ --- │ │ i64 ┆ str ┆ i64 ┆ str │ ╞═════╪═══════╪══════╪═══════════╡ │ 1 ┆ Alice ┆ 30 ┆ active │ │ 2 ┆ Bob ┆ null ┆ inactive │ └─────┴───────┴──────┴───────────┘
- TableFrame.with_columns(
- *exprs: td_typing.IntoExpr | Iterable[td_typing.IntoExpr],
- **named_exprs: td_typing.IntoExpr,
- Categories:
projection
Add columns to the TableFrame.
- Parameters:
exprs – Columns or expressions to add.
named_exprs – Named expressions to add.
Example:
>>> import tabsdata as td >>> >>> tf: td.TableFrame ... >>> ┌──────┬──────┐ │ x ┆ y │ │ --- ┆ --- │ │ f64 ┆ f64 │ ╞══════╪══════╡ │ 1.0 ┆ 2.0 │ │ 2.0 ┆ 2.0 │ │ 10.0 ┆ 10.0 │ │ 4.0 ┆ 10.0 │ │ 5.0 ┆ null │ │ null ┆ null │ └──────┴──────┘ >>> >>> tf.with_columns(td.col("x").mul(td.col("y")).alias("z")) >>> ┌──────┬──────┬──────┐ │ x ┆ y ┆ z │ │ --- ┆ --- ┆ --- │ │ f64 ┆ f64 ┆ f64 │ ╞══════╪══════╪══════╡ │ 1.0 ┆ 2.0 ┆ 2.0 │ │ 2.0 ┆ 2.0 ┆ 4.0 │ │ NaN ┆ NaN ┆ NaN │ │ 4.0 ┆ NaN ┆ NaN │ │ 5.0 ┆ null ┆ null │ │ null ┆ null ┆ null │ └──────┴──────┴──────┘
- class TableInput( )
Bases:
SourcePluginTable-based data inputs.
- stream(
- working_dir: str,
- TableInput.stream(
- working_dir: str,
- class TableOutput( )
Bases:
DestinationPlugin- Categories:
destination
Table-based data outputs.
- class TabsdataFunction(
- func: Callable,
- name: str | None,
- input: SourcePlugin = None,
- output: DestinationPlugin = None,
- trigger_by: str | list[str] | CronTrigger | StageTrigger | None = None,
- output_actions: DataQuality | list[DataQuality] | None = None,
Bases:
object- Class to decorate a function with metadata and methods for use in a Tabsdata
environment.
Attributes:
- property input: SourcePlugin | None
The data to be used when running the function.
- Type:
SourcePlugin | None
- property original_file
The file where the original function is defined in the user’s computer
- Type:
- property original_folder: str
The folder where the original function is defined, as a local path in the user’s computer.
- Type:
- property original_function: Callable
The original function that was decorated, without any behaviour modifications.
- Type:
Callable
- property output: DestinationPlugin | None
The location where the function results will be saved when run.
- Type:
- property output_actions: list[OnTablesAction]
Actions to perform on the output tables.
- Type:
list[OnTablesAction]
- property trigger_by: list[str] | CronTrigger | StageTrigger | None
The trigger(s) that will cause the function to execute. It must be another table or tables in the system, a CronTrigger, or a StageTrigger in the case of a publisher.
- Type:
List[str]
- class UserPasswordCredentials( )
Bases:
Credentials- Categories:
credentials
Represents credentials consisting of a user and a password.
This class is used to manage and securely handle user and password credentials. It includes functionalities to set user and password details, convert them into a dictionary representation, and provide a string representation for debugging purposes.
- property password: Secret
The password.
- Type:
Secret
- property user: Secret
The user.
- Type:
Secret
- concat(
- items: Iterable[TdType] | None,
- how: Literal['vertical', 'vertical_relaxed', 'diagonal', 'diagonal_relaxed'] = 'vertical',
- Categories:
union
Combine multiple TableFrames by stacking their rows.
- Parameters:
items – The TableFrames to concatenate. If None or empty, returns None. None entries within the iterable are ignored. If all entries are None, returns None.
how –
{‘vertical’, ‘vertical_relaxed’, ‘diagonal’, ‘diagonal_relaxed’}
vertical: Appends the rows of each input below the previous one. All inputs must have exactly the same column names and types; otherwise the operation fails.
vertical_relaxed: Same as vertical, but if columns with the same name have different data types across inputs, they are converted to a common type (e.g. Int32 → Int64).
diagonal: Aligns columns by name across all inputs. If a column is missing from a particular input, that input is padded with null values for the missing column. Matching columns keep their original type if consistent.
diagonal_relaxed: Same as diagonal, but when matching columns have different data types, they are converted to a common type (e.g. Int32 → Int64).
Example:
>>> import tabsdata as td >>> >>> tf1: td.TableFrame ... >>> ┌──────┬──────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞══════╪══════╡ │ a ┆ 1 │ │ b ┆ 2 │ └──────┴──────┘ >>> >>> tf2: td.TableFrame ... >>> ┌──────┬──────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞══════╪══════╡ │ x ┆ 10 │ │ y ┆ 20 │ └──────┴──────┘ >>> >>> tf = td.concat([tf1, tf2]) >>> ┌──────┬──────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ i64 │ ╞══════╪══════╡ │ a ┆ 1 │ │ b ┆ 2 │ │ x ┆ 10 │ │ y ┆ 20 │ └──────┴──────┘
- publisher(
- source: AzureSource | LocalFileSource | MariaDBSource | MySQLSource | OracleSource | PostgresSource | S3Source | SourcePlugin,
- tables: TableOutput | str | List[str],
- name: str = None,
- trigger_by: PublisherTriggerBySpec = None,
- on_tables: OnTablesSpec = None,
- Categories:
publisher
Decorator to define a function to publish data to Tabsdata.
- Parameters:
source – Where to obtain the data that will be provided as an input to the function.
tables – Where to store the output of the function.
name – The name with which the function will be registered. If not provided, the current function name will be used.
trigger_by – The trigger that will cause the function to execute. It can be a table in the system, a list of tables, None (in which case it must be triggered manually), or a CronTrigger.
on_tables (OnTablesSpec, optional) – Actions to perform on output tables after executing the function. Can be a single action or a list of them. Defaults to None.
- Returns:
The function converted to a Tabsdata Function.
- Return type:
callable
- subscriber(
- tables: TableInput | str | List[str],
- destination: AzureDestination | LocalFileDestination | MariaDBDestination | MySQLDestination | OracleDestination | PostgresDestination | S3Destination | DestinationPlugin,
- name: str = None,
- trigger_by: SubscriberTriggerBySpec = '*',
- Categories:
subscriber
Decorator to define a function to subscribe to data present in Tabsdata.
- Parameters:
tables – Where to obtain the data that will be provided as an input to the function.
destination – Where to store the output of the function.
name – The name with which the function will be registered. If not provided, the current function name will be used.
trigger_by – The trigger that will cause the function to execute. It can be a table in the system, a list of tables, None (in which case it must be triggered manually), or a CronTrigger. Defaults to all dependencies.
- Returns:
The function converted to a Tabsdata Function.
- Return type:
callable
- transformer(
- input_tables: TableInput | str | List[str],
- output_tables: TableOutput | str | List[str],
- name: str = None,
- trigger_by: TransformerTriggerBySpec = '*',
- on_output_tables: OnTablesSpec = None,
- Categories:
transformer
Decorator to define a function to transform data present in Tabsdata.
- Parameters:
input_tables (TableInput | str | List[str]) – Where to obtain the data that will be provided as an input to the function.
output_tables (TableOutput | str | List[str]) – Where to store the output of the function.
name (str, optional) – The name with which the function will be registered. If not provided, the current function name will be used.
trigger_by (TransformerTriggerBySpec, optional) – The trigger that will cause the function to execute. It can be a table in the system, a list of tables, None (in which case it must be triggered manually), or a CronTrigger. Defaults to all dependencies.
on_output_tables (OnTablesSpec, optional) – Actions to perform on output tables after executing the function. Can be a single action or a list of them. Defaults to None.
- Returns:
The function converted to a Tabsdata Function.
- Return type:
callable