Spaces:
Running
Running
| import json | |
| import os | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Tuple | |
| from openspace.utils.logging import Logger | |
| logger = Logger.get_logger(__name__) | |
| def load_trajectory_from_jsonl(jsonl_path: str) -> List[Dict[str, Any]]: | |
| trajectory = [] | |
| # Check if file exists first | |
| if not os.path.exists(jsonl_path): | |
| logger.debug(f"No trajectory file found at {jsonl_path} (this is normal for knowledge-only tasks)") | |
| return [] | |
| try: | |
| with open(jsonl_path, "r", encoding="utf-8") as f: | |
| for line in f: | |
| line = line.strip() | |
| if line: | |
| step = json.loads(line) | |
| trajectory.append(step) | |
| logger.info(f"Loaded {len(trajectory)} steps from {jsonl_path}") | |
| return trajectory | |
| except Exception as e: | |
| logger.error(f"Failed to load trajectory from {jsonl_path}: {e}") | |
| return [] | |
| def load_metadata(trajectory_dir: str) -> Optional[Dict[str, Any]]: | |
| metadata_path = os.path.join(trajectory_dir, "metadata.json") | |
| try: | |
| with open(metadata_path, "r", encoding="utf-8") as f: | |
| metadata = json.load(f) | |
| return metadata | |
| except Exception as e: | |
| logger.warning(f"Failed to load metadata from {metadata_path}: {e}") | |
| return None | |
| def format_trajectory_for_export( | |
| trajectory: List[Dict[str, Any]], | |
| format_type: str = "compact" | |
| ) -> str: | |
| if format_type == "compact": | |
| return _format_compact(trajectory) | |
| elif format_type == "detailed": | |
| return _format_detailed(trajectory) | |
| elif format_type == "markdown": | |
| return _format_markdown(trajectory) | |
| else: | |
| raise ValueError(f"Unknown format type: {format_type}") | |
| def _format_compact(trajectory: List[Dict[str, Any]]) -> str: | |
| """Compact format: one line per step.""" | |
| lines = [] | |
| for step in trajectory: | |
| step_num = step.get("step", "?") | |
| backend = step.get("backend", "?") | |
| server = step.get("server") | |
| tool = step.get("tool", "?") | |
| result_status = "success" if step.get("result", {}).get("status") == "success" else "error" | |
| # Include server name for MCP backend | |
| backend_str = f"{backend}@{server}" if server else backend | |
| lines.append(f"Step {step_num}: [{backend_str}] {tool} -> {result_status}") | |
| return "\n".join(lines) | |
| def _format_detailed(trajectory: List[Dict[str, Any]]) -> str: | |
| """Detailed format: multiple lines per step with parameters.""" | |
| lines = [] | |
| for step in trajectory: | |
| step_num = step.get("step", "?") | |
| timestamp = step.get("timestamp", "?") | |
| backend = step.get("backend", "?") | |
| server = step.get("server") | |
| tool = step.get("tool", "?") | |
| command = step.get("command", "?") | |
| parameters = step.get("parameters", {}) | |
| result = step.get("result", {}) | |
| from openspace.utils.display import Box, BoxStyle | |
| box = Box(width=66, style=BoxStyle.ROUNDED, color='bl') | |
| lines.append("") | |
| lines.append(box.top_line(0)) | |
| lines.append(box.text_line(f"Step {step_num} ({timestamp})", align='center', indent=0, text_color='c')) | |
| lines.append(box.separator_line(0)) | |
| lines.append(box.text_line(f"Backend: {backend}", indent=0)) | |
| if server: | |
| lines.append(box.text_line(f"Server: {server}", indent=0)) | |
| lines.append(box.text_line(f"Tool: {tool}", indent=0)) | |
| lines.append(box.text_line(f"Command: {command}", indent=0)) | |
| lines.append(box.separator_line(0)) | |
| # Parameters and result can be multi-line | |
| param_str = json.dumps(parameters, indent=2) | |
| for param_line in param_str.split('\n'): | |
| lines.append(box.text_line(param_line, indent=0)) | |
| lines.append(box.separator_line(0)) | |
| result_str = json.dumps(result, indent=2) | |
| for result_line in result_str.split('\n'): | |
| lines.append(box.text_line(result_line, indent=0)) | |
| lines.append(box.bottom_line(0)) | |
| return "\n".join(lines) | |
| def _format_markdown(trajectory: List[Dict[str, Any]]) -> str: | |
| """Markdown format: table format.""" | |
| lines = [ | |
| "# Trajectory", | |
| "", | |
| "| Step | Backend | Server | Tool | Status | Screenshot |", | |
| "|------|---------|--------|------|--------|------------|" | |
| ] | |
| for step in trajectory: | |
| step_num = step.get("step", "?") | |
| backend = step.get("backend", "?") | |
| server = step.get("server", "-") | |
| tool = step.get("tool", "?") | |
| result_status = "✓" if step.get("result", {}).get("status") == "success" else "✗" | |
| screenshot = "📷" if step.get("screenshot") else "" | |
| lines.append(f"| {step_num} | {backend} | {server} | {tool} | {result_status} | {screenshot} |") | |
| return "\n".join(lines) | |
| def analyze_trajectory(trajectory: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """ | |
| Analyze trajectory and return statistics. | |
| """ | |
| if not trajectory: | |
| return { | |
| "total_steps": 0, | |
| "success_rate": 0.0, | |
| "backends": {}, | |
| "action_types": {} | |
| } | |
| total_steps = len(trajectory) | |
| success_count = 0 | |
| backends = {} | |
| action_types = {} | |
| for step in trajectory: | |
| # Count successes | |
| if step.get("result", {}).get("status") == "success": | |
| success_count += 1 | |
| # Count backends | |
| backend = step.get("backend", "unknown") | |
| backends[backend] = backends.get(backend, 0) + 1 | |
| # Count tool types | |
| tool = step.get("tool", "unknown") | |
| action_types[tool] = action_types.get(tool, 0) + 1 | |
| return { | |
| "total_steps": total_steps, | |
| "success_count": success_count, | |
| "success_rate": success_count / total_steps if total_steps > 0 else 0.0, | |
| "backends": backends, | |
| "tools": action_types | |
| } | |
| def load_recording_session(recording_dir: str) -> Dict[str, Any]: | |
| """ | |
| Load complete recording session including trajectory, metadata, plans, and snapshots. | |
| Args: | |
| recording_dir: Path to recording directory | |
| Returns: | |
| Dictionary containing all session data: | |
| { | |
| "trajectory": List[Dict], | |
| "metadata": Dict, | |
| "plans": List[Dict], | |
| "decisions": List[str], | |
| "statistics": Dict | |
| } | |
| """ | |
| recording_path = Path(recording_dir) | |
| if not recording_path.exists(): | |
| logger.error(f"Recording directory not found: {recording_dir}") | |
| return {} | |
| session = { | |
| "trajectory": [], | |
| "metadata": None, | |
| "plans": [], | |
| "decisions": [], | |
| "statistics": {} | |
| } | |
| # Load trajectory | |
| traj_file = recording_path / "traj.jsonl" | |
| if traj_file.exists(): | |
| session["trajectory"] = load_trajectory_from_jsonl(str(traj_file)) | |
| session["statistics"] = analyze_trajectory(session["trajectory"]) | |
| # Load metadata | |
| metadata_file = recording_path / "metadata.json" | |
| if metadata_file.exists(): | |
| session["metadata"] = load_metadata(str(recording_path)) | |
| # Load plans | |
| plans_dir = recording_path / "plans" | |
| if plans_dir.exists(): | |
| for plan_file in sorted(plans_dir.glob("plan_*.json")): | |
| try: | |
| with open(plan_file, 'r', encoding='utf-8') as f: | |
| session["plans"].append(json.load(f)) | |
| except Exception as e: | |
| logger.warning(f"Failed to load plan {plan_file}: {e}") | |
| # Load decisions log | |
| decisions_file = recording_path / "decisions.log" | |
| if decisions_file.exists(): | |
| try: | |
| with open(decisions_file, 'r', encoding='utf-8') as f: | |
| session["decisions"] = f.readlines() | |
| except Exception as e: | |
| logger.warning(f"Failed to load decisions: {e}") | |
| return session | |
| def filter_trajectory( | |
| trajectory: List[Dict[str, Any]], | |
| backend: Optional[str] = None, | |
| tool: Optional[str] = None, | |
| status: Optional[str] = None, | |
| time_range: Optional[Tuple[str, str]] = None | |
| ) -> List[Dict[str, Any]]: | |
| filtered = trajectory | |
| if backend: | |
| filtered = [s for s in filtered if s.get("backend") == backend] | |
| if tool: | |
| filtered = [s for s in filtered if s.get("tool") == tool] | |
| if status: | |
| filtered = [s for s in filtered if s.get("result", {}).get("status") == status] | |
| if time_range: | |
| start_time, end_time = time_range | |
| filtered = [ | |
| s for s in filtered | |
| if start_time <= s.get("timestamp", "") <= end_time | |
| ] | |
| return filtered | |
| def extract_errors(trajectory: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| return [ | |
| step for step in trajectory | |
| if step.get("result", {}).get("status") == "error" | |
| ] | |
| def generate_summary_report(recording_dir: str, output_file: Optional[str] = None) -> str: | |
| session = load_recording_session(recording_dir) | |
| if not session: | |
| return "Error: Could not load recording session" | |
| lines = [] | |
| lines.append("# Recording Session Summary\n") | |
| # Metadata section | |
| if session["metadata"]: | |
| lines.append("## Metadata") | |
| metadata = session["metadata"] | |
| lines.append(f"- **Task ID**: {metadata.get('task_id', 'N/A')}") | |
| lines.append(f"- **Start Time**: {metadata.get('start_time', 'N/A')}") | |
| lines.append(f"- **End Time**: {metadata.get('end_time', 'N/A')}") | |
| lines.append(f"- **Total Steps**: {metadata.get('total_steps', 0)}") | |
| lines.append(f"- **Backends**: {', '.join(metadata.get('backends', []))}") | |
| lines.append("") | |
| # Statistics section | |
| if session["statistics"]: | |
| lines.append("## Statistics") | |
| stats = session["statistics"] | |
| lines.append(f"- **Total Steps**: {stats.get('total_steps', 0)}") | |
| lines.append(f"- **Success Count**: {stats.get('success_count', 0)}") | |
| lines.append(f"- **Success Rate**: {stats.get('success_rate', 0):.2%}") | |
| lines.append("") | |
| lines.append("### Backend Distribution") | |
| for backend, count in stats.get('backends', {}).items(): | |
| lines.append(f"- {backend}: {count}") | |
| lines.append("") | |
| lines.append("### Tool Distribution") | |
| for tool, count in sorted(stats.get('tools', {}).items(), key=lambda x: x[1], reverse=True): | |
| lines.append(f"- {tool}: {count}") | |
| lines.append("") | |
| # Plans section | |
| if session["plans"]: | |
| lines.append(f"## Plans ({len(session['plans'])} total)") | |
| for i, plan in enumerate(session["plans"], 1): | |
| lines.append(f"### Plan {i}") | |
| lines.append(f"- Created: {plan.get('created_at', 'N/A')}") | |
| lines.append(f"- Created by: {plan.get('created_by', 'N/A')}") | |
| plan_data = plan.get('plan', {}) | |
| if 'task_updates' in plan_data: | |
| lines.append(f"- Tasks: {len(plan_data['task_updates'])}") | |
| lines.append("") | |
| # Errors section | |
| if session["trajectory"]: | |
| errors = extract_errors(session["trajectory"]) | |
| if errors: | |
| lines.append(f"## Errors ({len(errors)} total)") | |
| for error in errors[:5]: # Show first 5 errors | |
| lines.append(f"- Step {error.get('step')}: {error.get('backend')} - {error.get('tool')}") | |
| error_msg = error.get('result', {}).get('output', 'No error message') | |
| lines.append(f" ```\n {error_msg[:200]}\n ```") | |
| if len(errors) > 5: | |
| lines.append(f" ... and {len(errors) - 5} more errors") | |
| lines.append("") | |
| # Decisions section | |
| if session["decisions"]: | |
| lines.append(f"## Decisions ({len(session['decisions'])} total)") | |
| for decision in session["decisions"][:10]: # Show first 10 decisions | |
| lines.append(f" {decision.strip()}") | |
| if len(session["decisions"]) > 10: | |
| lines.append(f" ... and {len(session['decisions']) - 10} more decisions") | |
| lines.append("") | |
| report = "\n".join(lines) | |
| # Save to file if requested | |
| if output_file: | |
| try: | |
| with open(output_file, 'w', encoding='utf-8') as f: | |
| f.write(report) | |
| logger.info(f"Report saved to {output_file}") | |
| except Exception as e: | |
| logger.error(f"Failed to save report: {e}") | |
| return report | |
| def compare_recordings(recording_dir1: str, recording_dir2: str) -> Dict[str, Any]: | |
| session1 = load_recording_session(recording_dir1) | |
| session2 = load_recording_session(recording_dir2) | |
| stats1 = session1.get("statistics", {}) | |
| stats2 = session2.get("statistics", {}) | |
| return { | |
| "session1": { | |
| "path": recording_dir1, | |
| "total_steps": stats1.get("total_steps", 0), | |
| "success_rate": stats1.get("success_rate", 0), | |
| "backends": stats1.get("backends", {}) | |
| }, | |
| "session2": { | |
| "path": recording_dir2, | |
| "total_steps": stats2.get("total_steps", 0), | |
| "success_rate": stats2.get("success_rate", 0), | |
| "backends": stats2.get("backends", {}) | |
| }, | |
| "differences": { | |
| "step_diff": stats2.get("total_steps", 0) - stats1.get("total_steps", 0), | |
| "success_rate_diff": stats2.get("success_rate", 0) - stats1.get("success_rate", 0) | |
| } | |
| } |