File size: 4,477 Bytes
63c75d5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import json
import uuid
from datetime import datetime, timezone
from pathlib import Path
from config import settings
from reviewer.scorer import score_session, get_all_spooled_entries
from reviewer.pattern_detector import detect_failure_patterns
from reviewer.skill_analyzer import analyze_skill_coverage


def _group_by_session(rows: list[dict]) -> dict[str, list[dict]]:
    grouped = {}
    for row in rows:
        grouped.setdefault(row["session_id"], []).append(row)
    return grouped


def generate_report(since: str | None = None) -> dict:
    from spooler.store import get_conn

    conn = get_conn()
    rows = get_all_spooled_entries(conn, since)
    conn.close()

    # Group and score
    grouped = _group_by_session(rows)
    session_scores = []
    for session_id, sess_rows in grouped.items():
        agent_id = sess_rows[0].get("agent_id", "unknown")
        scored = score_session(sess_rows)
        scored["session_id"] = session_id
        scored["agent_id"] = agent_id
        session_scores.append(scored)

    # Detect patterns
    patterns = detect_failure_patterns(session_scores, rows)

    # Skill coverage
    skill_coverage = analyze_skill_coverage()

    # Optimization recommendations (stub — deterministic only)
    recommendations = []
    for mcp in skill_coverage.get("mcps_missing_skill_surface", []):
        recommendations.append({
            "type": "skill_surface_missing",
            "description": f"MCP '{mcp}' is registered but has no skill surface",
            "target": mcp,
            "confidence": 0.8,
        })

    # Unused skills (skills dir exists, no tool_name match in spool)
    skill_names = set(skill_coverage.get("mcps_with_skill_surface", []))
    used_tools = {r.get("tool_name") for r in rows if r.get("tool_name")}
    unused_skills = [s for s in skill_names if s not in used_tools and s not in skill_coverage.get("mcps_missing_skill_surface", [])]
    for skill in unused_skills:
        recommendations.append({
            "type": "skill_never_used",
            "description": f"Skill '{skill}' exists but was not used in this period",
            "target": skill,
            "confidence": 0.7,
        })

    # Filter by confidence threshold
    recommendations = [r for r in recommendations if r["confidence"] >= settings.review_confidence_threshold]

    # Drift signals (stub — compare to previous report if available)
    drift_signals = []

    report = {
        "review_id": str(uuid.uuid4()),
        "generated_at": datetime.now(timezone.utc).isoformat(),
        "period": {
            "from": since or "all",
            "to": datetime.now(timezone.utc).isoformat(),
        },
        "sessions_reviewed": len(session_scores),
        "session_quality_scores": session_scores,
        "failure_patterns": patterns,
        "unused_tools": [
            {"tool": mcp, "registered": True, "used_this_week": False}
            for mcp in skill_coverage.get("mcps_missing_skill_surface", [])
        ],
        "optimization_recommendations": recommendations,
        "drift_signals": drift_signals,
        "skill_coverage": skill_coverage,
    }

    # Persist to DB
    _persist_report(report)

    return report


def generate_skills_report() -> dict:
    coverage = analyze_skill_coverage()
    report = {
        "generated_at": datetime.now(timezone.utc).isoformat(),
        "period": "weekly",
        **coverage,
    }
    _write_json_artifact("skills-latest.json", report)
    return report


def _persist_report(report: dict):
    from spooler.store import get_conn

    conn = get_conn()
    conn.execute(
        """
        INSERT OR REPLACE INTO review_reports
        (review_id, generated_at, period_from, period_to, report_json)
        VALUES (?, ?, ?, ?, ?)
        """,
        (
            report["review_id"],
            report["generated_at"],
            report["period"]["from"],
            report["period"]["to"],
            json.dumps(report),
        ),
    )
    conn.commit()
    conn.close()
    _write_json_artifact("review-latest.json", report)
    _write_json_artifact(f"history/review-{report['generated_at'].replace(':', '').replace('+00:00', 'Z')}.json", report)


def _write_json_artifact(relative_path: str, payload: dict):
    base = settings.openclaw_state_dir / "session_amplifier"
    target = base / relative_path
    target.parent.mkdir(parents=True, exist_ok=True)
    target.write_text(json.dumps(payload, indent=2, sort_keys=True))