MQTT broker — actor-model design¶
This page explains how the MQTT broker is built, for readers who want to
understand the internals or extend them. For the user-facing API (on_message,
{name} captures, tap modes) see the MQTT broker guide.
The MQTT broker is the first part of BlackBull that uses the framework's Actor inbox for real — see Relationship to the framework actor model below.
Why an actor model at all¶
A broker is shared mutable state by nature: one routing table, one set of sessions, one retained-message store, all touched by every connection concurrently. The obvious implementation reaches for locks. The actor model removes the need for them: a single actor owns the state and processes one message at a time, so concurrent connections can never interleave inside it.
BlackBull's broker is three kinds of actor:
| Actor | Count | Owns | Inbox carries |
|---|---|---|---|
BrokerActor |
one per app/worker | all routing/session/retained state | client control events (Attach, ClientPublish, …) |
MQTTConnectionActor |
one per connection | one socket's write side | outbound packets (Send, Close) |
TapActor |
one per app/worker | nothing (stateless dispatch) | published messages for on_message taps |
Each lives in its own module: BrokerActor in blackbull.mqtt.broker,
MQTTConnectionActor in blackbull.mqtt.connection, TapActor (with the
Message read-model) in blackbull.mqtt.tap, and the user-facing
MQTTExtension wiring in blackbull.mqtt.extension. The wire codec is separate
again, in blackbull.mqtt.messages.
The two-actor data plane¶
┌──────────────────────── one per connection ────────────────────────┐
socket → reader loop ──decode──► MQTTConnectionActor.run() ──write──► socket
(bytes) │ (control packets) ▲ (sole writer, drains its inbox)
│ │
▼ send(ClientPublish, …) │ send(Send(packet=…)), send(Close)
┌─────────────────────────────────────────────┐
│ BrokerActor.run() │ one per app/worker
│ owns subscriptions / sessions / retained │ (serial inbox → no locks)
└─────────────────────────────────────────────┘
BrokerActor — the state owner¶
BrokerActor is the single locus of routing state: the live-connection
registry, per-client sessions (subscriptions and pending QoS state), retained
messages, and Will templates. It never touches a socket. When it needs to send
something to a client it sends a Send (or Close) message to that client's
connection actor and moves on.
Because it processes its inbox serially, two PUBLISHes from two different connections are handled one after another, never concurrently — so the routing table and session dicts are plain Python objects with no locks and no shared mutable state. This is the property the actor model buys.
MQTTConnectionActor — the sole socket writer¶
Each connection has one MQTTConnectionActor. Its inbox carries only outbound
packets, and its run() loop — draining that inbox — is the only code that
writes to the socket. That single-writer invariant means there are no
cross-task write races, even though the broker, the keep-alive path, and the
reader can all originate outbound traffic.
A sibling reader task (read_loop) does the opposite direction: it decodes
the wire and sends control messages (ClientPublish, ClientSubscribe, …) to
the broker. Stateless replies the connection can answer itself — PINGRESP, AUTH —
it routes back through its own inbox so that run() stays the only writer.
serve_connection is the raw-protocol handler body that wires the reader task
and the writer loop together and guarantees the broker sees a Detach when the
connection ends — including an abnormal (cancelled) close, which is what makes a
Will fire.
The Will-on-teardown payoff¶
Because BrokerActor outlives every connection actor by construction, a peer's
Last-Will-and-Testament routes to live subscribers during that peer's
teardown with no special-casing. An earlier (pre-actor) broker had to keep
global state alive forever to make this work; the long-lived broker actor makes
the crutch unnecessary.
The tap dispatch plane¶
on_message handlers are application-level taps on top of routing — the
broker delivers to subscribers whether or not any tap is registered. Taps have
their own dispatch plane so a slow tap can never stall the data plane.
- actor mode (default). The connection offers each published
Messageto the sharedTapActorwith a non-blocking call and returns immediately. TheTapActorhas a bounded inbox; on overflow it drops the newest message and logs a running dropped-count. A slow tap therefore back-pressures nothing — the cost surfaces as bounded coverage, not latency. - inline mode. The connection awaits the matching callbacks itself, so a
slow tap back-pressures only its own connection. Selected with
MQTTExtension(tap_mode='inline'); retained mainly so thebench/mqtt/tap_throughput.pycomparison is a controlled one-variable test.
Both modes share one matching/invocation path (run_taps), so a tap behaves
identically either way apart from the back-pressure characteristic. {name}
topic captures are compiled once (rewriting {name} to a + level and
recording the capture position) and bound to keyword arguments at dispatch.
Reading the wire: PacketFramer¶
TCP is a byte stream, so a single read may contain several MQTT packets, a
partial packet, or junk from a desynchronised peer. PacketFramer isolates that
framing/resync state machine from the read loop: fed raw bytes, it yields each
fully decoded packet and keeps any trailing partial packet for the next feed. An
incomplete packet ends the current iteration (wait for more bytes); a hard
decode error drops one byte and resyncs.
Why the framer still copies at the decode boundary
PacketFramer snapshots its buffer with bytes(...) before each decode
because the codec's decode_packet input contract is deliberately bytes
(enforced by beartype). A zero-copy framer would mean widening that contract
to the buffer protocol; that is deferred, and the copy is bounded by one
packet's length.
Relationship to the framework actor model¶
The framework's actor-model design invariants describe
an Actor base with an asyncio.Queue inbox and a send / run / _handle
contract. Historically the HTTP actors (ConnectionActor, HTTP1Actor,
HTTP2Actor, StreamActor, …) override run() and call each other through
direct method calls — the inbox is defined but latent on that path.
The MQTT broker is the first production code that uses the inbox for real:
BrokerActor, MQTTConnectionActor, and TapActor all keep the base run()
loop, override _handle(), and communicate exclusively by await
other.send(message). That is why the broker needs no locks — the "one message
at a time" guarantee is the base-class run() draining the queue, not a
convention the broker re-implements.
So the broker is not a parallel mechanism bolted onto the framework; it is the actor model the framework already described, finally exercised end-to-end. If you are looking for a worked example of BlackBull's actor inbox, this is it.