import json import uuid from datetime import datetime, timezone from pathlib import Path from config import settings from reviewer.scorer import score_session, get_all_spooled_entries from reviewer.pattern_detector import detect_failure_patterns from reviewer.skill_analyzer import analyze_skill_coverage def _group_by_session(rows: list[dict]) -> dict[str, list[dict]]: grouped = {} for row in rows: grouped.setdefault(row["session_id"], []).append(row) return grouped def generate_report(since: str | None = None) -> dict: from spooler.store import get_conn conn = get_conn() rows = get_all_spooled_entries(conn, since) conn.close() # Group and score grouped = _group_by_session(rows) session_scores = [] for session_id, sess_rows in grouped.items(): agent_id = sess_rows[0].get("agent_id", "unknown") scored = score_session(sess_rows) scored["session_id"] = session_id scored["agent_id"] = agent_id session_scores.append(scored) # Detect patterns patterns = detect_failure_patterns(session_scores, rows) # Skill coverage skill_coverage = analyze_skill_coverage() # Optimization recommendations (stub — deterministic only) recommendations = [] for mcp in skill_coverage.get("mcps_missing_skill_surface", []): recommendations.append({ "type": "skill_surface_missing", "description": f"MCP '{mcp}' is registered but has no skill surface", "target": mcp, "confidence": 0.8, }) # Unused skills (skills dir exists, no tool_name match in spool) skill_names = set(skill_coverage.get("mcps_with_skill_surface", [])) used_tools = {r.get("tool_name") for r in rows if r.get("tool_name")} unused_skills = [s for s in skill_names if s not in used_tools and s not in skill_coverage.get("mcps_missing_skill_surface", [])] for skill in unused_skills: recommendations.append({ "type": "skill_never_used", "description": f"Skill '{skill}' exists but was not used in this period", "target": skill, "confidence": 0.7, }) # Filter by confidence threshold recommendations = [r for r in recommendations if r["confidence"] >= settings.review_confidence_threshold] # Drift signals (stub — compare to previous report if available) drift_signals = [] report = { "review_id": str(uuid.uuid4()), "generated_at": datetime.now(timezone.utc).isoformat(), "period": { "from": since or "all", "to": datetime.now(timezone.utc).isoformat(), }, "sessions_reviewed": len(session_scores), "session_quality_scores": session_scores, "failure_patterns": patterns, "unused_tools": [ {"tool": mcp, "registered": True, "used_this_week": False} for mcp in skill_coverage.get("mcps_missing_skill_surface", []) ], "optimization_recommendations": recommendations, "drift_signals": drift_signals, "skill_coverage": skill_coverage, } # Persist to DB _persist_report(report) return report def generate_skills_report() -> dict: coverage = analyze_skill_coverage() report = { "generated_at": datetime.now(timezone.utc).isoformat(), "period": "weekly", **coverage, } _write_json_artifact("skills-latest.json", report) return report def _persist_report(report: dict): from spooler.store import get_conn conn = get_conn() conn.execute( """ INSERT OR REPLACE INTO review_reports (review_id, generated_at, period_from, period_to, report_json) VALUES (?, ?, ?, ?, ?) """, ( report["review_id"], report["generated_at"], report["period"]["from"], report["period"]["to"], json.dumps(report), ), ) conn.commit() conn.close() _write_json_artifact("review-latest.json", report) _write_json_artifact(f"history/review-{report['generated_at'].replace(':', '').replace('+00:00', 'Z')}.json", report) def _write_json_artifact(relative_path: str, payload: dict): base = settings.openclaw_state_dir / "session_amplifier" target = base / relative_path target.parent.mkdir(parents=True, exist_ok=True) target.write_text(json.dumps(payload, indent=2, sort_keys=True))