Introducing Mongo Broadcaster: A Multi-Channel MongoDB Change Stream Processor

About a month ago, I wrote a blog post on streaming changes on certain collections in Mercury to build a real-time update system. Over the weekend and due to boredom, I built a package that'll allow you and me to stream changes from a collection and broadcast them via a defined channel such as Websocket, Redis, HTTP streaming, or saved in a collection. For now, I have decided not to implement the delete operation, even if it was a few LOCs. The package focuses solely on insert and update operations. In this blog post, I'll discuss the implementation of the package. It's called broadcaster and can be downloaded from PyPi. TL;DR Install the library from PyPi: pip install mongo-broadcaster Abstract classes In Mercury, the current implementation broadcasts only to a WebSocket client. However, my technical partner & I were discussing building activity logs and whatnot, and only then did it occur that I can extend this to Redis or even store the streams in another collection. As a result, I designed a BaseChannel class that allows other channels to inherit its abstract methods to allow them to design their own implementation: from abc import ABC, abstractmethod from typing import Any, Dict class BaseChannel(ABC): @abstractmethod async def connect(self): """Initialize connection""" pass @abstractmethod async def disconnect(self): """Close connection""" pass @abstractmethod async def send(self, recipient: str, message: Dict[str, Any]): """Send message to recipient""" pass The BaseChannel is the progenitor of the Websocket, Redis, HTTP, and Database channel,s which will be discussed later on. The BaseChannel class has three methods: connect(): This initializes the connection to the channel. e.g., connecting redis server instance. disconnect(): This closes the connection to the channel. send(recipient, message): This handles the sending of messages to the registered recipient. Let's take a look at the models defined for this package. Models There are four Pydantic models defined; three for configuration and one for standardizing the response from the MongoDB stream. Let's take a look at the solo standardization model first. ChangeEvent The ChangeEvent model class returns the log from the MongoDB stream into a presentable format for use in other functions/methods. It is defined as: class ChangeEvent(BaseModel): """Standardized change event model""" operation: str # 'insert'|'update' collection: str document_id: str data: dict timestamp: float namespace: Optional[str] = None recipient: Optional[str] = Field( None, description="Target recipient ID if available (for directed messaging)" ) For example, an update operation returns the following JSON: { '_id': { '_data': '82680684FF000000012B042C0100296E5A1004D3EE9A783E1646B481F4358CF62D4A8D463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B657900463C5F6964003C757365725F31323300000004' }, 'operationType': 'update', 'clusterTime': Timestamp(1745257727, 1), 'wallTime': datetime.datetime(2025, 4, 21, 17, 48, 47, 405000), 'fullDocument': { '_id': 'user_123', 'name': 'Updated to test the newly added field to watch.' }, 'ns': { 'db': 'test', 'coll': 'users' }, 'documentKey': { '_id': 'user_123' }, 'updateDescription': { 'updatedFields': { 'name': 'Updated to test the newly added field to watch.' }, 'removedFields': [], 'truncatedArrays': [] }, 'fullDocumentBeforeChange': None } Now, that doesn't look all pretty. It can be formatted to: operation = 'update' collection = 'users' document_id = 'user_123' data = {} timestamp = 1745257958.0 namespace = 'test' recipient = user_123 The code above is from a function format_change_event defined in the utils.py file. We'll get there, but for now, let's take a look at the other models for configuration. ...Config The three configuration models are: ChangeStreamConfig CollectionConfig BroadcasterConfig ChangeStreamConfig This model defines a pipeline and options to be utilized by the .watch() method. That is, the configuration defined here will determine the behaviour of the change stream. The model comes predefined with a default pipeline and options field: class ChangeStreamConfig(BaseModel): """Configuration for MongoDB change stream""" pipeline: List[Dict[str, Any]] = Field( default_factory=lambda: [ {'$match': {'operationType': {'$in': ['insert', 'update']}}} # add support for delete later -> this requires some other changes ] ) options: Dict[str, Any] = Field( default_factory=lambda: { 'full_document': 'updat

Apr 22, 2025 - 09:29
 0
Introducing Mongo Broadcaster: A Multi-Channel MongoDB Change Stream Processor

About a month ago, I wrote a blog post on streaming changes on certain collections in Mercury to build a real-time update system.

Over the weekend and due to boredom, I built a package that'll allow you and me to stream changes from a collection and broadcast them via a defined channel such as Websocket, Redis, HTTP streaming, or saved in a collection.

For now, I have decided not to implement the delete operation, even if it was a few LOCs. The package focuses solely on insert and update operations.

In this blog post, I'll discuss the implementation of the package. It's called broadcaster and can be downloaded from PyPi.

TL;DR

Install the library from PyPi:

pip install mongo-broadcaster

You decide!

Abstract classes

In Mercury, the current implementation broadcasts only to a WebSocket client. However, my technical partner & I were discussing building activity logs and whatnot, and only then did it occur that I can extend this to Redis or even store the streams in another collection.

As a result, I designed a BaseChannel class that allows other channels to inherit its abstract methods to allow them to design their own implementation:

from abc import ABC, abstractmethod
from typing import Any, Dict


class BaseChannel(ABC):
    @abstractmethod
    async def connect(self):
        """Initialize connection"""
        pass

    @abstractmethod
    async def disconnect(self):
        """Close connection"""
        pass

    @abstractmethod
    async def send(self, recipient: str, message: Dict[str, Any]):
        """Send message to recipient"""
        pass

The BaseChannel is the progenitor of the Websocket, Redis, HTTP, and Database channel,s which will be discussed later on.

The BaseChannel class has three methods:

  • connect(): This initializes the connection to the channel. e.g., connecting redis server instance.
  • disconnect(): This closes the connection to the channel.
  • send(recipient, message): This handles the sending of messages to the registered recipient.

Let's take a look at the models defined for this package.

Models

There are four Pydantic models defined; three for configuration and one for standardizing the response from the MongoDB stream. Let's take a look at the solo standardization model first.

ChangeEvent

The ChangeEvent model class returns the log from the MongoDB stream into a presentable format for use in other functions/methods. It is defined as:

class ChangeEvent(BaseModel):
    """Standardized change event model"""
    operation: str  # 'insert'|'update'
    collection: str
    document_id: str
    data: dict
    timestamp: float
    namespace: Optional[str] = None
    recipient: Optional[str] = Field(
        None,
        description="Target recipient ID if available (for directed messaging)"
    )

For example, an update operation returns the following JSON:

{
    '_id': {
        '_data': '82680684FF000000012B042C0100296E5A1004D3EE9A783E1646B481F4358CF62D4A8D463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B657900463C5F6964003C757365725F31323300000004'
    },
    'operationType': 'update',
    'clusterTime': Timestamp(1745257727,
    1),
    'wallTime': datetime.datetime(2025,
    4,
    21,
    17,
    48,
    47,
    405000),
    'fullDocument': {
        '_id': 'user_123',
        'name': 'Updated to test the newly added field to watch.'
    },
    'ns': {
        'db': 'test',
        'coll': 'users'
    },
    'documentKey': {
        '_id': 'user_123'
    },
    'updateDescription': {
        'updatedFields': {
            'name': 'Updated to test the newly added field to watch.'
        },
        'removedFields': [],
        'truncatedArrays': []
    },
    'fullDocumentBeforeChange': None
}

Now, that doesn't look all pretty. It can be formatted to:

operation = 'update'
collection = 'users'
document_id = 'user_123'
data = {}
timestamp = 1745257958.0
namespace = 'test'
recipient = user_123

The code above is from a function format_change_event defined in the utils.py file. We'll get there, but for now, let's take a look at the other models for configuration.

...Config

The three configuration models are:

  • ChangeStreamConfig
  • CollectionConfig
  • BroadcasterConfig

ChangeStreamConfig

This model defines a pipeline and options to be utilized by the .watch() method. That is, the configuration defined here will determine the behaviour of the change stream. The model comes predefined with a default pipeline and options field:

class ChangeStreamConfig(BaseModel):
    """Configuration for MongoDB change stream"""
    pipeline: List[Dict[str, Any]] = Field(
        default_factory=lambda: [
            {'$match': {'operationType': {'$in': ['insert', 'update']}}}
            # add support for delete later -> this requires some other changes
        ]
    )
    options: Dict[str, Any] = Field(
        default_factory=lambda: {
            'full_document': 'updateLookup',
            'full_document_before_change': 'whenAvailable'
        }
    )

The pipeline is your normal aggregation pipeline. Above, the default pipeline is to match the insert and update operations executed.

The options field comes with two options predefined:

  • full_document to return the full document on which the operation (insert|update) is performed on.
  • full_document_before_change to return the previous version of the document before a change when avaialble. e.g., the document before an update was carried out.

The document before change will not be available by default if the capture of pre-images on the collection is not enabled.

CollectionConfig

CollectionConfig is responsible for storing the details for a specific collection to be streamed. Let's take a look at the definition:

class CollectionConfig(BaseModel):
    """Configuration for a specific collection"""
    collection_name: str
    database_name: Optional[str] = None
    change_stream_config: ChangeStreamConfig = Field(default_factory=ChangeStreamConfig)
    fields_to_watch: List[str] = Field(default_factory=list)
    recipient_identifier: Optional[str] = None  # Field path to identify recipient (e.g., "owner.id")

The CollectionConfig model takes:

  • the collection name
  • the database name (optional)
  • the change stream configuration
  • a list of fields to watch for changes in the collection. If the list is empty, it watches for all fields in the collection
  • an identifier for the recipient of any change. If this is left out, every connected recipient or subscribe will be broadcasted to (e.g., a general service announcement).

BroadcasterConfig

This is the main configuration for the tool. Here, the database URI, a list of collections and optional default database is configured. It is defined simply as:

class BroadcasterConfig(BaseModel):
    """Main configuration for the broadcaster"""
    mongo_uri: str
    collections: List[CollectionConfig]
    default_database: Optional[str] = None

Now that the structure for each configuration model has been laid out, let's take a look at utility functions.

Utilities

These are essential functions that help with the smooth running for the package. The utility functions are self-explainable:

Validate mongo connection

This function validates that the MongoDB connection is valid and live.

def validate_mongo_connection(uri: str) -> bool:
    """Verify MongoDB connection is available"""
    try:
        from motor.motor_asyncio import AsyncIOMotorClient
        client = AsyncIOMotorClient(uri)
        client.admin.command('ping')
        return True
    except Exception as e:
        logger.error(f"MongoDB connection failed: {str(e)}")
        return False

Format change event

This function makes use of the ChangeEvent model to purify the chunk of data sent by the stream.

def format_change_event(change: Dict[str, Any], config: CollectionConfig) -> ChangeEvent:
    """Transform raw MongoDB change stream event"""
    return ChangeEvent(
        operation=change['operationType'],
        collection=change['ns']['coll'],
        document_id=str(change['documentKey']['_id']),
        data=extract_fields(change, getattr(config, 'fields_to_watch', [])),
        timestamp=change['clusterTime'].time,
        namespace=change['ns']['db'],
        recipient=None
    )

Extract fields

The function filters the stream of document to pick items from the change dictionary registered in the fields lsit.

def extract_fields(change: Dict[str, Any], fields: list) -> Dict[str, Any]:
    """Extract specific fields from change stream data"""
    result = {}
    for field in fields:
        keys = field.split('.')
        value = change
        try:
            for key in keys:
                value = value.get(key, {})
            if value:  # Only add non-empty values
                result[field] = value
        except AttributeError:
            continue
    return result

Backoff handler

For retries:

def backoff_handler(details):
    """Exponential backoff for connection retries"""
    logger.warning(
        f"Retrying in {details['wait']:.1f} seconds after "
        f"{details['tries']} tries calling {details['target']}"
    )

Channels

Broadcaster currently supports four channels:

  • Websocket
  • Redis
  • HTTP
  • Database logging

Which channel should be used for broadcasting MongoDB changes?

All four channels inherit the base channel and contain an additional broadcast method. Each of these channels has the methods:

  • connect(client_id): registers a new connection
  • disconnect(client_id): disbands a connection
  • send(recipient, message): sends a message to a designated client
  • broadcast(message): sends message to all connected clients

Websocket

For Websocket, here's the implementation:

class WebSocketChannel(BaseChannel):
    def __init__(self):
        self.active_connections: Dict[str, WebSocket] = {}

    async def connect(self, client_id: str, websocket: WebSocket):
        """Register a new websocket connection"""
        await websocket.accept()
        self.active_connections[client_id] = websocket

    async def disconnect(self, client_id: str):
        """Remove a websocket connection"""
        if client_id in self.active_connections:
            del self.active_connections[client_id]

    async def send(self, recipient: str, message: Dict[str, Any]):
        """Send message to specific client"""
        if recipient in self.active_connections:
            try:
                await self.active_connections[recipient].send_json(message)
            except Exception as e:
                await self.disconnect(recipient)
                raise e

    async def broadcast(self, message: Dict[str, Any]):
        """Send message to all connected clients"""
        for connection in list(self.active_connections.values()):
            try:
                await connection.send_json(message)
            except:
                # Remove dead connections
                client_id = next(
                    (k for k, v in self.active_connections.items()
                        if v == connection), None)
                if client_id:
                    await self.disconnect(client_id)

Redis


class RedisPubSubChannel(BaseChannel):
    def __init__(self, redis_uri: str, channel_prefix: str = "mongo_change:"):
        self.redis_uri = redis_uri
        self.channel_prefix = channel_prefix
        self.redis = None

    async def connect(self):
        """Initialize Redis connection"""
        self.redis = await aioredis.from_url(self.redis_uri)

    async def disconnect(self):
        """Close Redis connection"""
        if self.redis:
            await self.redis.close()

    async def send(self, recipient: str, message: Dict[str, Any]):
        """Publish message to recipient-specific channel"""
        channel = f"{self.channel_prefix}{recipient}"
        await self.redis.publish(channel, json.dumps(message))

    async def broadcast(self, message: Dict[str, Any]):
        """Publish message to broadcast channel"""
        channel = f"{self.channel_prefix}broadcast"
        await self.redis.publish(channel, json.dumps(message))

For redis, the channel name can be set and if not set, the mongo_change will be subscribed to for stream changes.

HTTP

This is specially designed for webhooks.

class HTTPCallbackChannel(BaseChannel):
    def __init__(self, endpoint: str, headers: Dict[str, str] = None, timeout: int = 5):
        self.endpoint = endpoint
        self.headers = headers or {}
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.session = None

    async def connect(self):
        """Create aiohttp session"""
        self.session = aiohttp.ClientSession(headers=self.headers, timeout=self.timeout)

    async def disconnect(self):
        """Close aiohttp session"""
        if self.session:
            await self.session.close()

    async def send(self, recipient: str, message: Dict[str, Any]):
        """Send HTTP POST request"""
        payload = {
            "recipient": recipient,
            "event": message
        }
        async with self.session.post(self.endpoint, json=payload) as response:
            if response.status >= 400:
                raise ValueError(f"HTTP request failed with status {response.status}")

    async def broadcast(self, message: Dict[str, Any]):
        """Send broadcast HTTP POST request"""
        async with self.session.post(self.endpoint, json={"event": message}) as response:
            if response.status >= 400:
                raise ValueError(f"HTTP request failed with status {response.status}")

It is important to note that the current implementation is very basic and can be fine tuned for secure webhook use cases. For example:

class WebhookChannel(HTTPCallbackChannel):
    async def send(self, recipient: str, message: dict):
        """Add webhook signing or retry logic"""
        message["signature"] = generate_hmac(message)
        await super().send(recipient, message)

Database

This channel essentially records the change back to the database, either tied to a single recipient or as a broadcast under the configured parameters.

I think anyone can do this without necessarily using this library, but yay to anything that makes life easy

class DatabaseChannel(BaseChannel):
    def __init__(self, mongo_uri: str, database: str, collection: str):
        self.mongo_uri = mongo_uri
        self.database_name = database
        self.collection_name = collection
        self.client = None
        self.collection = None

    async def connect(self):
        """Initialize MongoDB connection"""
        self.client = AsyncIOMotorClient(self.mongo_uri)
        self.collection = self.client[self.database_name][self.collection_name]

    async def disconnect(self):
        """Close MongoDB connection"""
        if self.client:
            self.client.close()

    async def send(self, recipient: str, message: Dict[str, Any]):
        """Save message to database"""
        if not self.collection:
            raise RuntimeError("Database connection not established")

        document = {
            "recipient": recipient,
            "message": message,
            "timestamp": datetime.datetime.utcnow()
        }
        await self.collection.insert_one(document)

    async def broadcast(self, message: Dict[str, Any]):
        """Save broadcast message to database"""
        document = {
            "message": message,
            "timestamp": datetime.datetime.utcnow(),
            "broadcast": True
        }
        await self.collection.insert_one(document)

Exceptions

The ChannelNotConnectedError exception is raised when there's no valid channel configured for broadcast.

class ChannelNotConnectedError(Exception):
    """Raised when no output channels are configured"""
    pass

There's another that hasn't been implemented yet - that's to check for invalid configuration. Pydantic type checking can handle that.

Broadcasting

All these channels need an outlet for subscribers. The MongoChangeBroadcaster class comes to our rescue:

class MongoChangeBroadcaster:
    def __init__(self, config: BroadcasterConfig):
        self.config = config
        self.mongo_client: Optional[AsyncIOMotorClient] = None
        self.channels: List[BaseChannel] = []
        self._running = False
        self._tasks: List[asyncio.Task] = []

The class has nine methods.

Initialize connection

This method validates and initalizes the MongoDB connection. Validation is done with the aid of the validate_mongo_connection utility function.

async def _initialize_connection(self):
    """Validate and establish MongoDB connection"""
    if not validate_mongo_connection(self.config.mongo_uri):
        raise ConnectionError("Invalid MongoDB connection URI")
    self.mongo_client = AsyncIOMotorClient(self.config.mongo_uri)

Add channel

A station owner needs to add a channel for subscribers to stream events. That's what this function does:

def add_channel(self, channel: BaseChannel):
    """Register an output channel"""
    self.channels.append(channel)

Start streaming

After initialization and channel addition, the stream is started. The method raises an exception if no channel is configured.

async def start(self):
    """Start all change stream watchers"""
    if not self.channels:
        raise ChannelNotConnectedError("No output channels configured")

    await self._initialize_connection()
    self._running = True

    for collection_config in self.config.collections:
        task = asyncio.create_task(
            self._watch_collection_with_retry(collection_config)
        )
        self._tasks.append(task)

Watch collection with retry

Hiccups can happen during transmission. Instead of stopping the entire stream, the broadcast station retries for a bit. The method that handles this is defined:

async def _watch_collection_with_retry(self, config: CollectionConfig):
    """Wrapper with exponential backoff for resiliency"""
    from tenacity import retry, stop_after_attempt, wait_exponential

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=4, max=10),
        before_sleep=backoff_handler
    )
    async def _watch():
        await self._watch_collection(config)

    await _watch()

Watch collection

Since the stream has started, we need to make sure our agents are on site to get us information. Thewatch_collection() method takes the data from the collection config and watches the changes:

async def _watch_collection(self, config: CollectionConfig):
    """Monitor a single collection for changes"""
    db_name = config.database_name or self.config.default_database
    if not db_name:
        raise ValueError("Database name must be specified")

    db = self.mongo_client[db_name]
    collection = db[config.collection_name]

    try:
        async with collection.watch(
            pipeline=config.change_stream_config.pipeline,
            **config.change_stream_config.options
        ) as change_stream:
            async for change in change_stream:
                if not self._running:
                    break
                await self._process_change(change, config)

    except asyncio.CancelledError:
        logger.info(f"Stopped watching {config.collection_name}")
    except Exception as e:
        logger.error(f"Error watching {config.collection_name}: {str(e)}")
        raise

Extract nested field

Some documents have nested fields, and the recipient identifier can be in a field such as order.customer.id. The helper method breaks these fields to get the recipient:

 def _extract_nested_field(self, doc: dict, field_path: str) -> Optional[Any]:
    """Safely extract nested fields like 'owner.id'"""
    value = doc
    for key in field_path.split('.'):
        value = value.get(key, {})
        if not value:
            return None
    return value

This may not work for really complex scenarios now that I think about it. To be safe, make use of simple identifiers like _id.

Process change

When the streams are logged into the broadcast station, they need to be processed to retrieve the important information. Here, the method also converts the change event into a neater ChangeEvent model instance.

 async def _process_change(self, change: dict, config: CollectionConfig):
    """Process and distribute a change event"""
    try:
        # Extract recipient if configured
        recipient = None
        if config.recipient_identifier:
            doc = change.get('fullDocument') or change.get('fullDocumentBeforeChange', {})
            recipient = self._extract_nested_field(doc, config.recipient_identifier)

        event = format_change_event(change, config)
        event.recipient = str(recipient) if recipient else None
        await self._send_to_channels(event)

    except Exception as e:
        logger.error(f"Error processing change: {str(e)}")

Send to channels

The method broadcasts to registered channels to send to recipients or broadcast to the whole network via this method:

 async def _send_to_channels(self, event: ChangeEvent):
    """Broadcast to all registered channels"""
    for channel in self.channels:
        try:
            if event.recipient:  # Targeted delivery
                await channel.send(event.recipient, event.dict())
            else:  # Broadcast
                await channel.broadcast(event.dict())
        except Exception as e:
            logger.error(f"Channel {type(channel).__name__} failed: {str(e)}")

Stop

Every beginning must have an end. The stop() method cancels all the asynchronous tasks deployed and closes the MongoDB client when invoked. This is great for shutdown() events.

 async def stop(self):
    """Gracefully shutdown all watchers"""
    self._running = False
    for task in self._tasks:
        task.cancel()
    await asyncio.gather(*self._tasks, return_exceptions=True)
    if self.mongo_client:
        self.mongo_client.close()

Example

Here's a WebSocket channel example:

from contextlib import asynccontextmanager

from fastapi import FastAPI, WebSocket
from starlette.websockets import WebSocketDisconnect

from broadcaster import MongoChangeBroadcaster, BroadcasterConfig, CollectionConfig, WebSocketChannel


@asynccontextmanager
async def lifespan(app: FastAPI):
    await broadcaster.start()

    yield
    await broadcaster.stop


app = FastAPI(lifespan=lifespan)
websocket_channel = WebSocketChannel()

config = BroadcasterConfig(
    mongo_uri="mongodb://localhost:27017",
    collections=[
        CollectionConfig(
            collection_name="users",
            recipient_identifier="fullDocument._id",  # Send to user who owns the document
            database_name="test",
        ),
    ]
)

broadcaster = MongoChangeBroadcaster(config)
broadcaster.add_channel(websocket_channel)


@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
    await websocket_channel.connect(user_id, websocket)
    try:
        while True:
            await websocket.receive_text()  # Keep connection alive
    except WebSocketDisconnect:
        await websocket_channel.disconnect(user_id)


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app, host="0.0.0.0", port=8000)

To test:

  1. Run: python websocket_example.py
  2. Open a websocket connection with a user ID: websocat ws://0.0.0.0:8000/ws/ydev
  3. Make a change into the collection. In the example, I have a users collection in my test database.
  4. Make a change, for example: db.users.updateOne({_id: 'ydev'}, {"$set": {"name": "Updated to test the newly added field to watch changes"}})
  5. Listen for changes in the WebSocket console

Conclusion

This is a lengthy blog post, so I must commend you if you read or skipped to this part. I'm going to try to write a full documentation for the library (amen).

It's open source on GitHub on https://github.com/Youngestdev/broadcaster.