import cpsl
import cpsl.ui as ui
from openai import AsyncOpenAI
client = AsyncOpenAI()
app = cpsl.App(
name="support-ops",
image=cpsl.Image(python_packages=["openai"]),
channels=[cpsl.Channel("support-telegram")],
keep_warm_seconds=30,
cpu=0.5,
memory=1024,
secrets=["OPENAI_API_KEY"],
filesystems={"/data": cpsl.FileSystem("support-ops")},
price=1500,
pricing_type="monthly",
)
app.add_integration(
cpsl.Gmail(
client_id=cpsl.Secret.from_name("GOOGLE_OAUTH_CLIENT_ID"),
client_secret=cpsl.Secret.from_name("GOOGLE_OAUTH_CLIENT_SECRET"),
scopes=["https://www.googleapis.com/auth/gmail.readonly"],
)
)
threads = app.collection(
"threads",
columns=["subject", "status", "priority", "thread_id"],
scope="owner",
sortable=True,
filterable=True,
)
app.setting("auto_sync", scope="owner", type=bool, default=True)
@app.data("mailbox_metrics", access="authenticated")
async def mailbox_metrics():
rows = await threads.find(limit=500)
return {"total_threads": len(rows)}
@app.page("Overview", icon="layout-dashboard")
def overview():
return ui.Page([ui.Metric("Threads", data="mailbox_metrics", field="total_threads")])
@app.task()
async def sync_threads(session: cpsl.Session | None = None):
if session:
await session.notify("Syncing mailbox state...")
await threads.insert_one(
{
"subject": "Renewal review",
"status": "needs-review",
"priority": "high",
"thread_id": "thread_123",
}
)
@app.message()
async def handle(session: cpsl.Session, msg: cpsl.Message):
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a support operations copilot."},
*session.chat_messages(msg),
],
)
await session.reply(response.choices[0].message.content or "No response.")