Spaces:
Running
Running
| import asyncio | |
| import json | |
| import subprocess | |
| import sys | |
| import uuid | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| from enum import Enum | |
| from pathlib import Path | |
| from typing import Any, Optional | |
| from litellm import get_max_tokens | |
| from agent.config import Config | |
| from agent.context_manager.manager import ContextManager | |
| class OpType(Enum): | |
| USER_INPUT = "user_input" | |
| EXEC_APPROVAL = "exec_approval" | |
| INTERRUPT = "interrupt" | |
| UNDO = "undo" | |
| COMPACT = "compact" | |
| SHUTDOWN = "shutdown" | |
| class Event: | |
| event_type: str | |
| data: Optional[dict[str, Any]] = None | |
| class Session: | |
| """ | |
| Maintains agent session state | |
| Similar to Session in codex-rs/core/src/codex.rs | |
| """ | |
| def __init__( | |
| self, | |
| event_queue: asyncio.Queue, | |
| config: Config | None = None, | |
| tool_router=None, | |
| context_manager: ContextManager | None = None, | |
| anthropic_key: Optional[str] = None, | |
| hf_token: Optional[str] = None, | |
| ): | |
| self.tool_router = tool_router | |
| tool_specs = tool_router.get_tool_specs_for_llm() if tool_router else [] | |
| # Use provided hf_token or fallback to tool_router's | |
| effective_hf_token = hf_token or (tool_router.hf_token if tool_router else None) | |
| self.context_manager = context_manager or ContextManager( | |
| max_context=get_max_tokens(config.model_name), | |
| compact_size=0.1, | |
| untouched_messages=5, | |
| tool_specs=tool_specs, | |
| hf_token=effective_hf_token, | |
| ) | |
| self.event_queue = event_queue | |
| self.session_id = str(uuid.uuid4()) | |
| self.config = config or Config( | |
| model_name="huggingface/novita/deepseek-ai/DeepSeek-V3.2", | |
| ) | |
| self.is_running = True | |
| self.current_task: asyncio.Task | None = None | |
| self.pending_approval: Optional[dict[str, Any]] = None | |
| # User's keys | |
| self.anthropic_key = anthropic_key | |
| self.hf_token = effective_hf_token | |
| # Session trajectory logging | |
| self.logged_events: list[dict] = [] | |
| self.session_start_time = datetime.now().isoformat() | |
| self.turn_count: int = 0 | |
| self.last_auto_save_turn: int = 0 | |
| async def send_event(self, event: Event) -> None: | |
| """Send event back to client and log to trajectory""" | |
| await self.event_queue.put(event) | |
| # Log event to trajectory | |
| self.logged_events.append( | |
| { | |
| "timestamp": datetime.now().isoformat(), | |
| "event_type": event.event_type, | |
| "data": event.data, | |
| } | |
| ) | |
| def interrupt(self) -> None: | |
| """Interrupt current running task""" | |
| if self.current_task and not self.current_task.done(): | |
| self.current_task.cancel() | |
| def increment_turn(self) -> None: | |
| """Increment turn counter (called after each user interaction)""" | |
| self.turn_count += 1 | |
| async def auto_save_if_needed(self) -> None: | |
| """Check if auto-save should trigger and save if so (completely non-blocking)""" | |
| if not self.config.save_sessions: | |
| return | |
| interval = self.config.auto_save_interval | |
| if interval <= 0: | |
| return | |
| turns_since_last_save = self.turn_count - self.last_auto_save_turn | |
| if turns_since_last_save >= interval: | |
| print(f"\n💾 Auto-saving session (turn {self.turn_count})...") | |
| # Fire-and-forget save - returns immediately | |
| self.save_and_upload_detached(self.config.session_dataset_repo) | |
| self.last_auto_save_turn = self.turn_count | |
| def get_trajectory(self) -> dict: | |
| """Serialize complete session trajectory for logging""" | |
| return { | |
| "session_id": self.session_id, | |
| "session_start_time": self.session_start_time, | |
| "session_end_time": datetime.now().isoformat(), | |
| "model_name": self.config.model_name, | |
| "messages": [msg.model_dump() for msg in self.context_manager.items], | |
| "events": self.logged_events, | |
| } | |
| def save_trajectory_local( | |
| self, | |
| directory: str = "session_logs", | |
| upload_status: str = "pending", | |
| dataset_url: Optional[str] = None, | |
| ) -> Optional[str]: | |
| """ | |
| Save trajectory to local JSON file as backup with upload status | |
| Args: | |
| directory: Directory to save logs (default: "session_logs") | |
| upload_status: Status of upload attempt ("pending", "success", "failed") | |
| dataset_url: URL of dataset if upload succeeded | |
| Returns: | |
| Path to saved file if successful, None otherwise | |
| """ | |
| try: | |
| log_dir = Path(directory) | |
| log_dir.mkdir(parents=True, exist_ok=True) | |
| trajectory = self.get_trajectory() | |
| # Add upload metadata | |
| trajectory["upload_status"] = upload_status | |
| trajectory["upload_url"] = dataset_url | |
| trajectory["last_save_time"] = datetime.now().isoformat() | |
| filename = f"session_{self.session_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" | |
| filepath = log_dir / filename | |
| with open(filepath, "w") as f: | |
| json.dump(trajectory, f, indent=2) | |
| return str(filepath) | |
| except Exception as e: | |
| print(f"Failed to save session locally: {e}") | |
| return None | |
| def update_local_save_status( | |
| self, filepath: str, upload_status: str, dataset_url: Optional[str] = None | |
| ) -> bool: | |
| """Update the upload status of an existing local save file""" | |
| try: | |
| with open(filepath, "r") as f: | |
| data = json.load(f) | |
| data["upload_status"] = upload_status | |
| data["upload_url"] = dataset_url | |
| data["last_save_time"] = datetime.now().isoformat() | |
| with open(filepath, "w") as f: | |
| json.dump(data, f, indent=2) | |
| return True | |
| except Exception as e: | |
| print(f"Failed to update local save status: {e}") | |
| return False | |
| def save_and_upload_detached(self, repo_id: str) -> Optional[str]: | |
| """ | |
| Save session locally and spawn detached subprocess for upload (fire-and-forget) | |
| Args: | |
| repo_id: HuggingFace dataset repo ID | |
| Returns: | |
| Path to local save file | |
| """ | |
| # Save locally first (fast, synchronous) | |
| local_path = self.save_trajectory_local(upload_status="pending") | |
| if not local_path: | |
| return None | |
| # Spawn detached subprocess for upload (fire-and-forget) | |
| try: | |
| uploader_script = Path(__file__).parent / "session_uploader.py" | |
| # Use Popen with detached process | |
| subprocess.Popen( | |
| [sys.executable, str(uploader_script), "upload", local_path, repo_id], | |
| stdin=subprocess.DEVNULL, | |
| stdout=subprocess.DEVNULL, | |
| stderr=subprocess.DEVNULL, | |
| start_new_session=True, # Detach from parent | |
| ) | |
| except Exception as e: | |
| print(f"⚠️ Failed to spawn upload subprocess: {e}") | |
| return local_path | |
| def retry_failed_uploads_detached( | |
| directory: str = "session_logs", repo_id: Optional[str] = None | |
| ) -> None: | |
| """ | |
| Spawn detached subprocess to retry failed/pending uploads (fire-and-forget) | |
| Args: | |
| directory: Directory containing session logs | |
| repo_id: Target dataset repo ID | |
| """ | |
| if not repo_id: | |
| return | |
| try: | |
| uploader_script = Path(__file__).parent / "session_uploader.py" | |
| # Spawn detached subprocess for retry | |
| subprocess.Popen( | |
| [sys.executable, str(uploader_script), "retry", directory, repo_id], | |
| stdin=subprocess.DEVNULL, | |
| stdout=subprocess.DEVNULL, | |
| stderr=subprocess.DEVNULL, | |
| start_new_session=True, # Detach from parent | |
| ) | |
| except Exception as e: | |
| print(f"⚠️ Failed to spawn retry subprocess: {e}") | |