Skip to content

blackbull.mqtt.extension

blackbull.mqtt.extension

User-facing wiring for the MQTT 5 broker — detector + :class:MQTTExtension.

MQTTExtension

Bases: Extension

Wires the MQTT 5 broker into a BlackBull app as a non-ASGI protocol.

Register it once and tap the broker's routing with :meth:on_message::

from blackbull.mqtt import MQTTExtension, Message

mqtt = app.add_extension(MQTTExtension(port=1883))

@mqtt.on_message(topic='sensors/{room}/temperature')
async def on_temp(msg: Message, room: str):
    print(room, msg.payload)

The broker itself (CONNECT/SUBSCRIBE/PUBLISH/QoS flows, retained messages, Will delivery) runs whether or not any handler is registered; handlers are an application-level tap on top of normal broker routing.

This instance owns a single :class:BrokerActor and a single :class:TapActor (both started on app startup, stopped on shutdown) — one broker and one tap consumer per worker, no module globals. The class is self-contained and importable from a future blackbull-mqtt package without touching the core.

tap_mode selects how taps are dispatched: 'actor' (default) runs them on the decoupled :class:TapActor so a slow tap never back-pressures delivery; 'inline' reproduces the Sprint 53 connection-actor dispatch and exists mainly so bench/mqtt/tap_throughput.py can compare the two on one build. tap_queue_size bounds the actor-mode inbox (drop-newest on overflow).

iter_subscriptions()

Yield a :class:Subscription for each registered on_message tap.

A stable, public, read-only accessor over the compiled handlers — the seam AsyncAPIExtension reads instead of touching _handlers. Reflects the handlers registered at call time, so a documentation endpoint that calls this per request picks up taps added after the documenting extension was wired in.

on_message(topic='#')

Decorator: register an async (message, **captures) -> None tap for every PUBLISH whose topic matches topic (an MQTT topic filter, so + and # wildcards apply, plus {name} capture segments). The callback receives a :class:~blackbull.mqtt.Message.

shutdown(app) async

Stop the broker and tap actors on lifespan shutdown.

startup(app) async

Start the broker (and, in actor mode, the tap) inbox loops.

MQTTProtocolDetector

Bases: ProtocolDetector

Recognise an MQTT connection from its first byte.

Every MQTT session opens with a CONNECT packet whose fixed-header first byte is 0x10 (type 1, flags 0). That is unambiguous against the HTTP request line and the HTTP/2 preface, both of which start with ASCII text.

Subscription

Bases: NamedTuple

A read-only view of one registered on_message tap.

Yielded by :meth:MQTTExtension.iter_subscriptions so documentation tools (AsyncAPIExtension) can describe an app's taps without reaching into the private _handlers list. topic is the filter as the application wrote it ({name} captures restored); callback is the registered coroutine function (its __name__/docstring name the AsyncAPI operation).