OpenSpace / openspace /recording /manager.py
darkfire514's picture
Upload 160 files
399b80c verified
import datetime
import json
import ast
import types
from typing import Any, Dict, List, Optional
from pathlib import Path
from openspace.utils.logging import Logger
from .recorder import TrajectoryRecorder
from .action_recorder import ActionRecorder
logger = Logger.get_logger(__name__)
class RecordingManager:
# Global instance management (singleton pattern)
_global_instance: Optional['RecordingManager'] = None
def __init__(
self,
enabled: bool = True,
task_id: str = "",
log_dir: str = "./logs/recordings",
backends: Optional[List[str]] = None,
enable_screenshot: bool = True,
enable_video: bool = False,
enable_conversation_log: bool = True,
auto_save_interval: int = 10,
server_url: Optional[str] = None,
agent_name: str = "GroundingAgent",
):
"""
Initialize automatic recording manager
Args:
enabled: whether to enable recording
task_id: task ID (for naming recording directory)
log_dir: log directory path
backends: list of backends to record (None = all)
(optional: "mcp", "gui", "shell", "system", "web")
enable_screenshot: whether to enable screenshot (through platform.ScreenshotClient)
enable_video: whether to enable video recording (through platform.RecordingClient)
enable_conversation_log: whether to save LLM conversations to conversations.jsonl (default: True)
auto_save_interval: automatic save interval (steps)
server_url: local server address (None = read from config/environment variables)
agent_name: name of the agent performing the recording (default: "GroundingAgent")
"""
self.enabled = enabled
self.task_id = task_id
self.log_dir = log_dir
self.backends = set(backends) if backends else {"mcp", "gui", "shell", "system", "web"}
self.enable_screenshot = enable_screenshot
self.enable_video = enable_video
self.enable_conversation_log = enable_conversation_log
self.auto_save_interval = auto_save_interval
self.server_url = server_url
self.agent_name = agent_name
# internal state
self._recorder: Optional[TrajectoryRecorder] = None
self._action_recorder: Optional[ActionRecorder] = None
self._is_started = False
self._step_counter = 0
# registered LLM clients
self._registered_llm_clients = []
self._original_methods = {}
# video/screenshot clients (internal management)
self._recording_client = None
self._screenshot_client = None
# Register as global instance
RecordingManager._global_instance = self
@classmethod
def is_recording(cls) -> bool:
"""
Check if there is an active recording session
Returns:
bool: True if recording is active
"""
return cls._global_instance is not None and cls._global_instance._is_started
@classmethod
async def record_retrieved_tools(
cls,
task_instruction: str,
tools: List[Any],
search_debug_info: Optional[Dict[str, Any]] = None,
):
"""
Record the tools retrieved for a task
Args:
task_instruction: The task instruction used for retrieval
tools: List of retrieved tools
search_debug_info: Debug info from search (similarity scores, LLM selections)
"""
instance = cls._global_instance
if not instance or not instance._is_started or not instance._recorder:
return
# Extract tool info
tool_info = []
for tool in tools:
info = {
"name": getattr(tool, "name", str(tool)),
}
# Prefer runtime_info.backend
# over backend_type (may be NOT_SET for cached RemoteTools)
runtime_info = getattr(tool, "_runtime_info", None)
if runtime_info and hasattr(runtime_info, "backend"):
info["backend"] = runtime_info.backend.value if hasattr(runtime_info.backend, "value") else str(runtime_info.backend)
info["server_name"] = runtime_info.server_name
elif hasattr(tool, "backend_type"):
info["backend"] = tool.backend_type.value if hasattr(tool.backend_type, "value") else str(tool.backend_type)
tool_info.append(info)
# Build metadata
metadata = {
"instruction": task_instruction[:500], # Truncate long instructions
"count": len(tools),
"tools": tool_info,
}
# Add search debug info if available
if search_debug_info:
metadata["search_debug"] = {
"search_mode": search_debug_info.get("search_mode", ""),
"total_candidates": search_debug_info.get("total_candidates", 0),
"mcp_count": search_debug_info.get("mcp_count", 0),
"non_mcp_count": search_debug_info.get("non_mcp_count", 0),
"llm_filter": search_debug_info.get("llm_filter", {}),
"tool_scores": search_debug_info.get("tool_scores", []),
}
# Save to metadata
await instance._recorder.add_metadata("retrieved_tools", metadata)
logger.info(f"Recorded {len(tools)} retrieved tools (with search debug info: {search_debug_info is not None})")
@classmethod
async def record_skill_selection(
cls,
selection_record: Dict[str, Any],
):
"""
Record skill selection decision to metadata.json.
This captures the pre-execution skill matching conversation:
- Which skills were available
- The LLM prompt and response (or keyword fallback)
- Which skills were selected
Args:
selection_record: Structured record from SkillRegistry.select_skills_with_llm()
Keys: method, task, available_skills, prompt, llm_response, selected, error
"""
instance = cls._global_instance
if not instance or not instance._is_started or not instance._recorder:
return
# Save to metadata alongside retrieved_tools
await instance._recorder.add_metadata("skill_selection", selection_record)
selected = selection_record.get("selected", [])
method = selection_record.get("method", "unknown")
logger.info(
f"Recorded skill selection: {len(selected)} selected via {method} "
f"(from {len(selection_record.get('available_skills', []))} available)"
)
@staticmethod
def _truncate_messages(
messages: List[Dict[str, Any]],
max_content_length: int = 5000,
) -> List[Dict[str, Any]]:
"""Truncate message content to avoid huge log files."""
result = []
for msg in messages:
new_msg = {"role": msg.get("role", "unknown")}
content = msg.get("content", "")
if isinstance(content, str):
if len(content) > max_content_length:
new_msg["content"] = content[:max_content_length] + f"... [truncated, total {len(content)} chars]"
else:
new_msg["content"] = content
elif isinstance(content, list):
# Handle multi-part content (e.g., with images)
new_content = []
for item in content:
if isinstance(item, dict):
if item.get("type") == "image":
new_content.append({"type": "image", "note": "[image data omitted]"})
elif item.get("type") == "text":
text = item.get("text", "")
if len(text) > max_content_length:
new_content.append({
"type": "text",
"text": text[:max_content_length] + f"... [truncated, total {len(text)} chars]"
})
else:
new_content.append(item)
else:
new_content.append(item)
else:
new_content.append(item)
new_msg["content"] = new_content
else:
new_msg["content"] = str(content)[:max_content_length]
if "tool_calls" in msg:
new_msg["tool_calls"] = msg["tool_calls"]
result.append(new_msg)
return result
@classmethod
async def record_conversation_setup(
cls,
setup_messages: List[Dict[str, Any]],
tools: Optional[List] = None,
max_content_length: int = 5000,
agent_name: str = "GroundingAgent",
extra: Optional[Dict[str, Any]] = None,
):
"""
Record initial conversation context to conversations.jsonl (called once before iterations).
Writes a ``type: "setup"`` line containing all system messages, the user
instruction, **and** the tool schemas exposed to the LLM so the log
gives a complete picture of what the model sees.
Args:
setup_messages: The initial messages list (system prompts + user instruction).
tools: BaseTool list passed to the LLM (optional). Each tool's
name, backend, and description are recorded.
max_content_length: Max length for message content truncation.
agent_name: Agent/phase identifier. Used to distinguish conversations
from different pipeline stages during replay.
Common values: "GroundingAgent", "ExecutionAnalyzer",
"SkillEvolver", "SkillEvolver.confirm", "SkillEvolver.retry".
extra: Optional dict of additional context (e.g. evolution_type,
trigger, target_skills) merged into the record.
"""
instance = cls._global_instance
if not instance or not instance._is_started or not instance._recorder:
return
if not getattr(instance, 'enable_conversation_log', True):
return
record: Dict[str, Any] = {
"type": "setup",
"agent_name": agent_name,
"timestamp": datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
"messages": cls._truncate_messages(setup_messages, max_content_length),
}
if extra:
record["extra"] = extra
# Record tool definitions so the log shows what the LLM can call.
# Description includes the [Backend] tag that the LLM actually sees.
if tools:
_BACKEND_LABELS = {
"mcp": "MCP", "shell": "Shell", "gui": "GUI",
"web": "Web", "system": "System",
}
tool_defs = []
for t in tools:
schema = getattr(t, "schema", None)
if schema:
backend_val = getattr(schema, "backend_type", None)
backend_str = (
backend_val.value
if hasattr(backend_val, "value")
else str(backend_val) if backend_val else None
)
entry: Dict[str, Any] = {
"name": schema.name,
"backend": backend_str,
}
if schema.description:
desc = schema.description
# Mirror the [Backend] tag that _prepare_tools_for_llmclient
# adds so the recording matches what the LLM sees.
if backend_str and backend_str not in ("not_set",):
label = _BACKEND_LABELS.get(backend_str, backend_str)
desc = f"[{label}] {desc}"
if len(desc) > 200:
desc = desc[:200] + "..."
entry["description"] = desc
else:
entry = {"name": getattr(t, "name", str(t))}
tool_defs.append(entry)
record["tools"] = tool_defs
conv_file = instance._recorder.trajectory_dir / "conversations.jsonl"
try:
with open(conv_file, "a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=False))
f.write("\n")
except Exception as e:
logger.debug(f"Failed to write conversation setup: {e}")
@classmethod
async def record_iteration_context(
cls,
iteration: int,
delta_messages: List[Dict[str, Any]],
response_metadata: Dict[str, Any],
max_content_length: int = 5000,
agent_name: str = "GroundingAgent",
extra: Optional[Dict[str, Any]] = None,
):
"""
Record a single iteration's delta messages to conversations.jsonl.
Only the messages produced during this iteration are stored (assistant
response, tool results, inter-iteration guidance), avoiding repetition
of system prompts and initial user instruction. The initial context is
stored once via ``record_conversation_setup``. The full conversation
can be reconstructed by concatenating the setup with all deltas in order.
Args:
iteration: Iteration number (1-based).
delta_messages: Messages added during this iteration (assistant + tool results).
response_metadata: Lightweight metadata about the LLM response
(has_tool_calls, tool_calls_count).
max_content_length: Max length for message content truncation.
agent_name: Agent/phase identifier (must match the corresponding
``record_conversation_setup`` call).
extra: Optional dict of additional context merged into the record.
"""
instance = cls._global_instance
if not instance or not instance._is_started or not instance._recorder:
return
if not getattr(instance, 'enable_conversation_log', True):
return
record = {
"type": "iteration",
"agent_name": agent_name,
"iteration": iteration,
"timestamp": datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
"response_metadata": response_metadata,
"delta_messages": cls._truncate_messages(delta_messages, max_content_length),
}
if extra:
record["extra"] = extra
# Append to conversations.jsonl (real-time)
conv_file = instance._recorder.trajectory_dir / "conversations.jsonl"
try:
with open(conv_file, "a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=False))
f.write("\n")
except Exception as e:
logger.debug(f"Failed to write conversation log: {e}")
@classmethod
async def record_tool_execution(
cls,
tool_name: str,
backend: str,
parameters: Dict[str, Any],
result: Any,
server_name: Optional[str] = None,
is_success: bool = True,
metadata: Optional[Dict[str, Any]] = None,
):
"""
Record tool execution (internal method, called by BaseTool automatically)
Args:
tool_name: Name of the tool
backend: Backend type (gui, shell, mcp, etc.)
parameters: Tool parameters
result: Tool execution result (content or error message)
server_name: Server name for MCP backend
is_success: Whether the tool execution was successful (default: True for backward compatibility)
metadata: Tool result metadata (e.g. intermediate_steps for GUI)
"""
if not cls._global_instance or not cls._global_instance._is_started:
return
instance = cls._global_instance
# Infer backend if not_set or not in allowed backends
if backend == "not_set" or backend not in instance.backends:
inferred = cls._infer_backend_from_tool_name(tool_name)
if inferred and inferred in instance.backends:
backend = inferred
elif backend not in instance.backends:
logger.debug(
f"Backend '{backend}' not in recording backends {instance.backends}, "
f"skipping recording for tool '{tool_name}'"
)
return
# Create mock tool_call and result objects for compatibility with existing _record_* methods
class MockFunctionCall:
def __init__(self, name, arguments):
self.name = name
self.arguments = arguments
class MockToolCall:
def __init__(self, name, arguments):
self.function = MockFunctionCall(name, arguments)
class MockResult:
def __init__(self, content, is_success=True, metadata=None):
self.content = content
self.is_success = is_success
self.is_error = not is_success
self.error = content if not is_success else None
self.metadata = metadata or {}
tool_call = MockToolCall(tool_name, parameters)
mock_result = MockResult(result, is_success=is_success, metadata=metadata)
try:
if backend == "mcp":
server = server_name or "unknown"
await instance._record_mcp(tool_call, mock_result, server)
elif backend == "gui":
await instance._record_gui(tool_call, mock_result)
elif backend == "shell":
await instance._record_shell(tool_call, mock_result)
elif backend == "system":
await instance._record_system(tool_call, mock_result)
elif backend == "web":
await instance._record_web(tool_call, mock_result)
else:
logger.warning(f"No recording handler for backend '{backend}', tool '{tool_name}'")
return
instance._step_counter += 1
except Exception as e:
logger.warning(f"Failed to record tool execution for {tool_name}: {e}")
@staticmethod
def _parse_arguments(arg_data):
"""Safely parse tool_call.function.arguments which may be JSON string.
Handles:
1. Proper JSON strings with true/false/null
2. Python literal strings (produced by OpenAI) using ast.literal_eval
3. Already-dict objects (returned by SDK)
"""
if not isinstance(arg_data, str):
return arg_data or {}
# First, try JSON
try:
return json.loads(arg_data)
except json.JSONDecodeError:
pass
# Fallback to Python literal
try:
return ast.literal_eval(arg_data)
except Exception:
logger.debug("Failed to parse arguments, returning raw string")
return {"raw": arg_data}
async def start(self, task_id: Optional[str] = None):
"""Start automatic recording
Args:
task_id: If provided, override the current task_id for this recording session. This allows
external callers (e.g. Coordinator) to specify a meaningful task identifier without
having to recreate the RecordingManager instance.
"""
# Allow dynamic update of task_id before recording actually starts
if task_id:
self.task_id = task_id
if not self.enabled or self._is_started:
return
try:
# check server availability (only when video or screenshot is enabled)
if self.enable_video or self.enable_screenshot:
await self._check_server_availability()
self._recorder = TrajectoryRecorder(
task_name=self.task_id,
log_dir=self.log_dir,
enable_screenshot=self.enable_screenshot,
enable_video=self.enable_video,
server_url=self.server_url,
)
# create action recorder for agent decision tracking
self._action_recorder = ActionRecorder(
trajectory_dir=Path(self._recorder.get_trajectory_dir())
)
# create video client (internal management)
if self.enable_video:
from openspace.platforms import RecordingClient
self._recording_client = RecordingClient(base_url=self.server_url)
success = await self._recording_client.start_recording()
if success:
logger.info("Video recording started")
else:
logger.warning("Video recording failed to start")
# create screenshot client (internal management)
if self.enable_screenshot:
from openspace.platforms import ScreenshotClient
self._screenshot_client = ScreenshotClient(base_url=self.server_url)
logger.debug("Screenshot client ready")
# save initial metadata
await self._recorder.add_metadata("task_id", self.task_id)
await self._recorder.add_metadata("backends", list(self.backends))
await self._recorder.add_metadata("start_time", datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S"))
# Capture and save initial screenshot if enabled
if self.enable_screenshot and self._screenshot_client:
try:
init_shot = await self._screenshot_client.capture()
if init_shot:
await self._recorder.save_init_screenshot(init_shot)
logger.debug("Initial screenshot saved")
except Exception as e:
logger.debug(f"Failed to capture initial screenshot: {e}")
self._is_started = True
logger.info(f"Recording started: {self._recorder.get_trajectory_dir()}")
except Exception as e:
logger.error(f"Recording failed to start: {e}")
raise
async def _check_server_availability(self):
"""Check if local server is available"""
try:
from openspace.platforms import SystemInfoClient
# Use context manager to ensure aiohttp session is closed, avoiding warning of unclosed session
async with SystemInfoClient(base_url=self.server_url) as client:
info = await client.get_system_info()
if info:
logger.info(f"Server connected ({info.get('platform', 'unknown')})")
else:
logger.warning("Server not responding, video/screenshot functionality unavailable")
except Exception:
logger.warning("Cannot connect to server, video/screenshot functionality unavailable")
async def save_execution_outcome(
self,
status: str,
iterations: int,
execution_time: float = 0,
) -> None:
"""Persist task-level execution outcome into metadata.json.
Should be called **before** ``stop()`` so the data is included in the
finalized recording. The saved dict has the structure::
{"status": "success"|"incomplete"|"error",
"iterations": int,
"execution_time": float}
"""
if self._recorder:
await self._recorder.add_metadata("execution_outcome", {
"status": status,
"iterations": iterations,
"execution_time": round(execution_time, 2),
})
async def stop(self):
"""Stop automatic recording"""
if not self.enabled or not self._is_started:
return
try:
# stop video recording and save
if self._recording_client:
try:
video_path = None
if self._recorder:
video_path = str(Path(self._recorder.get_trajectory_dir()) / "screen_recording.mp4")
video_bytes = await self._recording_client.end_recording(dest=video_path)
if video_bytes and video_path:
video_size_mb = len(video_bytes) / (1024 * 1024)
logger.info(f"Video recording saved: {video_path} ({video_size_mb:.2f} MB)")
except Exception as e:
logger.warning(f"Video recording failed to save: {e}")
# close RecordingClient session, avoid unclosed session warning
try:
if self._recording_client:
await self._recording_client.close()
except Exception as e:
logger.debug(f"Failed to close RecordingClient session: {e}")
# close screenshot client
if self._screenshot_client:
try:
await self._screenshot_client.close()
except Exception as e:
logger.debug(f"Screenshot client failed to close: {e}")
finally:
self._screenshot_client = None
# finalize trajectory recording
if self._recorder:
# save final metadata
await self._recorder.add_metadata("end_time", datetime.datetime.now().isoformat())
await self._recorder.add_metadata("total_steps", self._step_counter)
# generate summary
await self.generate_summary()
# finalize recording
await self._recorder.finalize()
logger.info(f"Recording completed: {self._recorder.get_trajectory_dir()}")
# Restore original methods for registered LLM clients
for client in self._registered_llm_clients:
client_id = id(client)
if client_id in self._original_methods:
try:
original_method = self._original_methods[client_id]
client.complete = original_method
except Exception as e:
logger.debug(f"Failed to restore original method for LLM client: {e}")
self._registered_llm_clients.clear()
self._original_methods.clear()
self._is_started = False
self._recorder = None
self._action_recorder = None
except Exception as e:
logger.error(f"Recording failed to stop: {e}")
def register_to_llm(self, llm_client):
"""Register LLM client: wrap complete() to record tool results (Path B, aligned with AnyTool)."""
if not self.enabled:
return
if id(llm_client) in self._original_methods:
return
original_complete = llm_client.complete
self._original_methods[id(llm_client)] = original_complete
async def wrapped_complete(self_client, *args, **kwargs):
response = await original_complete(*args, **kwargs)
if response.get("tool_results"):
await self._auto_record_tool_results(response["tool_results"])
return response
llm_client.complete = types.MethodType(wrapped_complete, llm_client)
self._registered_llm_clients.append(llm_client)
@staticmethod
def _infer_backend_from_tool_name(tool_name: str) -> Optional[str]:
"""Infer backend from tool name when tool_results lack backend."""
if not tool_name or not isinstance(tool_name, str):
return None
name = tool_name.strip()
if "__" in name:
name = name.split("__", 1)[-1]
shell_tools = {"shell_agent", "read_file", "write_file", "list_dir", "run_shell"}
if name in shell_tools:
return "shell"
if name in ("gui_agent",) or "gui" in name.lower():
return "gui"
if "mcp" in name.lower() or ("." in name and "__" not in name):
return "mcp"
if name in ("deep_research_agent", "deep_research"):
return "web"
return None
async def _auto_record_tool_results(self, tool_results: List[Dict]):
"""Record tool execution results from LLM complete() (Path B, aligned with AnyTool)."""
if not self._recorder or not self._is_started:
return
for tool_result in tool_results:
tool_call = tool_result.get("tool_call")
result = tool_result.get("result")
backend = tool_result.get("backend")
server_name = tool_result.get("server_name")
if not tool_call or not result:
continue
if not backend:
_name = getattr(getattr(tool_call, "function", None), "name", None) or str(tool_result.get("tool_call", ""))
backend = self._infer_backend_from_tool_name(_name)
if not backend:
logger.warning(f"Tool result missing 'backend', cannot infer for '{_name}', skipping")
continue
result_metadata = result.metadata if hasattr(result, 'metadata') else None
await RecordingManager.record_tool_execution(
tool_name=tool_call.function.name,
backend=backend,
parameters=self._parse_arguments(tool_call.function.arguments),
result=result.content if hasattr(result, 'content') else str(result),
server_name=server_name,
is_success=result.is_success if hasattr(result, 'is_success') else True,
metadata=result_metadata,
)
async def _record_mcp(self, tool_call, result, server: str):
tool_name = tool_call.function.name
parameters = self._parse_arguments(tool_call.function.arguments)
command = f"{server}.{tool_name}"
result_str = str(result.content) if result.is_success else str(result.error)
result_brief = result_str[:200] + "..." if len(result_str) > 200 else result_str
is_actual_success = result.is_success and not result_str.startswith("ERROR:")
step_info = await self._recorder.record_step(
backend="mcp",
tool=tool_name,
command=command,
result={
"status": "success" if is_actual_success else "error",
"output": result_brief,
},
parameters=parameters,
extra={
"server": server,
},
auto_screenshot=self.enable_screenshot
)
# Add agent_name to step_info
step_info["agent_name"] = self.agent_name
async def _record_gui(self, tool_call, result):
tool_name = tool_call.function.name
parameters = self._parse_arguments(tool_call.function.arguments)
# Extract actual pyautogui command (from action_history)
command = "gui_agent"
if result.is_success and hasattr(result, 'metadata') and result.metadata:
action_history = result.metadata.get("action_history", [])
if action_history:
# Get last successful execution action
for action in reversed(action_history):
planned_action = action.get("planned_action", {})
execution_result = action.get("execution_result", {})
if planned_action.get("action_type") == "PYAUTOGUI_COMMAND":
cmd = planned_action.get("command", "")
if cmd and execution_result.get("status") == "success":
command = cmd
break
elif execution_result.get("status") == "success":
action_type = planned_action.get("action_type", "")
if action_type and action_type not in ["WAIT", "DONE", "FAIL"]:
params = planned_action.get("parameters", {})
if params:
param_str = ", ".join([f"{k}={v}" for k, v in list(params.items())[:2]])
command = f"{action_type}({param_str})"
else:
command = action_type
break
result_str = str(result.content) if result.is_success else str(result.error)
is_actual_success = result.is_success
if result.is_success:
first_200_chars = result_str[:200] if result_str else ""
critical_failure_patterns = ["Task failed", "CRITICAL ERROR:", "FATAL:"]
has_critical_failure = any(pattern in first_200_chars for pattern in critical_failure_patterns)
is_actual_success = not has_critical_failure
# Extract intermediate_steps from metadata for embedding in traj.jsonl
extra = {}
if hasattr(result, 'metadata') and result.metadata:
intermediate_steps = result.metadata.get("intermediate_steps")
if intermediate_steps:
extra["intermediate_steps"] = intermediate_steps
step_info = await self._recorder.record_step(
backend="gui",
tool="gui_agent",
command=command,
result={
"status": "success" if is_actual_success else "error",
"output": result_str,
},
parameters=parameters,
auto_screenshot=self.enable_screenshot,
extra=extra if extra else None,
)
step_info["agent_name"] = self.agent_name
async def _record_shell(self, tool_call, result):
tool_name = tool_call.function.name
parameters = self._parse_arguments(tool_call.function.arguments)
task = parameters.get("task", tool_name)
exit_code = 0 if result.is_success else 1
stdout = str(result.content) if result.is_success else ""
stderr = str(result.error) if result.is_error else ""
command = task
if hasattr(result, 'metadata') and result.metadata:
code_history = result.metadata.get("code_history", [])
if code_history:
# Try to find the last successful execution
found_success = False
for code_info in reversed(code_history):
if code_info.get("status") == "success":
lang = code_info.get("lang", "bash")
code = code_info.get("code", "")
# String format code block: ```lang\ncode\n```
command = f"```{lang}\n{code}\n```"
found_success = True
break
# If no successful execution found, use last code block
if not found_success and code_history:
last_code = code_history[-1]
lang = last_code.get("lang", "bash")
code = last_code.get("code", "")
command = f"```{lang}\n{code}\n```"
stdout_brief = stdout[:200] + "..." if len(stdout) > 200 else stdout
stderr_brief = stderr[:200] + "..." if len(stderr) > 200 else stderr
is_actual_success = result.is_success
if result.is_success:
first_500_chars = stdout[:500] if stdout else ""
critical_failure_patterns = [
"Task failed after",
"[TASK_FAILED:",
"EXECUTION ERROR",
"timed out",
]
has_critical_failure = any(pattern in first_500_chars for pattern in critical_failure_patterns)
is_actual_success = not has_critical_failure
step_info = await self._recorder.record_step(
backend="shell",
tool="shell_agent",
command=command,
result={
"status": "success" if is_actual_success else "error",
"exit_code": exit_code,
"stdout": stdout_brief,
"stderr": stderr_brief,
},
auto_screenshot=self.enable_screenshot
)
step_info["agent_name"] = self.agent_name
async def _record_system(self, tool_call, result):
tool_name = tool_call.function.name
parameters = self._parse_arguments(tool_call.function.arguments)
command = tool_name
if parameters:
key_params = []
for key in ['path', 'file', 'directory', 'name', 'provider', 'backend']:
if key in parameters and parameters[key]:
key_params.append(f"{parameters[key]}")
if key_params:
command = f"{tool_name}({', '.join(key_params[:2])})"
result_str = str(result.content) if result.is_success else str(result.error)
result_brief = result_str[:200] + "..." if len(result_str) > 200 else result_str
is_actual_success = result.is_success
if result.is_success and result_str:
is_actual_success = not result_str.startswith("ERROR:")
step_info = await self._recorder.record_step(
backend="system",
tool=tool_name,
command=command,
result={
"status": "success" if is_actual_success else "error",
"output": result_brief,
},
auto_screenshot=self.enable_screenshot
)
step_info["agent_name"] = self.agent_name
async def _record_web(self, tool_call, result):
tool_name = tool_call.function.name
parameters = self._parse_arguments(tool_call.function.arguments)
query = parameters.get("query", "")
command = query if query else "deep_research"
result_str = str(result.content) if result.is_success else str(result.error)
is_actual_success = result.is_success
if result.is_success and result_str:
is_actual_success = not result_str.startswith("ERROR:")
step_info = await self._recorder.record_step(
backend="web",
tool="deep_research_agent",
command=command,
result={
"status": "success" if is_actual_success else "error",
"output": result_str, # Full output preserved for training/replay
},
auto_screenshot=self.enable_screenshot
)
# Add agent_name to step_info
step_info["agent_name"] = self.agent_name
async def add_metadata(self, key: str, value: Any):
if self._recorder:
await self._recorder.add_metadata(key, value)
async def save_plan(self, plan: Dict[str, Any], agent_name: str = "GroundingAgent"):
"""
Save agent plan to recording directory.
This integrates planning information with execution trajectory.
Args:
plan: The plan data (usually containing task_updates or plan steps)
agent_name: Name of the agent creating the plan
"""
if not self._recorder or not self._is_started:
logger.warning("Cannot save plan: recording not started")
return
try:
plan_dir = Path(self._recorder.get_trajectory_dir()) / "plans"
plan_dir.mkdir(exist_ok=True)
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
plan_data = {
"version": timestamp,
"created_at": datetime.datetime.now().isoformat(),
"created_by": agent_name,
"plan": plan
}
# Save versioned plan
plan_file = plan_dir / f"plan_{timestamp}.json"
with open(plan_file, 'w', encoding='utf-8') as f:
json.dump(plan_data, f, indent=2, ensure_ascii=False)
# Save current plan (latest)
current_plan_file = plan_dir / "current_plan.json"
with open(current_plan_file, 'w', encoding='utf-8') as f:
json.dump(plan_data, f, indent=2, ensure_ascii=False)
logger.debug(f"Saved plan to recording: {plan_file.name}")
except Exception as e:
logger.error(f"Failed to save plan: {e}")
async def log_decision(
self,
agent_name: str,
decision: str,
context: Optional[Dict[str, Any]] = None
):
"""
Log agent decision with optional context.
This provides insight into agent reasoning process.
Args:
agent_name: Name of the agent making the decision
decision: Description of the decision
context: Additional context information
"""
if not self._recorder or not self._is_started:
logger.warning("Cannot log decision: recording not started")
return
try:
traj_dir = Path(self._recorder.get_trajectory_dir())
log_file = traj_dir / "decisions.log"
timestamp = datetime.datetime.now().isoformat()
log_entry = f"[{timestamp}] {agent_name}: {decision}"
if context:
log_entry += f"\n Context: {json.dumps(context, ensure_ascii=False)}"
log_entry += "\n"
with open(log_file, 'a', encoding='utf-8') as f:
f.write(log_entry)
logger.debug(f"Logged decision from {agent_name}")
except Exception as e:
logger.error(f"Failed to log decision: {e}")
async def record_agent_action(
self,
agent_name: str,
action_type: str,
input_data: Optional[Dict[str, Any]] = None,
reasoning: Optional[Dict[str, Any]] = None,
output_data: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None,
related_tool_steps: Optional[list] = None,
correlation_id: Optional[str] = None,
) -> Optional[Dict[str, Any]]:
"""
Record an agent's action and decision-making process.
Args:
agent_name: Name of the agent performing the action
action_type: Type of action (plan | execute | evaluate | monitor)
input_data: Input data the agent received (simplified)
reasoning: Agent's reasoning process (structured)
output_data: Agent's output/decision (structured)
metadata: Additional metadata (LLM model, tokens, duration, etc.)
related_tool_steps: List of tool execution step numbers related to this action
correlation_id: Optional correlation ID to link related events
Returns:
The recorded action info, or None if recording not started
"""
if not self._action_recorder or not self._is_started:
logger.debug("Cannot record agent action: recording not started")
return None
try:
action_info = await self._action_recorder.record_action(
agent_name=agent_name,
action_type=action_type,
input_data=input_data,
reasoning=reasoning,
output_data=output_data,
metadata=metadata,
related_tool_steps=related_tool_steps,
correlation_id=correlation_id,
)
logger.debug(f"Recorded agent action: {agent_name} - {action_type}")
return action_info
except Exception as e:
logger.error(f"Failed to record agent action: {e}")
return None
async def generate_summary(self) -> Dict[str, Any]:
"""
Generate a comprehensive summary of the recording session.
"""
if not self._recorder or not self._is_started:
logger.warning("Cannot generate summary: recording not started")
return {}
try:
from .action_recorder import load_agent_actions, analyze_agent_actions
from .utils import load_trajectory_from_jsonl, analyze_trajectory
traj_dir = self._recorder.get_trajectory_dir()
# Load all recorded data
trajectory = load_trajectory_from_jsonl(f"{traj_dir}/traj.jsonl")
agent_actions = load_agent_actions(traj_dir)
# Analyze data
traj_stats = analyze_trajectory(trajectory)
action_stats = analyze_agent_actions(agent_actions)
# Build summary
summary = {
"task_id": self.task_id,
"start_time": self._recorder.metadata.get("start_time", ""),
"end_time": self._recorder.metadata.get("end_time", ""),
"trajectory": {
"total_steps": traj_stats.get("total_steps", 0),
"success_count": traj_stats.get("success_count", 0),
"success_rate": traj_stats.get("success_rate", 0),
"by_backend": traj_stats.get("backends", {}),
"by_tool": traj_stats.get("tools", {}),
},
"agent_actions": {
"total_actions": action_stats.get("total_actions", 0),
"by_agent": action_stats.get("by_agent", {}),
"by_type": action_stats.get("by_type", {}),
}
}
# Save summary to file
summary_file = Path(traj_dir) / "summary.json"
with open(summary_file, 'w', encoding='utf-8') as f:
json.dump(summary, f, indent=2, ensure_ascii=False)
logger.info(f"Generated summary: {summary_file}")
return summary
except Exception as e:
logger.error(f"Failed to generate summary: {e}")
return {}
async def __aenter__(self):
await self.start()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.stop()
return False
@property
def recording_status(self) -> bool:
return self._is_started
@property
def trajectory_dir(self) -> Optional[str]:
if self._recorder:
return str(self._recorder.get_trajectory_dir())
return None
@property
def recording_client(self):
return self._recording_client
@property
def screenshot_client(self):
return self._screenshot_client
@property
def step_count(self) -> int:
"""Get current step count"""
return self._step_counter
__all__ = [
'RecordingManager',
]