Skip to content

Middleware

Middleware wraps a handler with cross-cutting concerns — logging, auth, body parsing, response shaping — without changing the handler's signature. The shape is the same as Starlette / Quart / ASGI 3.0 generally, with one BlackBull convenience (@as_middleware) layered on top.

Writing a middleware

import time

async def logging_mw(scope, receive, send, call_next):
    t0 = time.monotonic()
    await call_next(scope, receive, send)
    elapsed = (time.monotonic() - t0) * 1000
    print(f"{scope['method']} {scope['path']}  {elapsed:.1f} ms")

Signature: async def mw(scope, receive, send, call_next).

  • Call await call_next(scope, receive, send) to pass control to the next layer.
  • The legacy parameter name inner is accepted as an alias for call_next.
  • Sending a response without calling call_next short-circuits all inner layers.

Middleware functions keep the full (scope, receive, send, call_next) shape; the simplified handler form does not apply to them.

The @as_middleware decorator

Route handlers can return Response / JSONResponse objects instead of calling send(...) with raw ASGI events. A middleware that wraps send would otherwise have to handle both forms. Decorate the middleware with @as_middleware and the inner wrapper sees only plain dict events — Response objects are expanded into http.response.start + http.response.body for you:

from blackbull import as_middleware

@as_middleware
async def add_header_mw(scope, receive, send, call_next):
    async def wrapped(event):
        if event['type'] == 'http.response.start':
            event = {**event,
                     'headers': list(event.get('headers', [])) + [(b'x-custom', b'1')]}
        await send(event)
    await call_next(scope, receive, wrapped)

@as_middleware also works on classes (it wraps __call__). All of BlackBull's built-in middleware uses the class form:

@as_middleware
class TimingMiddleware:
    def __init__(self, threshold_ms: float = 100.0):
        self._threshold = threshold_ms
    async def __call__(self, scope, receive, send, call_next):
        ...

Omit the decorator when you need raw send arguments — e.g. middleware used in a deployment that never registers simplified handlers.

Observation vs. interception

Middleware in BlackBull is sugar over the @app.intercept('before_handler') hook. For purely observational concerns (logging, metrics, tracing) prefer @app.on(...) instead — middleware forces every observer to run on the request critical path, and one slow observer can degrade every response. See Events.

Attaching middleware to a route

@app.route(path='/protected', middlewares=[auth_mw, logging_mw])
async def handler(scope, receive, send):
    ...

The list is outer-to-inner: the first entry runs first on the way in, last on the way out.

            ┌─ auth_mw ──────────────────────────────────┐
  request → │  ┌─ logging_mw ─────────────────────────┐  │ → response
            │  │  ┌─ handler ─┐                       │  │
            │  │  │  (runs)   │                       │  │
            │  │  └───────────┘                       │  │
            │  └──────────────────────────────────────┘  │
            └────────────────────────────────────────────┘

auth_mw runs first; it either short-circuits or delegates to logging_mw, which then delegates to handler. Post-handler code (after await call_next(...)) runs in reverse order: logging_mw post → auth_mw post.

For routes that share a middleware prefix, use Route Groups.

Global middleware

app.use(mw) registers a middleware that wraps every route:

from blackbull import BlackBull
from blackbull.middleware.compression import Compression

app = BlackBull()
app.use(Compression())            # applies to all routes

Global middleware run outside per-route middleware. The effective order at request time is:

global mw → route-group mw → per-route mw → handler

Each layer can short-circuit (return without calling call_next) to skip everything below it.

Built-in middleware

websocket

Consumes the initial websocket.connect event and sends websocket.accept so the inner handler can skip the boilerplate:

from blackbull.middleware import websocket
from blackbull.utils import Scheme

@app.route(path='/chat', scheme=Scheme.websocket, middlewares=[websocket])
async def chat(scope, receive, send):
    while True:
        event = await receive()
        if event['type'] == 'websocket.disconnect':
            break
        await send({'type': 'websocket.send', 'text': event.get('text', '')})

Compression / compress

Compresses HTTP response bodies using the codec the client prefers (brotli > zstd > gzip, based on Accept-Encoding):

from blackbull.middleware import compress
from blackbull.middleware.compression import Compression

@app.route(path='/data', middlewares=[compress])
async def data_handler(scope, receive, send):
    await send(Response(large_payload))

# Or with a higher size threshold:
@app.route(path='/large', middlewares=[Compression(min_size=4096)])
async def large(scope, receive, send):
    ...

Brotli and zstandard are optional extras:

pip install 'blackbull[compression]'

The default min_size is 100 bytes — responses smaller than that pass through uncompressed.

Session

Signed-cookie sessions — session data lives entirely in a cookie HMAC-signed by the server. No server-side store, no database. Trade-off: cookies are capped at ~4 KiB by browsers and you can't revoke a session early without rotating the secret.

from blackbull.middleware.session import Session

# Operator sets BB_SESSION_SECRET=<32-byte random> in the deployment env
app.use(Session())

# Or pass the secret explicitly (handy for tests):
app.use(Session(secret=b'<long-random-bytes>'))

@app.route(path='/')
async def index(scope, receive, send):
    scope['session']['user'] = 'alice'           # any JSON-serializable value
    await send(Response('signed in'))

@app.route(path='/whoami')
async def whoami(scope, receive, send):
    await send(Response(scope['session'].get('user', 'anonymous')))

scope['session'] is a dict subclass that tracks whether you mutated it. The middleware only emits Set-Cookie when a request handler changed the session — read-only handlers leave the response cache-friendly.

Secret resolution:

  • The constructor accepts secret= directly.
  • When secret is None, the middleware reads BB_SESSION_SECRET from the environment.
  • If neither is set, construction raises — there is no insecure default. Generate one with:
    python -c "import secrets; print(secrets.token_urlsafe(32))"
    

Cookie attributes (all keyword arguments, with sensible defaults):

Argument Default Notes
cookie_name 'session' Name of the cookie carrying the payload.
max_age None Seconds the cookie is valid. None ⇒ session cookie (until browser closes).
secure True Send the Secure attribute (cookie only over HTTPS).
httponly True Send the HttpOnly attribute (JS can't read the cookie).
samesite 'Lax' 'Strict', 'Lax', 'None', or None (omit).
path '/' Cookie Path.

Clearing the session emits a tombstone cookie with Max-Age=0:

@app.route(path='/logout')
async def logout(scope, receive, send):
    scope['session'].clear()
    await send(Response('signed out'))

A cookie whose signature fails to verify (tampering, wrong secret) is silently dropped — the handler sees an empty session.

Cache

Per-worker in-memory response cache for GET and HEAD. Captures the handler's response on the first hit, stores it under (method, path, query_string), and replays it directly on subsequent matching requests until the entry expires.

from blackbull.middleware.cache import Cache

app.use(Cache(max_age=600))   # 10-minute TTL

@app.route(path='/feed')
async def feed(scope, receive, send):
    items = await fetch_news()           # expensive
    body = render(items).encode()
    await send({'type': 'http.response.start', 'status': 200,
                'headers': [(b'content-type', b'text/html')]})
    await send({'type': 'http.response.body', 'body': body})

A weak ETag (W/"<sha256-prefix>") is generated automatically when the handler doesn't supply one. Subsequent requests with a matching If-None-Match header receive 304 Not Modified with no body.

Standard Cache-Control directives are honoured. Responses carrying no-store, private, or no-cache pass through unstored. Requests with Cache-Control: no-store bypass the cache too.

Constructor arguments:

Argument Default Notes
max_age 300 TTL in seconds when the response does not specify its own.
max_entries 1024 LRU cap on cached responses.
cacheable_methods {'GET', 'HEAD'} Methods eligible for caching.
cacheable_statuses {200, 203, 300, 301, 308, 404, 410, 414, 451} Status codes eligible for caching.
cache_authenticated False When False, requests with Authorization bypass the cache (RFC 9111 §3.5).
generate_etag True Auto-generate ETag when the handler omits it.

Limitations:

  • Per-worker. Multi-worker deployments hold a separate cache in each process.
  • No cross-restart persistence. In-memory only.
  • No Vary matching. Requests differing only by Accept-Encoding or Accept-Language share the same cache slot.
  • No explicit invalidation API. Wait for TTL or restart the worker.
  • Streaming responses (any more_body=True chunk) are forwarded straight through without caching.

CORS

Handles preflight OPTIONS requests and adds the required Access-Control-* headers to actual cross-origin responses.

from blackbull import BlackBull, CORS

app = BlackBull()
app.use(CORS(
    allow_origins=['https://myapp.example.com'],
    allow_methods=['GET', 'POST', 'OPTIONS'],
    allow_headers=['Authorization', 'Content-Type'],
    allow_credentials=True,
    max_age=3600,
))
Parameter Type Default Notes
allow_origins list[str] \| str '*' Explicit origin strings, or '*' for wildcard
allow_methods list[str] ['GET','POST','HEAD','OPTIONS'] Methods allowed in preflight
allow_headers list[str] \| str '*' Request headers allowed
allow_credentials bool False Emit Access-Control-Allow-Credentials: true
expose_headers list[str] [] Response headers the browser JS may read
max_age int \| None 600 Preflight cache seconds; None omits the header

allow_credentials=True cannot be combined with allow_origins=['*'] — the CORS spec forbids it. List explicit origins instead.

Apply to specific route groups rather than globally when only some routes need CORS:

api = app.group(middlewares=[CORS(allow_origins=['https://myapp.example.com'])])

@api.route(path='/items')
async def list_items(): ...

StaticFiles

Serves files from a directory under a URL prefix. See Static files for the configuration surface (URL prefix, range requests, in-memory cache, PROD-mode passthrough).

Recipes

Request ID

Attach a unique ID to every request for distributed tracing:

import uuid

async def request_id_mw(scope, receive, send, call_next):
    req_id = (scope['headers'].get(b'x-request-id', b'')
              or uuid.uuid4().hex.encode())
    scope['request_id'] = (req_id.decode()
                           if isinstance(req_id, bytes) else req_id)

    _send = send
    async def tagged_send(body, status=200, headers=[]):
        await _send(body, status,
                    list(headers) + [(b'x-request-id', scope['request_id'].encode())])

    await call_next(scope, receive, tagged_send)

Rate limiting (token bucket, in-process)

import time
from collections import defaultdict
from http import HTTPStatus
from blackbull import JSONResponse

_buckets: dict[str, tuple[float, int]] = defaultdict(lambda: (time.monotonic(), 0))
RATE_LIMIT = 60   # requests per minute per IP

async def rate_limit_mw(scope, receive, send, call_next):
    ip = (scope.get('client') or ['unknown'])[0]
    now = time.monotonic()
    window_start, count = _buckets[ip]

    if now - window_start > 60:
        _buckets[ip] = (now, 1)
    elif count >= RATE_LIMIT:
        await send(JSONResponse({'error': 'rate limit exceeded'},
                                status=HTTPStatus.TOO_MANY_REQUESTS))
        return
    else:
        _buckets[ip] = (window_start, count + 1)

    await call_next(scope, receive, send)

Per-IP in-process limiting is fine for a single-worker deployment. For multi-worker or multi-host setups, use a shared store (Redis, Memcached) so the bucket survives across workers.

Injecting values into scope

Middleware may add any key to scope for inner layers to consume:

async def auth_mw(scope, receive, send, call_next):
    auth = scope['headers'].get(b'authorization', b'')
    token = auth[7:].decode() if auth.startswith(b'Bearer ') else ''
    user = SESSIONS.get(token)
    if not user:
        await send(JSONResponse({'error': 'Unauthorized'},
                                status=HTTPStatus.UNAUTHORIZED))
        return                  # short-circuit: inner layers never run
    scope['user'] = user        # available to all inner layers
    scope['token'] = token
    await call_next(scope, receive, send)

The path-parameter dict scope['path_params'] and the framework error keys (scope['state']['error_status'], etc.) are populated by the framework — see Error handling.

Next

  • Error handling — custom error handlers, the DEV-mode traceback page.
  • Events@app.on / @app.intercept for observational and interceptive hooks.
  • Static filesStaticFiles middleware configuration.