Skip to content

blackbull.mqtt

blackbull.mqtt

MQTT 5 broker — a non-core "bridge" protocol shipped with BlackBull.

BlackBull's core protocols are the HTTP family (HTTP/1.1, HTTP/2, and — as they land — gRPC and HTTP/3), which share the from-scratch HTTP stack the framework exists to implement. MQTT is a bridge protocol: an independent protocol family that rides the Non-ASGI bridge but shares none of the HTTP protocol logic. It lives in its own subpackage so the boundary is explicit and so it can be extracted to a standalone blackbull-mqtt distribution later without touching the core (see docs/guide/mqtt.md).

Wire it in through the generic extension seam::

from blackbull import BlackBull
from blackbull.mqtt import MQTTExtension

app = BlackBull()
mqtt = app.add_extension(MQTTExtension(port=1883))

@mqtt.on_message(topic='sensors/+/temperature')
async def on_temp(msg: Message):
    print(msg.topic, msg.payload)

AsyncAPIExtension

Bases: Extension

Mount an AsyncAPI 3.0 spec endpoint (+ HTML viewer) describing the MQTT taps.

Requires an MQTTExtension on the same app; it is read lazily (per request) via app.extensions['mqtt'], so there is no ordering constraint at registration — and taps added after this extension are still documented. The MQTT extension must, however, be present when the spec route is hit, or the request raises RuntimeError.

Two GET routes are registered (mirroring OpenAPIExtension):

  • spec_path (default /asyncapi.json) → the document as JSON;
  • docs_path (default /asyncapi, None to skip) → an HTML page hosting the CDN AsyncAPI renderer.

init_app(app)

Wire the spec + docs routes onto app through the public API.

BrokerActor

Bases: Actor

Single owner of all MQTT routing state (one per app/worker).

Connection actors send it client events; it sends Send/Close back to them. Serial inbox processing means no locks and no shared mutable state — the design goal of the Sprint 53 refactor.

MQTTExtension

Bases: Extension

Wires the MQTT 5 broker into a BlackBull app as a non-ASGI protocol.

Register it once and tap the broker's routing with :meth:on_message::

from blackbull.mqtt import MQTTExtension, Message

mqtt = app.add_extension(MQTTExtension(port=1883))

@mqtt.on_message(topic='sensors/{room}/temperature')
async def on_temp(msg: Message, room: str):
    print(room, msg.payload)

The broker itself (CONNECT/SUBSCRIBE/PUBLISH/QoS flows, retained messages, Will delivery) runs whether or not any handler is registered; handlers are an application-level tap on top of normal broker routing.

This instance owns a single :class:BrokerActor and a single :class:TapActor (both started on app startup, stopped on shutdown) — one broker and one tap consumer per worker, no module globals. The class is self-contained and importable from a future blackbull-mqtt package without touching the core.

tap_mode selects how taps are dispatched: 'actor' (default) runs them on the decoupled :class:TapActor so a slow tap never back-pressures delivery; 'inline' reproduces the Sprint 53 connection-actor dispatch and exists mainly so bench/mqtt/tap_throughput.py can compare the two on one build. tap_queue_size bounds the actor-mode inbox (drop-newest on overflow).

iter_subscriptions()

Yield a :class:Subscription for each registered on_message tap.

A stable, public, read-only accessor over the compiled handlers — the seam AsyncAPIExtension reads instead of touching _handlers. Reflects the handlers registered at call time, so a documentation endpoint that calls this per request picks up taps added after the documenting extension was wired in.

on_message(topic='#')

Decorator: register an async (message, **captures) -> None tap for every PUBLISH whose topic matches topic (an MQTT topic filter, so + and # wildcards apply, plus {name} capture segments). The callback receives a :class:~blackbull.mqtt.Message.

shutdown(app) async

Stop the broker and tap actors on lifespan shutdown.

startup(app) async

Start the broker (and, in actor mode, the tap) inbox loops.

MQTTProtocolDetector

Bases: ProtocolDetector

Recognise an MQTT connection from its first byte.

Every MQTT session opens with a CONNECT packet whose fixed-header first byte is 0x10 (type 1, flags 0). That is unambiguous against the HTTP request line and the HTTP/2 preface, both of which start with ASCII text.

Message dataclass

A published message handed to an on_message tap.

The user-facing read-model — a plain, immutable view of one PUBLISH, deliberately distinct from the wire codec MQTTPublish (which carries the __iter__/__getitem__ tuple-magic) and from the actor inbox Message base. Named Message for ecosystem consistency (aiomqtt, paho): users write from blackbull.mqtt import Message.

Subscription

Bases: NamedTuple

A read-only view of one registered on_message tap.

Yielded by :meth:MQTTExtension.iter_subscriptions so documentation tools (AsyncAPIExtension) can describe an app's taps without reaching into the private _handlers list. topic is the filter as the application wrote it ({name} captures restored); callback is the registered coroutine function (its __name__/docstring name the AsyncAPI operation).

TapActor

Bases: Actor

Decoupled, lifespan-owned consumer of on_message taps.

Producers call :meth:offer (non-blocking); a single consumer task drains the bounded inbox and runs the matching taps, so FIFO order of accepted messages is preserved and tap latency never reaches the connection or broker.

dropped property

Number of messages dropped on overflow so far (best-effort metric).

offer(message)

Enqueue message without blocking; drop-newest on overflow.

Taps are best-effort observability, so a full queue drops the incoming message rather than back-pressuring the publisher — but the running dropped count is always logged, since silent loss is the one unacceptable outcome.

serve_connection(reader, writer, ctx, broker, *, app_handlers=None, tap=None) async

Raw-protocol handler body for one MQTT connection.

Spawns the connection actor's inbox-drain (run) alongside the reader loop, and guarantees the broker sees a Detach when the connection ends — so a Will fires on an abnormal (cancelled) close. Pass tap for decoupled tap dispatch or app_handlers for inline dispatch (see :mod:blackbull.mqtt.tap).