Skip to content

blackbull.mqtt.tap

blackbull.mqtt.tap

Application taps on broker routing — the on_message observability layer.

A tap is an async (message, **captures) -> None callback registered via :meth:blackbull.mqtt.MQTTExtension.on_message for a topic filter. Taps are best-effort observers on top of normal broker routing; the broker runs whether or not any tap is registered.

Two dispatch engines share one code path (:func:run_taps):

  • actor (default) — :class:TapActor is a single, lifespan-owned consumer. A connection actor hands it a :class:Message with :meth:TapActor.offer (non-blocking) and returns immediately, so a slow tap can never back-pressure the connection or the broker. Its inbox is bounded; on overflow the newest message is dropped and a running dropped-count is logged (taps are best-effort, but silent loss is unacceptable).
  • inline — the connection actor awaits the callbacks itself. This is the Sprint 53 contract, retained as an internal option so the perf comparison (bench/mqtt/tap_throughput.py) stays reproducible.

A topic filter may carry {name} capture segments ('sensors/{room}/temperature'); {name} matches one level like + and binds it as a keyword argument to the callback, mirroring HTTP path params.

Message dataclass

A published message handed to an on_message tap.

The user-facing read-model — a plain, immutable view of one PUBLISH, deliberately distinct from the wire codec MQTTPublish (which carries the __iter__/__getitem__ tuple-magic) and from the actor inbox Message base. Named Message for ecosystem consistency (aiomqtt, paho): users write from blackbull.mqtt import Message.

Tap dataclass

A compiled on_message registration.

match_filter is the topic filter with each {name} segment rewritten to + (so the validated :func:topic_matches_filter does the matching); captures records the (level_index, name) of each {name} segment for binding once a topic matches.

display_filter property

The topic filter as originally written, with {name} captures restored (match_filter rewrites each to + for matching).

'sensors/{room}/temperature' round-trips back to itself; a plain 'sensors/+/temperature' stays as 'sensors/+/temperature'. Used by documentation tooling (AsyncAPIExtension) that wants to show the filter the application declared rather than the internal match form.

bind(topic)

Return captured {name: value} if topic matches, else None.

TapActor

Bases: Actor

Decoupled, lifespan-owned consumer of on_message taps.

Producers call :meth:offer (non-blocking); a single consumer task drains the bounded inbox and runs the matching taps, so FIFO order of accepted messages is preserved and tap latency never reaches the connection or broker.

dropped property

Number of messages dropped on overflow so far (best-effort metric).

offer(message)

Enqueue message without blocking; drop-newest on overflow.

Taps are best-effort observability, so a full queue drops the incoming message rather than back-pressuring the publisher — but the running dropped count is always logged, since silent loss is the one unacceptable outcome.

TapDeliver dataclass

Bases: Message

Hand a published :class:Message to the :class:TapActor.

compile_tap(topic, callback)

Compile a topic filter (possibly with {name} captures) into a :class:Tap.

compile_taps(handlers)

Normalise a handler list to :class:Tap objects.

Accepts already-compiled Tap objects or (topic, callback) pairs, so direct callers (tests, benchmarks) can keep the lightweight tuple form.

run_taps(taps, message) async

Invoke every tap whose filter matches message (sequential, isolated).

Captured {name} segments are passed as keyword arguments; a handler without captures is simply called callback(message).