#30 OETV Tennis Analytics Web App

Introduction The OETV Tennis Analytics project, developed by GitHub user flnzba, represents a sophisticated data pipeline and analytics solution for tennis statistics focused on the Austrian Tennis Federation (OETV) data. The project implements a modern data engineering workflow that extracts, transforms, and visualizes tennis data, providing valuable insights into player performance, rankings, and tournament trends through an interactive dashboard. This technical article examines the architecture of the project, with a specific focus on the three core components - load.py, transform.py, and app.py - that form the backbone of this analytics solution. Project Overview The OETV Tennis Analytics project is structured as a comprehensive data pipeline with three main components: Data Extraction and Loading (load.py): Responsible for retrieving data from OETV's API and loading it into a SQLite database. Data Transformation (transform.py): Handles the processing and transformation of raw tennis data into structured, analysis-ready formats. Web Application Interface (app.py): A Streamlit-powered dashboard that visualizes the processed data, allowing users to interact with and gain insights from tennis statistics. The project employs Docker for containerization, ensuring consistent deployment across different environments, and includes checkpointing mechanisms to optimize data processing efficiency. Architecture Deep Dive Data Flow Architecture The data flows through the system in the following sequence: Extraction: Raw data is extracted from OETV APIs and external sources Loading: Data is stored in a SQLite database (data.db) Transformation: Raw data is processed, cleaned, and transformed Visualization: Processed data is presented through an interactive Streamlit dashboard Core Components Analysis Let's examine each core component in detail with actual code examples: 1. Data Extraction and Loading (load.py) load.py serves as the data ingestion layer, responsible for extracting tennis data from the OETV API and storing it in a structured format. The script uses the curl_cffi library for making HTTP requests, which helps with browser impersonation to avoid blocking. Key Functions and Mechanisms API Integration The module implements a function to fetch data from OETV's API, handling authentication, rate limiting, and response parsing: def get_data_batches(client): """Generator function that yields batches of player data as they're fetched. Parameters: - client: Dictionary containing API client configuration Yields: - Batch of player records (list of dictionaries) """ start = load_checkpoint() total_results = 0 consecutive_errors = 0 max_consecutive_errors = 3 print(f"Starting batch fetching from position {start} at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") try: # First request to get total results retries = 0 while retries 0: consecutive_errors = 0 # Increase delay slightly after recovering from errors base_delay = min(base_delay * 1.2, 5.0) else: # Gradually decrease delay on consistent success base_delay = max(base_delay * 0.9, 0.5) # Add some randomness to the delay time.sleep(base_delay + random.uniform(0.1, 1.0)) This adaptive approach adjusts request timing based on the API's response pattern, reducing delays when the server is responsive and increasing them when encountering errors. 2. Data Transformation (transform.py) transform.py forms the processing layer of the pipeline, converting raw data from the API into a structured SQLite database. This module is responsible for creating the database schema and populating it with the fetched data. Key Functions and Mechanisms Database Initialization The module initializes the SQLite database with the appropriate schema for storing player data: def init_database(): """Initialize the database with the players table""" conn = sqlite3.connect(db_path) c = conn.cursor() # Drop table if it exists c.execute("DROP TABLE IF EXISTS players") # Create table c.execute( """ CREATE TABLE IF NOT EXISTS players ( playerId TEXT, licenceNr TEXT, natRank INTEGER, natRankFed INTEGER, firstname TEXT, lastname TEXT, nationality TEXT, clubName TEXT, clubNr TEXT, fedNickname TEXT, fedRank REAL, birthYear INTEGER, atpPoints INTEGER, points INTEGER ) """ ) conn.commit() return conn, c This function establishes the database structure, defining the schema for player data with fields for identifiers, rankings, personal information, and performance metrics. Batch Processing The module im

Mar 6, 2025 - 18:21
 0
#30 OETV Tennis Analytics Web App

Introduction

The OETV Tennis Analytics project, developed by GitHub user flnzba, represents a sophisticated data pipeline and analytics solution for tennis statistics focused on the Austrian Tennis Federation (OETV) data. The project implements a modern data engineering workflow that extracts, transforms, and visualizes tennis data, providing valuable insights into player performance, rankings, and tournament trends through an interactive dashboard.

This technical article examines the architecture of the project, with a specific focus on the three core components - load.py, transform.py, and app.py - that form the backbone of this analytics solution.

Project Overview

The OETV Tennis Analytics project is structured as a comprehensive data pipeline with three main components:

  1. Data Extraction and Loading (load.py): Responsible for retrieving data from OETV's API and loading it into a SQLite database.

  2. Data Transformation (transform.py): Handles the processing and transformation of raw tennis data into structured, analysis-ready formats.

  3. Web Application Interface (app.py): A Streamlit-powered dashboard that visualizes the processed data, allowing users to interact with and gain insights from tennis statistics.

The project employs Docker for containerization, ensuring consistent deployment across different environments, and includes checkpointing mechanisms to optimize data processing efficiency.

Architecture Deep Dive

Data Flow Architecture

The data flows through the system in the following sequence:

  1. Extraction: Raw data is extracted from OETV APIs and external sources
  2. Loading: Data is stored in a SQLite database (data.db)
  3. Transformation: Raw data is processed, cleaned, and transformed
  4. Visualization: Processed data is presented through an interactive Streamlit dashboard

Core Components Analysis

Let's examine each core component in detail with actual code examples:

1. Data Extraction and Loading (load.py)

load.py serves as the data ingestion layer, responsible for extracting tennis data from the OETV API and storing it in a structured format. The script uses the curl_cffi library for making HTTP requests, which helps with browser impersonation to avoid blocking.

Key Functions and Mechanisms

API Integration

The module implements a function to fetch data from OETV's API, handling authentication, rate limiting, and response parsing:

def get_data_batches(client):
    """Generator function that yields batches of player data as they're fetched.

    Parameters:
    - client: Dictionary containing API client configuration

    Yields:
    - Batch of player records (list of dictionaries)
    """
    start = load_checkpoint()
    total_results = 0
    consecutive_errors = 0
    max_consecutive_errors = 3

    print(f"Starting batch fetching from position {start} at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

    try:
        # First request to get total results
        retries = 0
        while retries < client['max_retries']:
            try:
                r = requests.get(
                    client['url'],
                    headers=client['headers'],
                    referer=client['referer'],
                    impersonate="chrome",
                    timeout=client['timeout']
                )
                if r.status_code == 200:
                    rankings_data = json.loads(r.text)
                    all_data = list(rankings_data.values())
                    all_data = list(all_data[1].values())
                    total_results = all_data[0]
                    print(f"Total results to fetch: {total_results}")
                    break
                else:
                    print(f"Error status code: {r.status_code}, retrying...")
                    retries += 1
                    time.sleep(5 * retries)  # Increasing backoff
            except Exception as e:
                print(f"Exception during initial request: {e}")
                retries += 1
                time.sleep(5 * retries)

This function acts as a generator that fetches data in batches from the OETV API. It first determines the total number of records available, then systematically retrieves them in chunks of 100.

Robust Error Handling

The module includes robust error handling to manage issues with API requests, rate limiting, and response parsing:

def handle_error(status_code):
    print("Error: ", status_code)
    # Get the base directory from environment or use a default
    DATA_DIR = os.environ.get('DATA_DIR', os.path.dirname(os.path.abspath(__file__)))
    log_dir = os.path.join(DATA_DIR, "logs")
    os.makedirs(log_dir, exist_ok=True)
    with open(os.path.join(log_dir, "error-last-call.txt"), "a") as file:
        file.write(f"Error: {status_code} \n")

Error handling is crucial for maintaining the stability of the pipeline, especially when dealing with external APIs. The function logs errors for later investigation and ensures the process can continue despite temporary failures.

Checkpointing System

To optimize performance and prevent redundant data fetching, load.py implements a checkpointing mechanism. The system tracks the last successful data load, enabling incremental updates:

def get_checkpoint_file():
    # Get the base directory from environment or use a default
    # This allows configuration in Docker environments
    DATA_DIR = os.environ.get('DATA_DIR', os.path.dirname(os.path.abspath(__file__)))
    checkpoint_dir = os.path.join(DATA_DIR, "checkpoints")
    os.makedirs(checkpoint_dir, exist_ok=True)
    return os.path.join(checkpoint_dir, "checkpoint.txt")


def save_checkpoint(start, all_rankings=None):
    with open(get_checkpoint_file(), "w") as f:
        f.write(str(start))

    # Optionally save the current rankings as backup
    if all_rankings:
        # Get the base directory from environment or use a default
        DATA_DIR = os.environ.get('DATA_DIR', os.path.dirname(os.path.abspath(__file__)))
        checkpoint_dir = os.path.join(DATA_DIR, "checkpoints")
        os.makedirs(checkpoint_dir, exist_ok=True)
        temp_path = os.path.join(checkpoint_dir, "rankings_checkpoint.json")
        try:
            with open(temp_path, "w") as file:
                json.dump(all_rankings, file)
        except Exception as e:
            print(f"Error saving rankings checkpoint: {e}")


def load_checkpoint():
    if os.path.exists(get_checkpoint_file()):
        with open(get_checkpoint_file(), "r") as f:
            return int(f.read().strip())
    return 0

The checkpointing mechanism allows the system to resume data collection from where it left off, which is especially useful for large datasets or when handling interruptions.

Dynamic Backoff Strategy

The module implements a sophisticated backoff strategy to handle API rate limits and server load:

# Dynamically adjust delay based on success pattern
if consecutive_errors > 0:
    consecutive_errors = 0
    # Increase delay slightly after recovering from errors
    base_delay = min(base_delay * 1.2, 5.0)
else:
    # Gradually decrease delay on consistent success
    base_delay = max(base_delay * 0.9, 0.5)

# Add some randomness to the delay
time.sleep(base_delay + random.uniform(0.1, 1.0))

This adaptive approach adjusts request timing based on the API's response pattern, reducing delays when the server is responsive and increasing them when encountering errors.

2. Data Transformation (transform.py)

transform.py forms the processing layer of the pipeline, converting raw data from the API into a structured SQLite database. This module is responsible for creating the database schema and populating it with the fetched data.

Key Functions and Mechanisms

Database Initialization

The module initializes the SQLite database with the appropriate schema for storing player data:

def init_database():
    """Initialize the database with the players table"""
    conn = sqlite3.connect(db_path)
    c = conn.cursor()

    # Drop table if it exists
    c.execute("DROP TABLE IF EXISTS players")

    # Create table
    c.execute(
        """
        CREATE TABLE IF NOT EXISTS players (
            playerId TEXT,
            licenceNr TEXT,
            natRank INTEGER,
            natRankFed INTEGER,
            firstname TEXT,
            lastname TEXT,
            nationality TEXT,
            clubName TEXT,
            clubNr TEXT,
            fedNickname TEXT,
            fedRank REAL,
            birthYear INTEGER,
            atpPoints INTEGER,
            points INTEGER
        )
        """
    )

    conn.commit()

    return conn, c

This function establishes the database structure, defining the schema for player data with fields for identifiers, rankings, personal information, and performance metrics.

Batch Processing

The module implements a batch processing approach to efficiently handle large amounts of data:

def save_batch(batch, conn, cursor):
    """Save a batch of player data to the database"""
    try:
        # Insert data into the table
        for player in batch:
            cursor.execute(
                """
                INSERT INTO players (playerId, licenceNr, natRank, natRankFed, firstname, lastname, nationality, clubName, clubNr, fedNickname, fedRank, birthYear, atpPoints, points) VALUES
                (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                """,
                (
                    player["playerId"],
                    player["licenceNr"],
                    player["natRank"],
                    player["natRankFed"],
                    player["firstname"],
                    player["lastname"],
                    player["nationality"],
                    player["clubName"],
                    player["clubNr"],
                    player["fedNickname"],
                    player["fedRank"],
                    player["birthYear"],
                    player["atpPoints"],
                    player["points"],
                ),
            )

        # Commit this batch
        conn.commit()
        return True
    except Exception as e:
        print(f"Error saving batch to database: {e}")
        # Rollback on error
        conn.rollback()
        return False

This function handles the insertion of batches of player data into the database, with transaction management to ensure data integrity.

Continuous Processing Pipeline

The module orchestrates a continuous processing pipeline that transforms data as it's received:

def process_data_continuous():
    """Process data continuously as it's fetched, saving in batches"""
    try:
        print(f"Starting data processing at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

        # Initialize database
        conn, cursor = init_database()

        # Set up the API client
        client = setup_api_client()

        # Get batches and process them
        total_processed = 0
        for batch_number, batch in enumerate(get_data_batches(client)):
            # Save this batch to the database
            if save_batch(batch, conn, cursor):
                total_processed += len(batch)
                print(f"Batch {batch_number+1} saved. Total records processed: {total_processed}")
            else:
                print(f"Failed to save batch {batch_number+1}")

        # Get total record count
        cursor.execute("SELECT COUNT(*) FROM players")
        total_records = cursor.fetchone()[0]
        print(f"Total records in database: {total_records}")

        # Close connection
        conn.close()

        print(f"Data processing completed at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

    except Exception as e:
        print(f"Fatal error in data processing: {e}")
        if 'conn' in locals():
            conn.close()
        sys.exit(1)

This function serves as the main orchestrator of the transformation process, connecting the data extraction from load.py with the database operations, while providing progress updates and summary statistics.

3. Web Application Interface (app.py)

app.py serves as the visualization and interaction layer, implemented as a Streamlit application that presents the processed data in an accessible format through an interactive dashboard.

Key Functions and Mechanisms

Database Connection and Data Loading

The module establishes a connection to the SQLite database and loads the player data:

# Database path - must match the path used in transform.py
# Get the base directory from environment or use a default
# This allows configuration in Docker environments
DATA_DIR = os.environ.get('DATA_DIR', os.path.dirname(os.path.abspath(__file__)))
db_path = os.path.join(DATA_DIR, "data.db")

# Check if the database file exists
if not os.path.exists(db_path):
    st.error(f"Database file not found at {db_path}. Please run transform.py first to create the database.")
    st.stop()

# Load data from SQLite database
conn = sqlite3.connect(db_path)
query = "SELECT * FROM players"
data = pd.read_sql_query(query, conn)
conn.close()

df = pd.DataFrame(data)

# Calculate player age from birth year
current_year = pd.Timestamp.now().year
df['age'] = current_year - df['birthYear']

This section loads the data from the database into a pandas DataFrame, which is then used for visualization and analysis. It also performs some initial data preprocessing, such as calculating the players' ages.

Interactive Filtering

The module implements a comprehensive set of interactive filters in the sidebar to allow users to explore the data based on various dimensions:

# Sidebar Filters
st.sidebar.header("