personadesk-api / app /cli.py
Legal-i's picture
Initial PersonaDesk API push to Space
45fcd6e
"""Operational CLI β€” small, focused entries for solo-dev ops.
Run with: python -m app.cli <command> [args...]
Commands:
retention β€” run the retention sweep handler synchronously.
In prod this is wired to RQ-scheduler in Phase 6.
enqueue-retention β€” enqueue the retention sweep on RQ instead of running
it inline (useful when the worker has the right env
and the API host doesn't).
rescue-jobs β€” v2_01 Β§7.8 β€” re-enqueue or dead-letter Job rows that
RQ appears to have lost. Run every ~5 min from cron.
backfill-embeddings β€” v2_01 Β§7.5 β€” compute transcript_embedding for every
approved clip whose embedding is NULL. Idempotent.
Run once after the 0013 migration to populate
existing clips; new clips are embedded by the
tagging job on ingest.
readiness-report β€” v2_06 Β§12 β€” print actuals against the five
production-ready criteria from the pilot playbook.
Read-only; safe to run anytime.
"""
from __future__ import annotations
import logging
import sys
import uuid
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("personadesk.cli")
def _run_retention_inline() -> int:
"""Run retention_sweep here, no Redis required. Useful for one-shots."""
from app.db import SessionLocal
from app.models import Job, Organization
from app.services import jobs as jobs_mod
db = SessionLocal()
try:
# We need an org for the org-scoped Job row. The retention sweep itself
# is *global* β€” it touches every org's rows β€” but the framework wants a
# row to anchor status/duration on. Pick the first org (system actions
# are intentionally not tenant-scoped at this point).
org = db.query(Organization).order_by(Organization.created_at.asc()).first()
if org is None:
log.error("retention: no organization rows; nothing to sweep")
return 0
idem = f"retention_sweep:{uuid.uuid4()}"
job = Job(
organization_id=org.id,
type="retention_sweep",
entity_type="system",
entity_id=org.id, # arbitrary; the handler ignores it
status="queued",
attempts=0,
max_attempts=1,
idempotency_key=idem,
payload={"triggered_by": "cli"},
)
db.add(job)
db.commit()
job_id = job.id
finally:
db.close()
jobs_mod.run_job(job_id)
db = SessionLocal()
try:
j = db.get(Job, job_id)
if j:
log.info("retention sweep finished: status=%s result=%s", j.status, j.payload.get("result"))
return 0 if j.status == "completed" else 2
return 2
finally:
db.close()
def _run_rescue_jobs() -> int:
"""v2_01 Β§7.8 β€” sweep orphaned Job rows once."""
from app.db import SessionLocal
from app.services.jobs import rescue_lost_jobs
db = SessionLocal()
try:
counts = rescue_lost_jobs(db)
log.info("rescue-jobs: %s", counts)
return 0
finally:
db.close()
def _run_readiness_report() -> int:
"""v2_06 Β§12 β€” print actuals against the five production-ready criteria.
Reads from the DB; runs under bypass (it's a cross-tenant operational
summary, not a customer-facing query). Output is human-readable text
so it can be piped into the weekly review thread.
"""
from datetime import UTC, datetime, timedelta
from app.db import SessionLocal, rls_context
from app.models import (
Organization,
Persona,
SafetyIncident,
SessionMessage,
)
from app.models import Session as SessionModel
db = SessionLocal()
with rls_context(bypass=True):
try:
paying = (
db.query(Organization)
.filter(Organization.plan != "free")
.filter(Organization.billing_status.in_(("active", "trialing")))
.count()
)
log.info("─" * 60)
log.info("Production-readiness report (v2_06 Β§12)")
log.info("─" * 60)
log.info("1. Paying customers (plan != free, status active/trialing):")
log.info(" %d [target: β‰₯5]", paying)
# 2. Cost-per-session is operator-side (S3/CloudFront invoices);
# we can only show the session count and let the operator divide.
ninety = datetime.now(UTC) - timedelta(days=90)
sessions_90d = (
db.query(SessionModel.id)
.filter(SessionModel.started_at >= ninety)
.count()
)
log.info("2. Sessions in last 90d: %d", sessions_90d)
log.info(
" Cost reconciliation: divide S3+CloudFront invoice by "
"this number, compare to $0.05 target "
"(v2_results/cost_v2_final.md). Target: actual within 25%% of model.",
)
# 3. Activation time β€” first paying session timestamp minus
# signup. We don't have a dedicated event for "live widget"; the
# first session.started under a persona's public_key is the
# earliest legible proxy.
activation_hours: list[float] = []
for p in db.query(Persona).filter(Persona.deleted_at.is_(None)).all():
first = (
db.query(SessionModel.started_at)
.filter(SessionModel.persona_id == p.id)
.order_by(SessionModel.started_at.asc())
.first()
)
if first and first[0]:
delta = first[0] - p.created_at
activation_hours.append(delta.total_seconds() / 3600)
if activation_hours:
avg_hours = sum(activation_hours) / len(activation_hours)
log.info(
"3. Average creator activation time: %.1f hours [target: < 4]",
avg_hours,
)
else:
log.info("3. Average creator activation time: no data yet [target: < 4]")
# 4. Critical safety incidents in 90d.
critical = (
db.query(SafetyIncident)
.filter(SafetyIncident.created_at >= ninety)
.filter(SafetyIncident.severity.in_(("high", "critical")))
.count()
)
log.info(
"4. Safety incidents (high/critical, last 90d): %d [target: 0 for 3 months]",
critical,
)
# 5. Referrals is operator-tracked (Notion / CRM / human). Surface
# as a marker line β€” there's no in-DB signal.
log.info(
"5. Referrals: track manually in CRM. The honest metric β€” "
"no DB field substitutes.",
)
# Quality cadence appetizer: p95 latency across all personas.
durations: list[int] = []
for (trace,) in (
db.query(SessionMessage.selection_trace)
.filter(SessionMessage.role == "system")
.filter(SessionMessage.created_at >= ninety)
.all()
):
d = (trace or {}).get("duration_ms")
if isinstance(d, int):
durations.append(d)
if durations:
durations.sort()
p95_idx = max(0, int(round(len(durations) * 0.95)) - 1)
log.info(
"β€”β€” Selection p95 latency (last 90d): %d ms [hard ceiling 800]",
durations[p95_idx],
)
log.info("─" * 60)
return 0
finally:
db.close()
def _run_backfill_embeddings() -> int:
"""v2_01 Β§7.5 β€” populate transcript_embedding for clips that lack one.
Reads clips with `transcript_embedding IS NULL AND transcript IS NOT NULL`
in batches, embeds them via the configured provider, commits every 50
rows so a long run can be killed and resumed safely. Idempotent β€” running
twice does nothing on the second pass.
"""
from app.db import SessionLocal
from app.models import Clip
from app.services.embeddings import get_embedder
db = SessionLocal()
try:
# Targeting query: approved+usable isn't required (we embed everything
# that has a transcript, so the engine's semantic component is ready
# whenever a clip becomes serveable).
clips = (
db.query(Clip)
.filter(Clip.transcript_embedding.is_(None))
.filter(Clip.transcript.isnot(None))
.filter(Clip.deleted_at.is_(None))
.all()
)
total = len(clips)
if total == 0:
log.info("backfill: nothing to do (0 clips need embedding)")
return 0
log.info("backfill: %d clips need embedding", total)
emb = get_embedder()
done = 0
failed = 0
for clip in clips:
try:
clip.transcript_embedding = emb.embed(clip.transcript)
done += 1
except Exception as exc: # noqa: BLE001
log.warning("clip %s failed: %s", clip.id, exc)
failed += 1
if (done + failed) % 50 == 0:
db.commit()
log.info("backfill progress: %d/%d (failed=%d)", done + failed, total, failed)
db.commit()
log.info(
"backfill complete: embedded=%d failed=%d total=%d", done, failed, total,
)
return 0 if failed == 0 else 2
finally:
db.close()
def main(argv: list[str] | None = None) -> int:
argv = list(sys.argv[1:] if argv is None else argv)
if not argv:
print(__doc__, file=sys.stderr)
return 1
cmd, *_ = argv
if cmd == "retention":
return _run_retention_inline()
if cmd == "rescue-jobs":
return _run_rescue_jobs()
if cmd == "readiness-report":
return _run_readiness_report()
if cmd == "backfill-embeddings":
return _run_backfill_embeddings()
print(f"unknown command: {cmd}", file=sys.stderr)
print(__doc__, file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())