Building an Automated Bitcoin Price ETL Pipeline with Airflow and PostgreSQL

Introduction This article details creating an automated ETL (Extract, Transform, Load) pipeline that retrieves daily Bitcoin price data from the Polygon.io API, performs necessary transformations, and loads the data into a PostgreSQL database. The workflow is orchestrated using Apache Airflow, ensuring reliable daily execution. This project demonstrates several key data engineering concepts: API data extraction Data transformation using pandas Database integration with PostgreSQL Workflow orchestration with Apache Airflow Deployment to a cloud environment System Architecture The pipeline consists of the following components: Data Source: Polygon.io API providing cryptocurrency price data ETL Script: Python script that handles extraction, transformation, and loading Database: PostgreSQL for data storage Orchestration: Apache Airflow for scheduling and monitoring Infrastructure: Cloud VM for hosting the pipeline The system flows in a linear fashion: Airflow triggers the ETL script daily, which extracts the latest BTC prices, transforms the data into a suitable format, and loads it into the PostgreSQL database. Detailed Implementation Step 1: Creating the ETL Script The first component is btc_prices.py, which handles the core ETL functionality: import requests import os from sqlalchemy import create_engine import pandas as pd from datetime import datetime from dotenv import load_dotenv # Define API endpoint url = 'https://api.polygon.io/v1/open-close/crypto/BTC/USD/2025-03-31?adjusted=true&apiKey=YOUR_API_KEY' response = requests.get(url) if response.status_code == 200: data = response.json() open_price = data.get('open') close_price = data.get('close') date = data.get('day') symbol = data.get('symbol') else: print(f"Failed to retrieve data: {response.status}") exit() # Prepare data for insertion data_df = { 'symbol': symbol, 'open_price': open_price, 'close_price': close_price, 'date': date } df = pd.DataFrame(data_df, index=[0]) df['date'] = pd.to_datetime(df['date']).dt.strftime('%Y-%m-%d') # Load environment variables load_dotenv() dbname = os.getenv('dbname') user = os.getenv('user') password = os.getenv('password') host = os.getenv('host') port = os.getenv('port') # Create database connection engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{dbname}') df.to_sql("crypto_prices", con=engine, if_exists="append", index=False, schema="dataengineering") print(f"Successfully loaded crypto data for {df['date'][0]}") This script: Extracts Bitcoin price data from the Polygon.io API Transforms and structures the data using pandas Loads the data into PostgreSQL Uses environment variables for secure database connection management Step 2: Creating the Airflow DAG Next, the btc_dag.py defines the Airflow DAG (Directed Acyclic Graph) that orchestrates the workflow: from datetime import datetime, timedelta from airflow import DAG from airflow.operators.bash_operator import BashOperator # DAG default arguments default_args = { "owner": "data_engineer", "depends_on_past": False, "start_date": datetime(2025, 3, 31), "email_on_failure": False, "email_on_retry": True, "retries": 2, "retry_delay": timedelta(minutes=2) } with DAG( 'polygon_btc_data', default_args=default_args, schedule_interval='@daily', ) as dag: activate_venv = BashOperator( task_id='activate_virtual_env', bash_command='source /home/user/project/venv/bin/activate', ) execute_file = BashOperator( task_id='execute_python_file', bash_command='python /home/user/project/btc_prices.py', ) activate_venv >> execute_file This DAG: Defines the execution schedule Activates the virtual environment Executes the ETL script Step 3: Setting Up the Environment Creating a Virtual Environment: python -m venv venv source venv/bin/activate Installing Dependencies: pip install requests pandas sqlalchemy python-dotenv psycopg2-binary apache-airflow Setting Up Environment Variables: echo "dbname=your_database_name" >> .env echo "user=your_database_user" >> .env echo "password=your_database_password" >> .env echo "host=your_database_host" >> .env echo "port=your_database_port" >> .env Step 4: Server Deployment SSH into the cloud VM: ssh user@your_server_ip Create necessary directories: mkdir -p ~/crypto_price mkdir -p ~/airflow/dags Transfer scripts to the server: scp btc_prices.py user@your_server_ip:~/crypto_price/ scp btc_dag.py user@your_server_ip:~/airflow/dags/ Step 5: PostgreSQL Configuration Creating Database Schema: CREATE SCHEMA IF NOT EXISTS dataengineering; CREATE TABLE IF NOT EXISTS dataengineering.crypto_prices ( id SERIAL PRIMARY KEY, symbol

Mar 31, 2025 - 20:34
 0
Building an Automated Bitcoin Price ETL Pipeline with Airflow and PostgreSQL

Introduction

This article details creating an automated ETL (Extract, Transform, Load) pipeline that retrieves daily Bitcoin price data from the Polygon.io API, performs necessary transformations, and loads the data into a PostgreSQL database. The workflow is orchestrated using Apache Airflow, ensuring reliable daily execution.

This project demonstrates several key data engineering concepts:

  • API data extraction
  • Data transformation using pandas
  • Database integration with PostgreSQL
  • Workflow orchestration with Apache Airflow
  • Deployment to a cloud environment

System Architecture

The pipeline consists of the following components:

  1. Data Source: Polygon.io API providing cryptocurrency price data
  2. ETL Script: Python script that handles extraction, transformation, and loading
  3. Database: PostgreSQL for data storage
  4. Orchestration: Apache Airflow for scheduling and monitoring
  5. Infrastructure: Cloud VM for hosting the pipeline

The system flows in a linear fashion: Airflow triggers the ETL script daily, which extracts the latest BTC prices, transforms the data into a suitable format, and loads it into the PostgreSQL database.

Detailed Implementation

Step 1: Creating the ETL Script

The first component is btc_prices.py, which handles the core ETL functionality:

import requests
import os
from sqlalchemy import create_engine
import pandas as pd
from datetime import datetime
from dotenv import load_dotenv

# Define API endpoint
url = 'https://api.polygon.io/v1/open-close/crypto/BTC/USD/2025-03-31?adjusted=true&apiKey=YOUR_API_KEY'
response = requests.get(url)

if response.status_code == 200:
    data = response.json()
    open_price = data.get('open')
    close_price = data.get('close')
    date = data.get('day')
    symbol = data.get('symbol')
else:
    print(f"Failed to retrieve data: {response.status}")
    exit()

# Prepare data for insertion
data_df = {
    'symbol': symbol,
    'open_price': open_price,
    'close_price': close_price,
    'date': date
}
df = pd.DataFrame(data_df, index=[0])
df['date'] = pd.to_datetime(df['date']).dt.strftime('%Y-%m-%d')

# Load environment variables
load_dotenv()
dbname = os.getenv('dbname')
user = os.getenv('user')
password = os.getenv('password')
host = os.getenv('host')
port = os.getenv('port')

# Create database connection
engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{dbname}')

df.to_sql("crypto_prices", con=engine, if_exists="append", index=False, schema="dataengineering")
print(f"Successfully loaded crypto data for {df['date'][0]}")

This script:

  • Extracts Bitcoin price data from the Polygon.io API
  • Transforms and structures the data using pandas
  • Loads the data into PostgreSQL
  • Uses environment variables for secure database connection management

Step 2: Creating the Airflow DAG

Next, the btc_dag.py defines the Airflow DAG (Directed Acyclic Graph) that orchestrates the workflow:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

# DAG default arguments
default_args = {
    "owner": "data_engineer",
    "depends_on_past": False,
    "start_date": datetime(2025, 3, 31),
    "email_on_failure": False,
    "email_on_retry": True,
    "retries": 2,
    "retry_delay": timedelta(minutes=2)
}

with DAG(
    'polygon_btc_data',
    default_args=default_args,
    schedule_interval='@daily',
) as dag:

    activate_venv = BashOperator(
        task_id='activate_virtual_env',
        bash_command='source /home/user/project/venv/bin/activate',
    )

    execute_file = BashOperator(
        task_id='execute_python_file',
        bash_command='python /home/user/project/btc_prices.py',
    )

    activate_venv >> execute_file

This DAG:

  • Defines the execution schedule
  • Activates the virtual environment
  • Executes the ETL script

Step 3: Setting Up the Environment

  1. Creating a Virtual Environment:
   python -m venv venv
   source venv/bin/activate
  1. Installing Dependencies:
   pip install requests pandas sqlalchemy python-dotenv psycopg2-binary apache-airflow
  1. Setting Up Environment Variables:
   echo "dbname=your_database_name" >> .env
   echo "user=your_database_user" >> .env
   echo "password=your_database_password" >> .env
   echo "host=your_database_host" >> .env
   echo "port=your_database_port" >> .env

Step 4: Server Deployment

  1. SSH into the cloud VM:
   ssh user@your_server_ip
  1. Create necessary directories:
   mkdir -p ~/crypto_price
   mkdir -p ~/airflow/dags
  1. Transfer scripts to the server:
   scp btc_prices.py user@your_server_ip:~/crypto_price/
   scp btc_dag.py user@your_server_ip:~/airflow/dags/

Step 5: PostgreSQL Configuration

  1. Creating Database Schema:
   CREATE SCHEMA IF NOT EXISTS dataengineering;

   CREATE TABLE IF NOT EXISTS dataengineering.crypto_prices (
       id SERIAL PRIMARY KEY,
       symbol VARCHAR(10) NOT NULL,
       open_price NUMERIC(20, 8) NOT NULL,
       close_price NUMERIC(20, 8) NOT NULL,
       date DATE NOT NULL,
       created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
   );

Conclusion

The architecture follows best practices for data engineering:

  • Separation of extraction, transformation, and loading concerns
  • Secure credential management
  • Robust error handling
  • Automated scheduling
  • Cloud-based deployment

The combination of Python, Airflow, and PostgreSQL provides a powerful foundation for financial data analysis, enabling timely insights into cryptocurrency market trends.

Github