Removing columns
Removing columns before loading data into a database is a reliable method to eliminate sensitive or unnecessary fields. For example, in the given scenario, a source is created with a "country_id" column, which is then excluded from the database before loading.
Let's create a sample pipeline demonstrating the process of removing a column.
Create a source function that creates dummy data as follows:
import dlt
# This function creates a dummy data source.
@dlt.source
def dummy_source():
@dlt.resource(write_disposition="replace")
def dummy_data():
for i in range(3):
yield {"id": i, "name": f"Jane Washington {i}", "country_code": 40 + i}
return dummy_data()This function creates three columns:
id
,name
, andcountry_code
.Next, create a function to filter out columns from the data before loading it into a database as follows:
from typing import Dict, List, Optional
def remove_columns(doc: Dict, remove_columns: Optional[List[str]] = None) -> Dict:
if remove_columns is None:
remove_columns = []
# Iterating over the list of columns to be removed
for column_name in remove_columns:
# Removing the column if it exists in the document
if column_name in doc:
del doc[column_name]
return docdoc
: The document (dict) from which columns will be removed.remove_columns
: List of column names to be removed, defaults to None.Next, declare the columns to be removed from the table, and then modify the source as follows:
# Example columns to remove:
remove_columns_list = ["country_code"]
# Create an instance of the source so you can edit it.
source_instance = dummy_source()
# Modify this source instance's resource
source_instance.dummy_data.add_map(
lambda doc: remove_columns(doc, remove_columns_list)
)You can optionally inspect the result:
for row in source_instance:
print(row)
#{'id': 0, 'name': 'Jane Washington 0'}
#{'id': 1, 'name': 'Jane Washington 1'}
#{'id': 2, 'name': 'Jane Washington 2'}At last, create a pipeline:
# Integrating with a dlt pipeline
pipeline = dlt.pipeline(
pipeline_name='example',
destination='bigquery',
dataset_name='filtered_data'
)
# Run the pipeline with the transformed source
load_info = pipeline.run(data_source)
print(load_info)