Concurrency in Python
Fundamentals of Concurrency Concurrency: Managing multiple tasks simultaneously to improve responsiveness. Threads and asyncio operate on a single processor, switching tasks. Multiprocessing achieves true parallelism by using multiple CPU cores. Parallelism: Running tasks literally at the same time on different processors. Types of Tasks: I/O-Bound: Limited by input/output operations (e.g., network, file system). CPU-Bound: Limited by the processor's computational power. Concurrency Models Threading: Use for I/O-bound tasks. Managed by the OS (preemptive multitasking). Python module: threading, concurrent.futures.ThreadPoolExecutor. Asyncio: Best for I/O-bound tasks. Uses cooperative multitasking (tasks decide when to yield control). Requires non-blocking libraries (e.g., aiohttp for HTTP requests). Python module: asyncio. Multiprocessing: Use for CPU-bound tasks. Achieves parallelism by using multiple processes. Python module: multiprocessing, concurrent.futures.ProcessPoolExecutor. Advantages and Limitations Threading: ✅ Simplifies concurrency for I/O-bound tasks. ❌ Limited by Python’s Global Interpreter Lock (GIL) for CPU-bound tasks. Asyncio: ✅ Lightweight; excellent for scaling I/O-bound tasks. ❌ Requires non-blocking libraries; prone to errors if blocking calls are used. Multiprocessing: ✅ Best for CPU-bound tasks; achieves true parallelism. ❌ Higher overhead for creating processes. asyncio — Asynchronous I/O As per official document of Python (version 3.13.3 when writing this blog) below are the key concepts to remember when working with asyncio in Python - Summary: asyncio is a library to write concurrent code using the async/await syntax. asyncio is used as a foundation for multiple Python asynchronous frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc. asyncio is often a perfect fit for IO-bound and high-level structured network code. import asyncio async def main(): print('Guten ...') await asyncio.sleep(1) print('... morgen!') asyncio.run(main()) asyncio provides a set of high-level and low-level APIs to make use of it as a Python developer. High level APIs include Runners, Coroutines and Tasks, Streams, Synchronization Primitives, Subprocesses, Queues, Exceptions. While Low level APIs include Event Loop, Futures, Transports and, Protocols, Policies, Platform Support, Extending . Basic terms to remember - AsyncIO Runner Event Loop Coroutine Task Queue Future Results AsyncIO Runner This is the outer container that manages the entire async execution Creates and manages the event loop Handles setup and cleanup of the async environment Event Loop The engine that drives asynchronous execution Continuously runs in cycles, checking for tasks to execute Manages the scheduling of coroutines Tracks when I/O operations complete Coroutine A function marked with async def Can be paused and resumed using await Task Queue Where the event loop stores tasks that are ready to run Manages the order of execution Future Results Stores the eventual results of async operations Used to track completion status Runner API asyncio.run(coro, *, debug=None, loop_factory=None) Execute the coroutine coro and return the result. Example, async def main(): await asyncio.sleep(1) print('hey! how are are you?') asyncio.run(main()) Runner Context Manager API class asyncio.Runner(*, debug=None, loop_factory=None) run(coro, *, context=None) Run a coroutine coro in the embedded loop close() Close the runner. get_loop() Return the event loop associated with the runner instance. Example, async def main(): await asyncio.sleep(1) print('hey! whats up?') with asyncio.Runner() as runner: runner.run(main()) Coroutine and Task API Coroutine Function and Coroutine Object # Define a coroutine function async def my_function(): print("Starting") await asyncio.sleep(1) print("Finished") return "Result" # Call the function - this creates a coroutine object but doesn't run it yet coro = my_function() print(type(coro)) # # To actually run it, you need to: # Option 1: Await it (from within another coroutine) result = await coro # Option 2: Schedule it on the event loop task = asyncio.create_task(coro) # Option 3: Run it using asyncio.run() result = asyncio.run(coro) Key differences - | Aspect | Coroutine Function (`async def`) | Coroutine Object (`func()`) | | ------------------- | ----------------------------------- | ----------------------------------------- | | Definition | Declared with `async def` | Created by calling a coroutine function | | Nature | A callable | An awaitable object | | Purpose | Defines an asynchronous operation

Fundamentals of Concurrency
Concurrency: Managing multiple tasks simultaneously to improve responsiveness.
Threads and asyncio operate on a single processor, switching tasks.
Multiprocessing achieves true parallelism by using multiple CPU cores.
Parallelism: Running tasks literally at the same time on different processors.
Types of Tasks:
I/O-Bound: Limited by input/output operations (e.g., network, file system).
CPU-Bound: Limited by the processor's computational power.
Concurrency Models
Threading:
Use for I/O-bound tasks.
Managed by the OS (preemptive multitasking).
Python module: threading, concurrent.futures.ThreadPoolExecutor.
Asyncio:
Best for I/O-bound tasks.
Uses cooperative multitasking (tasks decide when to yield control).
Requires non-blocking libraries (e.g., aiohttp for HTTP requests).
Python module: asyncio.
Multiprocessing:
Use for CPU-bound tasks.
Achieves parallelism by using multiple processes.
Python module: multiprocessing, concurrent.futures.ProcessPoolExecutor.
Advantages and Limitations
Threading:
✅ Simplifies concurrency for I/O-bound tasks.
❌ Limited by Python’s Global Interpreter Lock (GIL) for CPU-bound tasks.
Asyncio:
✅ Lightweight; excellent for scaling I/O-bound tasks.
❌ Requires non-blocking libraries; prone to errors if blocking calls are used.
Multiprocessing:
✅ Best for CPU-bound tasks; achieves true parallelism.
❌ Higher overhead for creating processes.
asyncio — Asynchronous I/O
As per official document of Python (version 3.13.3 when writing this blog) below are the key concepts to remember when working with asyncio in Python -
Summary:
asyncio is a library to write concurrent code using the async/await syntax.
asyncio is used as a foundation for multiple Python asynchronous frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc.
asyncio is often a perfect fit for IO-bound and high-level structured network code.
import asyncio
async def main():
print('Guten ...')
await asyncio.sleep(1)
print('... morgen!')
asyncio.run(main())
asyncio provides a set of high-level and low-level APIs to make use of it as a Python developer.
High level APIs include Runners, Coroutines and Tasks, Streams, Synchronization Primitives, Subprocesses, Queues, Exceptions.
While Low level APIs include Event Loop, Futures, Transports and, Protocols, Policies, Platform Support, Extending .
Basic terms to remember -
- AsyncIO Runner
- Event Loop
- Coroutine
- Task Queue
- Future Results
AsyncIO Runner
This is the outer container that manages the entire async execution
Creates and manages the event loop
Handles setup and cleanup of the async environment
Event Loop
The engine that drives asynchronous execution
Continuously runs in cycles, checking for tasks to execute
Manages the scheduling of coroutines
Tracks when I/O operations complete
Coroutine
A function marked with async def
Can be paused and resumed using await
Task Queue
Where the event loop stores tasks that are ready to run
Manages the order of execution
Future Results
Stores the eventual results of async operations
Used to track completion status
Runner API
asyncio.run(coro, *, debug=None, loop_factory=None)
Execute the coroutine coro and return the result.
Example,
async def main():
await asyncio.sleep(1)
print('hey! how are are you?')
asyncio.run(main())
Runner Context Manager API
class asyncio.Runner(*, debug=None, loop_factory=None)
run(coro, *, context=None)
Run a coroutine coro in the embedded loop
close()
Close the runner.
get_loop()
Return the event loop associated with the runner instance.
Example,
async def main():
await asyncio.sleep(1)
print('hey! whats up?')
with asyncio.Runner() as runner:
runner.run(main())
Coroutine and Task API
Coroutine Function and Coroutine Object
# Define a coroutine function
async def my_function():
print("Starting")
await asyncio.sleep(1)
print("Finished")
return "Result"
# Call the function - this creates a coroutine object but doesn't run it yet
coro = my_function()
print(type(coro)) #
# To actually run it, you need to:
# Option 1: Await it (from within another coroutine)
result = await coro
# Option 2: Schedule it on the event loop
task = asyncio.create_task(coro)
# Option 3: Run it using asyncio.run()
result = asyncio.run(coro)
Key differences -
| Aspect | Coroutine Function (`async def`) | Coroutine Object (`func()`) |
| ------------------- | ----------------------------------- | ----------------------------------------- |
| Definition | Declared with `async def` | Created by calling a coroutine function |
| Nature | A callable | An awaitable object |
| Purpose | Defines an asynchronous operation | Represents the pending execution |
| Execution State | Static (like a function definition) | Dynamic (has internal state) |
| Awaitable? | ❌ Cannot be awaited | ✅ Can be awaited |
| Scheduling | Not scheduled on event loop | Can be scheduled with `await` or a `Task` |
| | | |
Event Loop Life cycle Methods
# Explicit event loop management (when asyncio.run() isn't sufficient)
import asyncio
async def my_coroutine():
await asyncio.sleep(1)
return "Done"
# Get the event loop
loop = asyncio.get_event_loop()
# Schedule coroutines
task = loop.create_task(my_coroutine())
# Run until all tasks complete
loop.run_until_complete(task)
# Get the result
result = task.result()
print(result)
# Close the loop when done
loop.close()
Creating Tasks API
import asyncio
async def fetch_data(id):
print(f"Fetching data for {id}...")
await asyncio.sleep(2) # Simulating network request
return f"Data for {id}"
async def main():
# Create a task that runs in the background
task = asyncio.create_task(fetch_data(123))
# Do other work while the task runs concurrently
print("Doing other work...")
await asyncio.sleep(1)
# Wait for the task to complete and get its result
result = await task
print(f"Task completed with result: {result}")
asyncio.run(main())
Task Cancellation and Handling Cancellation Exceptions
import asyncio
async def long_operation():
try:
print("Starting long operation...")
await asyncio.sleep(10) # Long operation
return "Operation completed"
except asyncio.CancelledError:
print("Operation was cancelled, cleaning up...")
# Perform any necessary cleanup
raise # Re-raise to propagate cancellation
async def main():
# Create the task
task = asyncio.create_task(long_operation())
# Let it run for a bit
await asyncio.sleep(2)
# Cancel the task
print("Cancelling task...")
task.cancel()
try:
# Await the cancelled task
await task
except asyncio.CancelledError:
print("Main: task was cancelled")
asyncio.run(main())
Task Groups API
import asyncio
async def process_item(item):
print(f"Processing {item}...")
await asyncio.sleep(item) # Variable processing time
return f"Processed {item}"
async def process_order(order):
print(f"Processing {order}...")
await asyncio.sleep(order) # Variable processing order
return f"Processed {order}"
async def process_bill(bill):
print(f"Processing {bill}...")
await asyncio.sleep(bill) # Variable processing bill
return f"Processed {bill}"
async def main():
# Using TaskGroup to manage multiple tasks
async with asyncio.TaskGroup() as tg:
# Create multiple tasks that will run concurrently
task1 = tg.create_task(process_item(1))
task2 = tg.create_task(process_order(2))
task3 = tg.create_task(process_bill(3))
print("All tasks have been created and are running...")
# When we exit the TaskGroup context, all tasks are awaited
# If any task raises an exception, it will be propagated
print("All tasks completed!")
print(f"Results: {task1.result()}, {task2.result()}, {task3.result()}")
asyncio.run(main())
How await
suspend execution and yeilds control back to event loop?
import asyncio
import time
async def task1():
print(f"[{time.strftime('%H:%M:%S')}] Task 1 started")
await asyncio.sleep(2) # Suspend here, yield to event loop
print(f"[{time.strftime('%H:%M:%S')}] Task 1 resumed after 2s")
await asyncio.sleep(1) # Suspend again
print(f"[{time.strftime('%H:%M:%S')}] Task 1 completed")
async def task2():
print(f"[{time.strftime('%H:%M:%S')}] Task 2 started")
await asyncio.sleep(1) # Suspend here, yield to event loop
print(f"[{time.strftime('%H:%M:%S')}] Task 2 resumed after 1s")
await asyncio.sleep(2) # Suspend again
print(f"[{time.strftime('%H:%M:%S')}] Task 2 completed")
async def main():
# Create tasks to run concurrently
t1 = asyncio.create_task(task1())
t2 = asyncio.create_task(task2())
# Wait for both tasks to complete
await t1
await t2
asyncio.run(main())
Output :
Program starts
│
├─> Task 1 starts
│ └─> await sleep(2) - Suspends Task 1
│ │
├─> Task 2 starts (Event loop switches to Task 2)
│ └─> await sleep(1) - Suspends Task 2
│ │
│ │ (Event loop waits, no active tasks)
│ │
│ └─> Task 2 resumes after 1s
│ └─> await sleep(2) - Suspends Task 2 again
│ │
│ └─> Task 1 resumes after 2s
│ └─> await sleep(1) - Suspends Task 1 again
│ │
│ │ (Event loop waits)
│ │
│ └─> Task 1 completes
│ │
│ └─> Task 2 completes
│
Program ends
Understanding async with
and async for
Statements
Both async with
and async for
are extensions of their synchronous counterparts, designed to work with asynchronous code.
Both are syntactic sugar that make asynchronous code more readable and maintainable by matching the familiar synchronous patterns.
The async with
statement is used with asynchronous context managers, allowing you to handle setup and teardown operations that involve awaitable objects.
Common for database connections, file operations, network connections
import asyncio
class AsyncResource:
async def __aenter__(self):
# Async setup code
print("Acquiring resource...")
await asyncio.sleep(1) # Simulate async acquisition
print("Resource acquired")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# Async cleanup code
print("Releasing resource...")
await asyncio.sleep(0.5) # Simulate async release
print("Resource released")
async def use_resource(self):
print("Using the resource")
await asyncio.sleep(0.5)
async def main():
# Using async with to manage the resource
async with AsyncResource() as resource:
# This block is executed after __aenter__ completes
await resource.use_resource()
# When the block exits, __aexit__ is called automatically
print("Done with the resource")
asyncio.run(main())
Acquiring resource...
Resource acquired
Using the resource
Releasing resource...
Resource released
Done with the resource
Another Example,
import asyncio
import aiofiles
async def read_file():
async with aiofiles.open('data.txt', 'r') as file:
# File is automatically closed when the block exits
content = await file.read()
return content
# Usage
asyncio.run(read_file())
The async for
statement is used with asynchronous iterators, allowing you to iterate over items that are produced asynchronously.
Common for streaming APIs, paginated results, large dataset processing
import asyncio
class AsyncCounter:
def __init__(self, limit):
self.limit = limit
self.counter = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.counter < self.limit:
self.counter += 1
# Simulate async work to produce next value
await asyncio.sleep(0.5)
return self.counter
else:
# Signal the end of iteration
raise StopAsyncIteration
async def main():
# Using async for to iterate over async iterator
print("Starting iteration...")
async for number in AsyncCounter(5):
print(f"Got number: {number}")
print("Iteration complete")
asyncio.run(main())
Starting iteration...
Got number: 1
Got number: 2
Got number: 3
Got number: 4
Got number: 5
Iteration complete
Another Example,
import asyncio
import aiohttp
async def fetch_data_stream():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/data', timeout=30) as response:
# Process the response as a stream
async for chunk in response.content.iter_chunked(1024):
# Process each chunk as it arrives
process_chunk(chunk)
def process_chunk(chunk):
# Do something with the data chunk
print(f"Received {len(chunk)} bytes")
# Usage
asyncio.run(fetch_data_stream())
Understanding Futures in AsyncIO
Futures represent the eventual result of an asynchronous operation
They act as a placeholder until a result is available.
States:
Pending
: Initial state, no result yet
Finished
: Has a result or exception
Cancelled
: Operation was cancelled
Key Methods:
set_result(result)
: Set the result of the future
set_exception(exception)
: Set an exception
result()
: Get the result (raises exception if not done)
done()
: Check if completed
add_done_callback(fn)
: Add a callback for when done
Comparison with Tasks:
Tasks are higher-level and manage coroutines
Tasks are automatically scheduled to run
Futures are lower-level, must be set manually
Every Task is a Future, but not every Future is a Task
Awaitable Objects
├── Coroutines (created from async def functions)
├── Tasks (manages coroutines in the event loop)
└── Futures (low-level awaitable objects representing eventual results)
import asyncio
async def set_future_after_delay(future, delay, value):
# Simulate work
await asyncio.sleep(delay)
# Set the result on the future
future.set_result(value)
print(f"Future set with value: {value}")
async def main():
# Create a Future object
future = asyncio.Future()
# Schedule a task that will eventually set the future's result
asyncio.create_task(set_future_after_delay(future, 2, "Future is complete!"))
print("Waiting for future to complete...")
# Await the future - this will pause here until the future has a result
result = await future
print(f"Received result: {result}")
print(f"Future state: {future}")
asyncio.run(main())
Waiting for future to complete...
Future set with value: Future is complete!
Received result: Future is complete!
Future state:
Cheat Sheet to save developers valuable time
✅ 1. Fundamental AsyncIO Concepts
| Concept | Purpose | Syntax |
| ---------------------------- | ------------------------------- | ------------------------------------------- |
| Coroutine Function | Declares async function | `async def foo():` |
| Coroutine Object | Returned when calling coroutine | `coro = foo()` |
| await | Suspend and yield control | `await some_async_func()` |
| async with | Async context manager | `async with aiofile.open(...) as f:` |
| async for | Async iteration (e.g., streams) | `async for line in stream:` |
| Event Loop | Orchestrates async tasks | `asyncio.run(main())` |
| Manual Loop | Custom control (rare) | `loop = asyncio.get_event_loop()` |
| asyncio.run() | Start main coroutine | `asyncio.run(main())` |
| asyncio.Runner() (3.11+) | Context-managed loop runner | `with asyncio.Runner() as r: r.run(main())` |
| Event loop policy | Platform-specific loop rules | `asyncio.get_event_loop_policy()` |