Spaces:
Running
Running
| from __future__ import annotations | |
| import asyncio | |
| import base64 | |
| import datetime as _dt | |
| import json | |
| import os | |
| import re | |
| import shutil | |
| import tempfile | |
| import threading | |
| import time | |
| import traceback | |
| import zipfile | |
| from pathlib import Path | |
| from typing import Any | |
| from urllib.parse import unquote | |
| from uuid import uuid4 | |
| from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect | |
| from fastapi.responses import FileResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from starlette.background import BackgroundTask | |
| from agent_base.react_agent import MultiTurnReactAgent, default_llm_config | |
| from agent_base.utils import ( | |
| MissingRequiredEnvError, | |
| PROJECT_ROOT, | |
| append_saved_image_paths_to_prompt, | |
| image_input_content_parts, | |
| load_default_dotenvs, | |
| require_required_env, | |
| safe_jsonable, | |
| stage_image_bytes_for_input, | |
| ) | |
| STATIC_DIR = Path(__file__).resolve().parent / "static" | |
| MAX_UPLOAD_IMAGES = 12 | |
| MAX_IMAGE_BYTES = 12 * 1024 * 1024 | |
| MAX_WORKSPACE_DOWNLOAD_BYTES = 100 * 1024 * 1024 | |
| MAX_WORKSPACE_DOWNLOAD_FILES = 5000 | |
| FRONTEND_MANAGED_RUNS_DIR: str | None = None | |
| FRONTEND_CLEANUP_RETENTION_SECONDS = 6 * 60 * 60 | |
| FRONTEND_CLEANUP_MAX_RUNS = 40 | |
| FRONTEND_CLEANUP_INTERVAL_SECONDS = 15 * 60 | |
| FRONTEND_COLLECTION_ENABLED = True | |
| FRONTEND_COLLECTION_DATASET_REPO = "InternScience/ResearchHarness-Data" | |
| FRONTEND_COLLECTION_BATCH_SIZE = 5 | |
| FRONTEND_COLLECTION_MAX_BUNDLE_BYTES = 20 * 1024 * 1024 | |
| _CLEANUP_THREAD_STARTED = False | |
| _ACTIVE_MANAGED_RUNS: set[str] = set() | |
| _ACTIVE_MANAGED_RUNS_LOCK = threading.Lock() | |
| _DOWNLOAD_WORKSPACES: dict[str, str] = {} | |
| _DOWNLOAD_WORKSPACES_LOCK = threading.Lock() | |
| _COLLECTION_LOCK = threading.Lock() | |
| _COLLECTION_CONFIG_WARNED: set[str] = set() | |
| app = FastAPI(title="ResearchHarness Space UI") | |
| app.mount("/static", StaticFiles(directory=STATIC_DIR), name="frontend-static") | |
| def configure_frontend( | |
| *, | |
| managed_runs_dir: str | None = None, | |
| cleanup_retention_seconds: int | None = None, | |
| cleanup_max_runs: int | None = None, | |
| cleanup_interval_seconds: int | None = None, | |
| collection_enabled: bool | None = None, | |
| collection_dataset_repo: str | None = None, | |
| collection_batch_size: int | None = None, | |
| collection_max_bundle_bytes: int | None = None, | |
| ) -> None: | |
| global FRONTEND_MANAGED_RUNS_DIR | |
| global FRONTEND_CLEANUP_RETENTION_SECONDS, FRONTEND_CLEANUP_MAX_RUNS, FRONTEND_CLEANUP_INTERVAL_SECONDS | |
| global FRONTEND_COLLECTION_ENABLED, FRONTEND_COLLECTION_DATASET_REPO | |
| global FRONTEND_COLLECTION_BATCH_SIZE, FRONTEND_COLLECTION_MAX_BUNDLE_BYTES | |
| if collection_enabled is not None: | |
| FRONTEND_COLLECTION_ENABLED = bool(collection_enabled) | |
| if collection_dataset_repo is not None: | |
| FRONTEND_COLLECTION_DATASET_REPO = str(collection_dataset_repo or "").strip() | |
| if collection_batch_size is not None: | |
| FRONTEND_COLLECTION_BATCH_SIZE = max(1, int(collection_batch_size)) | |
| if collection_max_bundle_bytes is not None: | |
| FRONTEND_COLLECTION_MAX_BUNDLE_BYTES = max(1, int(collection_max_bundle_bytes)) | |
| if not managed_runs_dir: | |
| raise ValueError("managed_runs_dir is required for the Space frontend") | |
| path = Path(managed_runs_dir).expanduser() | |
| if path.exists() and not path.is_dir(): | |
| raise ValueError(f"managed-runs-dir is not a directory: {path}") | |
| path.mkdir(parents=True, exist_ok=True) | |
| FRONTEND_MANAGED_RUNS_DIR = str(path) | |
| if cleanup_retention_seconds is not None: | |
| FRONTEND_CLEANUP_RETENTION_SECONDS = max(60, int(cleanup_retention_seconds)) | |
| if cleanup_max_runs is not None: | |
| FRONTEND_CLEANUP_MAX_RUNS = max(1, int(cleanup_max_runs)) | |
| if cleanup_interval_seconds is not None: | |
| FRONTEND_CLEANUP_INTERVAL_SECONDS = max(60, int(cleanup_interval_seconds)) | |
| _collection_root() | |
| cleanup_managed_runs_once() | |
| _start_managed_cleanup_thread() | |
| class FrontendRunBridge: | |
| def __init__(self, *, loop: asyncio.AbstractEventLoop): | |
| self.loop = loop | |
| self.outbound: asyncio.Queue[dict[str, Any]] = asyncio.Queue() | |
| self.cancelled = threading.Event() | |
| self.conversation_messages: list[dict[str, Any]] | None = None | |
| self.conversation_workspace_root: str = "" | |
| self.managed_run_root: str = "" | |
| self.managed_workspace_root: str = "" | |
| self.managed_trace_dir: str = "" | |
| self.download_token: str = "" | |
| self._pending_answers: dict[str, str] = {} | |
| self._pending_events: dict[str, threading.Event] = {} | |
| self._lock = threading.Lock() | |
| def send(self, payload: dict[str, Any]) -> None: | |
| self.loop.call_soon_threadsafe(self.outbound.put_nowait, safe_jsonable(payload)) | |
| def trace_event(self, row: dict[str, Any]) -> None: | |
| self.send({"type": "trace", "row": row}) | |
| def submit_answer(self, request_id: str, answer: str) -> bool: | |
| with self._lock: | |
| event = self._pending_events.get(request_id) | |
| if event is None: | |
| return False | |
| self._pending_answers[request_id] = str(answer) | |
| event.set() | |
| return True | |
| def ask_user(self, *, question: str, context: str = "") -> str: | |
| request_id = uuid4().hex | |
| event = threading.Event() | |
| with self._lock: | |
| self._pending_events[request_id] = event | |
| self.send( | |
| { | |
| "type": "ask_user", | |
| "request_id": request_id, | |
| "question": question, | |
| "context": context, | |
| } | |
| ) | |
| while not event.wait(0.2): | |
| if self.cancelled.is_set(): | |
| return "[AskUser] Cancelled before user answer was received." | |
| with self._lock: | |
| answer = self._pending_answers.pop(request_id, "") | |
| self._pending_events.pop(request_id, None) | |
| answer = str(answer).strip() | |
| if not answer: | |
| return "[AskUser] User answer was empty." | |
| return f"[AskUser] User answer:\n{answer}" | |
| def _managed_runs_root() -> Path | None: | |
| if not FRONTEND_MANAGED_RUNS_DIR: | |
| return None | |
| return Path(FRONTEND_MANAGED_RUNS_DIR).expanduser().resolve() | |
| def _new_managed_run_root() -> Path: | |
| root = _managed_runs_root() | |
| if root is None: | |
| raise ValueError("managed workspace mode is not configured") | |
| timestamp = _dt.datetime.now().strftime("%Y%m%d_%H%M%S") | |
| return root / f"run_{timestamp}_{uuid4().hex[:8]}" | |
| def _mark_managed_run_active(run_root: Path) -> None: | |
| with _ACTIVE_MANAGED_RUNS_LOCK: | |
| _ACTIVE_MANAGED_RUNS.add(str(run_root.resolve())) | |
| def _register_download_workspace(workspace_root: Path) -> str: | |
| token = uuid4().hex | |
| with _DOWNLOAD_WORKSPACES_LOCK: | |
| _DOWNLOAD_WORKSPACES[token] = str(workspace_root.resolve()) | |
| return token | |
| def _unregister_download_workspace(token: str) -> None: | |
| if not token: | |
| return | |
| with _DOWNLOAD_WORKSPACES_LOCK: | |
| _DOWNLOAD_WORKSPACES.pop(token, None) | |
| def _download_workspace_for_token(token: str) -> Path: | |
| with _DOWNLOAD_WORKSPACES_LOCK: | |
| workspace_text = _DOWNLOAD_WORKSPACES.get(str(token or "")) | |
| if not workspace_text: | |
| raise HTTPException(status_code=404, detail="No downloadable workspace is available for this chat.") | |
| workspace_root = Path(workspace_text).resolve() | |
| if not workspace_root.is_dir(): | |
| raise HTTPException(status_code=404, detail="The workspace is no longer available.") | |
| return workspace_root | |
| def _resolve_workspace_file_path(workspace_root: Path, raw_path: str) -> Path: | |
| text = str(raw_path or "").strip() | |
| if text.startswith("file://"): | |
| text = text[7:] | |
| text = unquote(text) | |
| if not text: | |
| raise HTTPException(status_code=400, detail="workspace file path is required") | |
| candidate = Path(text) | |
| if not candidate.is_absolute(): | |
| candidate = workspace_root / text | |
| resolved = candidate.resolve() | |
| try: | |
| resolved.relative_to(workspace_root.resolve()) | |
| except ValueError as exc: | |
| raise HTTPException(status_code=403, detail="workspace file path is outside the workspace") from exc | |
| if not resolved.is_file(): | |
| raise HTTPException(status_code=404, detail="workspace file does not exist") | |
| if resolved.suffix.lower() not in {".png", ".jpg", ".jpeg", ".gif", ".webp", ".bmp", ".svg"}: | |
| raise HTTPException(status_code=415, detail="only workspace image files can be displayed inline") | |
| return resolved | |
| def _release_managed_run(bridge: FrontendRunBridge) -> None: | |
| _unregister_download_workspace(bridge.download_token) | |
| if bridge.managed_run_root: | |
| with _ACTIVE_MANAGED_RUNS_LOCK: | |
| _ACTIVE_MANAGED_RUNS.discard(str(Path(bridge.managed_run_root).resolve())) | |
| bridge.managed_run_root = "" | |
| bridge.managed_workspace_root = "" | |
| bridge.managed_trace_dir = "" | |
| bridge.download_token = "" | |
| def _create_managed_run(bridge: FrontendRunBridge) -> tuple[Path, str]: | |
| run_root = _new_managed_run_root() | |
| workspace_root = run_root / "agent_workspace" | |
| trace_dir = run_root / "agent_trace" | |
| workspace_root.mkdir(parents=True, exist_ok=True) | |
| trace_dir.mkdir(parents=True, exist_ok=True) | |
| bridge.managed_run_root = str(run_root) | |
| bridge.managed_workspace_root = str(workspace_root) | |
| bridge.managed_trace_dir = str(trace_dir) | |
| bridge.download_token = _register_download_workspace(workspace_root) | |
| _mark_managed_run_active(run_root) | |
| return workspace_root, str(trace_dir) | |
| def cleanup_managed_runs_once() -> None: | |
| root = _managed_runs_root() | |
| if root is None or not root.exists(): | |
| return | |
| now = time.time() | |
| with _ACTIVE_MANAGED_RUNS_LOCK: | |
| active = set(_ACTIVE_MANAGED_RUNS) | |
| runs = [] | |
| for child in root.iterdir(): | |
| if not child.is_dir() or not child.name.startswith("run_"): | |
| continue | |
| try: | |
| resolved = str(child.resolve()) | |
| mtime = child.stat().st_mtime | |
| except OSError: | |
| continue | |
| runs.append((mtime, child, resolved)) | |
| for mtime, child, resolved in runs: | |
| if resolved in active: | |
| continue | |
| if FRONTEND_CLEANUP_RETENTION_SECONDS and now - mtime > FRONTEND_CLEANUP_RETENTION_SECONDS: | |
| shutil.rmtree(child, ignore_errors=True) | |
| remaining = [] | |
| with _ACTIVE_MANAGED_RUNS_LOCK: | |
| active = set(_ACTIVE_MANAGED_RUNS) | |
| for child in root.iterdir(): | |
| if not child.is_dir() or not child.name.startswith("run_"): | |
| continue | |
| try: | |
| remaining.append((child.stat().st_mtime, child, str(child.resolve()))) | |
| except OSError: | |
| continue | |
| remaining.sort(reverse=True, key=lambda item: item[0]) | |
| for _, child, resolved in remaining[FRONTEND_CLEANUP_MAX_RUNS:]: | |
| if resolved not in active: | |
| shutil.rmtree(child, ignore_errors=True) | |
| def _managed_cleanup_loop() -> None: | |
| while True: | |
| time.sleep(FRONTEND_CLEANUP_INTERVAL_SECONDS) | |
| cleanup_managed_runs_once() | |
| def _start_managed_cleanup_thread() -> None: | |
| global _CLEANUP_THREAD_STARTED | |
| if _CLEANUP_THREAD_STARTED: | |
| return | |
| thread = threading.Thread(target=_managed_cleanup_loop, daemon=True) | |
| thread.start() | |
| _CLEANUP_THREAD_STARTED = True | |
| def _collection_root() -> Path | None: | |
| root = _managed_runs_root() | |
| if root is None: | |
| return None | |
| collection_root = root / "_collection" | |
| (collection_root / "pending").mkdir(parents=True, exist_ok=True) | |
| return collection_root | |
| def _collection_token() -> str: | |
| for name in ("HF_TOKEN", "HUGGINGFACE_HUB_TOKEN", "HUGGING_FACE_HUB_TOKEN"): | |
| value = os.getenv(name, "").strip() | |
| if value: | |
| return value | |
| return "" | |
| def _warn_collection_once(key: str, message: str) -> None: | |
| if key in _COLLECTION_CONFIG_WARNED: | |
| return | |
| _COLLECTION_CONFIG_WARNED.add(key) | |
| print(f"[ResearchHarness Space collection] {message}", flush=True) | |
| def _collection_ready() -> bool: | |
| if not FRONTEND_COLLECTION_ENABLED: | |
| return False | |
| if not FRONTEND_COLLECTION_DATASET_REPO: | |
| _warn_collection_once("missing_repo", "disabled because RH_COLLECTION_DATASET_REPO is empty.") | |
| return False | |
| if not _collection_token(): | |
| _warn_collection_once("missing_token", "disabled because HF_TOKEN is not configured.") | |
| return False | |
| return _collection_root() is not None | |
| class _CollectionBundleTooLarge(RuntimeError): | |
| pass | |
| def _iter_collection_files(run_root: Path) -> list[tuple[Path, str]]: | |
| files: list[tuple[Path, str]] = [] | |
| for dirname in ("agent_trace", "agent_workspace"): | |
| base = run_root / dirname | |
| if not base.exists() or not base.is_dir(): | |
| continue | |
| for path in sorted(base.rglob("*")): | |
| if path.is_symlink() or not path.is_file(): | |
| continue | |
| arcname = str(Path(dirname) / path.relative_to(base)) | |
| files.append((path, arcname)) | |
| return files | |
| def _write_collection_bundle(run_root: Path, result: dict[str, Any]) -> Path | None: | |
| collection_root = _collection_root() | |
| if collection_root is None: | |
| return None | |
| pending_dir = collection_root / "pending" | |
| bundle_id = f"{run_root.name}_{_dt.datetime.utcnow().strftime('%Y%m%dT%H%M%SZ')}_{uuid4().hex[:8]}" | |
| zip_path = pending_dir / f"{bundle_id}.zip" | |
| meta_path = pending_dir / f"{bundle_id}.json" | |
| files = _iter_collection_files(run_root) | |
| skipped: list[dict[str, str]] = [] | |
| manifest = { | |
| "bundle_id": bundle_id, | |
| "run_id": run_root.name, | |
| "created_at_utc": _dt.datetime.utcnow().isoformat(timespec="seconds") + "Z", | |
| "source": "ResearchHarness HuggingFace Space", | |
| "max_bundle_bytes": FRONTEND_COLLECTION_MAX_BUNDLE_BYTES, | |
| "file_count": len(files), | |
| "result_text": str(result.get("result_text", "")), | |
| "termination": str(result.get("termination", "")), | |
| } | |
| try: | |
| with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as archive: | |
| for path, arcname in files: | |
| try: | |
| archive.write(path, arcname) | |
| except OSError as exc: | |
| skipped.append({"path": str(path), "error": str(exc)}) | |
| continue | |
| if zip_path.stat().st_size > FRONTEND_COLLECTION_MAX_BUNDLE_BYTES: | |
| raise _CollectionBundleTooLarge | |
| manifest["skipped_files"] = skipped | |
| archive.writestr("manifest.json", json.dumps(safe_jsonable(manifest), ensure_ascii=False, indent=2)) | |
| if zip_path.stat().st_size > FRONTEND_COLLECTION_MAX_BUNDLE_BYTES: | |
| raise _CollectionBundleTooLarge | |
| except _CollectionBundleTooLarge: | |
| zip_path.unlink(missing_ok=True) | |
| meta_path.unlink(missing_ok=True) | |
| print( | |
| f"[ResearchHarness Space collection] dropped oversized bundle for {run_root.name}; " | |
| f"limit={FRONTEND_COLLECTION_MAX_BUNDLE_BYTES} bytes", | |
| flush=True, | |
| ) | |
| return None | |
| except Exception: | |
| zip_path.unlink(missing_ok=True) | |
| meta_path.unlink(missing_ok=True) | |
| print("[ResearchHarness Space collection] failed to create bundle", flush=True) | |
| traceback.print_exc() | |
| return None | |
| meta = dict(manifest) | |
| meta["bundle_bytes"] = zip_path.stat().st_size | |
| meta_path.write_text(json.dumps(safe_jsonable(meta), ensure_ascii=False, indent=2), encoding="utf-8") | |
| print(f"[ResearchHarness Space collection] queued bundle {zip_path.name}", flush=True) | |
| return zip_path | |
| def _record_collection_upload_error(collection_root: Path, error: str) -> None: | |
| payload = { | |
| "created_at_utc": _dt.datetime.utcnow().isoformat(timespec="seconds") + "Z", | |
| "error": error, | |
| } | |
| (collection_root / "last_upload_error.json").write_text( | |
| json.dumps(payload, ensure_ascii=False, indent=2), | |
| encoding="utf-8", | |
| ) | |
| def _create_dataset_pr_for_bundles(bundle_paths: list[Path]) -> str: | |
| from huggingface_hub import CommitOperationAdd, HfApi | |
| batch_id = f"batch_{_dt.datetime.utcnow().strftime('%Y%m%dT%H%M%SZ')}_{uuid4().hex[:8]}" | |
| operations = [] | |
| for bundle_path in bundle_paths: | |
| operations.append( | |
| CommitOperationAdd( | |
| path_in_repo=f"batches/{batch_id}/{bundle_path.name}", | |
| path_or_fileobj=str(bundle_path), | |
| ) | |
| ) | |
| sidecar = bundle_path.with_suffix(".json") | |
| if sidecar.exists(): | |
| operations.append( | |
| CommitOperationAdd( | |
| path_in_repo=f"batches/{batch_id}/{sidecar.name}", | |
| path_or_fileobj=str(sidecar), | |
| ) | |
| ) | |
| info = HfApi(token=_collection_token()).create_commit( | |
| repo_id=FRONTEND_COLLECTION_DATASET_REPO, | |
| repo_type="dataset", | |
| operations=operations, | |
| commit_message=f"Add ResearchHarness traces {batch_id}", | |
| commit_description="Automatically collected ResearchHarness Space trajectories.", | |
| create_pr=True, | |
| ) | |
| return str(getattr(info, "pr_url", "") or getattr(info, "commit_url", "") or info) | |
| def _flush_collection_batches() -> None: | |
| if not _collection_ready(): | |
| return | |
| collection_root = _collection_root() | |
| if collection_root is None: | |
| return | |
| with _COLLECTION_LOCK: | |
| pending_dir = collection_root / "pending" | |
| while True: | |
| bundles = sorted(pending_dir.glob("*.zip"), key=lambda path: path.stat().st_mtime) | |
| if len(bundles) < FRONTEND_COLLECTION_BATCH_SIZE: | |
| return | |
| selected = bundles[:FRONTEND_COLLECTION_BATCH_SIZE] | |
| try: | |
| pr_url = _create_dataset_pr_for_bundles(selected) | |
| except Exception as exc: | |
| _record_collection_upload_error(collection_root, str(exc)) | |
| print("[ResearchHarness Space collection] failed to create dataset PR", flush=True) | |
| traceback.print_exc() | |
| return | |
| for bundle_path in selected: | |
| bundle_path.unlink(missing_ok=True) | |
| bundle_path.with_suffix(".json").unlink(missing_ok=True) | |
| (collection_root / "last_upload_error.json").unlink(missing_ok=True) | |
| print( | |
| f"[ResearchHarness Space collection] created dataset PR for {len(selected)} bundles: {pr_url}", | |
| flush=True, | |
| ) | |
| def _collect_finished_managed_run(run_root_text: str, result: dict[str, Any]) -> None: | |
| if not _collection_ready() or not run_root_text: | |
| return | |
| run_root = Path(run_root_text) | |
| if not run_root.exists() or not run_root.is_dir(): | |
| return | |
| bundle = _write_collection_bundle(run_root, result) | |
| if bundle is None: | |
| return | |
| threading.Thread(target=_flush_collection_batches, daemon=True).start() | |
| def _workspace_download_files(workspace_root: Path) -> list[Path]: | |
| files: list[Path] = [] | |
| total_bytes = 0 | |
| for path in sorted(workspace_root.rglob("*")): | |
| if path.is_symlink() or not path.is_file(): | |
| continue | |
| try: | |
| resolved = path.resolve() | |
| resolved.relative_to(workspace_root) | |
| size = resolved.stat().st_size | |
| except (OSError, ValueError): | |
| continue | |
| files.append(resolved) | |
| total_bytes += size | |
| if len(files) > MAX_WORKSPACE_DOWNLOAD_FILES: | |
| raise HTTPException(status_code=413, detail="Workspace has too many files to download as one zip.") | |
| if total_bytes > MAX_WORKSPACE_DOWNLOAD_BYTES: | |
| raise HTTPException(status_code=413, detail="Workspace is too large to download as one zip.") | |
| if not files: | |
| raise HTTPException(status_code=404, detail="The agent workspace has no downloadable files yet.") | |
| return files | |
| def _create_workspace_zip(workspace_root: Path) -> Path: | |
| files = _workspace_download_files(workspace_root) | |
| handle = tempfile.NamedTemporaryFile(prefix="rh_workspace_", suffix=".zip", delete=False) | |
| zip_path = Path(handle.name) | |
| handle.close() | |
| try: | |
| with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as archive: | |
| for path in files: | |
| archive.write(path, path.relative_to(workspace_root).as_posix()) | |
| if zip_path.stat().st_size > MAX_WORKSPACE_DOWNLOAD_BYTES: | |
| raise HTTPException(status_code=413, detail="Workspace zip is too large to download.") | |
| except Exception: | |
| zip_path.unlink(missing_ok=True) | |
| raise | |
| return zip_path | |
| class FrontendInteractiveAgent(MultiTurnReactAgent): | |
| def __init__(self, *, bridge: FrontendRunBridge, **kwargs: Any): | |
| super().__init__(**kwargs) | |
| self.bridge = bridge | |
| def custom_call_tool(self, tool_name: str, tool_args: Any, **kwargs: Any): | |
| if tool_name != "AskUser": | |
| return super().custom_call_tool(tool_name, tool_args, **kwargs) | |
| tool = self.tool_map.get("AskUser") | |
| if tool is None: | |
| return "[AskUser] Tool is not available in this run." | |
| try: | |
| parsed = tool.parse_json_args(tool_args) | |
| except ValueError as exc: | |
| return f"[AskUser] {exc}" | |
| question = str(parsed.get("question", "")).strip() | |
| context = str(parsed.get("context", "") or "").strip() | |
| if not question: | |
| return "[AskUser] question must be a non-empty string." | |
| return self.bridge.ask_user(question=question, context=context) | |
| def _safe_image_suffix(mime: str, filename: str = "") -> str: | |
| suffix = Path(filename).suffix.lower() | |
| if suffix in {".png", ".jpg", ".jpeg", ".gif", ".webp", ".bmp"}: | |
| return suffix | |
| mapping = { | |
| "image/png": ".png", | |
| "image/jpeg": ".jpg", | |
| "image/gif": ".gif", | |
| "image/webp": ".webp", | |
| "image/bmp": ".bmp", | |
| } | |
| return mapping.get(mime.lower(), ".png") | |
| def decode_image_data_url(data_url: str, *, filename: str = "") -> tuple[str, bytes]: | |
| match = re.fullmatch(r"data:(image/[A-Za-z0-9.+-]+);base64,(.*)", str(data_url), flags=re.DOTALL) | |
| if not match: | |
| raise ValueError("image must be a data:image/...;base64,... URL") | |
| mime = match.group(1) | |
| try: | |
| raw = base64.b64decode(match.group(2), validate=True) | |
| except ValueError as exc: | |
| raise ValueError(f"invalid base64 image data: {exc}") from exc | |
| if not raw: | |
| raise ValueError("image upload is empty") | |
| if len(raw) > MAX_IMAGE_BYTES: | |
| raise ValueError(f"image upload exceeds {MAX_IMAGE_BYTES} bytes") | |
| return _safe_image_suffix(mime, filename), raw | |
| def save_uploaded_images(workspace_root: Path, images: list[dict[str, Any]]) -> tuple[list[dict[str, Any]], list[str]]: | |
| if len(images) > MAX_UPLOAD_IMAGES: | |
| raise ValueError(f"at most {MAX_UPLOAD_IMAGES} images are supported per run") | |
| if not images: | |
| return [], [] | |
| timestamp = _dt.datetime.now().strftime("%Y%m%d_%H%M%S") | |
| content_parts: list[dict[str, Any]] = [] | |
| saved_paths: list[str] = [] | |
| for idx, item in enumerate(images, start=1): | |
| if not isinstance(item, dict): | |
| raise ValueError("each image item must be an object") | |
| data_url = str(item.get("data_url", "")).strip() | |
| filename = str(item.get("name", "") or f"image_{idx}") | |
| suffix, raw = decode_image_data_url(data_url, filename=filename) | |
| saved_path = stage_image_bytes_for_input( | |
| raw, | |
| workspace_root=workspace_root, | |
| filename=f"{timestamp}_{filename}", | |
| image_index=idx - 1, | |
| suffix=suffix, | |
| ) | |
| saved_paths.append(saved_path) | |
| content_parts.extend(image_input_content_parts(data_url, saved_path)) | |
| return content_parts, saved_paths | |
| def _prompt_with_uploaded_image_paths(prompt: str, saved_paths: list[str]) -> str: | |
| return append_saved_image_paths_to_prompt(prompt, saved_paths) | |
| def _run_agent_thread( | |
| *, | |
| bridge: FrontendRunBridge, | |
| prompt: str, | |
| workspace_root: Path, | |
| initial_content_parts: list[dict[str, Any]], | |
| trace_dir: str, | |
| prior_messages: list[dict[str, Any]] | None = None, | |
| managed_run_root: str = "", | |
| model_name: str = "", | |
| ) -> None: | |
| try: | |
| load_default_dotenvs() | |
| require_required_env("ResearchHarness frontend") | |
| agent = FrontendInteractiveAgent( | |
| bridge=bridge, | |
| llm=default_llm_config(model_name=model_name or None), | |
| trace_dir=trace_dir, | |
| ) | |
| bridge.send( | |
| { | |
| "type": "run_started", | |
| "model": agent.model, | |
| "workspace_root": str(workspace_root), | |
| "trace_dir": trace_dir, | |
| "download_token": bridge.download_token, | |
| } | |
| ) | |
| result = agent._run_session( | |
| prompt, | |
| workspace_root=str(workspace_root), | |
| event_callback=bridge.trace_event, | |
| initial_content_parts=initial_content_parts or None, | |
| prior_messages=prior_messages, | |
| interrupt_event=bridge.cancelled, | |
| ) | |
| bridge.conversation_messages = result.get("messages", []) | |
| bridge.conversation_workspace_root = str(workspace_root) | |
| if managed_run_root: | |
| _collect_finished_managed_run(managed_run_root, result) | |
| bridge.send( | |
| { | |
| "type": "run_finished", | |
| "result_text": result.get("result_text", ""), | |
| "termination": result.get("termination", ""), | |
| } | |
| ) | |
| except (MissingRequiredEnvError, ValueError) as exc: | |
| bridge.send({"type": "run_error", "error": str(exc)}) | |
| except Exception as exc: | |
| bridge.send({"type": "run_error", "error": str(exc), "traceback": traceback.format_exc()}) | |
| def index() -> FileResponse: | |
| return FileResponse(STATIC_DIR / "index.html") | |
| def favicon() -> FileResponse: | |
| return FileResponse(STATIC_DIR / "favicon.svg", media_type="image/svg+xml") | |
| def download_workspace_zip(token: str) -> FileResponse: | |
| workspace_root = _download_workspace_for_token(token) | |
| zip_path = _create_workspace_zip(workspace_root) | |
| filename = f"{workspace_root.parent.name}_agent_workspace.zip" | |
| return FileResponse( | |
| zip_path, | |
| media_type="application/zip", | |
| filename=filename, | |
| background=BackgroundTask(lambda path: Path(path).unlink(missing_ok=True), str(zip_path)), | |
| ) | |
| def workspace_file(token: str, path: str) -> FileResponse: | |
| workspace_root = _download_workspace_for_token(token) | |
| return FileResponse(_resolve_workspace_file_path(workspace_root, path)) | |
| async def websocket_endpoint(websocket: WebSocket) -> None: | |
| await websocket.accept() | |
| bridge = FrontendRunBridge(loop=asyncio.get_running_loop()) | |
| run_thread: threading.Thread | None = None | |
| async def sender() -> None: | |
| while True: | |
| payload = await bridge.outbound.get() | |
| await websocket.send_json(payload) | |
| sender_task = asyncio.create_task(sender()) | |
| try: | |
| await websocket.send_json({"type": "ready", "managed_workspace": True}) | |
| while True: | |
| message = await websocket.receive_json() | |
| message_type = str(message.get("type", "")).strip() | |
| if message_type == "start": | |
| if run_thread is not None and run_thread.is_alive(): | |
| bridge.send({"type": "run_error", "error": "A run is already active. Wait for it to finish before starting a new conversation."}) | |
| continue | |
| prompt = str(message.get("prompt", "")).strip() | |
| if not prompt: | |
| bridge.send({"type": "run_error", "error": "Prompt is required."}) | |
| continue | |
| try: | |
| continue_conversation = bool(message.get("continue_conversation")) | |
| model_name = str(message.get("model_name", "") or "").strip() | |
| prior_messages = None | |
| if continue_conversation: | |
| if not bridge.conversation_messages or not bridge.managed_workspace_root: | |
| bridge.send({"type": "run_error", "error": "No active conversation is available on the server. Click New chat and start again."}) | |
| continue | |
| workspace_root = Path(bridge.managed_workspace_root) | |
| effective_trace_dir = bridge.managed_trace_dir | |
| prior_messages = bridge.conversation_messages | |
| else: | |
| _release_managed_run(bridge) | |
| workspace_root, effective_trace_dir = _create_managed_run(bridge) | |
| image_parts, saved_paths = save_uploaded_images( | |
| workspace_root, | |
| message.get("images", []) if isinstance(message.get("images", []), list) else [], | |
| ) | |
| run_prompt = _prompt_with_uploaded_image_paths(prompt, saved_paths) | |
| except ValueError as exc: | |
| bridge.send({"type": "run_error", "error": str(exc)}) | |
| continue | |
| bridge.cancelled.clear() | |
| if not continue_conversation: | |
| bridge.conversation_messages = None | |
| bridge.conversation_workspace_root = str(workspace_root) | |
| bridge.send({"type": "conversation_reset"}) | |
| if saved_paths: | |
| bridge.send({"type": "uploaded_images", "paths": saved_paths}) | |
| run_thread = threading.Thread( | |
| target=_run_agent_thread, | |
| kwargs={ | |
| "bridge": bridge, | |
| "prompt": run_prompt, | |
| "workspace_root": workspace_root, | |
| "initial_content_parts": image_parts, | |
| "trace_dir": effective_trace_dir, | |
| "prior_messages": prior_messages, | |
| "managed_run_root": bridge.managed_run_root, | |
| "model_name": model_name, | |
| }, | |
| daemon=True, | |
| ) | |
| run_thread.start() | |
| elif message_type == "ask_user_answer": | |
| ok = bridge.submit_answer(str(message.get("request_id", "")), str(message.get("answer", ""))) | |
| if not ok: | |
| bridge.send({"type": "run_error", "error": "No pending AskUser request matched that answer."}) | |
| elif message_type == "interrupt": | |
| if run_thread is not None and run_thread.is_alive(): | |
| bridge.cancelled.set() | |
| bridge.send({"type": "interrupt_requested"}) | |
| else: | |
| bridge.send({"type": "run_error", "error": "No active run is available to interrupt."}) | |
| elif message_type == "new": | |
| if run_thread is not None and run_thread.is_alive(): | |
| bridge.send({"type": "run_error", "error": "The current run is still active. Start a new conversation after it finishes."}) | |
| else: | |
| _release_managed_run(bridge) | |
| bridge.conversation_messages = None | |
| bridge.conversation_workspace_root = "" | |
| bridge.send({"type": "conversation_reset"}) | |
| else: | |
| bridge.send({"type": "run_error", "error": f"Unknown websocket message type: {message_type}"}) | |
| except WebSocketDisconnect: | |
| bridge.cancelled.set() | |
| finally: | |
| bridge.cancelled.set() | |
| _release_managed_run(bridge) | |
| sender_task.cancel() | |