File size: 4,477 Bytes
63c75d5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 | import json
import uuid
from datetime import datetime, timezone
from pathlib import Path
from config import settings
from reviewer.scorer import score_session, get_all_spooled_entries
from reviewer.pattern_detector import detect_failure_patterns
from reviewer.skill_analyzer import analyze_skill_coverage
def _group_by_session(rows: list[dict]) -> dict[str, list[dict]]:
grouped = {}
for row in rows:
grouped.setdefault(row["session_id"], []).append(row)
return grouped
def generate_report(since: str | None = None) -> dict:
from spooler.store import get_conn
conn = get_conn()
rows = get_all_spooled_entries(conn, since)
conn.close()
# Group and score
grouped = _group_by_session(rows)
session_scores = []
for session_id, sess_rows in grouped.items():
agent_id = sess_rows[0].get("agent_id", "unknown")
scored = score_session(sess_rows)
scored["session_id"] = session_id
scored["agent_id"] = agent_id
session_scores.append(scored)
# Detect patterns
patterns = detect_failure_patterns(session_scores, rows)
# Skill coverage
skill_coverage = analyze_skill_coverage()
# Optimization recommendations (stub — deterministic only)
recommendations = []
for mcp in skill_coverage.get("mcps_missing_skill_surface", []):
recommendations.append({
"type": "skill_surface_missing",
"description": f"MCP '{mcp}' is registered but has no skill surface",
"target": mcp,
"confidence": 0.8,
})
# Unused skills (skills dir exists, no tool_name match in spool)
skill_names = set(skill_coverage.get("mcps_with_skill_surface", []))
used_tools = {r.get("tool_name") for r in rows if r.get("tool_name")}
unused_skills = [s for s in skill_names if s not in used_tools and s not in skill_coverage.get("mcps_missing_skill_surface", [])]
for skill in unused_skills:
recommendations.append({
"type": "skill_never_used",
"description": f"Skill '{skill}' exists but was not used in this period",
"target": skill,
"confidence": 0.7,
})
# Filter by confidence threshold
recommendations = [r for r in recommendations if r["confidence"] >= settings.review_confidence_threshold]
# Drift signals (stub — compare to previous report if available)
drift_signals = []
report = {
"review_id": str(uuid.uuid4()),
"generated_at": datetime.now(timezone.utc).isoformat(),
"period": {
"from": since or "all",
"to": datetime.now(timezone.utc).isoformat(),
},
"sessions_reviewed": len(session_scores),
"session_quality_scores": session_scores,
"failure_patterns": patterns,
"unused_tools": [
{"tool": mcp, "registered": True, "used_this_week": False}
for mcp in skill_coverage.get("mcps_missing_skill_surface", [])
],
"optimization_recommendations": recommendations,
"drift_signals": drift_signals,
"skill_coverage": skill_coverage,
}
# Persist to DB
_persist_report(report)
return report
def generate_skills_report() -> dict:
coverage = analyze_skill_coverage()
report = {
"generated_at": datetime.now(timezone.utc).isoformat(),
"period": "weekly",
**coverage,
}
_write_json_artifact("skills-latest.json", report)
return report
def _persist_report(report: dict):
from spooler.store import get_conn
conn = get_conn()
conn.execute(
"""
INSERT OR REPLACE INTO review_reports
(review_id, generated_at, period_from, period_to, report_json)
VALUES (?, ?, ?, ?, ?)
""",
(
report["review_id"],
report["generated_at"],
report["period"]["from"],
report["period"]["to"],
json.dumps(report),
),
)
conn.commit()
conn.close()
_write_json_artifact("review-latest.json", report)
_write_json_artifact(f"history/review-{report['generated_at'].replace(':', '').replace('+00:00', 'Z')}.json", report)
def _write_json_artifact(relative_path: str, payload: dict):
base = settings.openclaw_state_dir / "session_amplifier"
target = base / relative_path
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(json.dumps(payload, indent=2, sort_keys=True))
|