from fastapi import FastAPI, Query, HTTPException from fastapi.responses import StreamingResponse from datetime import datetime, timezone from pydantic import BaseModel import asyncio import json import sqlite3 from spooler.processor import run_spool from spooler.store import get_recent_sessions, get_session_activity, get_conn, get_session_summary from reviewer.report import generate_report, generate_skills_report from reviewer.skill_analyzer import find_skill_candidates from reviewer.intelligence import ( generate_context_pressure_report, generate_failure_mode_report, generate_intelligence_bundle, generate_session_sprawl_report, ) from reviewer.session_snapshot import build_recent_session_snapshots, build_session_snapshot from api.scheduler import claim_lock, release_lock, get_job_status from config import settings app = FastAPI(title="Session Amplifier", version="0.1.0") @app.get("/health") def health(): conn = get_conn() stats = conn.execute( "SELECT COUNT(*) AS entries, COUNT(DISTINCT session_id) AS sessions FROM spooled_entries" ).fetchone() conn.close() entries = int(stats["entries"]) if stats else 0 sessions = int(stats["sessions"]) if stats else 0 return { "status": "ok", "version": "0.1.0", "db_path": str(settings.db_path), "agents_root": str(settings.openclaw_agents_root), "spool_ready": entries > 0, "entries": entries, "sessions": sessions, } @app.post("/spool") def spool(): count, sessions = run_spool() return {"entries_spooled": count, "sessions_updated": sessions} @app.get("/review/report") def review_report(since: str | None = Query(None)): try: report = generate_report(since=since) return report except Exception as exc: raise HTTPException(status_code=500, detail=str(exc)) @app.get("/review/skills") def review_skills(): return generate_skills_report() @app.get("/review/skills/candidates") def review_skill_candidates( query: str = Query(..., min_length=3, max_length=500), agent_id: str | None = Query(None, max_length=100), limit: int = Query(8, ge=1, le=25), ): return find_skill_candidates(query=query, agent_id=agent_id, limit=limit) @app.get("/sessions/recent") def sessions_recent(limit: int = Query(25, ge=1, le=200)): return { "generated_at": datetime.now(timezone.utc).isoformat(), "sessions": get_recent_sessions(limit), } @app.get("/sessions/active-bulk") def sessions_active_bulk( limit: int = Query(40, ge=1, le=200), activity_limit: int = Query(200, ge=1, le=1000), ): sessions = get_recent_sessions(limit) return { "generated_at": datetime.now(timezone.utc).isoformat(), "sessions": sessions, "activity": { row["session_id"]: _normalize_activity_rows(get_session_activity(row["session_id"], activity_limit)) for row in sessions }, } @app.get("/sessions/snapshots") def sessions_snapshots( limit: int = Query(40, ge=1, le=200), activity_limit: int = Query(80, ge=1, le=500), ): """Return canonical OpenClaw session snapshots for recent sessions.""" return build_recent_session_snapshots(limit=limit, activity_limit=activity_limit) @app.get("/session/{session_id}/snapshot") def session_snapshot(session_id: str, activity_limit: int = Query(80, ge=1, le=500)): """Return a canonical OpenClaw session snapshot for one session.""" summary = get_session_summary(session_id) if not summary: raise HTTPException(status_code=404, detail="session not found") return build_session_snapshot(summary, activity_limit=activity_limit) @app.get("/reports/session-sprawl") def report_session_sprawl( limit: int = Query(500, ge=1, le=5000), stale_days: int = Query(30, ge=1, le=3650), ): return generate_session_sprawl_report(limit=limit, stale_days=stale_days) @app.get("/reports/context-pressure") def report_context_pressure(limit: int = Query(200, ge=1, le=5000)): return generate_context_pressure_report(limit=limit) @app.get("/reports/failure-modes") def report_failure_modes(limit: int = Query(200, ge=1, le=1000)): return generate_failure_mode_report(limit=limit) @app.post("/review/run") def review_run(kind: str = Query("light", pattern="^(light|deep)$")): return generate_intelligence_bundle(kind=kind) def _normalize_activity_rows(rows: list[dict]) -> list[dict]: normalized = [] for row in rows: role = row.get("role") or "" tool_name = row.get("tool_name") or "" clean_text = row.get("clean_text") or "" is_error = bool(row.get("is_error")) preview = row.get("preview") or "" # Classify event type if role == "toolResult": if is_error: event_type = "tool_error" summary = f"✗ {tool_name or 'tool'}" else: event_type = "tool_result" summary = f"✓ {tool_name}" if tool_name else (preview[:80] or "tool result") elif tool_name and role in ("assistant", "user"): event_type = "tool_call" summary = f"→ {tool_name}" elif role == "assistant": lower = clean_text.strip().lower() if lower.startswith(("using", "i'll use", "i will use")) or lower.startswith(("tool call", "calling")): event_type = "assistant_meta" summary = preview[:120] or "assistant planning" elif any(kw in lower[:100] for kw in ("thinking", "reasoning", "analyzing")): event_type = "assistant_thinking" summary = preview[:120] or "thinking" else: event_type = "assistant_text" summary = preview[:120] or "assistant" elif role == "user": event_type = "user_message" summary = preview[:120] or "user" elif role == "system": event_type = "system" summary = preview[:80] or "system" else: event_type = "event" summary = preview[:80] or str(role) or "event" normalized.append( { "timestamp": row.get("timestamp") or row.get("indexed_at"), "session_id": row.get("session_id"), "agent_id": row.get("agent_id"), "event_type": event_type, "role": role, "tool_name": tool_name, "summary": summary, "details": clean_text[:500] if clean_text else "", "is_error": is_error, "entry_idx": row.get("entry_idx"), } ) return normalized @app.get("/session/{session_id}/activity") def session_activity(session_id: str, limit: int = Query(200, ge=1, le=1000)): rows = get_session_activity(session_id, limit) return { "generated_at": datetime.now(timezone.utc).isoformat(), "session_id": session_id, "activity": _normalize_activity_rows(rows), } # Pricing per 1M tokens (input, output). Mirror of session_context_report.py MODEL_PRICING. _API_MODEL_PRICING = { "gpt-5": (2.5, 10.0), "gpt-5-4o": (2.5, 10.0), "gpt-4o": (2.5, 10.0), "gpt-4o-mini": (0.15, 0.6), "gpt-4.1": (2.0, 8.0), "gpt-4-turbo": (10.0, 30.0), "gpt-4": (30.0, 60.0), "claude-opus-4-6": (3.0, 15.0), "claude-sonnet-4-6": (3.0, 15.0), "claude-haiku-4-6": (0.8, 4.0), "claude-3-5-sonnet": (3.0, 15.0), "claude-3-opus": (15.0, 75.0), "claude-3-sonnet": (3.0, 15.0), "deepseek-chat": (0.14, 0.28), "deepseek-reasoner": (0.55, 2.19), "gemini-2.5-pro": (1.25, 5.0), "gemini-2.5-flash": (0.075, 0.30), "gemini-2.5-flash-lite": (0.075, 0.15), "mistral-large": (2.0, 6.0), "mistral-small": (0.15, 0.6), "minimax-m2.7": (0.099, 0.396), "minimax-m2": (0.099, 0.396), "qwen": (0.5, 2.0), "moonshotai/kimi-k2": (0.5, 1.5), "default": (0.1, 0.4), } def _infer_pricing(model: str) -> tuple[float, float]: lowered = model.lower() for key, price in _API_MODEL_PRICING.items(): if key.lower() in lowered or lowered in key.lower(): return price return (0.1, 0.4) @app.get("/session/{session_id}/cost_summary") def session_cost_summary(session_id: str): """Return token count and cost estimate for a session.""" import math conn = get_conn() conn.row_factory = sqlite3.Row # return Row objects for dict-like access rows = conn.execute( """ SELECT role, tool_name, clean_text, is_error, entry_type FROM spooled_entries WHERE session_id = ? OR session_id LIKE ? ORDER BY entry_idx ASC """, (session_id, session_id + "%"), ).fetchall() if not rows: raise HTTPException(status_code=404, detail="session not found") # Get model from model_change entries model = "default" for row in reversed(rows): if row["entry_type"] == "model_change" or row["role"] == "model_change": try: obj = json.loads(row["clean_text"] or "{}") model = obj.get("modelId", obj.get("provider", "default")) except: pass break inp_price, out_price = _infer_pricing(model) user_tokens = assistant_tokens = tool_result_tokens = error_count = 0 tool_usage = {} for row in rows: role = row["role"] or "" text = row["clean_text"] or "" tool_name = row["tool_name"] or "" is_error = bool(row["is_error"]) tokens = max(1, math.ceil(len(text) / 4)) if role in ("user", "system"): user_tokens += tokens elif role == "assistant": assistant_tokens += tokens elif role == "toolResult": tool_result_tokens += tokens if is_error: error_count += 1 if tool_name: if tool_name not in tool_usage: tool_usage[tool_name] = {"calls": 0, "result_tokens": 0, "errors": 0} tool_usage[tool_name]["result_tokens"] += tokens if is_error: tool_usage[tool_name]["errors"] += 1 input_cost = (user_tokens / 1_000_000) * inp_price output_cost = ((assistant_tokens + tool_result_tokens) / 1_000_000) * out_price return { "generated_at": datetime.now(timezone.utc).isoformat(), "session_id": session_id, "model": model, "pricing_per_1m": {"input": inp_price, "output": out_price}, "tokens": { "user_input": user_tokens, "assistant_output": assistant_tokens, "tool_results": tool_result_tokens, "total": user_tokens + assistant_tokens + tool_result_tokens, }, "cost_usd": { "input": round(input_cost, 4), "output": round(output_cost, 4), "total": round(input_cost + output_cost, 4), }, "error_count": error_count, "tool_usage": tool_usage, } @app.get("/session/{session_id}/stream") async def session_stream(session_id: str): async def event_generator(): last_idx = -1 while True: data = session_activity(session_id, limit=200) act = data.get("activity", []) new_events = [e for e in act if e.get("entry_idx") is not None and e["entry_idx"] > last_idx] if new_events: new_events.sort(key=lambda x: x["entry_idx"]) for evt in new_events: yield f"data: {json.dumps(evt)}\n\n" last_idx = evt["entry_idx"] await asyncio.sleep(1.0) return StreamingResponse(event_generator(), media_type="text/event-stream") class LockRequest(BaseModel): owner: str ttl_minutes: int = 60 class ReleaseRequest(BaseModel): owner: str status_msg: str = "completed" @app.post("/jobs/{job_name}/lock") def api_lock_job(job_name: str, req: LockRequest): if claim_lock(job_name, req.owner, req.ttl_minutes): return {"status": "ok", "message": "lock acquired"} raise HTTPException(status_code=409, detail="lock held by another owner") @app.post("/jobs/{job_name}/release") def api_release_job(job_name: str, req: ReleaseRequest): if release_lock(job_name, req.owner, req.status_msg): return {"status": "ok", "message": "lock released"} raise HTTPException(status_code=403, detail="lock not held by owner") @app.get("/jobs") def api_list_jobs(): return {"jobs": get_job_status()}