Skip to content

blackbull.mqtt.connection

blackbull.mqtt.connection

MQTT 5.0 per-connection actor and its raw-protocol entry point.

:class:MQTTConnectionActor is one per connection. Its inbox carries only outbound packets (:class:~blackbull.mqtt.broker.Send / :class:~blackbull.mqtt.broker.Close from the broker, plus the two stateless replies it generates itself), and its run() — draining that inbox — is the sole writer to the socket, so there are no cross-task write races. A sibling reader loop decodes the wire (via :class:PacketFramer) and sends control messages to the broker. :func:serve_connection is the :data:~blackbull.server.protocol_registry.RawProtocolHandler body that wires the two together.

MQTTConnectionActor

Bases: Actor

One per MQTT connection; the sole writer to its socket.

Tap dispatch is selected at construction: pass a running :class:TapActor as tap for decoupled (actor-mode) dispatch, or app_handlers for inline dispatch on this connection (the Sprint 53 contract).

read_loop(reader) async

Decode the wire and forward control packets to the broker.

Reads block on a real socket; a fake reader may return b'' while merely idle, in which case we poll. Either way the loop ends on EOF (at_eof()), a DISCONNECT (sets _done), or task cancellation.

PacketFramer

Incremental MQTT packet de-framer.

Fed raw bytes with :meth:feed, it yields each fully decoded packet on iteration and retains any trailing partial packet for the next feed. The framing/resync state machine lives here rather than inline in the read loop:

  • an incomplete packet simply ends the current iteration — the partial bytes stay buffered for the next :meth:feed (TCP will deliver the rest),
  • a hard decode error (reserved flag bits, unknown type — the junk a desynchronised stream produces) drops one byte and resyncs.

This replaces Sprint 53's stalled_len bookkeeping. The bytes(...) snapshot at the decode boundary stays because the codec's input contract is deliberately bytes; a zero-copy framer would mean widening that contract to the buffer protocol (deferred).

serve_connection(reader, writer, ctx, broker, *, app_handlers=None, tap=None) async

Raw-protocol handler body for one MQTT connection.

Spawns the connection actor's inbox-drain (run) alongside the reader loop, and guarantees the broker sees a Detach when the connection ends — so a Will fires on an abnormal (cancelled) close. Pass tap for decoupled tap dispatch or app_handlers for inline dispatch (see :mod:blackbull.mqtt.tap).