Spaces:
Running
Running
| """Operational CLI β small, focused entries for solo-dev ops. | |
| Run with: python -m app.cli <command> [args...] | |
| Commands: | |
| retention β run the retention sweep handler synchronously. | |
| In prod this is wired to RQ-scheduler in Phase 6. | |
| enqueue-retention β enqueue the retention sweep on RQ instead of running | |
| it inline (useful when the worker has the right env | |
| and the API host doesn't). | |
| rescue-jobs β v2_01 Β§7.8 β re-enqueue or dead-letter Job rows that | |
| RQ appears to have lost. Run every ~5 min from cron. | |
| backfill-embeddings β v2_01 Β§7.5 β compute transcript_embedding for every | |
| approved clip whose embedding is NULL. Idempotent. | |
| Run once after the 0013 migration to populate | |
| existing clips; new clips are embedded by the | |
| tagging job on ingest. | |
| readiness-report β v2_06 Β§12 β print actuals against the five | |
| production-ready criteria from the pilot playbook. | |
| Read-only; safe to run anytime. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import sys | |
| import uuid | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") | |
| log = logging.getLogger("personadesk.cli") | |
| def _run_retention_inline() -> int: | |
| """Run retention_sweep here, no Redis required. Useful for one-shots.""" | |
| from app.db import SessionLocal | |
| from app.models import Job, Organization | |
| from app.services import jobs as jobs_mod | |
| db = SessionLocal() | |
| try: | |
| # We need an org for the org-scoped Job row. The retention sweep itself | |
| # is *global* β it touches every org's rows β but the framework wants a | |
| # row to anchor status/duration on. Pick the first org (system actions | |
| # are intentionally not tenant-scoped at this point). | |
| org = db.query(Organization).order_by(Organization.created_at.asc()).first() | |
| if org is None: | |
| log.error("retention: no organization rows; nothing to sweep") | |
| return 0 | |
| idem = f"retention_sweep:{uuid.uuid4()}" | |
| job = Job( | |
| organization_id=org.id, | |
| type="retention_sweep", | |
| entity_type="system", | |
| entity_id=org.id, # arbitrary; the handler ignores it | |
| status="queued", | |
| attempts=0, | |
| max_attempts=1, | |
| idempotency_key=idem, | |
| payload={"triggered_by": "cli"}, | |
| ) | |
| db.add(job) | |
| db.commit() | |
| job_id = job.id | |
| finally: | |
| db.close() | |
| jobs_mod.run_job(job_id) | |
| db = SessionLocal() | |
| try: | |
| j = db.get(Job, job_id) | |
| if j: | |
| log.info("retention sweep finished: status=%s result=%s", j.status, j.payload.get("result")) | |
| return 0 if j.status == "completed" else 2 | |
| return 2 | |
| finally: | |
| db.close() | |
| def _run_rescue_jobs() -> int: | |
| """v2_01 Β§7.8 β sweep orphaned Job rows once.""" | |
| from app.db import SessionLocal | |
| from app.services.jobs import rescue_lost_jobs | |
| db = SessionLocal() | |
| try: | |
| counts = rescue_lost_jobs(db) | |
| log.info("rescue-jobs: %s", counts) | |
| return 0 | |
| finally: | |
| db.close() | |
| def _run_readiness_report() -> int: | |
| """v2_06 Β§12 β print actuals against the five production-ready criteria. | |
| Reads from the DB; runs under bypass (it's a cross-tenant operational | |
| summary, not a customer-facing query). Output is human-readable text | |
| so it can be piped into the weekly review thread. | |
| """ | |
| from datetime import UTC, datetime, timedelta | |
| from app.db import SessionLocal, rls_context | |
| from app.models import ( | |
| Organization, | |
| Persona, | |
| SafetyIncident, | |
| SessionMessage, | |
| ) | |
| from app.models import Session as SessionModel | |
| db = SessionLocal() | |
| with rls_context(bypass=True): | |
| try: | |
| paying = ( | |
| db.query(Organization) | |
| .filter(Organization.plan != "free") | |
| .filter(Organization.billing_status.in_(("active", "trialing"))) | |
| .count() | |
| ) | |
| log.info("β" * 60) | |
| log.info("Production-readiness report (v2_06 Β§12)") | |
| log.info("β" * 60) | |
| log.info("1. Paying customers (plan != free, status active/trialing):") | |
| log.info(" %d [target: β₯5]", paying) | |
| # 2. Cost-per-session is operator-side (S3/CloudFront invoices); | |
| # we can only show the session count and let the operator divide. | |
| ninety = datetime.now(UTC) - timedelta(days=90) | |
| sessions_90d = ( | |
| db.query(SessionModel.id) | |
| .filter(SessionModel.started_at >= ninety) | |
| .count() | |
| ) | |
| log.info("2. Sessions in last 90d: %d", sessions_90d) | |
| log.info( | |
| " Cost reconciliation: divide S3+CloudFront invoice by " | |
| "this number, compare to $0.05 target " | |
| "(v2_results/cost_v2_final.md). Target: actual within 25%% of model.", | |
| ) | |
| # 3. Activation time β first paying session timestamp minus | |
| # signup. We don't have a dedicated event for "live widget"; the | |
| # first session.started under a persona's public_key is the | |
| # earliest legible proxy. | |
| activation_hours: list[float] = [] | |
| for p in db.query(Persona).filter(Persona.deleted_at.is_(None)).all(): | |
| first = ( | |
| db.query(SessionModel.started_at) | |
| .filter(SessionModel.persona_id == p.id) | |
| .order_by(SessionModel.started_at.asc()) | |
| .first() | |
| ) | |
| if first and first[0]: | |
| delta = first[0] - p.created_at | |
| activation_hours.append(delta.total_seconds() / 3600) | |
| if activation_hours: | |
| avg_hours = sum(activation_hours) / len(activation_hours) | |
| log.info( | |
| "3. Average creator activation time: %.1f hours [target: < 4]", | |
| avg_hours, | |
| ) | |
| else: | |
| log.info("3. Average creator activation time: no data yet [target: < 4]") | |
| # 4. Critical safety incidents in 90d. | |
| critical = ( | |
| db.query(SafetyIncident) | |
| .filter(SafetyIncident.created_at >= ninety) | |
| .filter(SafetyIncident.severity.in_(("high", "critical"))) | |
| .count() | |
| ) | |
| log.info( | |
| "4. Safety incidents (high/critical, last 90d): %d [target: 0 for 3 months]", | |
| critical, | |
| ) | |
| # 5. Referrals is operator-tracked (Notion / CRM / human). Surface | |
| # as a marker line β there's no in-DB signal. | |
| log.info( | |
| "5. Referrals: track manually in CRM. The honest metric β " | |
| "no DB field substitutes.", | |
| ) | |
| # Quality cadence appetizer: p95 latency across all personas. | |
| durations: list[int] = [] | |
| for (trace,) in ( | |
| db.query(SessionMessage.selection_trace) | |
| .filter(SessionMessage.role == "system") | |
| .filter(SessionMessage.created_at >= ninety) | |
| .all() | |
| ): | |
| d = (trace or {}).get("duration_ms") | |
| if isinstance(d, int): | |
| durations.append(d) | |
| if durations: | |
| durations.sort() | |
| p95_idx = max(0, int(round(len(durations) * 0.95)) - 1) | |
| log.info( | |
| "ββ Selection p95 latency (last 90d): %d ms [hard ceiling 800]", | |
| durations[p95_idx], | |
| ) | |
| log.info("β" * 60) | |
| return 0 | |
| finally: | |
| db.close() | |
| def _run_backfill_embeddings() -> int: | |
| """v2_01 Β§7.5 β populate transcript_embedding for clips that lack one. | |
| Reads clips with `transcript_embedding IS NULL AND transcript IS NOT NULL` | |
| in batches, embeds them via the configured provider, commits every 50 | |
| rows so a long run can be killed and resumed safely. Idempotent β running | |
| twice does nothing on the second pass. | |
| """ | |
| from app.db import SessionLocal | |
| from app.models import Clip | |
| from app.services.embeddings import get_embedder | |
| db = SessionLocal() | |
| try: | |
| # Targeting query: approved+usable isn't required (we embed everything | |
| # that has a transcript, so the engine's semantic component is ready | |
| # whenever a clip becomes serveable). | |
| clips = ( | |
| db.query(Clip) | |
| .filter(Clip.transcript_embedding.is_(None)) | |
| .filter(Clip.transcript.isnot(None)) | |
| .filter(Clip.deleted_at.is_(None)) | |
| .all() | |
| ) | |
| total = len(clips) | |
| if total == 0: | |
| log.info("backfill: nothing to do (0 clips need embedding)") | |
| return 0 | |
| log.info("backfill: %d clips need embedding", total) | |
| emb = get_embedder() | |
| done = 0 | |
| failed = 0 | |
| for clip in clips: | |
| try: | |
| clip.transcript_embedding = emb.embed(clip.transcript) | |
| done += 1 | |
| except Exception as exc: # noqa: BLE001 | |
| log.warning("clip %s failed: %s", clip.id, exc) | |
| failed += 1 | |
| if (done + failed) % 50 == 0: | |
| db.commit() | |
| log.info("backfill progress: %d/%d (failed=%d)", done + failed, total, failed) | |
| db.commit() | |
| log.info( | |
| "backfill complete: embedded=%d failed=%d total=%d", done, failed, total, | |
| ) | |
| return 0 if failed == 0 else 2 | |
| finally: | |
| db.close() | |
| def main(argv: list[str] | None = None) -> int: | |
| argv = list(sys.argv[1:] if argv is None else argv) | |
| if not argv: | |
| print(__doc__, file=sys.stderr) | |
| return 1 | |
| cmd, *_ = argv | |
| if cmd == "retention": | |
| return _run_retention_inline() | |
| if cmd == "rescue-jobs": | |
| return _run_rescue_jobs() | |
| if cmd == "readiness-report": | |
| return _run_readiness_report() | |
| if cmd == "backfill-embeddings": | |
| return _run_backfill_embeddings() | |
| print(f"unknown command: {cmd}", file=sys.stderr) | |
| print(__doc__, file=sys.stderr) | |
| return 1 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |