reaktiv: Reactive State Management for Python
The State Management Dilemma in Python Picture this: You’re designing a FastAPI WebSocket to broadcast real-time sensor data. Or orchestrating an IoT pipeline that reacts to environmental changes. Or building a long-running CLI tool that updates its UI based on user input. In all these scenarios, a critical question arises: How do you efficiently propagate state changes without drowning in boilerplate? Traditional approaches-manual subscriptions, observer patterns, or asyncio event loops-often lead to: Spaghetti Code: Callbacks entangled like headphone wires. Resource Leaks: Forgotten subscriptions haunting your memory. Concurrency Bugs: Race conditions that defy logical explanation. Enter reaktiv, a library that borrows the reactive programming paradigm from frontend frameworks like Angular and SolidJS, adapting it for Python’s async-first world. Reactive Programming: A 30-Second History Lesson Reactivity emerged in frontend ecosystems to solve a critical problem: How can UIs automatically update when data changes? Frameworks introduced signals-variables that track dependencies and notify consumers on mutation. The result? Cleaner code, implicit dependency management, and frontend devs who stopped manually calling render(). Python, however, lagged behind. While libraries like RxPy offered reactive extensions, they demanded complex setups. reaktiv fills this gap with a minimalist, Pythonic API tailored for real-world backend and IoT use cases. Core Concepts: Signals, Computed, and Effects reaktiv’s power lies in three primitives: Signal: A mutable value container that notifies dependents on change. from reaktiv import signal temperature = signal(25.0) # Initial value temperature.set(30.0) # Update value Computed: Derived values that auto-update when dependencies change. from reaktiv import computed feels_like = computed(lambda: temperature() * humidity()) Effect: Side effects triggered by signal/computed changes. from reaktiv import effect async def log_climate(): print(f"Feels like: {feels_like()}°C") log_effect = effect(log_climate) # Runs on `temperature` or `humidity` changes This triad enables declarative state management-define relationships once, let the library handle updates. Why reaktiv Shines in Real-World Scenarios 1. FastAPI WebSockets: Real-Time Updates Without the Boilerplate Consider a live dashboard tracking stock prices. With reaktiv: from fastapi import WebSocket from reaktiv import signal, effect stocks = signal({"AAPL": 150.0, "GOOGL": 2700.0}) @app.websocket("/stocks") async def stock_feed(websocket: WebSocket): await websocket.accept() async def send_updates(): await websocket.send_json(stocks()) # Assign effect to variable to avoid garbage collection update_effect = effect(send_updates) try: while True: await asyncio.sleep(1) # Simulate background updates finally: update_effect.dispose() # Cleanup When stocks.set(new_data) fires, all connected clients receive updates. No manual broadcast() calls. No polling. 2. IoT Systems: Reacting to Sensor Data For an IoT pipeline monitoring air quality: pm25 = signal(35.0) alert_threshold = signal(50.0) # Derived air quality index aqi = computed(lambda: pm25() / alert_threshold() * 100) async def trigger_alerts(): if aqi() > 100: await notify_admins() await activate_purifiers() # Assign effect to persist it alert_effect = effect(trigger_alerts) # Simulate sensor input pm25.set(55.0) # AQI = 110 → Alerts fire 3. Long-Running Processes with User Interaction Imagine a CLI tool that processes uploads while accepting user commands: upload_progress = signal(0) user_command = signal("pause") async def handle_upload(): while upload_progress()

The State Management Dilemma in Python
Picture this: You’re designing a FastAPI WebSocket to broadcast real-time sensor data. Or orchestrating an IoT pipeline that reacts to environmental changes. Or building a long-running CLI tool that updates its UI based on user input.
In all these scenarios, a critical question arises: How do you efficiently propagate state changes without drowning in boilerplate?
Traditional approaches-manual subscriptions, observer patterns, or asyncio
event loops-often lead to:
- Spaghetti Code: Callbacks entangled like headphone wires.
- Resource Leaks: Forgotten subscriptions haunting your memory.
- Concurrency Bugs: Race conditions that defy logical explanation.
Enter reaktiv, a library that borrows the reactive programming paradigm from frontend frameworks like Angular and SolidJS, adapting it for Python’s async-first world.
Reactive Programming: A 30-Second History Lesson
Reactivity emerged in frontend ecosystems to solve a critical problem: How can UIs automatically update when data changes? Frameworks introduced signals-variables that track dependencies and notify consumers on mutation.
The result? Cleaner code, implicit dependency management, and frontend devs who stopped manually calling render()
.
Python, however, lagged behind. While libraries like RxPy offered reactive extensions, they demanded complex setups. reaktiv fills this gap with a minimalist, Pythonic API tailored for real-world backend and IoT use cases.
Core Concepts: Signals, Computed, and Effects
reaktiv’s power lies in three primitives:
- Signal: A mutable value container that notifies dependents on change.
from reaktiv import signal
temperature = signal(25.0) # Initial value
temperature.set(30.0) # Update value
- Computed: Derived values that auto-update when dependencies change.
from reaktiv import computed
feels_like = computed(lambda: temperature() * humidity())
- Effect: Side effects triggered by signal/computed changes.
from reaktiv import effect
async def log_climate():
print(f"Feels like: {feels_like()}°C")
log_effect = effect(log_climate) # Runs on `temperature` or `humidity` changes
This triad enables declarative state management-define relationships once, let the library handle updates.
Why reaktiv Shines in Real-World Scenarios
1. FastAPI WebSockets: Real-Time Updates Without the Boilerplate
Consider a live dashboard tracking stock prices. With reaktiv:
from fastapi import WebSocket
from reaktiv import signal, effect
stocks = signal({"AAPL": 150.0, "GOOGL": 2700.0})
@app.websocket("/stocks")
async def stock_feed(websocket: WebSocket):
await websocket.accept()
async def send_updates():
await websocket.send_json(stocks())
# Assign effect to variable to avoid garbage collection
update_effect = effect(send_updates)
try:
while True:
await asyncio.sleep(1) # Simulate background updates
finally:
update_effect.dispose() # Cleanup
When stocks.set(new_data)
fires, all connected clients receive updates. No manual broadcast()
calls. No polling.
2. IoT Systems: Reacting to Sensor Data
For an IoT pipeline monitoring air quality:
pm25 = signal(35.0)
alert_threshold = signal(50.0)
# Derived air quality index
aqi = computed(lambda: pm25() / alert_threshold() * 100)
async def trigger_alerts():
if aqi() > 100:
await notify_admins()
await activate_purifiers()
# Assign effect to persist it
alert_effect = effect(trigger_alerts)
# Simulate sensor input
pm25.set(55.0) # AQI = 110 → Alerts fire
3. Long-Running Processes with User Interaction
Imagine a CLI tool that processes uploads while accepting user commands:
upload_progress = signal(0)
user_command = signal("pause")
async def handle_upload():
while upload_progress() < 100:
if user_command() == "pause":
await asyncio.sleep(1) # Wait for resume
continue
# Process chunk
upload_progress.update(lambda x: x + 10)
upload_effect = effect(handle_upload)
User inputs (e.g., user_command.set("resume")
) dynamically alter behavior without restarting the process.
Under the Hood: How reaktiv Tracks Dependencies
When you access a signal inside a computed
or effect
, reaktiv automatically records it as a dependency. This dependency graph ensures:
- Efficiency: Only affected computations rerun on changes.
- Glitch-Free: Batched updates prevent inconsistent intermediate states.
For example:
a = signal(1)
b = signal(2)
c = computed(lambda: a() + b()) # Depends on `a` and `b`
eff = effect(lambda: print(c())) # Depends on `c`
a.set(3) # Recomputes `c`, then triggers effect
Advanced Features for Production Use
- Custom Equality Checks: Avoid unnecessary updates with custom comparators.
# Only trigger if the difference exceeds 5
temp = signal(25.0, equal=lambda old, new: abs(old - new) < 5)
- Untracked Reads: Access a signal without subscribing to it.
from reaktiv import untracked
eff = effect(lambda: print(untracked(temp))) # No dependency on `temp`
- Effect Cleanup: Release resources when effects rerun or dispose.
async def fetch_data(on_cleanup):
timer = start_interval(update_data, seconds=10)
on_cleanup(timer.cancel) # Cancel on disposal
Performance Considerations
reaktiv excels in lightweight to moderate workloads (e.g., hundreds of clients, IoT edge devices). However:
- Scalability: For 10k+ WebSocket connections, pair reaktiv with dedicated brokers like Redis Pub/Sub.
- Garbage Collection: Effects must be assigned to variables; otherwise, they’re garbage-collected prematurely.
- Async Safety: Designed for thread-unsafe, single-threaded async code. Use with caution in threaded contexts.
When to Use reaktiv (and When Not To)
Ideal For:
- Real-time dashboards (FastAPI/WebSocket).
- IoT/edge computing pipelines.
- CLI tools with dynamic user interaction.
Not Ideal For:
- High-frequency trading systems (nanosecond latency).
- Distributed systems requiring consensus (use actors or CRDTs).
Embrace Reactivity, Reduce Cognitive Load
reaktiv isn’t a silver bullet, but it is a pragmatic tool for simplifying state management in Python. By adopting patterns proven in frontend ecosystems, it lets you:
- Declare relationships, not manual updates.
-
Integrate seamlessly with
asyncio
. - Focus on business logic, not boilerplate.
For senior developers, the value lies in reducing incidental complexity-the kind that breeds bugs and burns out teams.
Get Started:
pip install reaktiv
Explore the documentation and examples to see reaktiv in action.