A Coding Guide to Build a Production-Ready Asynchronous Python SDK with Rate Limiting, In-Memory Caching, and Authentication

In this tutorial, we guide users through building a robust, production-ready Python SDK. It begins by showing how to install and configure essential asynchronous HTTP libraries (aiohttp, nest-asyncio). It then walks through the implementation of core components, including structured response objects, token-bucket rate limiting, in-memory caching with TTL, and a clean, dataclass-driven design. We’ll see […] The post A Coding Guide to Build a Production-Ready Asynchronous Python SDK with Rate Limiting, In-Memory Caching, and Authentication appeared first on MarkTechPost.

Jun 24, 2025 - 07:50
 0
A Coding Guide to Build a Production-Ready Asynchronous Python SDK with Rate Limiting, In-Memory Caching, and Authentication

In this tutorial, we guide users through building a robust, production-ready Python SDK. It begins by showing how to install and configure essential asynchronous HTTP libraries (aiohttp, nest-asyncio). It then walks through the implementation of core components, including structured response objects, token-bucket rate limiting, in-memory caching with TTL, and a clean, dataclass-driven design. We’ll see how to wrap these pieces up in an AdvancedSDK class that supports async context management, automatic retry/wait-on-rate-limit behavior, JSON/auth headers injection, and convenient HTTP-verb methods. Along the way, a demo harness against JSONPlaceholder illustrates caching efficiency, batch fetching with rate limits, error handling, and even shows how to extend the SDK via a fluent “builder” pattern for custom configuration.

import asyncio
import aiohttp
import time
import json
from typing import Dict, List, Optional, Any, Union
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
import hashlib
import logging


!pip install aiohttp nest-asyncio

We install and configure the asynchronous runtime by importing asyncio and aiohttp, alongside utilities for timing, JSON handling, dataclass modeling, caching (via hashlib and datetime), and structured logging. The !pip install aiohttp nest-asyncio line ensures that the notebook can run an event loop seamlessly within Colab, enabling robust async HTTP requests and rate-limited workflows.

@dataclass
class APIResponse:
    """Structured response object"""
    data: Any
    status_code: int
    headers: Dict[str, str]
    timestamp: datetime
   
    def to_dict(self) -> Dict:
        return asdict(self)

The APIResponse dataclass encapsulates HTTP response details, payload (data), status code, headers, and the timestamp of retrieval into a single, typed object. The to_dict() helper converts the instance into a plain dictionary for easy logging, serialization, or downstream processing.

class RateLimiter:
    """Token bucket rate limiter"""
    def __init__(self, max_calls: int = 100, time_window: int = 60):
        self.max_calls = max_calls
        self.time_window = time_window
        self.calls = []
   
    def can_proceed(self) -> bool:
        now = time.time()
        self.calls = [call_time for call_time in self.calls if now - call_time < self.time_window]
       
        if len(self.calls) < self.max_calls:
            self.calls.append(now)
            return True
        return False
   
    def wait_time(self) -> float:
        if not self.calls:
            return 0
        return max(0, self.time_window - (time.time() - self.calls[0]))

The RateLimiter class enforces a simple token-bucket policy by tracking the timestamps of recent calls and allowing up to max_calls within a rolling time_window. When the limit is reached, can_proceed() returns False, and wait_time() calculates how long to pause before making the next request.

class Cache:
    """Simple in-memory cache with TTL"""
    def __init__(self, default_ttl: int = 300):
        self.cache = {}
        self.default_ttl = default_ttl
   
    def _generate_key(self, method: str, url: str, params: Dict = None) -> str:
        key_data = f"{method}:{url}:{json.dumps(params or {}, sort_keys=True)}"
        return hashlib.md5(key_data.encode()).hexdigest()
   
    def get(self, method: str, url: str, params: Dict = None) -> Optional[APIResponse]:
        key = self._generate_key(method, url, params)
        if key in self.cache:
            response, expiry = self.cache[key]
            if datetime.now() < expiry:
                return response
            del self.cache[key]
        return None
   
    def set(self, method: str, url: str, response: APIResponse, params: Dict = None, ttl: int = None):
        key = self._generate_key(method, url, params)
        expiry = datetime.now() + timedelta(seconds=ttl or self.default_ttl)
        self.cache[key] = (response, expiry)

The Cache class provides a lightweight in-memory TTL cache for API responses by hashing the request signature (method, URL, params) into a unique key. It returns valid cached APIResponse objects before expiry and automatically evicts stale entries after their time-to-live has elapsed.

class AdvancedSDK:
    """Advanced SDK with modern Python patterns"""
   
    def __init__(self, base_url: str, api_key: str = None, rate_limit: int = 100):
        self.base_url = base_url.rstrip('/')
        self.api_key = api_key
        self.session = None
        self.rate_limiter = RateLimiter(max_calls=rate_limit)
        self.cache = Cache()
        self.logger = self._setup_logger()
       
    def _setup_logger(self) -> logging.Logger:
        logger = logging.getLogger(f"SDK-{id(self)}")
        if not logger.handlers:
            handler = logging.StreamHandler()
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            handler.setFormatter(formatter)
            logger.addHandler(handler)
            logger.setLevel(logging.INFO)
        return logger
   
    async def __aenter__(self):
        """Async context manager entry"""
        self.session = aiohttp.ClientSession()
        return self
   
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit"""
        if self.session:
            await self.session.close()
   
    def _get_headers(self) -> Dict[str, str]:
        headers = {'Content-Type': 'application/json'}
        if self.api_key:
            headers['Authorization'] = f'Bearer {self.api_key}'
        return headers
   
    async def _make_request(self, method: str, endpoint: str, params: Dict = None,
                          data: Dict = None, use_cache: bool = True) -> APIResponse:
        """Core request method with rate limiting and caching"""
       
        if use_cache and method.upper() == 'GET':
            cached = self.cache.get(method, endpoint, params)
            if cached:
                self.logger.info(f"Cache hit for {method} {endpoint}")
                return cached
       
        if not self.rate_limiter.can_proceed():
            wait_time = self.rate_limiter.wait_time()
            self.logger.warning(f"Rate limit hit, waiting {wait_time:.2f}s")
            await asyncio.sleep(wait_time)
       
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
       
        try:
            async with self.session.request(
                method=method.upper(),
                url=url,
                params=params,
                json=data,
                headers=self._get_headers()
            ) as resp:
                response_data = await resp.json() if resp.content_type == 'application/json' else await resp.text()
               
                api_response = APIResponse(
                    data=response_data,
                    status_code=resp.status,
                    headers=dict(resp.headers),
                    timestamp=datetime.now()
                )
               
                if use_cache and method.upper() == 'GET' and 200 <= resp.status < 300:
                    self.cache.set(method, endpoint, api_response, params)
               
                self.logger.info(f"{method.upper()} {endpoint} - Status: {resp.status}")
                return api_response
               
        except Exception as e:
            self.logger.error(f"Request failed: {str(e)}")
            raise
   
    async def get(self, endpoint: str, params: Dict = None, use_cache: bool = True) -> APIResponse:
        return await self._make_request('GET', endpoint, params=params, use_cache=use_cache)
   
    async def post(self, endpoint: str, data: Dict = None) -> APIResponse:
        return await self._make_request('POST', endpoint, data=data, use_cache=False)
   
    async def put(self, endpoint: str, data: Dict = None) -> APIResponse:
        return await self._make_request('PUT', endpoint, data=data, use_cache=False)
   
    async def delete(self, endpoint: str) -> APIResponse:
        return await self._make_request('DELETE', endpoint, use_cache=False)

The AdvancedSDK class wraps everything together into a clean, async-first client: it manages an aiohttp session via async context managers, injects JSON and auth headers, and coordinates our RateLimiter and Cache under the hood. Its _make_request method centralizes GET/POST/PUT/DELETE logic, handling cache lookups, rate-limit waits, error logging, and response packing into APIResponse objects, while the get/post/put/delete helpers give us ergonomic, high-level calls.