| from fastapi import FastAPI, Query, HTTPException |
| from fastapi.responses import StreamingResponse |
| from datetime import datetime, timezone |
| from pydantic import BaseModel |
| import asyncio |
| import json |
| import sqlite3 |
|
|
| from spooler.processor import run_spool |
| from spooler.store import get_recent_sessions, get_session_activity, get_conn, get_session_summary |
| from reviewer.report import generate_report, generate_skills_report |
| from reviewer.skill_analyzer import find_skill_candidates |
| from reviewer.intelligence import ( |
| generate_context_pressure_report, |
| generate_failure_mode_report, |
| generate_intelligence_bundle, |
| generate_session_sprawl_report, |
| ) |
| from reviewer.session_snapshot import build_recent_session_snapshots, build_session_snapshot |
| from api.scheduler import claim_lock, release_lock, get_job_status |
| from config import settings |
|
|
| app = FastAPI(title="Session Amplifier", version="0.1.0") |
|
|
|
|
| @app.get("/health") |
| def health(): |
| conn = get_conn() |
| stats = conn.execute( |
| "SELECT COUNT(*) AS entries, COUNT(DISTINCT session_id) AS sessions FROM spooled_entries" |
| ).fetchone() |
| conn.close() |
| entries = int(stats["entries"]) if stats else 0 |
| sessions = int(stats["sessions"]) if stats else 0 |
| return { |
| "status": "ok", |
| "version": "0.1.0", |
| "db_path": str(settings.db_path), |
| "agents_root": str(settings.openclaw_agents_root), |
| "spool_ready": entries > 0, |
| "entries": entries, |
| "sessions": sessions, |
| } |
|
|
|
|
| @app.post("/spool") |
| def spool(): |
| count, sessions = run_spool() |
| return {"entries_spooled": count, "sessions_updated": sessions} |
|
|
|
|
| @app.get("/review/report") |
| def review_report(since: str | None = Query(None)): |
| try: |
| report = generate_report(since=since) |
| return report |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail=str(exc)) |
|
|
|
|
| @app.get("/review/skills") |
| def review_skills(): |
| return generate_skills_report() |
|
|
|
|
| @app.get("/review/skills/candidates") |
| def review_skill_candidates( |
| query: str = Query(..., min_length=3, max_length=500), |
| agent_id: str | None = Query(None, max_length=100), |
| limit: int = Query(8, ge=1, le=25), |
| ): |
| return find_skill_candidates(query=query, agent_id=agent_id, limit=limit) |
|
|
|
|
| @app.get("/sessions/recent") |
| def sessions_recent(limit: int = Query(25, ge=1, le=200)): |
| return { |
| "generated_at": datetime.now(timezone.utc).isoformat(), |
| "sessions": get_recent_sessions(limit), |
| } |
|
|
|
|
| @app.get("/sessions/active-bulk") |
| def sessions_active_bulk( |
| limit: int = Query(40, ge=1, le=200), |
| activity_limit: int = Query(200, ge=1, le=1000), |
| ): |
| sessions = get_recent_sessions(limit) |
| return { |
| "generated_at": datetime.now(timezone.utc).isoformat(), |
| "sessions": sessions, |
| "activity": { |
| row["session_id"]: _normalize_activity_rows(get_session_activity(row["session_id"], activity_limit)) |
| for row in sessions |
| }, |
| } |
|
|
|
|
| @app.get("/sessions/snapshots") |
| def sessions_snapshots( |
| limit: int = Query(40, ge=1, le=200), |
| activity_limit: int = Query(80, ge=1, le=500), |
| ): |
| """Return canonical OpenClaw session snapshots for recent sessions.""" |
| return build_recent_session_snapshots(limit=limit, activity_limit=activity_limit) |
|
|
|
|
| @app.get("/session/{session_id}/snapshot") |
| def session_snapshot(session_id: str, activity_limit: int = Query(80, ge=1, le=500)): |
| """Return a canonical OpenClaw session snapshot for one session.""" |
| summary = get_session_summary(session_id) |
| if not summary: |
| raise HTTPException(status_code=404, detail="session not found") |
| return build_session_snapshot(summary, activity_limit=activity_limit) |
|
|
|
|
| @app.get("/reports/session-sprawl") |
| def report_session_sprawl( |
| limit: int = Query(500, ge=1, le=5000), |
| stale_days: int = Query(30, ge=1, le=3650), |
| ): |
| return generate_session_sprawl_report(limit=limit, stale_days=stale_days) |
|
|
|
|
| @app.get("/reports/context-pressure") |
| def report_context_pressure(limit: int = Query(200, ge=1, le=5000)): |
| return generate_context_pressure_report(limit=limit) |
|
|
|
|
| @app.get("/reports/failure-modes") |
| def report_failure_modes(limit: int = Query(200, ge=1, le=1000)): |
| return generate_failure_mode_report(limit=limit) |
|
|
|
|
| @app.post("/review/run") |
| def review_run(kind: str = Query("light", pattern="^(light|deep)$")): |
| return generate_intelligence_bundle(kind=kind) |
|
|
|
|
| def _normalize_activity_rows(rows: list[dict]) -> list[dict]: |
| normalized = [] |
| for row in rows: |
| role = row.get("role") or "" |
| tool_name = row.get("tool_name") or "" |
| clean_text = row.get("clean_text") or "" |
| is_error = bool(row.get("is_error")) |
| preview = row.get("preview") or "" |
|
|
| |
| if role == "toolResult": |
| if is_error: |
| event_type = "tool_error" |
| summary = f"✗ {tool_name or 'tool'}" |
| else: |
| event_type = "tool_result" |
| summary = f"✓ {tool_name}" if tool_name else (preview[:80] or "tool result") |
| elif tool_name and role in ("assistant", "user"): |
| event_type = "tool_call" |
| summary = f"→ {tool_name}" |
| elif role == "assistant": |
| lower = clean_text.strip().lower() |
| if lower.startswith(("using", "i'll use", "i will use")) or lower.startswith(("tool call", "calling")): |
| event_type = "assistant_meta" |
| summary = preview[:120] or "assistant planning" |
| elif any(kw in lower[:100] for kw in ("thinking", "reasoning", "analyzing")): |
| event_type = "assistant_thinking" |
| summary = preview[:120] or "thinking" |
| else: |
| event_type = "assistant_text" |
| summary = preview[:120] or "assistant" |
| elif role == "user": |
| event_type = "user_message" |
| summary = preview[:120] or "user" |
| elif role == "system": |
| event_type = "system" |
| summary = preview[:80] or "system" |
| else: |
| event_type = "event" |
| summary = preview[:80] or str(role) or "event" |
|
|
| normalized.append( |
| { |
| "timestamp": row.get("timestamp") or row.get("indexed_at"), |
| "session_id": row.get("session_id"), |
| "agent_id": row.get("agent_id"), |
| "event_type": event_type, |
| "role": role, |
| "tool_name": tool_name, |
| "summary": summary, |
| "details": clean_text[:500] if clean_text else "", |
| "is_error": is_error, |
| "entry_idx": row.get("entry_idx"), |
| } |
| ) |
| return normalized |
|
|
|
|
| @app.get("/session/{session_id}/activity") |
| def session_activity(session_id: str, limit: int = Query(200, ge=1, le=1000)): |
| rows = get_session_activity(session_id, limit) |
| return { |
| "generated_at": datetime.now(timezone.utc).isoformat(), |
| "session_id": session_id, |
| "activity": _normalize_activity_rows(rows), |
| } |
|
|
|
|
| |
| _API_MODEL_PRICING = { |
| "gpt-5": (2.5, 10.0), "gpt-5-4o": (2.5, 10.0), "gpt-4o": (2.5, 10.0), |
| "gpt-4o-mini": (0.15, 0.6), "gpt-4.1": (2.0, 8.0), |
| "gpt-4-turbo": (10.0, 30.0), "gpt-4": (30.0, 60.0), |
| "claude-opus-4-6": (3.0, 15.0), "claude-sonnet-4-6": (3.0, 15.0), |
| "claude-haiku-4-6": (0.8, 4.0), "claude-3-5-sonnet": (3.0, 15.0), |
| "claude-3-opus": (15.0, 75.0), "claude-3-sonnet": (3.0, 15.0), |
| "deepseek-chat": (0.14, 0.28), "deepseek-reasoner": (0.55, 2.19), |
| "gemini-2.5-pro": (1.25, 5.0), "gemini-2.5-flash": (0.075, 0.30), |
| "gemini-2.5-flash-lite": (0.075, 0.15), |
| "mistral-large": (2.0, 6.0), "mistral-small": (0.15, 0.6), |
| "minimax-m2.7": (0.099, 0.396), "minimax-m2": (0.099, 0.396), |
| "qwen": (0.5, 2.0), "moonshotai/kimi-k2": (0.5, 1.5), |
| "default": (0.1, 0.4), |
| } |
|
|
|
|
| def _infer_pricing(model: str) -> tuple[float, float]: |
| lowered = model.lower() |
| for key, price in _API_MODEL_PRICING.items(): |
| if key.lower() in lowered or lowered in key.lower(): |
| return price |
| return (0.1, 0.4) |
|
|
|
|
| @app.get("/session/{session_id}/cost_summary") |
| def session_cost_summary(session_id: str): |
| """Return token count and cost estimate for a session.""" |
| import math |
| conn = get_conn() |
| conn.row_factory = sqlite3.Row |
| rows = conn.execute( |
| """ |
| SELECT role, tool_name, clean_text, is_error, entry_type |
| FROM spooled_entries |
| WHERE session_id = ? OR session_id LIKE ? |
| ORDER BY entry_idx ASC |
| """, |
| (session_id, session_id + "%"), |
| ).fetchall() |
|
|
| if not rows: |
| raise HTTPException(status_code=404, detail="session not found") |
|
|
| |
| model = "default" |
| for row in reversed(rows): |
| if row["entry_type"] == "model_change" or row["role"] == "model_change": |
| try: |
| obj = json.loads(row["clean_text"] or "{}") |
| model = obj.get("modelId", obj.get("provider", "default")) |
| except: |
| pass |
| break |
|
|
| inp_price, out_price = _infer_pricing(model) |
|
|
| user_tokens = assistant_tokens = tool_result_tokens = error_count = 0 |
| tool_usage = {} |
|
|
| for row in rows: |
| role = row["role"] or "" |
| text = row["clean_text"] or "" |
| tool_name = row["tool_name"] or "" |
| is_error = bool(row["is_error"]) |
| tokens = max(1, math.ceil(len(text) / 4)) |
|
|
| if role in ("user", "system"): |
| user_tokens += tokens |
| elif role == "assistant": |
| assistant_tokens += tokens |
| elif role == "toolResult": |
| tool_result_tokens += tokens |
| if is_error: |
| error_count += 1 |
| if tool_name: |
| if tool_name not in tool_usage: |
| tool_usage[tool_name] = {"calls": 0, "result_tokens": 0, "errors": 0} |
| tool_usage[tool_name]["result_tokens"] += tokens |
| if is_error: |
| tool_usage[tool_name]["errors"] += 1 |
|
|
| input_cost = (user_tokens / 1_000_000) * inp_price |
| output_cost = ((assistant_tokens + tool_result_tokens) / 1_000_000) * out_price |
|
|
| return { |
| "generated_at": datetime.now(timezone.utc).isoformat(), |
| "session_id": session_id, |
| "model": model, |
| "pricing_per_1m": {"input": inp_price, "output": out_price}, |
| "tokens": { |
| "user_input": user_tokens, |
| "assistant_output": assistant_tokens, |
| "tool_results": tool_result_tokens, |
| "total": user_tokens + assistant_tokens + tool_result_tokens, |
| }, |
| "cost_usd": { |
| "input": round(input_cost, 4), |
| "output": round(output_cost, 4), |
| "total": round(input_cost + output_cost, 4), |
| }, |
| "error_count": error_count, |
| "tool_usage": tool_usage, |
| } |
|
|
|
|
| @app.get("/session/{session_id}/stream") |
| async def session_stream(session_id: str): |
| async def event_generator(): |
| last_idx = -1 |
| while True: |
| data = session_activity(session_id, limit=200) |
| act = data.get("activity", []) |
| new_events = [e for e in act if e.get("entry_idx") is not None and e["entry_idx"] > last_idx] |
| if new_events: |
| new_events.sort(key=lambda x: x["entry_idx"]) |
| for evt in new_events: |
| yield f"data: {json.dumps(evt)}\n\n" |
| last_idx = evt["entry_idx"] |
| await asyncio.sleep(1.0) |
| |
| return StreamingResponse(event_generator(), media_type="text/event-stream") |
|
|
|
|
| class LockRequest(BaseModel): |
| owner: str |
| ttl_minutes: int = 60 |
|
|
| class ReleaseRequest(BaseModel): |
| owner: str |
| status_msg: str = "completed" |
|
|
| @app.post("/jobs/{job_name}/lock") |
| def api_lock_job(job_name: str, req: LockRequest): |
| if claim_lock(job_name, req.owner, req.ttl_minutes): |
| return {"status": "ok", "message": "lock acquired"} |
| raise HTTPException(status_code=409, detail="lock held by another owner") |
|
|
| @app.post("/jobs/{job_name}/release") |
| def api_release_job(job_name: str, req: ReleaseRequest): |
| if release_lock(job_name, req.owner, req.status_msg): |
| return {"status": "ok", "message": "lock released"} |
| raise HTTPException(status_code=403, detail="lock not held by owner") |
|
|
| @app.get("/jobs") |
| def api_list_jobs(): |
| return {"jobs": get_job_status()} |
|
|