Skip to main content
Capsule includes a built-in task execution model, so you do not need to introduce a second worker stack just to keep the app responsive. Tasks let you run work outside the main chat turn. Schedules let you trigger work on a cron cadence. Together they turn background execution into part of the product surface instead of something bolted on later. Use them when:
  • an operation will take longer than a normal reply should
  • work needs retries or cancellation
  • users should be able to check status later
  • you need periodic jobs such as syncs, digests, or cleanup

1. Add a task board page

Capsule includes a built-in task board widget:
import cpsl

app = cpsl.App(name="jobs-demo", image=cpsl.Image())


@app.page("Tasks", icon="layout-grid")
def tasks_page():
    return cpsl.ui.Page(
        [
            cpsl.ui.TaskBoard(title="All tasks", refresh_ms=3000),
            cpsl.ui.Divider(),
            cpsl.ui.TaskBoard(
                title="report_only",
                filter={"task_name": "build_report"},
                columns=[
                    cpsl.ui.TaskBoardColumn("Queued", ["pending", "scheduled", "retry"]),
                    cpsl.ui.TaskBoardColumn("Running", ["running"]),
                    cpsl.ui.TaskBoardColumn("Done", ["completed"]),
                    cpsl.ui.TaskBoardColumn("Failed", ["failed", "cancelled", "timeout"]),
                ],
                refresh_ms=3000,
            ),
        ]
    )

2. Register a background task

import asyncio


@app.task(retries=1, timeout=120, lock="report:{report_id}")
async def build_report(
    session: cpsl.Session | None = None,
    report_id: str = "daily",
    duration: int = 5,
):
    if session is not None:
        await session.notify(f"[{report_id}] starting")

    await asyncio.sleep(duration)

    if session is not None:
        await session.reply(f"[{report_id}] finished after {duration}s")

    return {"report_id": report_id, "ok": True}
Key options:
  • retries= retries on failure
  • timeout= kills slow tasks after N seconds
  • lock= prevents duplicate work for the same logical key
  • process=True runs the task in a separate OS process

3. Submit and schedule the task

Call the task from a handler:
@app.message()
async def handle(session: cpsl.Session, msg: cpsl.Message):
    text = (msg.text or "").strip()

    if text == "run":
        handle = await build_report.submit(session=session, report_id="daily", duration=5)
        await session.show_task(handle, message="Building daily report…")
        await session.reply(f"Submitted task {handle.task_id}")
        return

    if text.startswith("schedule "):
        delay = text.split(" ", 1)[1].strip() or "10m"
        handle = await build_report.schedule(
            session=session,
            delay=delay,
            report_id="scheduled",
            duration=5,
        )
        await session.show_task(handle, message=f"Scheduled for +{delay}")
        await session.reply(f"Scheduled task {handle.task_id}")
        return
The returned TaskHandle lets you:
  • inspect await handle.status()
  • cancel with await handle.cancel()
  • keep a task id for later lookup
The task descriptor itself also supports:
  • await build_report.find(...)
  • await build_report.count(...)
  • await build_report.cancel(...)

4. Add a schedule

Schedules run on cron. They are good for syncing external systems, sending digests, or precomputing data.
@app.schedule("0 9 * * 1-5")
async def weekday_digest():
    runtime_session = cpsl.current_session()
    owner = runtime_session.user.owner_id if runtime_session else "unknown"
    print(f"[weekday_digest] preparing digest for {owner}")
Important details:
  • scheduled handlers do not have a live chat reply target
  • cpsl.current_session() still gives you owner-scoped identity and integrations
  • use tasks if the scheduled work itself needs retries, locking, or a long runtime

5. Use process tasks for heavy work

If the task is CPU-heavy or you want crash isolation, set process=True:
@app.task(process=True, timeout=300)
async def crunch_embeddings(session: cpsl.Session | None = None, batch_id: str = ""):
    ...
Process tasks get a rehydrated Session, so they can still call reply(), stream_reply(), show_task(), or read session.db.

Full example

import asyncio
import cpsl

app = cpsl.App(name="jobs-demo", image=cpsl.Image())


@app.page("Tasks", icon="layout-grid")
def tasks_page():
    return cpsl.ui.Page(
        [
            cpsl.ui.TaskBoard(title="All tasks", refresh_ms=3000),
            cpsl.ui.Divider(),
            cpsl.ui.TaskBoard(
                title="build_report only",
                filter={"task_name": "build_report"},
                columns=[
                    cpsl.ui.TaskBoardColumn("Queued", ["pending", "scheduled", "retry"]),
                    cpsl.ui.TaskBoardColumn("Running", ["running"]),
                    cpsl.ui.TaskBoardColumn("Done", ["completed"]),
                    cpsl.ui.TaskBoardColumn("Failed", ["failed", "cancelled", "timeout"]),
                ],
                refresh_ms=3000,
            ),
        ]
    )


@app.task(retries=1, timeout=120, lock="report:{report_id}")
async def build_report(
    session: cpsl.Session | None = None,
    report_id: str = "daily",
    duration: int = 5,
):
    if session is not None:
        await session.notify(f"[{report_id}] starting")
    await asyncio.sleep(duration)
    if session is not None:
        await session.reply(f"[{report_id}] finished after {duration}s")
    return {"report_id": report_id, "ok": True}


@app.schedule("0 9 * * 1-5")
async def weekday_digest():
    runtime_session = cpsl.current_session()
    owner = runtime_session.user.owner_id if runtime_session else "unknown"
    print(f"[weekday_digest] preparing digest for {owner}")


@app.message()
async def handle(session: cpsl.Session, msg: cpsl.Message):
    text = (msg.text or "").strip()

    if text == "run":
        handle = await build_report.submit(session=session, report_id="daily", duration=5)
        await session.show_task(handle, message="Building daily report…")
        await session.reply(f"Submitted task {handle.task_id}")
        return

    if text.startswith("schedule "):
        delay = text.split(" ", 1)[1].strip() or "10m"
        handle = await build_report.schedule(
            session=session,
            delay=delay,
            report_id="scheduled",
            duration=5,
        )
        await session.show_task(handle, message=f"Scheduled for +{delay}")
        await session.reply(f"Scheduled task {handle.task_id}")
        return

    await session.reply("Try: run or schedule 10m")

Next steps