Skip to main content
Capsule decorators fall into two families:
  • functional app decorators called on app
  • class-based decorators imported from cpsl

Worked flow

In a real app, decorators usually work together rather than in isolation:
@app.boot()
async def on_boot():
    print("[support-ops] runtime ready")


@app.message()
async def handle(session: cpsl.Session, msg: cpsl.Message):
    if "sync" in msg.text.lower():
        handle = await sync_threads.submit(session=session)
        await session.show_task(handle, message="Thread sync started")
        return

    await session.reply("Ask for a sync or ask for analysis.")


@app.task(retries=2, lock="mailbox:sync")
async def sync_threads(session: cpsl.Session | None = None):
    if session:
        await session.notify("Syncing Gmail threads...")
    ...


@app.schedule("0 * * * *")
async def hourly_sync():
    await sync_threads.submit()
That one slice already shows the usual Capsule pattern:
  • lifecycle hooks prepare the runtime
  • message handlers decide what the user wants
  • tasks own side effects and longer work
  • schedules keep the app moving without user input

Functional app decorators

These require app = cpsl.App(..., image=cpsl.Image(), ...).
DecoratorPurpose
@app.boot()Run once when the runtime starts
@app.shutdown()Run when the runtime is shutting down
@app.enter()Run when a new session is created
@app.exit()Run when a session closes
@app.message()Handle inbound messages
@app.task(...)Register a background task
@app.schedule(cron)Register a cron job
@app.endpoint(...)Register an HTTP endpoint
@app.asgi(path=...)Mount an ASGI app

Class-based decorators

When you use @app.cls(...), the handler methods use the global decorator versions:
  • @cpsl.boot()
  • @cpsl.shutdown()
  • @cpsl.enter()
  • @cpsl.exit()
  • @cpsl.message()
  • @cpsl.task(...)
  • @cpsl.schedule(cron)
  • @cpsl.endpoint(...)
  • @cpsl.asgi(path=...)

Lifecycle hooks

boot()

Runs once when the runtime starts.
@app.boot()
async def on_boot():
    print("ready")

shutdown()

Runs during runtime shutdown. Good for cleanup and flush logic.
@app.shutdown()
async def on_shutdown():
    print("cleaning up")

enter()

Runs when a new session is created. Receives session.
@app.enter()
async def on_enter(session: cpsl.Session):
    session.data["started"] = True

exit()

Runs when a session closes.
@app.exit()
async def on_exit(session: cpsl.Session):
    print(f"closing {session.id}")

message()

Registers the main inbound handler.
from openai import AsyncOpenAI

client = AsyncOpenAI()

@app.message()
async def handle(session: cpsl.Session, msg: cpsl.Message):
    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": "You are a concise Capsule assistant."},
            *session.chat_messages(msg),
        ],
    )
    await session.reply(response.choices[0].message.content or "No response.")
For most apps, this is the main entry point. In a real app, remember to include image=cpsl.Image(python_packages=["openai"]) and secrets=["OPENAI_API_KEY"] on App(...) if you want to call the OpenAI SDK directly.

task(...)

Registers a background task and turns the function into a TaskDescriptor.

Arguments

ArgumentMeaning
retriesMax retry attempts
timeoutTask timeout in seconds
lockLock template like "user:{user_id}"
retry_forException types that should retry
callback_urlURL to POST completion/failure payloads to
processRun in a separate OS process

Example

@app.task(retries=2, timeout=60)
async def sync_threads(session: cpsl.Session | None = None):
    if session:
        await session.notify("Syncing Gmail threads...")
    await threads.insert_one(
        {
            "subject": "Renewal review",
            "status": "needs-review",
            "priority": "high",
            "thread_id": "thread_123",
        }
    )
Submitting it looks like this:
handle = await sync_threads.submit(session=session)
await session.show_task(handle, message="Syncing mailbox...")
After decoration, the task object supports:
  • .submit(...)
  • .schedule(...)
  • .find(...)
  • .count(...)
  • .cancel(...)

schedule(cron)

Registers a cron handler.
@app.schedule("*/5 * * * *")
async def every_five_minutes():
    ...
The cron string is standard 5-field cron syntax in UTC. A useful pattern is to keep the schedule thin and hand work off to a task:
@app.schedule("0 * * * *")
async def hourly_sync():
    await sync_threads.submit()

endpoint(method="GET", path="/", authorized=True)

Registers a plain HTTP handler.
@app.endpoint(method="POST", path="/webhook", authorized=False)
async def webhook(ctx: cpsl.RequestContext):
    ...
Use authorized=False for public webhooks. In a real handler you would usually inspect ctx.request or just return structured data:
@app.endpoint(method="GET", path="/health", authorized=False)
async def health(ctx: cpsl.RequestContext):
    return {"ok": True}

asgi(path="/app")

Mounts a full ASGI application such as FastAPI or Starlette.
from fastapi import FastAPI

api = FastAPI()


@app.asgi(path="/api")
def api_app():
    return api

See also