Building Automated Weather Data Pipeline with Apache Kafka and Cassandra

Introduction This project is implementing an ETL(Extract, Transform, Load) pipeline that fetches real-time weather data from OpenWeatherMap API, processes it through Apache Kafka and stores it in Cassandra database. The pipeline monitors the weather conditions across multiple cities in the world. System Architecture The pipeline consists of two main components: Data Producer(weather_df.py): It extracts weather data from OpenWeatherApi and publishes it to Kafka topic. Data Consumer(weather_consumer.py): Subscribes to the Kafka topic, process the incoming messages and load data into Cassandra database. Cassandra database: NoSQL database used for data storaged. Implementation step 1: Creating the scripts The first component is weather_df.py which handles data extraction and publishing: import requests, os import json from dotenv import load_dotenv from confluent_kafka import Producer import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) load_dotenv() own_url='https://api.openweathermap.org/data/2.5/weather' own_api_key=os.getenv('WEATHER_API_KEY') cities = [ "Milan", "Tokyo", "London", "Managua", "Sydney" ] def weather_extract(city): url = f"{own_url}?q={city}&appid={own_api_key}&units=metric" response=requests.get(url) data=response.json() data['extracted_city']=city return data def delivery_report(err, msg): """Callback for Kafka message delivery status.""" if err is not None: logger.error(f"Message delivery failed: {err}") else: logger.info(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}") kafka_config={ 'bootstrap.servers':os.getenv('BOOTSTRAP_SERVER'), "security.protocol": "SASL_SSL", "sasl.mechanisms": "PLAIN", "sasl.username": os.getenv('CONFLUENT_API_KEY'), "sasl.password": os.getenv('CONFLUENT_SECRET_KEY'), "broker.address.family": "v4", "message.send.max.retries": 5, "retry.backoff.ms": 500, } producer=Producer(kafka_config) topic='weather-data' def produce_weather_data(): for city in cities: data=weather_extract(city) if data: producer.produce(topic, key=city, value=json.dumps(data), callback=delivery_report) producer.poll(0) else: logger.error(f"Failed to fetch data for {city}") producer.flush() if __name__ == "__main__": produce_weather_data() logger.info("Data extraction and production complete") This script: Fetches current weather data for Milan, Tokyo, London, Managua, and Sydney Transforms the API response to a consistent format Sends the formatted data to a Confluent_Kafka topic named weather-data Uses environment variables for secure database connection management. Step 2: Running Consumer The Consumer weather_consumer.pySubscribes to and polls the messages from Kafka producer before loading it to the database. import os from dotenv import load_dotenv from confluent_kafka import Consumer, KafkaException from cassandra.cluster import Cluster from json import loads from datetime import datetime import uuid # --- Load environment variables --- load_dotenv() # --- Confluent Kafka Consumer Configuration --- conf = { 'bootstrap.servers': os.getenv('BOOTSTRAP_SERVER'), 'security.protocol': 'SASL_SSL', 'sasl.mechanisms': 'PLAIN', 'sasl.username': os.getenv('CONFLUENT_API_KEY'), 'sasl.password': os.getenv('CONFLUENT_SECRET_KEY'), 'group.id': 'weather-group-id', 'auto.offset.reset': 'earliest' } # Initialize Kafka consumer consumer = Consumer(conf) topic = 'weather-data' # Topic name consumer.subscribe([topic]) print(f"Subscribed to topic: {topic}") # --- Cassandra Setup (Azure Server) --- try: cluster = Cluster(['127.0.0.1']) # Updated with Azure IP address session = cluster.connect() session.execute(""" CREATE KEYSPACE IF NOT EXISTS city_weather_data WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1} """) session.set_keyspace("city_weather_data") session.execute(""" CREATE TABLE IF NOT EXISTS city_weather_data ( id UUID PRIMARY KEY, city_name TEXT, weather_main TEXT, weather_description TEXT, temperature FLOAT, timestamp TIMESTAMP ) """) print("Cassandra table ready") except Exception as e: print("Error setting up Cassandra: {e}") session = None # --- Read from Kafka and Insert into Cassandra --- if session: try: while True: msg = consumer.poll(1.0) if msg is None: continue if msg.error(): raise KafkaException(msg.error()) else: try: data = loads(msg.value().decode('utf-8')) # Ex

Apr 6, 2025 - 11:24
 0
Building Automated Weather Data Pipeline with Apache Kafka and Cassandra

Introduction

This project is implementing an ETL(Extract, Transform, Load) pipeline that fetches real-time weather data from OpenWeatherMap API, processes it through Apache Kafka and stores it in Cassandra database. The pipeline monitors the weather conditions across multiple cities in the world.

System Architecture

The pipeline consists of two main components:

  1. Data Producer(weather_df.py): It extracts weather data from OpenWeatherApi and publishes it to Kafka topic.
  2. Data Consumer(weather_consumer.py): Subscribes to the Kafka topic, process the incoming messages and load data into Cassandra database.
  3. Cassandra database: NoSQL database used for data storaged.

Implementation

step 1: Creating the scripts

The first component is weather_df.py which handles data extraction and publishing:


import requests, os
import json
from dotenv import load_dotenv
from confluent_kafka import Producer
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

load_dotenv()


own_url='https://api.openweathermap.org/data/2.5/weather'
own_api_key=os.getenv('WEATHER_API_KEY')
cities = [
    "Milan",
    "Tokyo",
    "London",
    "Managua",
    "Sydney"
]
def weather_extract(city):

    url = f"{own_url}?q={city}&appid={own_api_key}&units=metric"
    response=requests.get(url)
    data=response.json()
    data['extracted_city']=city
    return data

def delivery_report(err, msg):
    """Callback for Kafka message delivery status."""
    if err is not None:
        logger.error(f"Message delivery failed: {err}")
    else:
        logger.info(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")


kafka_config={
    'bootstrap.servers':os.getenv('BOOTSTRAP_SERVER'),
    "security.protocol": "SASL_SSL",
    "sasl.mechanisms": "PLAIN",
    "sasl.username": os.getenv('CONFLUENT_API_KEY'),
    "sasl.password": os.getenv('CONFLUENT_SECRET_KEY'),
    "broker.address.family": "v4",
    "message.send.max.retries": 5,
    "retry.backoff.ms": 500,
}

producer=Producer(kafka_config)
topic='weather-data'

def produce_weather_data():
    for city in cities:
        data=weather_extract(city)
        if data:
            producer.produce(topic, key=city, value=json.dumps(data), callback=delivery_report)
            producer.poll(0)
        else:
            logger.error(f"Failed to fetch data for {city}")
    producer.flush()

if __name__ == "__main__":
    produce_weather_data()
    logger.info("Data extraction and production complete")

This script:

  • Fetches current weather data for Milan, Tokyo, London, Managua, and Sydney

  • Transforms the API response to a consistent format

  • Sends the formatted data to a Confluent_Kafka topic named weather-data

  • Uses environment variables for secure database connection management.

Step 2: Running Consumer

The Consumer weather_consumer.pySubscribes to and polls the messages from Kafka producer before loading it to the database.

import os
from dotenv import load_dotenv
from confluent_kafka import Consumer, KafkaException
from cassandra.cluster import Cluster
from json import loads
from datetime import datetime
import uuid

# --- Load environment variables ---
load_dotenv()

# --- Confluent Kafka Consumer Configuration ---
conf = {
    'bootstrap.servers': os.getenv('BOOTSTRAP_SERVER'),
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'PLAIN',
    'sasl.username': os.getenv('CONFLUENT_API_KEY'),
    'sasl.password': os.getenv('CONFLUENT_SECRET_KEY'),
    'group.id': 'weather-group-id',
    'auto.offset.reset': 'earliest'
}

# Initialize Kafka consumer

consumer = Consumer(conf)
topic = 'weather-data'  # Topic name
consumer.subscribe([topic])
print(f"Subscribed to topic: {topic}")

# --- Cassandra Setup (Azure Server) ---
try:
    cluster = Cluster(['127.0.0.1'])  # Updated with Azure IP address
    session = cluster.connect()

    session.execute("""
        CREATE KEYSPACE IF NOT EXISTS city_weather_data
        WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}
    """)
    session.set_keyspace("city_weather_data")

    session.execute("""
        CREATE TABLE IF NOT EXISTS city_weather_data (
            id UUID PRIMARY KEY,
            city_name TEXT,
            weather_main TEXT,
            weather_description TEXT,
            temperature FLOAT,
            timestamp TIMESTAMP
        )
    """)
    print("Cassandra table ready")
except Exception as e:
    print("Error setting up Cassandra: {e}")
    session = None

# --- Read from Kafka and Insert into Cassandra ---
if session:
    try:
        while True:
            msg = consumer.poll(1.0)
            if msg is None:
                continue
            if msg.error():
                raise KafkaException(msg.error())
            else:
                try:
                    data = loads(msg.value().decode('utf-8'))

                    # Extract required fields
                    record = {
                        "id": uuid.uuid4(),
                        "city_name": data.get("extracted_city", "Unknown"),
                        "weather_main": data["weather"][0]["main"],
                        "weather_description": data["weather"][0]["description"],
                        "temperature": data["main"]["temp"],
                        "timestamp": datetime.fromtimestamp(data["dt"])
                    }

                    # Insert into Cassandra
                    session.execute("""
                        INSERT INTO city_weather_data (id, city_name, weather_main, weather_description, temperature, timestamp)
                        VALUES (%(id)s, %(city_name)s, %(weather_main)s, %(weather_description)s, %(temperature)s, %(timestamp)s)
                    """, record)

                    print(f"Inserted weather for {record['city_name']} at {record['timestamp']}")

                except Exception as e:
                    print(f"Error processing message: {e}")

    except KeyboardInterrupt:
        print("Consumer stopped manually")

    finally:
        consumer.close()
        print("Kafka consumer closed")

This script handles:

  • Message Consumption: Subscribes to and polls messages from Kafka
  • Data Transformation: Extracts relevant fields from the weather data
  • Data Loading: Inserts processed records into a Cassandra database

Step 3: Setting Up the Environment

**Creating a Virtual Environment:**

python -m venv venv
source venv/bin/activate

**Install required packages:**

pip install requests python-dotenv confluent-kafka cassandra-driver
**
** Step 4: Running the Pipeline**

Ensure your Cassandra instance is running. The consumer will automatically create the necessary KeySpace and table if they don't exist
Run the consumer to begin listening for messages:

python weather_consumer.py

In a separate terminal, run the producer to fetch and publish weather data:

python weather_df.py

The producer will:

  • Fetch weather data for each configured city
  • Publish messages to Kafka
  • Log the status of each operation

Data Flow

  • The producer calls OpenWeatherMap API for each city
  • Weather data is serialized to JSON and published to Kafka
  • The consumer continuously polls the Kafka topic
  • Incoming messages are deserialized and transformed
  • Data is inserted into the Cassandra database for persistence
  • The process repeats as new data becomes available

Future Enhancements

  • Scheduling: Implement Apache Airflow to schedule regular data collection
  • Data Validation: Add schema validation to ensure data quality
  • Monitoring: Implement metrics collection for pipeline performance
  • Scaling: Configure multiple consumer instances for parallel processing
  • Analytics: Build data visualization dashboards with the collected weather data

Conclusion

This ETL pipeline demonstrates how to build a real-time data processing system using Kafka. It provides a foundation that can be extended for various use cases, from weather analytics to environmental monitoring systems.