blackbull.mqtt.connection¶
blackbull.mqtt.connection
¶
MQTT 5.0 per-connection actor and its raw-protocol entry point.
:class:MQTTConnectionActor is one per connection. Its inbox carries only
outbound packets (:class:~blackbull.mqtt.broker.Send /
:class:~blackbull.mqtt.broker.Close from the broker, plus the two stateless
replies it generates itself), and its run() — draining that inbox — is the
sole writer to the socket, so there are no cross-task write races. A sibling
reader loop decodes the wire (via :class:PacketFramer) and sends control
messages to the broker. :func:serve_connection is the
:data:~blackbull.server.protocol_registry.RawProtocolHandler body that wires
the two together.
MQTTConnectionActor
¶
Bases: Actor
One per MQTT connection; the sole writer to its socket.
Tap dispatch is selected at construction: pass a running :class:TapActor
as tap for decoupled (actor-mode) dispatch, or app_handlers for inline
dispatch on this connection (the Sprint 53 contract).
read_loop(reader)
async
¶
Decode the wire and forward control packets to the broker.
Reads block on a real socket; a fake reader may return b'' while
merely idle, in which case we poll. Either way the loop ends on EOF
(at_eof()), a DISCONNECT (sets _done), or task cancellation.
PacketFramer
¶
Incremental MQTT packet de-framer.
Fed raw bytes with :meth:feed, it yields each fully decoded packet on
iteration and retains any trailing partial packet for the next feed. The
framing/resync state machine lives here rather than inline in the read loop:
- an incomplete packet simply ends the current iteration — the partial
bytes stay buffered for the next :meth:
feed(TCP will deliver the rest), - a hard decode error (reserved flag bits, unknown type — the junk a desynchronised stream produces) drops one byte and resyncs.
This replaces Sprint 53's stalled_len bookkeeping. The bytes(...)
snapshot at the decode boundary stays because the codec's input contract is
deliberately bytes; a zero-copy framer would mean widening that contract
to the buffer protocol (deferred).
serve_connection(reader, writer, ctx, broker, *, app_handlers=None, tap=None)
async
¶
Raw-protocol handler body for one MQTT connection.
Spawns the connection actor's inbox-drain (run) alongside the reader loop,
and guarantees the broker sees a Detach when the connection ends — so a
Will fires on an abnormal (cancelled) close. Pass tap for decoupled tap
dispatch or app_handlers for inline dispatch (see :mod:blackbull.mqtt.tap).