Ordo
Initial public release
63c75d5
import json
import uuid
from datetime import datetime, timezone
from pathlib import Path
from config import settings
from reviewer.scorer import score_session, get_all_spooled_entries
from reviewer.pattern_detector import detect_failure_patterns
from reviewer.skill_analyzer import analyze_skill_coverage
def _group_by_session(rows: list[dict]) -> dict[str, list[dict]]:
grouped = {}
for row in rows:
grouped.setdefault(row["session_id"], []).append(row)
return grouped
def generate_report(since: str | None = None) -> dict:
from spooler.store import get_conn
conn = get_conn()
rows = get_all_spooled_entries(conn, since)
conn.close()
# Group and score
grouped = _group_by_session(rows)
session_scores = []
for session_id, sess_rows in grouped.items():
agent_id = sess_rows[0].get("agent_id", "unknown")
scored = score_session(sess_rows)
scored["session_id"] = session_id
scored["agent_id"] = agent_id
session_scores.append(scored)
# Detect patterns
patterns = detect_failure_patterns(session_scores, rows)
# Skill coverage
skill_coverage = analyze_skill_coverage()
# Optimization recommendations (stub — deterministic only)
recommendations = []
for mcp in skill_coverage.get("mcps_missing_skill_surface", []):
recommendations.append({
"type": "skill_surface_missing",
"description": f"MCP '{mcp}' is registered but has no skill surface",
"target": mcp,
"confidence": 0.8,
})
# Unused skills (skills dir exists, no tool_name match in spool)
skill_names = set(skill_coverage.get("mcps_with_skill_surface", []))
used_tools = {r.get("tool_name") for r in rows if r.get("tool_name")}
unused_skills = [s for s in skill_names if s not in used_tools and s not in skill_coverage.get("mcps_missing_skill_surface", [])]
for skill in unused_skills:
recommendations.append({
"type": "skill_never_used",
"description": f"Skill '{skill}' exists but was not used in this period",
"target": skill,
"confidence": 0.7,
})
# Filter by confidence threshold
recommendations = [r for r in recommendations if r["confidence"] >= settings.review_confidence_threshold]
# Drift signals (stub — compare to previous report if available)
drift_signals = []
report = {
"review_id": str(uuid.uuid4()),
"generated_at": datetime.now(timezone.utc).isoformat(),
"period": {
"from": since or "all",
"to": datetime.now(timezone.utc).isoformat(),
},
"sessions_reviewed": len(session_scores),
"session_quality_scores": session_scores,
"failure_patterns": patterns,
"unused_tools": [
{"tool": mcp, "registered": True, "used_this_week": False}
for mcp in skill_coverage.get("mcps_missing_skill_surface", [])
],
"optimization_recommendations": recommendations,
"drift_signals": drift_signals,
"skill_coverage": skill_coverage,
}
# Persist to DB
_persist_report(report)
return report
def generate_skills_report() -> dict:
coverage = analyze_skill_coverage()
report = {
"generated_at": datetime.now(timezone.utc).isoformat(),
"period": "weekly",
**coverage,
}
_write_json_artifact("skills-latest.json", report)
return report
def _persist_report(report: dict):
from spooler.store import get_conn
conn = get_conn()
conn.execute(
"""
INSERT OR REPLACE INTO review_reports
(review_id, generated_at, period_from, period_to, report_json)
VALUES (?, ?, ?, ?, ?)
""",
(
report["review_id"],
report["generated_at"],
report["period"]["from"],
report["period"]["to"],
json.dumps(report),
),
)
conn.commit()
conn.close()
_write_json_artifact("review-latest.json", report)
_write_json_artifact(f"history/review-{report['generated_at'].replace(':', '').replace('+00:00', 'Z')}.json", report)
def _write_json_artifact(relative_path: str, payload: dict):
base = settings.openclaw_state_dir / "session_amplifier"
target = base / relative_path
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(json.dumps(payload, indent=2, sort_keys=True))