MongoDB#

You can use a subscriber to write to a MongoDB database.

Installing the connector#

To work with the MongoDB connector you are required to install the dependencies separately. This is to manage the size of the core package. Please run the following command in your terminal to install the dependencies.

$ pip install tabsdata[“mongodb”]

or,

$ pip install 'tabsdata['mongodb']'

Example (Subscriber - MongoDB)#

The following example creates a subscriber named write_sales. It writes two Tabsdata tables to the database. The subscriber is automatically triggered by a new commit to any of its input tables, and writes data to the database without any modification.

import tabsdata as td

db_username = td.HashiCorpSecret("path-to-secret", "DB_USERNAME")
db_password = td.HashiCorpSecret("path-to-secret", "DB_PASSWORD")

@td.subscriber(
    tables=["vendors", "items"],
    destination=td.MongoDBDestination(
        uri="mongodb://127.0.0.1:3306/sales_db",
        collection_with_ids=[("testing_database.testing_collection", None), ("testing_database.second_collection", "email")],
        credentials=td.UserPasswordCredentials(db_username, db_password),
        if_table_exists="replace",
    ),
)
def write_sales(tf1: td.TableFrame, tf2: td.TableFrame):
    return tf1, tf2

Note: After defining the subscriber, you need to register it with a Tabsdata collection. For more information, see Register a Function.

Setup (Subscriber - MongoDB)#

The following code uses placeholder values for defining a subscriber that reads Tabsdata tables and writes to the MongoDB database:

import tabsdata as td


@td.subscriber(
    tables=["<input_table1>", "<input_table2>"],
    destination=td.MongoDBDestination(
        uri="mongodb://<path_to_db>",
        collection_with_ids=[("database.collection_1",None), ("database.collection_2","field_name")],
        credentials=td.UserPasswordCredentials(
            "<db_username>", "<db_passowrd>"
        ),
        if_collection_exists="<value>",
        connection_options={dictionary};

    ),
    trigger_by=["<trigger_table1>", "<trigger_table2>"],
)
def <subscriber_name> (<table_frame1>:td.TableFrame, <table_frame2>:td.TableFrame):
    <function_logic>
    return <table_frame_output1>, <table_frame_output2>

Note: After defining the subscriber, you need to register it with a Tabsdata collection. For more information, see Register a Function.

Following properties are defined in the code above:

tables#

<input_table1>, <input_table2>… are the names of the Tabsdata tables to be written to the external system.

destination#

uri: <path_to_db> is the URI to the database. Some parameters can be sent together with the URI, for example the replication set to use, as you would do normally when connecting to the MongoDB database.

credentials: <db_username> and <db_password> are the username and password to log in to the database respectively. You can use different ways to store the credentials which are highlighted here in the documentation.

collections_with_ids: In MongoDB, documents (equivalent to tuples in RBMS) are stored in collections (equivalent to tables in RBMS) inside a database, and each document must have a column that acts as a unique identifier. This parameter will take, for each table that we want it to store, a tuple where its first element is a string representing the database plus the collection, as “database_name.collection_name”, and a second element that can be either a column in the table that we want to use as the identifier column (providing the string “column_name” in such a case) or None, in case we want MongoDB to automatically generate the identifier column. In the above example, we have collection_with_ids=[("database.collection_1",None), ("database.collection_2","field_name")]. Here the first entry in the array, ("database.collection_1",None) creates a collection that automatically generate the identifier column for all documents inside it. The second entry in the array, ("database.collection_2","field_name") creates a collection that uses field_name from the document as the identifier.

[optional] if_collection_exists: This parameter behaves identically to if_table_exists, found in other destinations. It defines what to do if the destination is storing to a specific collection that already existed before. The options are append, in which case the documents will be added to the list of already existing documents in the collection, or replace, in which case the entire collection will be emptied before performing insertion of the new documents. This parameter defaults to append.

[optional] connection_options: The optional parameter connection_options takes a dictionary as the input, and will send it directly to the function creating the connection with the MongoDB database. We are using the function pymongo.MongoClient to create it, with documentation here: https://pymongo.readthedocs.io/en/stable/api/pymongo/mongo_client.html#pymongo.mongo_client.MongoClient. For example, if one wanted to set the timeout to 1000 milliseconds, they would provide the following dictionary: {‘serverSelectionTimeoutMS’: 1000}.

[optional] maintain_order: maintain_order defines whether the MongoDB bulk write operation should insert documents in the same order as they are present in the Tabsdata table. Doing so will come at a performance cost, so it defaults to False.

[optional] update_existing: update_existing defines the behavior to take when inserting a document into a collection if one with the same ID already exists. If set to True, the previous document will be replaced with the new one. If set to False, the write will fail with a duplicate key error. This parameter defaults to True. Please note replacing the document also comes with a performance cost. Hence, if it does not apply, it is recommended that the parameter be set to “False”.

[optional] fail_on_duplicate_key: fail_on_duplicate_key is very related to update_existing, but different. It defines the behavior when a fail with a duplicate key error happens (only possible if update_existing is set to False). In this case, if fail_on_duplicate_key is set to True, when the error happens an exception will be raised, stopping the execution of the MongoDB destination, including all other tables still to be stored. If fail_on_duplicate_key is set to False, execution of the MongoDB destination will continue. Note that the fail will still have happened, so the document will not be written, but the rest of documents in the table and following tables will be stored.

[optional] use_trxs: use_trxs is used to enable transactions during each write. This way, if a bulk write fails, the entire bulk write will be automatically rolled back. Note that multiple transactions might be needed to store a single result. This parameter is boolean and defaults to False. Please note that we are using mongoDB transactions for this parameter. Hence, to use this, please configure your mongoDB accordingly.

[optional] docs_per_trx: docs_per_trx defines the amount of documents written in a single bulk write, which is sent as a single transaction. It currently defaults to 1000, as that is the recommended amount by MongoDB’s best transaction practises: https://www.mongodb.com/blog/post/performance-best-practices-transactions-and-read-write-concerns Note that since this parameter indicates the amount of documents inserted in each bulk write, it can also be used when use_trxs is set to False, in which case it will still define the amount of documents written per bulk write.

[optional] log_intermediate_files: By default, the time taken to store each result in a single collection is logged, as well as the total time taken. The parameter log_intermediate_files allows for finer grain logging, generating a log every time a single bulk_write is made or transaction is committed, generating a huge amount of logs but giving more information about the intermediate status of every result storage. If set to True, the logs will be generated, if set to False they will not. Defaults to False.

trigger_by#

[optional] <trigger_table1>, <trigger_table2>… are the names of the tables in the Tabsdata server. A new commit to any of these tables triggers the subscriber. All listed trigger tables must exist in the server before registering the subscriber.

Defining trigger tables is optional. If you don’t define the trigger_by property, the subscriber will be triggered by any of its input tables. If you define the trigger_by property, then only those tables listed in the property can automatically trigger the subscriber.

For more information, see Working with Triggers.

<subscriber_name>#

<subscriber_name> is the name for the subscriber that you are configuring.

<function_logic>#

<function_logic> governs the processing performed by the subscriber. You can specify function logic to be a simple write or to perform additional processing as needed. For more information about the function logic that you can include, see Working with Tables.

<table_frame1>, <table_frame2>… are the names for the variables that temporarily store source data for processing.

<table_frame_output1>, <table_frame_output2>… are the output from the function that are written to the external system.