"""Operational CLI — small, focused entries for solo-dev ops. Run with: python -m app.cli [args...] Commands: retention — run the retention sweep handler synchronously. In prod this is wired to RQ-scheduler in Phase 6. enqueue-retention — enqueue the retention sweep on RQ instead of running it inline (useful when the worker has the right env and the API host doesn't). rescue-jobs — v2_01 §7.8 — re-enqueue or dead-letter Job rows that RQ appears to have lost. Run every ~5 min from cron. backfill-embeddings — v2_01 §7.5 — compute transcript_embedding for every approved clip whose embedding is NULL. Idempotent. Run once after the 0013 migration to populate existing clips; new clips are embedded by the tagging job on ingest. readiness-report — v2_06 §12 — print actuals against the five production-ready criteria from the pilot playbook. Read-only; safe to run anytime. """ from __future__ import annotations import logging import sys import uuid logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") log = logging.getLogger("personadesk.cli") def _run_retention_inline() -> int: """Run retention_sweep here, no Redis required. Useful for one-shots.""" from app.db import SessionLocal from app.models import Job, Organization from app.services import jobs as jobs_mod db = SessionLocal() try: # We need an org for the org-scoped Job row. The retention sweep itself # is *global* — it touches every org's rows — but the framework wants a # row to anchor status/duration on. Pick the first org (system actions # are intentionally not tenant-scoped at this point). org = db.query(Organization).order_by(Organization.created_at.asc()).first() if org is None: log.error("retention: no organization rows; nothing to sweep") return 0 idem = f"retention_sweep:{uuid.uuid4()}" job = Job( organization_id=org.id, type="retention_sweep", entity_type="system", entity_id=org.id, # arbitrary; the handler ignores it status="queued", attempts=0, max_attempts=1, idempotency_key=idem, payload={"triggered_by": "cli"}, ) db.add(job) db.commit() job_id = job.id finally: db.close() jobs_mod.run_job(job_id) db = SessionLocal() try: j = db.get(Job, job_id) if j: log.info("retention sweep finished: status=%s result=%s", j.status, j.payload.get("result")) return 0 if j.status == "completed" else 2 return 2 finally: db.close() def _run_rescue_jobs() -> int: """v2_01 §7.8 — sweep orphaned Job rows once.""" from app.db import SessionLocal from app.services.jobs import rescue_lost_jobs db = SessionLocal() try: counts = rescue_lost_jobs(db) log.info("rescue-jobs: %s", counts) return 0 finally: db.close() def _run_readiness_report() -> int: """v2_06 §12 — print actuals against the five production-ready criteria. Reads from the DB; runs under bypass (it's a cross-tenant operational summary, not a customer-facing query). Output is human-readable text so it can be piped into the weekly review thread. """ from datetime import UTC, datetime, timedelta from app.db import SessionLocal, rls_context from app.models import ( Organization, Persona, SafetyIncident, SessionMessage, ) from app.models import Session as SessionModel db = SessionLocal() with rls_context(bypass=True): try: paying = ( db.query(Organization) .filter(Organization.plan != "free") .filter(Organization.billing_status.in_(("active", "trialing"))) .count() ) log.info("─" * 60) log.info("Production-readiness report (v2_06 §12)") log.info("─" * 60) log.info("1. Paying customers (plan != free, status active/trialing):") log.info(" %d [target: ≥5]", paying) # 2. Cost-per-session is operator-side (S3/CloudFront invoices); # we can only show the session count and let the operator divide. ninety = datetime.now(UTC) - timedelta(days=90) sessions_90d = ( db.query(SessionModel.id) .filter(SessionModel.started_at >= ninety) .count() ) log.info("2. Sessions in last 90d: %d", sessions_90d) log.info( " Cost reconciliation: divide S3+CloudFront invoice by " "this number, compare to $0.05 target " "(v2_results/cost_v2_final.md). Target: actual within 25%% of model.", ) # 3. Activation time — first paying session timestamp minus # signup. We don't have a dedicated event for "live widget"; the # first session.started under a persona's public_key is the # earliest legible proxy. activation_hours: list[float] = [] for p in db.query(Persona).filter(Persona.deleted_at.is_(None)).all(): first = ( db.query(SessionModel.started_at) .filter(SessionModel.persona_id == p.id) .order_by(SessionModel.started_at.asc()) .first() ) if first and first[0]: delta = first[0] - p.created_at activation_hours.append(delta.total_seconds() / 3600) if activation_hours: avg_hours = sum(activation_hours) / len(activation_hours) log.info( "3. Average creator activation time: %.1f hours [target: < 4]", avg_hours, ) else: log.info("3. Average creator activation time: no data yet [target: < 4]") # 4. Critical safety incidents in 90d. critical = ( db.query(SafetyIncident) .filter(SafetyIncident.created_at >= ninety) .filter(SafetyIncident.severity.in_(("high", "critical"))) .count() ) log.info( "4. Safety incidents (high/critical, last 90d): %d [target: 0 for 3 months]", critical, ) # 5. Referrals is operator-tracked (Notion / CRM / human). Surface # as a marker line — there's no in-DB signal. log.info( "5. Referrals: track manually in CRM. The honest metric — " "no DB field substitutes.", ) # Quality cadence appetizer: p95 latency across all personas. durations: list[int] = [] for (trace,) in ( db.query(SessionMessage.selection_trace) .filter(SessionMessage.role == "system") .filter(SessionMessage.created_at >= ninety) .all() ): d = (trace or {}).get("duration_ms") if isinstance(d, int): durations.append(d) if durations: durations.sort() p95_idx = max(0, int(round(len(durations) * 0.95)) - 1) log.info( "—— Selection p95 latency (last 90d): %d ms [hard ceiling 800]", durations[p95_idx], ) log.info("─" * 60) return 0 finally: db.close() def _run_backfill_embeddings() -> int: """v2_01 §7.5 — populate transcript_embedding for clips that lack one. Reads clips with `transcript_embedding IS NULL AND transcript IS NOT NULL` in batches, embeds them via the configured provider, commits every 50 rows so a long run can be killed and resumed safely. Idempotent — running twice does nothing on the second pass. """ from app.db import SessionLocal from app.models import Clip from app.services.embeddings import get_embedder db = SessionLocal() try: # Targeting query: approved+usable isn't required (we embed everything # that has a transcript, so the engine's semantic component is ready # whenever a clip becomes serveable). clips = ( db.query(Clip) .filter(Clip.transcript_embedding.is_(None)) .filter(Clip.transcript.isnot(None)) .filter(Clip.deleted_at.is_(None)) .all() ) total = len(clips) if total == 0: log.info("backfill: nothing to do (0 clips need embedding)") return 0 log.info("backfill: %d clips need embedding", total) emb = get_embedder() done = 0 failed = 0 for clip in clips: try: clip.transcript_embedding = emb.embed(clip.transcript) done += 1 except Exception as exc: # noqa: BLE001 log.warning("clip %s failed: %s", clip.id, exc) failed += 1 if (done + failed) % 50 == 0: db.commit() log.info("backfill progress: %d/%d (failed=%d)", done + failed, total, failed) db.commit() log.info( "backfill complete: embedded=%d failed=%d total=%d", done, failed, total, ) return 0 if failed == 0 else 2 finally: db.close() def main(argv: list[str] | None = None) -> int: argv = list(sys.argv[1:] if argv is None else argv) if not argv: print(__doc__, file=sys.stderr) return 1 cmd, *_ = argv if cmd == "retention": return _run_retention_inline() if cmd == "rescue-jobs": return _run_rescue_jobs() if cmd == "readiness-report": return _run_readiness_report() if cmd == "backfill-embeddings": return _run_backfill_embeddings() print(f"unknown command: {cmd}", file=sys.stderr) print(__doc__, file=sys.stderr) return 1 if __name__ == "__main__": raise SystemExit(main())