blackbull.app¶
blackbull.app
¶
BlackBull application object — the user-facing ASGI 3.0 entry point.
Exposes the BlackBull class which wraps a Router, an ErrorRouter,
lifespan hooks, per-route and global middleware chains, and (via
app.static) static-file serving. BlackBull.__call__ is the ASGI
callable: it dispatches lifespan events to _handle_lifespan and routes
HTTP / WebSocket scopes through the global-middleware chain ending in
_dispatch.
Companion definitions live in this module to avoid circular imports:
RouteGroup— returned byapp.group(middlewares=[...])to share a middleware prefix across routes._default_error_handler— registered on everyHTTPStatuserror and onExceptionso unhandled errors produce a sensible plain-text reply._wrap_send— adapts the ASGIsendcallable so handlers may passResponseobjects directly.
BlackBull
¶
enable_openapi(*, title='BlackBull API', version='0.1.0', description=None, spec_path='/openapi.json', docs_path='/docs')
¶
Auto-publish an OpenAPI 3.1 spec and Swagger UI for the app.
Registers two routes:
spec_path(default/openapi.json) returns the spec as JSON. The spec is regenerated on every request so new routes added after this call are reflected.docs_path(default/docs) returns an HTML page hosting Swagger UI pointed atspec_path. Passdocs_path=Noneto skip the UI route and serve only the JSON spec.
Call once, after the rest of the app's routes have been registered.
group(middlewares=[])
¶
Return a RouteGroup that prepends middlewares to every route.
intercept(event_name)
¶
Decorate a handler to intercept event_name synchronously.
The handler is awaited in registration order when the event fires. Exceptions propagate to the emitter and abort subsequent interceptors registered for the same event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_name
|
str
|
Name of the event to intercept (e.g. |
required |
Returns:
| Type | Description |
|---|---|
Callable[[EventHandler], EventHandler]
|
A decorator that registers the wrapped coroutine and returns it |
Callable[[EventHandler], EventHandler]
|
unchanged. |
Example
@app.intercept('app_startup')
async def handler(event: Event):
await setup()
on(event_name)
¶
Decorate a handler to observe event_name (fire-and-forget).
The handler is scheduled as an independent asyncio.Task each time
the event fires. Exceptions raised by the handler are caught and
logged; they do not propagate to the emitter or affect other handlers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_name
|
str
|
Name of the event to observe (e.g. |
required |
Returns:
| Type | Description |
|---|---|
Callable[[EventHandler], EventHandler]
|
A decorator that registers the wrapped coroutine and returns it |
Callable[[EventHandler], EventHandler]
|
unchanged. |
Example
@app.on('app_startup')
async def handler(event: Event):
print(event.detail)
on_error(key)
¶
Register a custom error handler for an HTTPStatus or exception class.
Usage::
@app.on_error(HTTPStatus.FORBIDDEN)
async def handle_403(scope, receive, send):
...
@app.on_error(ValueError)
async def handle_value_error(scope, receive, send):
...
The handler receives (scope, receive, send). scope['state'] contains: - 'error_status' : HTTPStatus - 'error_exception' : exception instance (when triggered by an exception) - 'allowed_methods' : allowed method names (for 405)
on_shutdown(fn)
¶
Register a zero-argument coroutine to run at lifespan shutdown.
The handler is wrapped in an adapter and registered as an
'app_shutdown' interception handler so it runs before the ASGI
server receives the lifespan.shutdown.complete acknowledgement.
Shutdown handlers run in registration order; an exception aborts the
remaining handlers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fn
|
Callable[[], Awaitable[None]]
|
Async callable that takes no arguments. |
required |
Returns:
| Type | Description |
|---|---|
Callable[[], Awaitable[None]]
|
fn unchanged, so the decorator can be stacked or the function |
Callable[[], Awaitable[None]]
|
used normally after registration. |
Example
@app.on_shutdown
async def close_db():
await db.disconnect()
on_startup(fn)
¶
Register a zero-argument coroutine to run at lifespan startup.
The handler is wrapped in an adapter and registered as an
'app_startup' interception handler so it runs before the ASGI
server receives the lifespan.startup.complete acknowledgement.
Startup handlers run in registration order; an exception aborts the
remaining handlers and prevents the completion event from being sent.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fn
|
Callable[[], Awaitable[None]]
|
Async callable that takes no arguments. |
required |
Returns:
| Type | Description |
|---|---|
Callable[[], Awaitable[None]]
|
fn unchanged, so the decorator can be stacked or the function |
Callable[[], Awaitable[None]]
|
used normally after registration. |
Example
@app.on_startup
async def open_db():
await db.connect()
route(methods=[HTTPMethod.GET], path='/', scheme=Scheme.http, functions=[], middlewares=[], name=None)
¶
Register a route handler, optionally wrapping it in middlewares.
run(certfile=None, keyfile=None, port=0, unix_path=None, inherited_fd=None, workers=None, max_connections=None, stream_queue_depth=None, ws_queue_depth=None, reload=False, reload_paths=None)
¶
Run the app under BlackBull's own server (single- or multi-worker).
This is the synchronous, fire-and-forget entry point — callers
write app.run(port=8000), not asyncio.run(app.run(...)).
For workers > 1 or reload=True the master pre-binds
sockets, forks workers, and blocks until SIGTERM / SIGINT.
For embedded use under an existing event loop, or for pre-binding
a socket before forking a test subprocess, instantiate
:class:blackbull.server.ASGIServer directly. Any external
ASGI server (uvicorn / hypercorn / granian / …) can drive the
:class:BlackBull instance via its ASGI 3.0 __call__.
Example::
app.run(port=8000)
app.run(port=8443, certfile='cert.pem', keyfile='key.pem', workers=4)
app.run(port=8443, certfile='cert.pem', keyfile='key.pem', reload=True)
app.run(unix_path='/run/blackbull.sock')
static(url_prefix, root_dir)
¶
Serve static files from root_dir under url_prefix via global middleware.
url_path_for(name, /, **params)
¶
Return the path for the named route with params substituted.
use(mw)
¶
Register a global middleware applied to every non-lifespan request.
RouteGroup
¶
A subset of routes that share a common middleware prefix.
Obtain via app.group(middlewares=[...]). Every route registered
through the group automatically prepends the group middlewares before
any per-route middlewares.
serve(app, *, certfile=None, keyfile=None, port=0, unix_path=None, inherited_fd=None, workers=None, max_connections=None, stream_queue_depth=None, ws_queue_depth=None, reload=False, reload_paths=None)
¶
Synchronous entry point for any ASGI 3.0 callable.
Works for a :class:BlackBull instance and for any plain ASGI
callable (uvicorn/hypercorn-style async def app(scope, receive,
send): …). This is what the blackbull console script calls
after resolving module:attr; :meth:BlackBull.serve is a thin
shim around it.
For workers=1 without reload the server runs in the current
process via asyncio.run. For workers > 1 or
reload=True the master pre-binds sockets, forks workers, and
blocks until SIGTERM / SIGINT — or in reload mode, until a
watched file changes (master then re-execs itself, see
:mod:blackbull.server.reload).
All integer parameters default to their corresponding BB_*
environment variables (see :mod:blackbull.env).