""" Recording Viewer Convenient tools for viewing and analyzing recording sessions. """ import json from pathlib import Path from typing import Optional, Dict, Any, List from openspace.utils.logging import Logger from .utils import load_recording_session, generate_summary_report from .action_recorder import load_agent_actions, analyze_agent_actions, format_agent_actions logger = Logger.get_logger(__name__) class RecordingViewer: """ Viewer for analyzing recording sessions. Provides convenient methods to: - Load and display recordings - Analyze agent behaviors - Generate reports """ def __init__(self, recording_dir: str): """ Initialize viewer with a recording directory. Args: recording_dir: Path to recording directory """ self.recording_dir = Path(recording_dir) if not self.recording_dir.exists(): raise ValueError(f"Recording directory not found: {recording_dir}") # Load session data self.session = load_recording_session(str(self.recording_dir)) logger.info(f"Loaded recording from {recording_dir}") def show_summary(self) -> str: """ Display a summary of the recording. Returns: Formatted summary string """ if not self.session.get("metadata"): return "No metadata available" metadata = self.session["metadata"] stats = self.session.get("statistics", {}) lines = [] lines.append("=" * 70) lines.append("RECORDING SUMMARY") lines.append("=" * 70) lines.append(f"Task ID: {metadata.get('task_id', 'N/A')}") lines.append(f"Start: {metadata.get('start_time', 'N/A')}") lines.append(f"End: {metadata.get('end_time', 'N/A')}") lines.append(f"Total Steps: {metadata.get('total_steps', 0)}") lines.append("") lines.append("Statistics:") lines.append(f" - Success Rate: {stats.get('success_rate', 0):.2%}") lines.append(f" - Success Count: {stats.get('success_count', 0)}/{stats.get('total_steps', 0)}") lines.append("") if stats.get("backends"): lines.append("Backend Usage:") for backend, count in sorted(stats["backends"].items(), key=lambda x: x[1], reverse=True): lines.append(f" - {backend}: {count}") lines.append("=" * 70) return "\n".join(lines) def show_agent_actions(self, format_type: str = "compact", agent_name: Optional[str] = None) -> str: actions = load_agent_actions(str(self.recording_dir)) if agent_name: actions = [a for a in actions if a.get("agent_name") == agent_name] if not actions: return f"No agent actions found{' for ' + agent_name if agent_name else ''}" # Add header header = f"\nAGENT ACTIONS ({len(actions)} total)" if agent_name: header += f" - {agent_name}" header += "\n" + "=" * 70 # Format actions formatted = format_agent_actions(actions, format_type) return header + "\n" + formatted def analyze_agents(self) -> str: actions = load_agent_actions(str(self.recording_dir)) stats = analyze_agent_actions(actions) lines = [] lines.append("\nAGENT ANALYSIS") lines.append("=" * 70) lines.append(f"Total Actions: {stats.get('total_actions', 0)}") lines.append("") lines.append("By Agent:") for agent, count in sorted(stats.get('by_agent', {}).items(), key=lambda x: x[1], reverse=True): percentage = (count / stats['total_actions'] * 100) if stats['total_actions'] > 0 else 0 lines.append(f" - {agent}: {count} ({percentage:.1f}%)") lines.append("") lines.append("By Action Type:") for action_type, count in sorted(stats.get('by_type', {}).items(), key=lambda x: x[1], reverse=True): percentage = (count / stats['total_actions'] * 100) if stats['total_actions'] > 0 else 0 lines.append(f" - {action_type}: {count} ({percentage:.1f}%)") return "\n".join(lines) def generate_full_report(self, output_file: Optional[str] = None) -> str: return generate_summary_report(str(self.recording_dir), output_file) def export_to_json(self, output_file: str): with open(output_file, 'w', encoding='utf-8') as f: json.dump(self.session, f, indent=2, ensure_ascii=False) logger.info(f"Exported session to {output_file}") def show_timeline(self, max_events: int = 50) -> str: # Load all events actions = load_agent_actions(str(self.recording_dir)) trajectory = self.session.get("trajectory", []) # Combine all events with unified format timeline = [] # Add agent actions for action in actions: timeline.append({ "timestamp": action.get("timestamp", ""), "type": "agent_action", "agent_name": action.get("agent_name", ""), "agent_type": action.get("agent_type", "unknown"), "action_type": action.get("action_type", ""), "step": action.get("step"), "correlation_id": action.get("correlation_id", ""), "description": f"[{action.get('agent_type', '?').upper()}] {action.get('action_type', '?')}", "related_tool_steps": action.get("related_tool_steps", []), }) # Add tool executions for traj_step in trajectory: timeline.append({ "timestamp": traj_step.get("timestamp", ""), "type": "tool_execution", "backend": traj_step.get("backend", ""), "tool": traj_step.get("tool", ""), "step": traj_step.get("step"), "agent_name": traj_step.get("agent_name", ""), "description": f"[TOOL:{traj_step.get('backend', '?').upper()}] {traj_step.get('tool', '?')}", "status": traj_step.get("result", {}).get("status", ""), }) # Sort by timestamp timeline.sort(key=lambda x: x.get("timestamp", "")) # Format output lines = [] lines.append("\nUNIFIED TIMELINE") lines.append("=" * 100) lines.append(f"Total events: {len(timeline)} (showing first {max_events})") lines.append("") for i, item in enumerate(timeline[:max_events]): timestamp = item.get("timestamp", "N/A") time_str = timestamp.split("T")[1][:8] if "T" in timestamp else timestamp[-8:] # Format line with type indicator type_marker = { "agent_action": "🤖", "tool_execution": "🔧" }.get(item.get("type"), "•") desc = item.get("description", "") agent = item.get("agent_name", "") agent_type = item.get("agent_type", "") line = f"{time_str} {type_marker} {desc}" # Add agent info if available if agent and agent_type: line += f" (by {agent}/{agent_type})" elif agent: line += f" (by {agent})" lines.append(line) # Show correlations correlations = [] if item.get("related_tool_steps"): correlations.append(f"→ tool steps: {item['related_tool_steps']}") if item.get("related_action_step"): correlations.append(f"→ action step: {item['related_action_step']}") if correlations: for corr in correlations: lines.append(f" {corr}") if len(timeline) > max_events: lines.append(f"\n... and {len(timeline) - max_events} more events") return "\n".join(lines) def show_agent_flow(self, agent_name: Optional[str] = None) -> str: """ Show the flow of a specific agent's actions and related events. """ actions = load_agent_actions(str(self.recording_dir)) if agent_name: actions = [a for a in actions if a.get("agent_name") == agent_name] lines = [] lines.append(f"\nAGENT FLOW{' - ' + agent_name if agent_name else ''}") lines.append("=" * 100) # Sort by timestamp actions.sort(key=lambda x: x.get("timestamp", "")) for action in actions: timestamp = action.get("timestamp", "N/A").split("T")[1][:8] if "T" in action.get("timestamp", "") else "N/A" agent_type = action.get("agent_type", "?").upper() action_type = action.get("action_type", "?") step = action.get("step", "?") lines.append(f"{timestamp} [{agent_type}] Action #{step}: {action_type}") # Show reasoning if available if action.get("reasoning"): thought = action["reasoning"].get("thought", "") if thought: lines.append(f" 💭 {thought[:80]}...") # Show output if action.get("output"): output = action["output"] if isinstance(output, dict): for key in ["message", "status", "evaluation"]: if key in output: lines.append(f" 📤 {key}: {str(output[key])[:60]}") lines.append("") return "\n".join(lines) def view_recording(recording_dir: str): """ Quick interactive viewer for a recording. """ try: viewer = RecordingViewer(recording_dir) print(viewer.show_summary()) print("\n") print(viewer.analyze_agents()) print("\n") print("Agent Actions (compact):") print(viewer.show_agent_actions(format_type="compact")) except Exception as e: logger.error(f"Failed to view recording: {e}") print(f"Error: {e}") def compare_recordings(recording_dir1: str, recording_dir2: str) -> str: """ Compare two recordings side by side. """ try: viewer1 = RecordingViewer(recording_dir1) viewer2 = RecordingViewer(recording_dir2) lines = [] lines.append("=" * 70) lines.append("RECORDING COMPARISON") lines.append("=" * 70) lines.append("") # Compare metadata meta1 = viewer1.session.get("metadata", {}) meta2 = viewer2.session.get("metadata", {}) lines.append("Recording 1:") lines.append(f" Task: {meta1.get('task_id', 'N/A')}") lines.append(f" Steps: {meta1.get('total_steps', 0)}") lines.append("") lines.append("Recording 2:") lines.append(f" Task: {meta2.get('task_id', 'N/A')}") lines.append(f" Steps: {meta2.get('total_steps', 0)}") lines.append("") # Compare statistics stats1 = viewer1.session.get("statistics", {}) stats2 = viewer2.session.get("statistics", {}) lines.append("Differences:") lines.append(f" Steps: {meta2.get('total_steps', 0) - meta1.get('total_steps', 0):+d}") lines.append(f" Success Rate: {stats2.get('success_rate', 0) - stats1.get('success_rate', 0):+.2%}") return "\n".join(lines) except Exception as e: logger.error(f"Failed to compare recordings: {e}") return f"Error: {e}" # CLI interface if __name__ == "__main__": import sys if len(sys.argv) < 2: print("Usage: python -m openspace.recording.viewer ") sys.exit(1) recording_dir = sys.argv[1] view_recording(recording_dir)