darkfire514's picture
Upload 160 files
399b80c verified
import json
import os
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from openspace.utils.logging import Logger
logger = Logger.get_logger(__name__)
def load_trajectory_from_jsonl(jsonl_path: str) -> List[Dict[str, Any]]:
trajectory = []
# Check if file exists first
if not os.path.exists(jsonl_path):
logger.debug(f"No trajectory file found at {jsonl_path} (this is normal for knowledge-only tasks)")
return []
try:
with open(jsonl_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
step = json.loads(line)
trajectory.append(step)
logger.info(f"Loaded {len(trajectory)} steps from {jsonl_path}")
return trajectory
except Exception as e:
logger.error(f"Failed to load trajectory from {jsonl_path}: {e}")
return []
def load_metadata(trajectory_dir: str) -> Optional[Dict[str, Any]]:
metadata_path = os.path.join(trajectory_dir, "metadata.json")
try:
with open(metadata_path, "r", encoding="utf-8") as f:
metadata = json.load(f)
return metadata
except Exception as e:
logger.warning(f"Failed to load metadata from {metadata_path}: {e}")
return None
def format_trajectory_for_export(
trajectory: List[Dict[str, Any]],
format_type: str = "compact"
) -> str:
if format_type == "compact":
return _format_compact(trajectory)
elif format_type == "detailed":
return _format_detailed(trajectory)
elif format_type == "markdown":
return _format_markdown(trajectory)
else:
raise ValueError(f"Unknown format type: {format_type}")
def _format_compact(trajectory: List[Dict[str, Any]]) -> str:
"""Compact format: one line per step."""
lines = []
for step in trajectory:
step_num = step.get("step", "?")
backend = step.get("backend", "?")
server = step.get("server")
tool = step.get("tool", "?")
result_status = "success" if step.get("result", {}).get("status") == "success" else "error"
# Include server name for MCP backend
backend_str = f"{backend}@{server}" if server else backend
lines.append(f"Step {step_num}: [{backend_str}] {tool} -> {result_status}")
return "\n".join(lines)
def _format_detailed(trajectory: List[Dict[str, Any]]) -> str:
"""Detailed format: multiple lines per step with parameters."""
lines = []
for step in trajectory:
step_num = step.get("step", "?")
timestamp = step.get("timestamp", "?")
backend = step.get("backend", "?")
server = step.get("server")
tool = step.get("tool", "?")
command = step.get("command", "?")
parameters = step.get("parameters", {})
result = step.get("result", {})
from openspace.utils.display import Box, BoxStyle
box = Box(width=66, style=BoxStyle.ROUNDED, color='bl')
lines.append("")
lines.append(box.top_line(0))
lines.append(box.text_line(f"Step {step_num} ({timestamp})", align='center', indent=0, text_color='c'))
lines.append(box.separator_line(0))
lines.append(box.text_line(f"Backend: {backend}", indent=0))
if server:
lines.append(box.text_line(f"Server: {server}", indent=0))
lines.append(box.text_line(f"Tool: {tool}", indent=0))
lines.append(box.text_line(f"Command: {command}", indent=0))
lines.append(box.separator_line(0))
# Parameters and result can be multi-line
param_str = json.dumps(parameters, indent=2)
for param_line in param_str.split('\n'):
lines.append(box.text_line(param_line, indent=0))
lines.append(box.separator_line(0))
result_str = json.dumps(result, indent=2)
for result_line in result_str.split('\n'):
lines.append(box.text_line(result_line, indent=0))
lines.append(box.bottom_line(0))
return "\n".join(lines)
def _format_markdown(trajectory: List[Dict[str, Any]]) -> str:
"""Markdown format: table format."""
lines = [
"# Trajectory",
"",
"| Step | Backend | Server | Tool | Status | Screenshot |",
"|------|---------|--------|------|--------|------------|"
]
for step in trajectory:
step_num = step.get("step", "?")
backend = step.get("backend", "?")
server = step.get("server", "-")
tool = step.get("tool", "?")
result_status = "✓" if step.get("result", {}).get("status") == "success" else "✗"
screenshot = "📷" if step.get("screenshot") else ""
lines.append(f"| {step_num} | {backend} | {server} | {tool} | {result_status} | {screenshot} |")
return "\n".join(lines)
def analyze_trajectory(trajectory: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Analyze trajectory and return statistics.
"""
if not trajectory:
return {
"total_steps": 0,
"success_rate": 0.0,
"backends": {},
"action_types": {}
}
total_steps = len(trajectory)
success_count = 0
backends = {}
action_types = {}
for step in trajectory:
# Count successes
if step.get("result", {}).get("status") == "success":
success_count += 1
# Count backends
backend = step.get("backend", "unknown")
backends[backend] = backends.get(backend, 0) + 1
# Count tool types
tool = step.get("tool", "unknown")
action_types[tool] = action_types.get(tool, 0) + 1
return {
"total_steps": total_steps,
"success_count": success_count,
"success_rate": success_count / total_steps if total_steps > 0 else 0.0,
"backends": backends,
"tools": action_types
}
def load_recording_session(recording_dir: str) -> Dict[str, Any]:
"""
Load complete recording session including trajectory, metadata, plans, and snapshots.
Args:
recording_dir: Path to recording directory
Returns:
Dictionary containing all session data:
{
"trajectory": List[Dict],
"metadata": Dict,
"plans": List[Dict],
"decisions": List[str],
"statistics": Dict
}
"""
recording_path = Path(recording_dir)
if not recording_path.exists():
logger.error(f"Recording directory not found: {recording_dir}")
return {}
session = {
"trajectory": [],
"metadata": None,
"plans": [],
"decisions": [],
"statistics": {}
}
# Load trajectory
traj_file = recording_path / "traj.jsonl"
if traj_file.exists():
session["trajectory"] = load_trajectory_from_jsonl(str(traj_file))
session["statistics"] = analyze_trajectory(session["trajectory"])
# Load metadata
metadata_file = recording_path / "metadata.json"
if metadata_file.exists():
session["metadata"] = load_metadata(str(recording_path))
# Load plans
plans_dir = recording_path / "plans"
if plans_dir.exists():
for plan_file in sorted(plans_dir.glob("plan_*.json")):
try:
with open(plan_file, 'r', encoding='utf-8') as f:
session["plans"].append(json.load(f))
except Exception as e:
logger.warning(f"Failed to load plan {plan_file}: {e}")
# Load decisions log
decisions_file = recording_path / "decisions.log"
if decisions_file.exists():
try:
with open(decisions_file, 'r', encoding='utf-8') as f:
session["decisions"] = f.readlines()
except Exception as e:
logger.warning(f"Failed to load decisions: {e}")
return session
def filter_trajectory(
trajectory: List[Dict[str, Any]],
backend: Optional[str] = None,
tool: Optional[str] = None,
status: Optional[str] = None,
time_range: Optional[Tuple[str, str]] = None
) -> List[Dict[str, Any]]:
filtered = trajectory
if backend:
filtered = [s for s in filtered if s.get("backend") == backend]
if tool:
filtered = [s for s in filtered if s.get("tool") == tool]
if status:
filtered = [s for s in filtered if s.get("result", {}).get("status") == status]
if time_range:
start_time, end_time = time_range
filtered = [
s for s in filtered
if start_time <= s.get("timestamp", "") <= end_time
]
return filtered
def extract_errors(trajectory: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
return [
step for step in trajectory
if step.get("result", {}).get("status") == "error"
]
def generate_summary_report(recording_dir: str, output_file: Optional[str] = None) -> str:
session = load_recording_session(recording_dir)
if not session:
return "Error: Could not load recording session"
lines = []
lines.append("# Recording Session Summary\n")
# Metadata section
if session["metadata"]:
lines.append("## Metadata")
metadata = session["metadata"]
lines.append(f"- **Task ID**: {metadata.get('task_id', 'N/A')}")
lines.append(f"- **Start Time**: {metadata.get('start_time', 'N/A')}")
lines.append(f"- **End Time**: {metadata.get('end_time', 'N/A')}")
lines.append(f"- **Total Steps**: {metadata.get('total_steps', 0)}")
lines.append(f"- **Backends**: {', '.join(metadata.get('backends', []))}")
lines.append("")
# Statistics section
if session["statistics"]:
lines.append("## Statistics")
stats = session["statistics"]
lines.append(f"- **Total Steps**: {stats.get('total_steps', 0)}")
lines.append(f"- **Success Count**: {stats.get('success_count', 0)}")
lines.append(f"- **Success Rate**: {stats.get('success_rate', 0):.2%}")
lines.append("")
lines.append("### Backend Distribution")
for backend, count in stats.get('backends', {}).items():
lines.append(f"- {backend}: {count}")
lines.append("")
lines.append("### Tool Distribution")
for tool, count in sorted(stats.get('tools', {}).items(), key=lambda x: x[1], reverse=True):
lines.append(f"- {tool}: {count}")
lines.append("")
# Plans section
if session["plans"]:
lines.append(f"## Plans ({len(session['plans'])} total)")
for i, plan in enumerate(session["plans"], 1):
lines.append(f"### Plan {i}")
lines.append(f"- Created: {plan.get('created_at', 'N/A')}")
lines.append(f"- Created by: {plan.get('created_by', 'N/A')}")
plan_data = plan.get('plan', {})
if 'task_updates' in plan_data:
lines.append(f"- Tasks: {len(plan_data['task_updates'])}")
lines.append("")
# Errors section
if session["trajectory"]:
errors = extract_errors(session["trajectory"])
if errors:
lines.append(f"## Errors ({len(errors)} total)")
for error in errors[:5]: # Show first 5 errors
lines.append(f"- Step {error.get('step')}: {error.get('backend')} - {error.get('tool')}")
error_msg = error.get('result', {}).get('output', 'No error message')
lines.append(f" ```\n {error_msg[:200]}\n ```")
if len(errors) > 5:
lines.append(f" ... and {len(errors) - 5} more errors")
lines.append("")
# Decisions section
if session["decisions"]:
lines.append(f"## Decisions ({len(session['decisions'])} total)")
for decision in session["decisions"][:10]: # Show first 10 decisions
lines.append(f" {decision.strip()}")
if len(session["decisions"]) > 10:
lines.append(f" ... and {len(session['decisions']) - 10} more decisions")
lines.append("")
report = "\n".join(lines)
# Save to file if requested
if output_file:
try:
with open(output_file, 'w', encoding='utf-8') as f:
f.write(report)
logger.info(f"Report saved to {output_file}")
except Exception as e:
logger.error(f"Failed to save report: {e}")
return report
def compare_recordings(recording_dir1: str, recording_dir2: str) -> Dict[str, Any]:
session1 = load_recording_session(recording_dir1)
session2 = load_recording_session(recording_dir2)
stats1 = session1.get("statistics", {})
stats2 = session2.get("statistics", {})
return {
"session1": {
"path": recording_dir1,
"total_steps": stats1.get("total_steps", 0),
"success_rate": stats1.get("success_rate", 0),
"backends": stats1.get("backends", {})
},
"session2": {
"path": recording_dir2,
"total_steps": stats2.get("total_steps", 0),
"success_rate": stats2.get("success_rate", 0),
"backends": stats2.get("backends", {})
},
"differences": {
"step_diff": stats2.get("total_steps", 0) - stats1.get("total_steps", 0),
"success_rate_diff": stats2.get("success_rate", 0) - stats1.get("success_rate", 0)
}
}