Skip to main content
Version: devel

Backfill to Filesystem with partial replace

info

The source code for this example can be found in our repository at: https://github.com/dlt-hub/dlt/tree/devel/docs/examples/partial_loading

About this Example

This script interacts with the Chess.com REST API to extract game data for a specific user on a monthly basis. The script retrieves game data for a specified time range, and when additional data is loaded for a different time range, it automatically handles de-duplication by deleting any previously loaded files for overlapping time range.

We'll learn:

  • How to configure a REST API source using the dlt library.
  • How to manage and delete old backfill files for de-duplication.
  • How to use Filesystem as a destination for storing extracted data.

Full source code

import os
import re
from dlt.common import pendulum as p
from typing import Dict, List, Iterator

import dlt
from dlt.sources import DltResource
from dlt.common.pipeline import LoadInfo
from dlt.destinations.impl.filesystem.filesystem import FilesystemClient
from dlt.sources.rest_api import RESTAPIConfig, rest_api_resources


@dlt.source
def chess_com_source(username: str, months: List[Dict[str, str]]) -> Iterator[DltResource]:
"""
Configures and yields resources to fetch chess game data for a given user across specified months.

Args:
username (str): Chess.com username to fetch games for.
months (List[Dict[str, str]]): List of dictionaries containing 'year' and 'month' keys.

Yields:
dlt.Resource: Resource objects containing fetched game data.
"""
for month in months:
year = month["year"]
month_str = month["month"]
# Configure REST API endpoint for the specific month
config: RESTAPIConfig = {
"client": {
"base_url": "https://api.chess.com/pub/", # Base URL for Chess.com API
},
"resources": [
{
"name": f"chess_com_games_{year}_{month_str}", # Unique resource name
"write_disposition": "append",
"endpoint": {
"path": f"player/{username}/games/{year}/{month_str}", # API endpoint path
},
"primary_key": ["url"], # Primary key to prevent duplicates
}
],
}
yield from rest_api_resources(config)


def generate_months(
start_year: int, start_month: int, end_year: int, end_month: int
) -> Iterator[Dict[str, str]]:
"""
Generates a list of months between the start and end dates.

Args:
start_year (int): Starting year.
start_month (int): Starting month.
end_year (int): Ending year.
end_month (int): Ending month.

Yields:
Dict[str, str]: Dictionary containing 'year' and 'month' as strings.
"""
start_date = p.datetime(start_year, start_month, 1)
end_date = p.datetime(end_year, end_month, 1)
current_date = start_date
while current_date <= end_date:
yield {"year": str(current_date.year), "month": f"{current_date.month:02d}"}
# Move to the next month
if current_date.month == 12:
current_date = current_date.replace(year=current_date.year + 1, month=1)
else:
current_date = current_date.replace(month=current_date.month + 1)


def delete_old_backfills(load_info: LoadInfo, p: dlt.Pipeline, table_name: str) -> None:
"""
Deletes old backfill files that do not match the current load ID to maintain data integrity.

Args:
load_info (LoadInfo): Information about the current load.
p (dlt.Pipeline): The dlt pipeline instance.
table_name (str): Name of the table to clean up backfills for.
"""
# Fetch current load id
load_id = load_info.loads_ids[0]
pattern = re.compile(rf"{load_id}") # Compile regex pattern for the current load ID

# Initialize the filesystem client
fs_client: FilesystemClient = p.destination.client( # type: ignore
p.default_schema, initial_config=p._get_destination_client_initial_config(p.destination)
)

# Construct the table directory path
table_dir = os.path.join(fs_client.dataset_path, table_name)

# Check if the table directory exists
if fs_client.fs_client.exists(table_dir):
# Traverse the table directory
for root, _dirs, files in fs_client.fs_client.walk(table_dir, maxdepth=None):
for file in files:
# Construct the full file path
file_path = os.path.join(root, file)
# If the file does not match the current load ID, delete it
if not pattern.search(file_path):
try:
fs_client.fs_client.rm(file_path) # Remove the old backfill file
print(f"Deleted old backfill file: {file_path}")
except Exception as e:
print(f"Error deleting file {file_path}: {e}")
else:
# Inform if the table directory does not exist
print(f"Table directory does not exist: {table_dir}")


def load_chess_data():
"""
Sets up and runs the dlt pipeline to load chess game data, then manages backfills.
"""
# Initialize the dlt pipeline with filesystem destination
pipeline = dlt.pipeline(
pipeline_name="chess_com_data", destination="filesystem", dataset_name="chess_games"
)

# Generate the list of months for the desired date range
months = list(generate_months(2023, 1, 2023, 12))

# Create the source with all specified months
source = chess_com_source("MagnusCarlsen", months)

# Run the pipeline to fetch and load data
info = pipeline.run(source)
print(info)

# After the run, delete old backfills for each table to maintain data consistency
for month in months:
table_name = f"chess_com_games_{month['year']}_{month['month']}"
delete_old_backfills(info, pipeline, table_name)


if __name__ == "__main__":
load_chess_data()

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.