| import json |
| import uuid |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from config import settings |
| from reviewer.scorer import score_session, get_all_spooled_entries |
| from reviewer.pattern_detector import detect_failure_patterns |
| from reviewer.skill_analyzer import analyze_skill_coverage |
|
|
|
|
| def _group_by_session(rows: list[dict]) -> dict[str, list[dict]]: |
| grouped = {} |
| for row in rows: |
| grouped.setdefault(row["session_id"], []).append(row) |
| return grouped |
|
|
|
|
| def generate_report(since: str | None = None) -> dict: |
| from spooler.store import get_conn |
|
|
| conn = get_conn() |
| rows = get_all_spooled_entries(conn, since) |
| conn.close() |
|
|
| |
| grouped = _group_by_session(rows) |
| session_scores = [] |
| for session_id, sess_rows in grouped.items(): |
| agent_id = sess_rows[0].get("agent_id", "unknown") |
| scored = score_session(sess_rows) |
| scored["session_id"] = session_id |
| scored["agent_id"] = agent_id |
| session_scores.append(scored) |
|
|
| |
| patterns = detect_failure_patterns(session_scores, rows) |
|
|
| |
| skill_coverage = analyze_skill_coverage() |
|
|
| |
| recommendations = [] |
| for mcp in skill_coverage.get("mcps_missing_skill_surface", []): |
| recommendations.append({ |
| "type": "skill_surface_missing", |
| "description": f"MCP '{mcp}' is registered but has no skill surface", |
| "target": mcp, |
| "confidence": 0.8, |
| }) |
|
|
| |
| skill_names = set(skill_coverage.get("mcps_with_skill_surface", [])) |
| used_tools = {r.get("tool_name") for r in rows if r.get("tool_name")} |
| unused_skills = [s for s in skill_names if s not in used_tools and s not in skill_coverage.get("mcps_missing_skill_surface", [])] |
| for skill in unused_skills: |
| recommendations.append({ |
| "type": "skill_never_used", |
| "description": f"Skill '{skill}' exists but was not used in this period", |
| "target": skill, |
| "confidence": 0.7, |
| }) |
|
|
| |
| recommendations = [r for r in recommendations if r["confidence"] >= settings.review_confidence_threshold] |
|
|
| |
| drift_signals = [] |
|
|
| report = { |
| "review_id": str(uuid.uuid4()), |
| "generated_at": datetime.now(timezone.utc).isoformat(), |
| "period": { |
| "from": since or "all", |
| "to": datetime.now(timezone.utc).isoformat(), |
| }, |
| "sessions_reviewed": len(session_scores), |
| "session_quality_scores": session_scores, |
| "failure_patterns": patterns, |
| "unused_tools": [ |
| {"tool": mcp, "registered": True, "used_this_week": False} |
| for mcp in skill_coverage.get("mcps_missing_skill_surface", []) |
| ], |
| "optimization_recommendations": recommendations, |
| "drift_signals": drift_signals, |
| "skill_coverage": skill_coverage, |
| } |
|
|
| |
| _persist_report(report) |
|
|
| return report |
|
|
|
|
| def generate_skills_report() -> dict: |
| coverage = analyze_skill_coverage() |
| report = { |
| "generated_at": datetime.now(timezone.utc).isoformat(), |
| "period": "weekly", |
| **coverage, |
| } |
| _write_json_artifact("skills-latest.json", report) |
| return report |
|
|
|
|
| def _persist_report(report: dict): |
| from spooler.store import get_conn |
|
|
| conn = get_conn() |
| conn.execute( |
| """ |
| INSERT OR REPLACE INTO review_reports |
| (review_id, generated_at, period_from, period_to, report_json) |
| VALUES (?, ?, ?, ?, ?) |
| """, |
| ( |
| report["review_id"], |
| report["generated_at"], |
| report["period"]["from"], |
| report["period"]["to"], |
| json.dumps(report), |
| ), |
| ) |
| conn.commit() |
| conn.close() |
| _write_json_artifact("review-latest.json", report) |
| _write_json_artifact(f"history/review-{report['generated_at'].replace(':', '').replace('+00:00', 'Z')}.json", report) |
|
|
|
|
| def _write_json_artifact(relative_path: str, payload: dict): |
| base = settings.openclaw_state_dir / "session_amplifier" |
| target = base / relative_path |
| target.parent.mkdir(parents=True, exist_ok=True) |
| target.write_text(json.dumps(payload, indent=2, sort_keys=True)) |
|
|