import asyncio
import cpsl
app = cpsl.App(name="jobs-demo", image=cpsl.Image())
@app.page("Tasks", icon="layout-grid")
def tasks_page():
return cpsl.ui.Page(
[
cpsl.ui.TaskBoard(title="All tasks", refresh_ms=3000),
cpsl.ui.Divider(),
cpsl.ui.TaskBoard(
title="build_report only",
filter={"task_name": "build_report"},
columns=[
cpsl.ui.TaskBoardColumn("Queued", ["pending", "scheduled", "retry"]),
cpsl.ui.TaskBoardColumn("Running", ["running"]),
cpsl.ui.TaskBoardColumn("Done", ["completed"]),
cpsl.ui.TaskBoardColumn("Failed", ["failed", "cancelled", "timeout"]),
],
refresh_ms=3000,
),
]
)
@app.task(retries=1, timeout=120, lock="report:{report_id}")
async def build_report(
session: cpsl.Session | None = None,
report_id: str = "daily",
duration: int = 5,
):
if session is not None:
await session.notify(f"[{report_id}] starting")
await asyncio.sleep(duration)
if session is not None:
await session.reply(f"[{report_id}] finished after {duration}s")
return {"report_id": report_id, "ok": True}
@app.schedule("0 9 * * 1-5")
async def weekday_digest():
runtime_session = cpsl.current_session()
owner = runtime_session.user.owner_id if runtime_session else "unknown"
print(f"[weekday_digest] preparing digest for {owner}")
@app.message()
async def handle(session: cpsl.Session, msg: cpsl.Message):
text = (msg.text or "").strip()
if text == "run":
handle = await build_report.submit(session=session, report_id="daily", duration=5)
await session.show_task(handle, message="Building daily report…")
await session.reply(f"Submitted task {handle.task_id}")
return
if text.startswith("schedule "):
delay = text.split(" ", 1)[1].strip() or "10m"
handle = await build_report.schedule(
session=session,
delay=delay,
report_id="scheduled",
duration=5,
)
await session.show_task(handle, message=f"Scheduled for +{delay}")
await session.reply(f"Scheduled task {handle.task_id}")
return
await session.reply("Try: run or schedule 10m")