""" agent-trace v2 — FORGE Telemetry Backbone Every agent sends events here. Owns: ingest, retention, aggregation, real-time dashboard. agent-learn reads rewards from here. """ import asyncio, json, os, sqlite3, time, uuid from contextlib import asynccontextmanager from pathlib import Path from typing import Optional import uvicorn from fastapi import FastAPI, HTTPException, Query, Request from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- DB_PATH = Path(os.getenv("TRACE_DB", "/tmp/trace.db")) PORT = int(os.getenv("PORT", "7860")) INGEST_KEY = os.getenv("TRACE_KEY", "") RETAIN_DAYS = int(os.getenv("RETAIN_DAYS", "7")) VALID_TYPES = {"llm_call","tool_use","react_step","skill_load","kanban_move", "slot_event","self_reflect","reward_signal","error","custom"} # --------------------------------------------------------------------------- # Database # --------------------------------------------------------------------------- def get_db(): conn = sqlite3.connect(str(DB_PATH), check_same_thread=False) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA synchronous=NORMAL") return conn def init_db(): conn = get_db() conn.executescript(""" CREATE TABLE IF NOT EXISTS traces ( id TEXT PRIMARY KEY, agent TEXT NOT NULL DEFAULT 'unknown', session_id TEXT NOT NULL DEFAULT '', task_id TEXT NOT NULL DEFAULT '', event_type TEXT NOT NULL DEFAULT 'custom', status TEXT NOT NULL DEFAULT 'ok', latency_ms REAL, tokens_in INTEGER, tokens_out INTEGER, model TEXT, tool_name TEXT, skill_id TEXT, reward REAL, error_msg TEXT, payload TEXT NOT NULL DEFAULT '{}', tags TEXT NOT NULL DEFAULT '[]', ts REAL NOT NULL ); CREATE INDEX IF NOT EXISTS idx_tr_ts ON traces(ts DESC); CREATE INDEX IF NOT EXISTS idx_tr_agent ON traces(agent); CREATE INDEX IF NOT EXISTS idx_tr_type ON traces(event_type); CREATE INDEX IF NOT EXISTS idx_tr_sess ON traces(session_id); CREATE INDEX IF NOT EXISTS idx_tr_task ON traces(task_id); CREATE INDEX IF NOT EXISTS idx_tr_reward ON traces(reward); CREATE TABLE IF NOT EXISTS hourly_stats ( hour TEXT NOT NULL, agent TEXT NOT NULL, event_type TEXT NOT NULL, count INTEGER NOT NULL DEFAULT 0, errors INTEGER NOT NULL DEFAULT 0, total_lat REAL NOT NULL DEFAULT 0, total_tok INTEGER NOT NULL DEFAULT 0, PRIMARY KEY (hour, agent, event_type) ); """) conn.commit(); conn.close() def purge_old(): cutoff = time.time() - RETAIN_DAYS * 86400 conn = get_db() n = conn.execute("DELETE FROM traces WHERE ts < ?", (cutoff,)).rowcount conn.execute("DELETE FROM hourly_stats WHERE hour < ?", (time.strftime("%Y-%m-%dT%H", time.gmtime(cutoff)),)) conn.commit(); conn.close() return n # --------------------------------------------------------------------------- # Write # --------------------------------------------------------------------------- def ingest_trace(data: dict) -> str: event_type = str(data.get("event_type","custom")) if event_type not in VALID_TYPES: event_type = "custom" status = "error" if str(data.get("status","ok")).lower()=="error" else "ok" latency_ms = float(data["latency_ms"]) if data.get("latency_ms") is not None else None tokens_in = int(data["tokens_in"]) if data.get("tokens_in") is not None else None tokens_out = int(data["tokens_out"]) if data.get("tokens_out") is not None else None reward = float(data["reward"]) if data.get("reward") is not None else None ts = float(data.get("ts") or time.time()) trace_id = str(data.get("id") or uuid.uuid4()) payload = json.dumps(data.get("payload") or {}) tags = json.dumps(data.get("tags") or []) agent = str(data.get("agent","unknown"))[:64] conn = get_db() conn.execute(""" INSERT OR IGNORE INTO traces (id,agent,session_id,task_id,event_type,status, latency_ms,tokens_in,tokens_out,model,tool_name,skill_id, reward,error_msg,payload,tags,ts) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", (trace_id, agent, str(data.get("session_id",""))[:128], str(data.get("task_id",""))[:128], event_type, status, latency_ms, tokens_in, tokens_out, str(data.get("model","") or "")[:128], str(data.get("tool_name","") or "")[:128], str(data.get("skill_id","") or "")[:128], reward, str(data.get("error_msg","") or "")[:1024], payload, tags, ts)) hour = time.strftime("%Y-%m-%dT%H", time.gmtime(ts)) total_tok = (tokens_in or 0) + (tokens_out or 0) conn.execute(""" INSERT INTO hourly_stats (hour,agent,event_type,count,errors,total_lat,total_tok) VALUES (?,?,?,1,?,?,?) ON CONFLICT(hour,agent,event_type) DO UPDATE SET count=count+1, errors=errors+excluded.errors, total_lat=total_lat+excluded.total_lat, total_tok=total_tok+excluded.total_tok""", (hour, agent, event_type, 1 if status=="error" else 0, latency_ms or 0, total_tok)) conn.commit(); conn.close() return trace_id def ingest_batch(events: list) -> dict: ok, errs = 0, [] for i, ev in enumerate(events): try: ingest_trace(ev); ok += 1 except Exception as e: errs.append({"index":i,"error":str(e)}) return {"ingested":ok,"errors":errs} def update_reward(trace_id: str, reward: float) -> bool: conn = get_db() n = conn.execute("UPDATE traces SET reward=? WHERE id=?", (reward, trace_id)).rowcount conn.commit(); conn.close() return n > 0 # --------------------------------------------------------------------------- # Read # --------------------------------------------------------------------------- def query_traces(agent="", event_type="", session_id="", task_id="", status="", has_reward=False, since_ts=0.0, limit=100, offset=0) -> list: conn = get_db() where, params = [], [] if agent: where.append("agent=?"); params.append(agent) if event_type: where.append("event_type=?"); params.append(event_type) if session_id: where.append("session_id=?"); params.append(session_id) if task_id: where.append("task_id=?"); params.append(task_id) if status: where.append("status=?"); params.append(status) if has_reward: where.append("reward IS NOT NULL") if since_ts: where.append("ts >= ?"); params.append(since_ts) sql = ("SELECT * FROM traces" + (f" WHERE {' AND '.join(where)}" if where else "") + " ORDER BY ts DESC LIMIT ? OFFSET ?") rows = conn.execute(sql, params+[limit,offset]).fetchall() conn.close() result = [] for r in rows: d = dict(r) for f in ("payload","tags"): try: d[f] = json.loads(d[f]) except Exception: pass result.append(d) return result def get_agents() -> list: conn = get_db() rows = conn.execute("SELECT DISTINCT agent FROM traces ORDER BY agent").fetchall() conn.close() return [r[0] for r in rows] def get_stats(window_hours=24) -> dict: conn = get_db() since = time.time() - window_hours * 3600 total = conn.execute("SELECT COUNT(*) FROM traces WHERE ts>=?",(since,)).fetchone()[0] errors = conn.execute("SELECT COUNT(*) FROM traces WHERE ts>=? AND status='error'",(since,)).fetchone()[0] by_agent = conn.execute( "SELECT agent, COUNT(*) as cnt, SUM(CASE WHEN status='error' THEN 1 ELSE 0 END) as errs " "FROM traces WHERE ts>=? GROUP BY agent ORDER BY cnt DESC",(since,)).fetchall() by_type = conn.execute( "SELECT event_type, COUNT(*) as cnt FROM traces WHERE ts>=? GROUP BY event_type ORDER BY cnt DESC",(since,)).fetchall() avg_lat = conn.execute("SELECT AVG(latency_ms) FROM traces WHERE ts>=? AND latency_ms IS NOT NULL",(since,)).fetchone()[0] total_tok = conn.execute( "SELECT SUM(COALESCE(tokens_in,0)+COALESCE(tokens_out,0)) FROM traces WHERE ts>=?",(since,)).fetchone()[0] rw = conn.execute( "SELECT COUNT(*),AVG(reward),MIN(reward),MAX(reward) FROM traces WHERE ts>=? AND reward IS NOT NULL",(since,)).fetchone() since_hour = time.strftime("%Y-%m-%dT%H", time.gmtime(since)) hourly = conn.execute( "SELECT hour, SUM(count) as total, SUM(errors) as errs " "FROM hourly_stats WHERE hour>=? GROUP BY hour ORDER BY hour",(since_hour,)).fetchall() conn.close() return { "window_hours": window_hours, "total_events": total, "error_count": errors, "error_rate": round(errors/max(total,1)*100,2), "avg_latency_ms": round(avg_lat or 0,1), "total_tokens": total_tok or 0, "by_agent": [dict(r) for r in by_agent], "by_event_type": [dict(r) for r in by_type], "reward_stats": {"count":rw[0]or 0,"avg":round(rw[1]or 0,4),"min":rw[2],"max":rw[3]}, "hourly_trend": [dict(r) for r in hourly], } # --------------------------------------------------------------------------- # Seed demo data # --------------------------------------------------------------------------- def seed_demo(): conn = get_db() n = conn.execute("SELECT COUNT(*) FROM traces").fetchone()[0] conn.close() if n > 0: return import random; random.seed(42) now = time.time() agents = ["nexus","pulse","kanban","memory","relay"] types = list(VALID_TYPES) for _ in range(80): a = random.choice(agents) et = random.choice(types) ingest_trace({ "agent": a, "event_type": et, "session_id": f"demo-{random.randint(1,8)}", "task_id": f"task-{random.randint(1,15)}", "status": "error" if random.random()<0.08 else "ok", "latency_ms": round(random.uniform(80,3500),1) if et=="llm_call" else round(random.uniform(5,400),1), "tokens_in": random.randint(100,2000) if et=="llm_call" else None, "tokens_out": random.randint(50,800) if et=="llm_call" else None, "model": "qwen/qwen3.5-35b-a3b" if a=="nexus" else "", "tool_name": random.choice(["web_search","calculator","kanban_create","slot_reserve"]) if et=="tool_use" else "", "reward": round(random.uniform(-0.3,1.0),3) if random.random()<0.3 else None, "ts": now - random.uniform(0, 23*3600), }) # --------------------------------------------------------------------------- # MCP # --------------------------------------------------------------------------- MCP_TOOLS = [ {"name":"trace_ingest","description":"Ingest a trace event into FORGE telemetry.", "inputSchema":{"type":"object","required":["agent","event_type"], "properties":{"agent":{"type":"string"},"event_type":{"type":"string"}, "session_id":{"type":"string"},"task_id":{"type":"string"}, "status":{"type":"string"},"latency_ms":{"type":"number"}, "tokens_in":{"type":"integer"},"tokens_out":{"type":"integer"}, "model":{"type":"string"},"tool_name":{"type":"string"}, "skill_id":{"type":"string"},"error_msg":{"type":"string"}, "payload":{"type":"object"},"tags":{"type":"array"}}}}, {"name":"trace_query","description":"Query trace events with filters.", "inputSchema":{"type":"object","properties":{"agent":{"type":"string"},"event_type":{"type":"string"}, "session_id":{"type":"string"},"task_id":{"type":"string"}, "status":{"type":"string"},"since_hours":{"type":"number"}, "limit":{"type":"integer"}}}}, {"name":"trace_stats","description":"Get aggregated telemetry statistics.", "inputSchema":{"type":"object","properties":{"window_hours":{"type":"integer","default":24}}}}, {"name":"trace_agents","description":"List all agents that have sent telemetry.", "inputSchema":{"type":"object","properties":{}}}, {"name":"trace_session","description":"Get complete event timeline for a session.", "inputSchema":{"type":"object","required":["session_id"],"properties":{"session_id":{"type":"string"}}}}, {"name":"trace_reward","description":"Assign reward score to a trace event (called by agent-learn).", "inputSchema":{"type":"object","required":["trace_id","reward"], "properties":{"trace_id":{"type":"string"},"reward":{"type":"number"}}}}, ] def handle_mcp(method, params, req_id): def ok(r): return {"jsonrpc":"2.0","id":req_id,"result":r} def txt(d): return ok({"content":[{"type":"text","text":json.dumps(d)}]}) if method=="initialize": return ok({"protocolVersion":"2024-11-05","serverInfo":{"name":"agent-trace","version":"1.0.0"},"capabilities":{"tools":{}}}) if method=="tools/list": return ok({"tools":MCP_TOOLS}) if method=="tools/call": n, a = params.get("name",""), params.get("arguments",{}) if n=="trace_ingest": tid = ingest_trace(a); return txt({"ok":True,"trace_id":tid}) if n=="trace_query": ev = query_traces(agent=a.get("agent",""),event_type=a.get("event_type",""), session_id=a.get("session_id",""),task_id=a.get("task_id",""), status=a.get("status",""),since_ts=time.time()-a.get("since_hours",1)*3600, limit=min(a.get("limit",50),200)) return txt({"events":ev,"count":len(ev)}) if n=="trace_stats": return txt(get_stats(a.get("window_hours",24))) if n=="trace_agents": return txt({"agents":get_agents()}) if n=="trace_session": return txt({"session_id":a["session_id"],"events":query_traces(session_id=a["session_id"],limit=500)}) if n=="trace_reward": return txt({"ok":update_reward(a["trace_id"],float(a["reward"])),"trace_id":a["trace_id"]}) return {"jsonrpc":"2.0","id":req_id,"error":{"code":-32601,"message":f"Unknown tool: {n}"}} if method in ("notifications/initialized","notifications/cancelled"): return None return {"jsonrpc":"2.0","id":req_id,"error":{"code":-32601,"message":f"Method not found: {method}"}} # --------------------------------------------------------------------------- # App # --------------------------------------------------------------------------- @asynccontextmanager async def lifespan(app): init_db(); seed_demo() asyncio.create_task(_purge_loop()) yield async def _purge_loop(): while True: await asyncio.sleep(3600); purge_old() app = FastAPI(title="agent-trace", version="1.0.0", lifespan=lifespan) def _auth(r): return not INGEST_KEY or r.headers.get("x-trace-key","") == INGEST_KEY @app.post("/api/trace", status_code=201) async def api_ingest(request: Request): if not _auth(request): raise HTTPException(403,"Invalid X-Trace-Key") body = await request.json() if isinstance(body, list): return JSONResponse(ingest_batch(body)) tid = ingest_trace(body) return JSONResponse({"ok":True,"trace_id":tid}) @app.post("/api/traces/batch", status_code=201) async def api_batch(request: Request): if not _auth(request): raise HTTPException(403,"Invalid X-Trace-Key") body = await request.json() return JSONResponse(ingest_batch(body if isinstance(body,list) else [body])) @app.get("/api/traces") async def api_query( agent:str=Query(""), event_type:str=Query(""), session_id:str=Query(""), task_id:str=Query(""), status:str=Query(""), has_reward:bool=Query(False), since_hours:float=Query(24.0), limit:int=Query(100,le=1000), offset:int=Query(0)): ev = query_traces(agent=agent, event_type=event_type, session_id=session_id, task_id=task_id, status=status, has_reward=has_reward, since_ts=time.time()-since_hours*3600, limit=limit, offset=offset) return JSONResponse({"events":ev,"count":len(ev)}) @app.get("/api/stats") async def api_stats(window_hours:int=Query(24)): return JSONResponse(get_stats(window_hours)) @app.get("/api/agents") async def api_agents(): return JSONResponse({"agents":get_agents()}) @app.get("/api/session/{sid}") async def api_session(sid:str): return JSONResponse({"session_id":sid,"events":query_traces(session_id=sid,limit=500)}) @app.patch("/api/trace/{trace_id}/reward") async def api_reward(trace_id:str, request:Request): if not _auth(request): raise HTTPException(403,"Invalid X-Trace-Key") body = await request.json() updated = update_reward(trace_id, float(body.get("reward",0))) return JSONResponse({"ok":updated,"trace_id":trace_id}) @app.delete("/api/purge") async def api_purge(request:Request): if not _auth(request): raise HTTPException(403,"Invalid X-Trace-Key") return JSONResponse({"ok":True,"deleted":purge_old(),"retain_days":RETAIN_DAYS}) @app.get("/api/health") async def api_health(): conn=get_db(); n=conn.execute("SELECT COUNT(*) FROM traces").fetchone()[0]; conn.close() return JSONResponse({"ok":True,"total_traces":n,"retain_days":RETAIN_DAYS}) @app.get("/mcp/sse") async def mcp_sse(request:Request): async def gen(): yield f"data: {json.dumps({'jsonrpc':'2.0','method':'connected','params':{}})}\n\n" yield f"data: {json.dumps({'jsonrpc':'2.0','method':'notifications/tools','params':{'tools':MCP_TOOLS}})}\n\n" while True: if await request.is_disconnected(): break yield ": ping\n\n"; await asyncio.sleep(15) return StreamingResponse(gen(), media_type="text/event-stream", headers={"Cache-Control":"no-cache","Connection":"keep-alive","X-Accel-Buffering":"no"}) @app.post("/mcp") async def mcp_rpc(request:Request): try: body = await request.json() except Exception: return JSONResponse({"jsonrpc":"2.0","id":None,"error":{"code":-32700,"message":"Parse error"}}) if isinstance(body,list): return JSONResponse([r for r in [handle_mcp(x.get("method",""),x.get("params",{}),x.get("id")) for x in body] if r]) r = handle_mcp(body.get("method",""), body.get("params",{}), body.get("id")) return JSONResponse(r or {"jsonrpc":"2.0","id":body.get("id"),"result":{}}) # --------------------------------------------------------------------------- # SPA # --------------------------------------------------------------------------- SPA = r"""