Requests and responses¶
How to read what the client sent, build the response, stream a body back, and detect when the client has gone away.
Reading the request body¶
from blackbull import read_body
@app.route(path='/echo', methods=[HTTPMethod.POST])
async def echo(scope, receive, send):
raw: bytes = await read_body(receive)
await send(Response(raw))
read_body reads all body chunks until more_body=False and
returns a single bytes object. The stream is consumed — call
at most once per request (typically inside a middleware, not the
handler itself).
For streaming uploads, call receive() directly — each call
returns one chunk with more_body=True until the final chunk
arrives with more_body=False. read_body is a convenience
wrapper that buffers all chunks before returning.
Recommended JSON-body middleware¶
import json
from http import HTTPStatus
from blackbull import read_body, JSONResponse
async def json_body_mw(scope, receive, send, call_next):
raw = await read_body(receive)
try:
scope['json'] = json.loads(raw)
except (json.JSONDecodeError, ValueError):
await send(JSONResponse({'error': 'Invalid JSON'},
status=HTTPStatus.BAD_REQUEST))
return
await call_next(scope, receive, send)
The handler then reads scope['json'] without touching receive.
Reading request headers¶
scope['headers'] is a Headers object — case-insensitive, ordered,
multi-valued.
# First value for a header; returns b'' when absent
ct = scope['headers'].get(b'content-type')
auth = scope['headers'].get(b'authorization', b'')
# All (name, value) pairs for a header (multi-value support)
pairs = scope['headers'].getlist(b'accept') # list[tuple[bytes, bytes]]
# ASGI-compliant iteration
for name, value in scope['headers']:
...
# Membership test
if b'content-length' in scope['headers']:
...
Header names are stored lowercase.
get vs getlist¶
Use .get(name) for headers that appear at most once
(content-type, authorization, host). Use .getlist(name)
for headers that may repeat:
| Header | Why it can repeat |
|---|---|
accept, accept-encoding |
Clients may send multiple preference lines |
set-cookie |
Servers send one Set-Cookie field per cookie |
cookie (HTTP/2) |
RFC 7540 §8.1.2.5 requires one field per cookie pair |
.getlist returns list[tuple[bytes, bytes]] — the full
(name, value) pairs in insertion order, or [] if the header
is absent.
Why cookie needs getlist on HTTP/2
HTTP/1.1 combines all cookies into a single
Cookie: a=1; b=2 field. HTTP/2 sends each cookie as a
separate header field to enable HPACK compression of
individual values. Calling .get(b'cookie') on an HTTP/2
scope silently discards all but the first cookie.
parse_cookies (below) handles this correctly for all
protocols.
Reading cookies¶
from blackbull import parse_cookies
cookies: dict[str, str] = parse_cookies(scope)
session = cookies.get('session', '')
parse_cookies(scope) returns a dict[str, str] of cookie name
→ value from the current request. The result is identical
across HTTP/1.1, HTTP/2, and WebSocket scopes — you don't need
to know how the client delivered the cookies.
Query parameters¶
scope['query_string'] contains the raw query string as bytes.
Parse it with the standard library:
from urllib.parse import parse_qs, parse_qsl
# parse_qs: each key maps to a list of values (handles ?tag=a&tag=b correctly)
params = parse_qs(scope['query_string'].decode())
page = int(params.get('page', ['1'])[0])
tags = params.get('tag', []) # ['a', 'b'] for ?tag=a&tag=b
# parse_qsl: flat list of (key, value) pairs preserving order
pairs = parse_qsl(scope['query_string'].decode())
For convenience, wrap this in a helper:
def qp(scope) -> dict[str, str]:
"""Return first value for each query parameter key."""
return {k: v[0]
for k, v in parse_qs(scope['query_string'].decode()).items()}
@app.route(path='/tasks')
async def list_tasks(scope, receive, send):
p = qp(scope)
done = p.get('done', 'false').lower() == 'true'
await send(JSONResponse({'done_filter': done}))
Form data¶
HTML forms with enctype="application/x-www-form-urlencoded" (the
default) send key=value pairs in the body. Read and parse with
read_body + parse_qs:
from urllib.parse import parse_qs
from blackbull import read_body
async def form_body_mw(scope, receive, send, call_next):
"""Parse application/x-www-form-urlencoded body; inject scope['form']."""
raw = await read_body(receive)
scope['form'] = {k: v[0] for k, v in parse_qs(raw.decode()).items()}
await call_next(scope, receive, send)
@app.route(methods=[HTTPMethod.POST], path='/submit', middlewares=[form_body_mw])
async def submit(scope, receive, send):
name = scope['form'].get('name', '')
await send(JSONResponse({'received': name}))
Multipart file uploads (multipart/form-data) are not yet
supported by a built-in helper. Use the python-multipart
package to parse the body manually.
Responses¶
Response¶
from blackbull import Response
from http import HTTPStatus
await send(Response(b'<h1>Hello</h1>'))
await send(Response('Hello', status=HTTPStatus.OK)) # str also accepted
await send(Response(b'Not found', status=HTTPStatus.NOT_FOUND))
# Redirect
await send(Response(b'', status=HTTPStatus.FOUND,
headers=[(b'location', b'/')]))
Default content_type is 'text/html; charset=utf-8'. Override
via the content_type parameter:
Response(b'plain text', content_type='text/plain; charset=utf-8')
JSONResponse¶
from blackbull import JSONResponse
await send(JSONResponse({'ok': True}))
await send(JSONResponse({'error': 'Bad request'}, status=HTTPStatus.BAD_REQUEST))
await send(JSONResponse({'id': 1, 'title': 'Buy milk'}, status=HTTPStatus.CREATED))
Content-Type is set to application/json automatically.
Custom response headers¶
Both Response and JSONResponse accept headers=[(bytes, bytes), ...]:
await send(JSONResponse({'ok': True}, headers=[
(b'x-request-id', b'abc123'),
(b'cache-control', b'no-store'),
]))
Set-Cookie helper¶
from blackbull import cookie_header
hdr = cookie_header('session', token, http_only=True)
# → (b'set-cookie', b'session=TOKEN; Path=/; HttpOnly; SameSite=Lax')
await send(JSONResponse({'ok': True}, headers=[hdr]))
Signature: cookie_header(name, value, path='/', http_only=True).
Cookies vs. tokens for SPA clients
Browsers may not reliably forward HttpOnly cookies set by a
fetch() response on the next page navigation. For
single-page apps, store the session token in sessionStorage
and send it as Authorization: Bearer <token> instead.
HTTP trailers¶
HTTP/1.1 chunked responses can carry trailing headers after the
body. Use the http.response.trailers event after the last
http.response.body chunk:
@app.route(path='/chunked')
async def chunked(scope, receive, send):
await send({
'type': 'http.response.start',
'status': 200,
'headers': [
(b'content-type', b'text/plain'),
(b'transfer-encoding', b'chunked'),
(b'trailer', b'x-checksum'),
],
})
await send({
'type': 'http.response.body',
'body': b'chunk data here',
'more_body': True,
})
await send({
'type': 'http.response.trailers',
'headers': [(b'x-checksum', b'abc123')],
})
WebSocket frames¶
For WebSocket routes (scheme=Scheme.websocket), use
WebSocketResponse to dispatch on payload type automatically:
from blackbull import WebSocketResponse
await send(WebSocketResponse('hello')) # str → text frame
await send(WebSocketResponse(b'\x00\x01')) # bytes → binary frame
await send(WebSocketResponse({'type': 'msg'})) # other → JSON-serialised text frame
See WebSockets for the full WebSocket surface.
Streaming responses¶
Use StreamingResponse to push an async generator to the client
without buffering the whole body in memory. The class calls
send directly, so it works as a nested ASGI app:
import asyncio
from blackbull import BlackBull, StreamingResponse
app = BlackBull()
async def countdown():
for i in range(5, 0, -1):
yield f'{i}\n'.encode()
await asyncio.sleep(1)
yield b'done\n'
@app.route(path='/stream')
async def handler(scope, receive, send):
await StreamingResponse(countdown())(scope, receive, send)
StreamingResponse.__init__ accepts:
| parameter | default | description |
|---|---|---|
content |
— | AsyncIterator of bytes or str chunks |
status |
200 |
HTTP status code |
headers |
[] |
extra header tuples (bytes, bytes) |
media_type |
'text/plain' |
Content-Type value (injected if absent from headers) |
str chunks are encoded to UTF-8 automatically.
How HTTP/1.1 delivers streaming responses¶
When more_body=True appears on the first http.response.body
event, the HTTP/1.1 sender adds Transfer-Encoding: chunked to
the response headers and formats each body event as a hex-length
chunk:
5\r\n
hello\r\n
5\r\n
world\r\n
0\r\n
\r\n
The terminal 0\r\n\r\n is written automatically when
more_body=False arrives (unless trailers=True was set in
http.response.start, in which case the
http.response.trailers handler writes it).
HTTP/2 is unaffected — DATA frames carry explicit length and
END_STREAM maps to more_body=False.
Writing streaming-safe middleware¶
Any middleware that wraps the send callable and collects body
parts will silently buffer a streaming response, defeating
more_body=True.
For function-based middleware, the safest approach is to pass body events through immediately rather than accumulating them:
async def prefix_mw(scope, receive, send, call_next):
captured_start = None
async def capturing_send(event):
nonlocal captured_start
if event.get('type') == 'http.response.start':
captured_start = event
elif event.get('type') == 'http.response.body':
if event.get('more_body'):
# Streaming response — pass through without buffering
await send(captured_start)
captured_start = None
await send(event)
else:
# Non-streaming — transform the body
body = b'[prefix] ' + event.get('body', b'')
await send(captured_start)
await send({**event, 'body': body})
else:
await send(event)
await call_next(scope, receive, capturing_send)
For most use cases (header injection, logging) the middleware does not touch the body at all and streaming safety is not a concern.
Detecting client disconnection¶
When the remote side closes the connection, receive() returns
{'type': 'http.disconnect'}. This is useful for long-polling
and server-sent events (SSE) — your handler can check whether
the client is still there before writing more.
@app.route(path='/events')
async def sse(scope, receive, send):
await send({
'type': 'http.response.start',
'status': 200,
'headers': [
(b'content-type', b'text/event-stream'),
(b'cache-control', b'no-cache'),
],
})
while True:
event = await receive()
if event['type'] == 'http.disconnect':
break
await send({
'type': 'http.response.body',
'body': b'data: ping\n\n',
'more_body': True,
})
await send({'type': 'http.response.body', 'body': b'', 'more_body': False})
For most streaming use cases, prefer StreamingResponse (above)
— it wraps an async generator and emits more_body=True on every
chunk automatically. The raw event pattern is convenient when
each chunk needs custom shaping (e.g. SSE event framing).
Passing Response(b'') to send ends the connection immediately
with Content-Length: 0 and cannot be followed by additional
chunks.
Next¶
- Routing —
@app.route, path parameters, route groups. - Middleware — wrapping handlers with cross-cutting concerns.
- Static files — serving file-system content under a URL prefix.