Building Automated Weather Data Pipeline with Apache Kafka and Cassandra
Introduction This project is implementing an ETL(Extract, Transform, Load) pipeline that fetches real-time weather data from OpenWeatherMap API, processes it through Apache Kafka and stores it in Cassandra database. The pipeline monitors the weather conditions across multiple cities in the world. System Architecture The pipeline consists of two main components: Data Producer(weather_df.py): It extracts weather data from OpenWeatherApi and publishes it to Kafka topic. Data Consumer(weather_consumer.py): Subscribes to the Kafka topic, process the incoming messages and load data into Cassandra database. Cassandra database: NoSQL database used for data storaged. Implementation step 1: Creating the scripts The first component is weather_df.py which handles data extraction and publishing: import requests, os import json from dotenv import load_dotenv from confluent_kafka import Producer import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) load_dotenv() own_url='https://api.openweathermap.org/data/2.5/weather' own_api_key=os.getenv('WEATHER_API_KEY') cities = [ "Milan", "Tokyo", "London", "Managua", "Sydney" ] def weather_extract(city): url = f"{own_url}?q={city}&appid={own_api_key}&units=metric" response=requests.get(url) data=response.json() data['extracted_city']=city return data def delivery_report(err, msg): """Callback for Kafka message delivery status.""" if err is not None: logger.error(f"Message delivery failed: {err}") else: logger.info(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}") kafka_config={ 'bootstrap.servers':os.getenv('BOOTSTRAP_SERVER'), "security.protocol": "SASL_SSL", "sasl.mechanisms": "PLAIN", "sasl.username": os.getenv('CONFLUENT_API_KEY'), "sasl.password": os.getenv('CONFLUENT_SECRET_KEY'), "broker.address.family": "v4", "message.send.max.retries": 5, "retry.backoff.ms": 500, } producer=Producer(kafka_config) topic='weather-data' def produce_weather_data(): for city in cities: data=weather_extract(city) if data: producer.produce(topic, key=city, value=json.dumps(data), callback=delivery_report) producer.poll(0) else: logger.error(f"Failed to fetch data for {city}") producer.flush() if __name__ == "__main__": produce_weather_data() logger.info("Data extraction and production complete") This script: Fetches current weather data for Milan, Tokyo, London, Managua, and Sydney Transforms the API response to a consistent format Sends the formatted data to a Confluent_Kafka topic named weather-data Uses environment variables for secure database connection management. Step 2: Running Consumer The Consumer weather_consumer.pySubscribes to and polls the messages from Kafka producer before loading it to the database. import os from dotenv import load_dotenv from confluent_kafka import Consumer, KafkaException from cassandra.cluster import Cluster from json import loads from datetime import datetime import uuid # --- Load environment variables --- load_dotenv() # --- Confluent Kafka Consumer Configuration --- conf = { 'bootstrap.servers': os.getenv('BOOTSTRAP_SERVER'), 'security.protocol': 'SASL_SSL', 'sasl.mechanisms': 'PLAIN', 'sasl.username': os.getenv('CONFLUENT_API_KEY'), 'sasl.password': os.getenv('CONFLUENT_SECRET_KEY'), 'group.id': 'weather-group-id', 'auto.offset.reset': 'earliest' } # Initialize Kafka consumer consumer = Consumer(conf) topic = 'weather-data' # Topic name consumer.subscribe([topic]) print(f"Subscribed to topic: {topic}") # --- Cassandra Setup (Azure Server) --- try: cluster = Cluster(['127.0.0.1']) # Updated with Azure IP address session = cluster.connect() session.execute(""" CREATE KEYSPACE IF NOT EXISTS city_weather_data WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1} """) session.set_keyspace("city_weather_data") session.execute(""" CREATE TABLE IF NOT EXISTS city_weather_data ( id UUID PRIMARY KEY, city_name TEXT, weather_main TEXT, weather_description TEXT, temperature FLOAT, timestamp TIMESTAMP ) """) print("Cassandra table ready") except Exception as e: print("Error setting up Cassandra: {e}") session = None # --- Read from Kafka and Insert into Cassandra --- if session: try: while True: msg = consumer.poll(1.0) if msg is None: continue if msg.error(): raise KafkaException(msg.error()) else: try: data = loads(msg.value().decode('utf-8')) # Ex

Introduction
This project is implementing an ETL(Extract, Transform, Load) pipeline that fetches real-time weather data from OpenWeatherMap API
, processes it through Apache Kafka and stores it in Cassandra database. The pipeline monitors the weather conditions across multiple cities in the world.
System Architecture
The pipeline consists of two main components:
- Data Producer(
weather_df.py
): It extracts weather data fromOpenWeatherApi
and publishes it to Kafka topic. - Data Consumer(
weather_consumer.py
): Subscribes to the Kafka topic, process the incoming messages and load data into Cassandra database. - Cassandra database: NoSQL database used for data storaged.
Implementation
step 1: Creating the scripts
The first component is weather_df.py
which handles data extraction and publishing:
import requests, os
import json
from dotenv import load_dotenv
from confluent_kafka import Producer
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
load_dotenv()
own_url='https://api.openweathermap.org/data/2.5/weather'
own_api_key=os.getenv('WEATHER_API_KEY')
cities = [
"Milan",
"Tokyo",
"London",
"Managua",
"Sydney"
]
def weather_extract(city):
url = f"{own_url}?q={city}&appid={own_api_key}&units=metric"
response=requests.get(url)
data=response.json()
data['extracted_city']=city
return data
def delivery_report(err, msg):
"""Callback for Kafka message delivery status."""
if err is not None:
logger.error(f"Message delivery failed: {err}")
else:
logger.info(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
kafka_config={
'bootstrap.servers':os.getenv('BOOTSTRAP_SERVER'),
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
"sasl.username": os.getenv('CONFLUENT_API_KEY'),
"sasl.password": os.getenv('CONFLUENT_SECRET_KEY'),
"broker.address.family": "v4",
"message.send.max.retries": 5,
"retry.backoff.ms": 500,
}
producer=Producer(kafka_config)
topic='weather-data'
def produce_weather_data():
for city in cities:
data=weather_extract(city)
if data:
producer.produce(topic, key=city, value=json.dumps(data), callback=delivery_report)
producer.poll(0)
else:
logger.error(f"Failed to fetch data for {city}")
producer.flush()
if __name__ == "__main__":
produce_weather_data()
logger.info("Data extraction and production complete")
This script:
Fetches current weather data for Milan, Tokyo, London, Managua, and Sydney
Transforms the API response to a consistent format
Sends the formatted data to a Confluent_Kafka topic named
weather-data
Uses environment variables for secure database connection management.
Step 2: Running Consumer
The Consumer weather_consumer.py
Subscribes to and polls the messages from Kafka producer before loading it to the database.
import os
from dotenv import load_dotenv
from confluent_kafka import Consumer, KafkaException
from cassandra.cluster import Cluster
from json import loads
from datetime import datetime
import uuid
# --- Load environment variables ---
load_dotenv()
# --- Confluent Kafka Consumer Configuration ---
conf = {
'bootstrap.servers': os.getenv('BOOTSTRAP_SERVER'),
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': os.getenv('CONFLUENT_API_KEY'),
'sasl.password': os.getenv('CONFLUENT_SECRET_KEY'),
'group.id': 'weather-group-id',
'auto.offset.reset': 'earliest'
}
# Initialize Kafka consumer
consumer = Consumer(conf)
topic = 'weather-data' # Topic name
consumer.subscribe([topic])
print(f"Subscribed to topic: {topic}")
# --- Cassandra Setup (Azure Server) ---
try:
cluster = Cluster(['127.0.0.1']) # Updated with Azure IP address
session = cluster.connect()
session.execute("""
CREATE KEYSPACE IF NOT EXISTS city_weather_data
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}
""")
session.set_keyspace("city_weather_data")
session.execute("""
CREATE TABLE IF NOT EXISTS city_weather_data (
id UUID PRIMARY KEY,
city_name TEXT,
weather_main TEXT,
weather_description TEXT,
temperature FLOAT,
timestamp TIMESTAMP
)
""")
print("Cassandra table ready")
except Exception as e:
print("Error setting up Cassandra: {e}")
session = None
# --- Read from Kafka and Insert into Cassandra ---
if session:
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
else:
try:
data = loads(msg.value().decode('utf-8'))
# Extract required fields
record = {
"id": uuid.uuid4(),
"city_name": data.get("extracted_city", "Unknown"),
"weather_main": data["weather"][0]["main"],
"weather_description": data["weather"][0]["description"],
"temperature": data["main"]["temp"],
"timestamp": datetime.fromtimestamp(data["dt"])
}
# Insert into Cassandra
session.execute("""
INSERT INTO city_weather_data (id, city_name, weather_main, weather_description, temperature, timestamp)
VALUES (%(id)s, %(city_name)s, %(weather_main)s, %(weather_description)s, %(temperature)s, %(timestamp)s)
""", record)
print(f"Inserted weather for {record['city_name']} at {record['timestamp']}")
except Exception as e:
print(f"Error processing message: {e}")
except KeyboardInterrupt:
print("Consumer stopped manually")
finally:
consumer.close()
print("Kafka consumer closed")
This script handles:
- Message Consumption: Subscribes to and polls messages from Kafka
- Data Transformation: Extracts relevant fields from the weather data
- Data Loading: Inserts processed records into a Cassandra database
Step 3: Setting Up the Environment
**Creating a Virtual Environment:**
python -m venv venv
source venv/bin/activate
**Install required packages:**
pip install requests python-dotenv confluent-kafka cassandra-driver
**
** Step 4: Running the Pipeline**
Ensure your Cassandra instance is running. The consumer will automatically create the necessary KeySpace and table if they don't exist
Run the consumer to begin listening for messages:
python weather_consumer.py
In a separate terminal, run the producer to fetch and publish weather data:
python weather_df.py
The producer will:
- Fetch weather data for each configured city
- Publish messages to Kafka
- Log the status of each operation
Data Flow
- The producer calls OpenWeatherMap API for each city
- Weather data is serialized to JSON and published to Kafka
- The consumer continuously polls the Kafka topic
- Incoming messages are deserialized and transformed
- Data is inserted into the Cassandra database for persistence
- The process repeats as new data becomes available
Future Enhancements
- Scheduling: Implement Apache Airflow to schedule regular data collection
- Data Validation: Add schema validation to ensure data quality
- Monitoring: Implement metrics collection for pipeline performance
- Scaling: Configure multiple consumer instances for parallel processing
- Analytics: Build data visualization dashboards with the collected weather data
Conclusion
This ETL pipeline demonstrates how to build a real-time data processing system using Kafka. It provides a foundation that can be extended for various use cases, from weather analytics to environmental monitoring systems.