blackbull.mqtt¶
blackbull.mqtt
¶
MQTT 5 broker — a non-core "bridge" protocol shipped with BlackBull.
BlackBull's core protocols are the HTTP family (HTTP/1.1, HTTP/2, and — as
they land — gRPC and HTTP/3), which share the from-scratch HTTP stack the
framework exists to implement. MQTT is a bridge protocol: an independent
protocol family that rides the Non-ASGI bridge but shares none of the HTTP
protocol logic. It lives in its own subpackage so the boundary is explicit and
so it can be extracted to a standalone blackbull-mqtt distribution later
without touching the core (see docs/guide/mqtt.md).
Wire it in through the generic extension seam::
from blackbull import BlackBull
from blackbull.mqtt import MQTTExtension
app = BlackBull()
mqtt = app.add_extension(MQTTExtension(port=1883))
@mqtt.on_message(topic='sensors/+/temperature')
async def on_temp(msg: Message):
print(msg.topic, msg.payload)
AsyncAPIExtension
¶
Bases: Extension
Mount an AsyncAPI 3.0 spec endpoint (+ HTML viewer) describing the MQTT taps.
Requires an MQTTExtension on the same app; it is read lazily (per
request) via app.extensions['mqtt'], so there is no ordering
constraint at registration — and taps added after this extension are
still documented. The MQTT extension must, however, be present when the
spec route is hit, or the request raises RuntimeError.
Two GET routes are registered (mirroring OpenAPIExtension):
spec_path(default/asyncapi.json) → the document as JSON;docs_path(default/asyncapi,Noneto skip) → an HTML page hosting the CDN AsyncAPI renderer.
init_app(app)
¶
Wire the spec + docs routes onto app through the public API.
BrokerActor
¶
Bases: Actor
Single owner of all MQTT routing state (one per app/worker).
Connection actors send it client events; it sends Send/Close
back to them. Serial inbox processing means no locks and no shared mutable
state — the design goal of the Sprint 53 refactor.
MQTTExtension
¶
Bases: Extension
Wires the MQTT 5 broker into a BlackBull app as a non-ASGI protocol.
Register it once and tap the broker's routing with :meth:on_message::
from blackbull.mqtt import MQTTExtension, Message
mqtt = app.add_extension(MQTTExtension(port=1883))
@mqtt.on_message(topic='sensors/{room}/temperature')
async def on_temp(msg: Message, room: str):
print(room, msg.payload)
The broker itself (CONNECT/SUBSCRIBE/PUBLISH/QoS flows, retained messages, Will delivery) runs whether or not any handler is registered; handlers are an application-level tap on top of normal broker routing.
This instance owns a single :class:BrokerActor and a single
:class:TapActor (both started on app startup, stopped on shutdown) — one
broker and one tap consumer per worker, no module globals. The class is
self-contained and importable from a future blackbull-mqtt package
without touching the core.
tap_mode selects how taps are dispatched: 'actor' (default) runs
them on the decoupled :class:TapActor so a slow tap never back-pressures
delivery; 'inline' reproduces the Sprint 53 connection-actor dispatch
and exists mainly so bench/mqtt/tap_throughput.py can compare the two on
one build. tap_queue_size bounds the actor-mode inbox (drop-newest on
overflow).
iter_subscriptions()
¶
Yield a :class:Subscription for each registered on_message tap.
A stable, public, read-only accessor over the compiled handlers — the
seam AsyncAPIExtension reads instead of touching _handlers.
Reflects the handlers registered at call time, so a documentation
endpoint that calls this per request picks up taps added after the
documenting extension was wired in.
on_message(topic='#')
¶
Decorator: register an async (message, **captures) -> None tap for
every PUBLISH whose topic matches topic (an MQTT topic filter, so +
and # wildcards apply, plus {name} capture segments). The
callback receives a :class:~blackbull.mqtt.Message.
shutdown(app)
async
¶
Stop the broker and tap actors on lifespan shutdown.
startup(app)
async
¶
Start the broker (and, in actor mode, the tap) inbox loops.
MQTTProtocolDetector
¶
Bases: ProtocolDetector
Recognise an MQTT connection from its first byte.
Every MQTT session opens with a CONNECT packet whose fixed-header first
byte is 0x10 (type 1, flags 0). That is unambiguous against the HTTP
request line and the HTTP/2 preface, both of which start with ASCII text.
Message
dataclass
¶
A published message handed to an on_message tap.
The user-facing read-model — a plain, immutable view of one PUBLISH,
deliberately distinct from the wire codec MQTTPublish (which carries the
__iter__/__getitem__ tuple-magic) and from the actor inbox
Message base. Named Message for ecosystem consistency (aiomqtt,
paho): users write from blackbull.mqtt import Message.
Subscription
¶
Bases: NamedTuple
A read-only view of one registered on_message tap.
Yielded by :meth:MQTTExtension.iter_subscriptions so documentation tools
(AsyncAPIExtension) can describe an app's taps without reaching into the
private _handlers list. topic is the filter as the application
wrote it ({name} captures restored); callback is the registered
coroutine function (its __name__/docstring name the AsyncAPI operation).
TapActor
¶
Bases: Actor
Decoupled, lifespan-owned consumer of on_message taps.
Producers call :meth:offer (non-blocking); a single consumer task drains
the bounded inbox and runs the matching taps, so FIFO order of accepted
messages is preserved and tap latency never reaches the connection or broker.
dropped
property
¶
Number of messages dropped on overflow so far (best-effort metric).
offer(message)
¶
Enqueue message without blocking; drop-newest on overflow.
Taps are best-effort observability, so a full queue drops the incoming message rather than back-pressuring the publisher — but the running dropped count is always logged, since silent loss is the one unacceptable outcome.
serve_connection(reader, writer, ctx, broker, *, app_handlers=None, tap=None)
async
¶
Raw-protocol handler body for one MQTT connection.
Spawns the connection actor's inbox-drain (run) alongside the reader loop,
and guarantees the broker sees a Detach when the connection ends — so a
Will fires on an abnormal (cancelled) close. Pass tap for decoupled tap
dispatch or app_handlers for inline dispatch (see :mod:blackbull.mqtt.tap).