blackbull.mqtt.extension¶
blackbull.mqtt.extension
¶
User-facing wiring for the MQTT 5 broker — detector + :class:MQTTExtension.
MQTTExtension
¶
Bases: Extension
Wires the MQTT 5 broker into a BlackBull app as a non-ASGI protocol.
Register it once and tap the broker's routing with :meth:on_message::
from blackbull.mqtt import MQTTExtension, Message
mqtt = app.add_extension(MQTTExtension(port=1883))
@mqtt.on_message(topic='sensors/{room}/temperature')
async def on_temp(msg: Message, room: str):
print(room, msg.payload)
The broker itself (CONNECT/SUBSCRIBE/PUBLISH/QoS flows, retained messages, Will delivery) runs whether or not any handler is registered; handlers are an application-level tap on top of normal broker routing.
This instance owns a single :class:BrokerActor and a single
:class:TapActor (both started on app startup, stopped on shutdown) — one
broker and one tap consumer per worker, no module globals. The class is
self-contained and importable from a future blackbull-mqtt package
without touching the core.
tap_mode selects how taps are dispatched: 'actor' (default) runs
them on the decoupled :class:TapActor so a slow tap never back-pressures
delivery; 'inline' reproduces the Sprint 53 connection-actor dispatch
and exists mainly so bench/mqtt/tap_throughput.py can compare the two on
one build. tap_queue_size bounds the actor-mode inbox (drop-newest on
overflow).
iter_subscriptions()
¶
Yield a :class:Subscription for each registered on_message tap.
A stable, public, read-only accessor over the compiled handlers — the
seam AsyncAPIExtension reads instead of touching _handlers.
Reflects the handlers registered at call time, so a documentation
endpoint that calls this per request picks up taps added after the
documenting extension was wired in.
on_message(topic='#')
¶
Decorator: register an async (message, **captures) -> None tap for
every PUBLISH whose topic matches topic (an MQTT topic filter, so +
and # wildcards apply, plus {name} capture segments). The
callback receives a :class:~blackbull.mqtt.Message.
shutdown(app)
async
¶
Stop the broker and tap actors on lifespan shutdown.
startup(app)
async
¶
Start the broker (and, in actor mode, the tap) inbox loops.
MQTTProtocolDetector
¶
Bases: ProtocolDetector
Recognise an MQTT connection from its first byte.
Every MQTT session opens with a CONNECT packet whose fixed-header first
byte is 0x10 (type 1, flags 0). That is unambiguous against the HTTP
request line and the HTTP/2 preface, both of which start with ASCII text.
Subscription
¶
Bases: NamedTuple
A read-only view of one registered on_message tap.
Yielded by :meth:MQTTExtension.iter_subscriptions so documentation tools
(AsyncAPIExtension) can describe an app's taps without reaching into the
private _handlers list. topic is the filter as the application
wrote it ({name} captures restored); callback is the registered
coroutine function (its __name__/docstring name the AsyncAPI operation).