blackbull.mqtt.tap¶
blackbull.mqtt.tap
¶
Application taps on broker routing — the on_message observability layer.
A tap is an async (message, **captures) -> None callback registered via
:meth:blackbull.mqtt.MQTTExtension.on_message for a topic filter. Taps are
best-effort observers on top of normal broker routing; the broker runs whether
or not any tap is registered.
Two dispatch engines share one code path (:func:run_taps):
- actor (default) — :class:
TapActoris a single, lifespan-owned consumer. A connection actor hands it a :class:Messagewith :meth:TapActor.offer(non-blocking) and returns immediately, so a slow tap can never back-pressure the connection or the broker. Its inbox is bounded; on overflow the newest message is dropped and a running dropped-count is logged (taps are best-effort, but silent loss is unacceptable). - inline — the connection actor awaits the callbacks itself. This is the
Sprint 53 contract, retained as an internal option so the perf comparison
(
bench/mqtt/tap_throughput.py) stays reproducible.
A topic filter may carry {name} capture segments
('sensors/{room}/temperature'); {name} matches one level like + and
binds it as a keyword argument to the callback, mirroring HTTP path params.
Message
dataclass
¶
A published message handed to an on_message tap.
The user-facing read-model — a plain, immutable view of one PUBLISH,
deliberately distinct from the wire codec MQTTPublish (which carries the
__iter__/__getitem__ tuple-magic) and from the actor inbox
Message base. Named Message for ecosystem consistency (aiomqtt,
paho): users write from blackbull.mqtt import Message.
Tap
dataclass
¶
A compiled on_message registration.
match_filter is the topic filter with each {name} segment rewritten
to + (so the validated :func:topic_matches_filter does the matching);
captures records the (level_index, name) of each {name} segment
for binding once a topic matches.
display_filter
property
¶
The topic filter as originally written, with {name} captures
restored (match_filter rewrites each to + for matching).
'sensors/{room}/temperature' round-trips back to itself; a plain
'sensors/+/temperature' stays as 'sensors/+/temperature'. Used
by documentation tooling (AsyncAPIExtension) that wants to show the
filter the application declared rather than the internal match form.
bind(topic)
¶
Return captured {name: value} if topic matches, else None.
TapActor
¶
Bases: Actor
Decoupled, lifespan-owned consumer of on_message taps.
Producers call :meth:offer (non-blocking); a single consumer task drains
the bounded inbox and runs the matching taps, so FIFO order of accepted
messages is preserved and tap latency never reaches the connection or broker.
dropped
property
¶
Number of messages dropped on overflow so far (best-effort metric).
offer(message)
¶
Enqueue message without blocking; drop-newest on overflow.
Taps are best-effort observability, so a full queue drops the incoming message rather than back-pressuring the publisher — but the running dropped count is always logged, since silent loss is the one unacceptable outcome.
compile_tap(topic, callback)
¶
Compile a topic filter (possibly with {name} captures) into a :class:Tap.
compile_taps(handlers)
¶
Normalise a handler list to :class:Tap objects.
Accepts already-compiled Tap objects or (topic, callback) pairs, so
direct callers (tests, benchmarks) can keep the lightweight tuple form.
run_taps(taps, message)
async
¶
Invoke every tap whose filter matches message (sequential, isolated).
Captured {name} segments are passed as keyword arguments; a handler
without captures is simply called callback(message).