Spaces:
Running
Running
| """Shell manager for running interactive subprocesses with streaming output.""" | |
| from __future__ import annotations | |
| import shutil | |
| import subprocess | |
| import tempfile | |
| import threading | |
| import time | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| from typing import IO | |
| IDLE_TIMEOUT_SECONDS = 15 * 60 # 15 minutes | |
| class ShellSession: | |
| """A running shell session.""" | |
| session_id: str | |
| process: subprocess.Popen[str] | |
| temp_dir: Path | |
| output_buffer: list[str] = field(default_factory=list) | |
| last_read_pos: int = 0 | |
| started_at: float = field(default_factory=time.time) | |
| last_used_at: float = field(default_factory=time.time) | |
| closed: bool = False | |
| def pid(self) -> int | None: | |
| return self.process.pid | |
| def returncode(self) -> int | None: | |
| return self.process.returncode | |
| def touch(self) -> None: | |
| """Update last_used_at to current time.""" | |
| self.last_used_at = time.time() | |
| def read_new_output(self) -> str: | |
| """Read new output since last read.""" | |
| self.touch() | |
| new_lines = self.output_buffer[self.last_read_pos :] | |
| self.last_read_pos = len(self.output_buffer) | |
| return "".join(new_lines) | |
| def get_full_output(self) -> str: | |
| """Get all output.""" | |
| self.touch() | |
| return "".join(self.output_buffer) | |
| def is_running(self) -> bool: | |
| """Check if process is still running.""" | |
| return self.process.poll() is None | |
| def is_idle_expired(self) -> bool: | |
| """Check if session has been idle longer than IDLE_TIMEOUT_SECONDS.""" | |
| return (time.time() - self.last_used_at) > IDLE_TIMEOUT_SECONDS | |
| def close(self) -> None: | |
| """Close the session and clean up temp directory.""" | |
| if not self.closed: | |
| self.closed = True | |
| try: | |
| self.process.terminate() | |
| self.process.wait(timeout=5) | |
| except Exception: | |
| try: | |
| self.process.kill() | |
| except Exception: | |
| pass | |
| # Clean up temp directory (retry a few times for Windows file locks) | |
| if self.temp_dir.exists(): | |
| for _ in range(3): | |
| try: | |
| shutil.rmtree(self.temp_dir, ignore_errors=False) | |
| break | |
| except Exception: | |
| time.sleep(0.1) | |
| class ShellManager: | |
| """Manages multiple shell sessions with streaming output.""" | |
| def __init__(self, default_timeout: float = 15.0) -> None: | |
| self.sessions: dict[str, ShellSession] = {} | |
| self._lock = threading.Lock() | |
| self.default_timeout = default_timeout | |
| self._cleanup_thread = threading.Thread( | |
| target=self._cleanup_loop, daemon=True | |
| ) | |
| self._cleanup_thread.start() | |
| def _cleanup_loop(self) -> None: | |
| """Background thread that cleans up idle sessions every 60 seconds.""" | |
| while True: | |
| time.sleep(60) | |
| self._cleanup_idle_sessions() | |
| def _cleanup_idle_sessions(self) -> None: | |
| """Close sessions that have been idle for more than IDLE_TIMEOUT_SECONDS.""" | |
| with self._lock: | |
| idle_sessions = [ | |
| sid | |
| for sid, session in self.sessions.items() | |
| if session.is_idle_expired() | |
| ] | |
| for sid in idle_sessions: | |
| session = self.sessions.pop(sid, None) | |
| if session: | |
| session.close() | |
| def start( | |
| self, | |
| session_id: str, | |
| command: str, | |
| cwd: str | None = None, | |
| env: dict[str, str] | None = None, | |
| ) -> ShellSession: | |
| """Start a new shell session in a fresh temp directory.""" | |
| with self._lock: | |
| # Close existing session with same ID | |
| if session_id in self.sessions: | |
| self.sessions[session_id].close() | |
| # Create a unique temp directory for this session | |
| temp_dir = Path(tempfile.mkdtemp(prefix=f"shell_{session_id}_")) | |
| # Merge env with inherited environment | |
| import os | |
| process_env = os.environ.copy() | |
| if env: | |
| process_env.update(env) | |
| # Use shell=True for interactive commands | |
| process = subprocess.Popen( | |
| command, | |
| shell=True, | |
| stdin=subprocess.PIPE, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| text=True, | |
| bufsize=1, | |
| cwd=str(temp_dir), | |
| env=process_env, | |
| ) | |
| session = ShellSession( | |
| session_id=session_id, | |
| process=process, | |
| temp_dir=temp_dir, | |
| ) | |
| self.sessions[session_id] = session | |
| # Start output reader thread | |
| thread = threading.Thread( | |
| target=self._read_output, args=(session,), daemon=True | |
| ) | |
| thread.start() | |
| return session | |
| def _read_output(self, session: ShellSession) -> None: | |
| """Read output from process in background thread.""" | |
| assert session.process.stdout is not None | |
| try: | |
| for line in session.process.stdout: | |
| with self._lock: | |
| session.output_buffer.append(line) | |
| except Exception: | |
| pass | |
| finally: | |
| with self._lock: | |
| session.closed = True | |
| def send_input(self, session_id: str, input_text: str) -> bool: | |
| """Send input to a running session.""" | |
| with self._lock: | |
| session = self.sessions.get(session_id) | |
| if session is None or not session.is_running(): | |
| return False | |
| if session.process.stdin is None: | |
| return False | |
| try: | |
| session.process.stdin.write(input_text + "\n") | |
| session.process.stdin.flush() | |
| session.touch() | |
| return True | |
| except Exception: | |
| return False | |
| def get_output(self, session_id: str) -> str | None: | |
| """Get full output from a session.""" | |
| with self._lock: | |
| session = self.sessions.get(session_id) | |
| if session is None: | |
| return None | |
| return session.get_full_output() | |
| def poll_output(self, session_id: str) -> str | None: | |
| """Get new output since last poll.""" | |
| with self._lock: | |
| session = self.sessions.get(session_id) | |
| if session is None: | |
| return None | |
| return session.read_new_output() | |
| def is_running(self, session_id: str) -> bool: | |
| """Check if a session is still running.""" | |
| with self._lock: | |
| session = self.sessions.get(session_id) | |
| if session is None: | |
| return False | |
| return session.is_running() | |
| def close(self, session_id: str) -> None: | |
| """Close a session.""" | |
| with self._lock: | |
| session = self.sessions.get(session_id) | |
| if session: | |
| session.close() | |
| def close_all(self) -> None: | |
| """Close all sessions.""" | |
| with self._lock: | |
| for session in self.sessions.values(): | |
| session.close() | |
| self.sessions.clear() | |
| # Global shell manager instance | |
| _shell_manager: ShellManager | None = None | |
| def get_shell_manager() -> ShellManager: | |
| """Get or create the global shell manager.""" | |
| global _shell_manager | |
| if _shell_manager is None: | |
| _shell_manager = ShellManager() | |
| return _shell_manager | |