import argparse import base64 import json import os import re import shutil import shlex import sys from pathlib import Path from typing import Any, Iterable, Optional, Union PROJECT_ROOT = Path(__file__).resolve().parent.parent _DOTENV_LAST_LOADED: dict[tuple[str, str], str] = {} REQUIRED_ENV_VARS = ( "API_KEY", "API_BASE", "MODEL_NAME", "SERPER_KEY", "JINA_KEY", "MINERU_TOKEN", ) IMAGE_INPUT_REL_DIR = Path("inputs") / "images" MAX_INPUT_IMAGE_BYTES = 25 * 1024 * 1024 IMAGE_MIME_BY_EXTENSION = { ".png": "image/png", ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".webp": "image/webp", ".gif": "image/gif", ".bmp": "image/bmp", } class MissingRequiredEnvError(RuntimeError): pass def load_dotenv(path: Union[str, Path]) -> None: env_path = Path(path).expanduser() if not env_path.exists(): return env_id = str(env_path.resolve()) for raw_line in env_path.read_text(encoding="utf-8").splitlines(): line = raw_line.strip() if not line or line.startswith("#"): continue if line.startswith("export "): line = line[len("export "):].strip() if "=" not in line: continue key, value = line.split("=", 1) key = key.strip() value = value.strip() if not key: continue if value: lexer = shlex.shlex(value, posix=True) lexer.whitespace = "" lexer.commenters = "#" parsed_value = "".join(list(lexer)).strip() else: parsed_value = "" marker = (env_id, key) existing = os.environ.get(key) previous_loaded = _DOTENV_LAST_LOADED.get(marker) if existing is None or existing == previous_loaded: os.environ[key] = parsed_value _DOTENV_LAST_LOADED[marker] = parsed_value def load_default_dotenvs() -> None: """Load dotenv files for both installed and source-tree usage.""" cwd_env = Path.cwd() / ".env" project_env = PROJECT_ROOT / ".env" load_dotenv(cwd_env) if cwd_env.resolve() != project_env.resolve(): load_dotenv(project_env) def env_flag(name: str) -> bool: return os.getenv(name, "").lower() in {"1", "true", "yes", "on"} def missing_required_env(required: tuple[str, ...] = REQUIRED_ENV_VARS) -> list[str]: return [key for key in required if not os.getenv(key, "").strip()] def require_required_env(context: str = "ResearchHarness") -> None: missing = missing_required_env() if not missing: return raise MissingRequiredEnvError( f"{context} missing required environment variables: {', '.join(missing)}. " "Set them in .env or the process environment before running." ) def read_role_prompt_files(paths: Iterable[str]) -> str: blocks: list[str] = [] for raw_path in paths: path_text = str(raw_path).strip() if not path_text: continue path = Path(path_text).expanduser() if not path.exists(): raise ValueError(f"Role prompt file does not exist: {path}") if not path.is_file(): raise ValueError(f"Role prompt path is not a file: {path}") blocks.append(path.read_text(encoding="utf-8").strip()) return "\n\n".join(block for block in blocks if block.strip()) def _safe_image_stem(name: str, fallback: str) -> str: stem = re.sub(r"[^A-Za-z0-9_.-]+", "_", Path(name).stem).strip("._") return stem or fallback def _unique_image_path(image_dir: Path, *, image_index: int, stem: str, suffix: str) -> Path: base_name = f"image_{image_index:03d}_{stem}{suffix}" candidate = image_dir / base_name if not candidate.exists(): return candidate counter = 1 while True: candidate = image_dir / f"image_{image_index:03d}_{stem}_{counter}{suffix}" if not candidate.exists(): return candidate counter += 1 def image_input_content_parts(data_url: str, saved_path: str, *, detail: str = "auto") -> list[dict[str, Any]]: """Build standard initial content parts for a saved user image.""" return [ {"type": "text", "text": f"[User-provided image saved at {saved_path}]"}, {"type": "image_url", "image_url": {"url": data_url, "detail": detail or "auto"}}, ] def stage_image_bytes_for_input( raw: bytes, *, workspace_root: Union[str, Path], filename: str, image_index: int, suffix: str, max_bytes: int = MAX_INPUT_IMAGE_BYTES, ) -> str: if not raw: raise ValueError("image input is empty") if len(raw) > max_bytes: raise ValueError(f"image input exceeds {max_bytes} bytes") normalized_suffix = suffix.lower() if normalized_suffix not in IMAGE_MIME_BY_EXTENSION: raise ValueError(f"unsupported image extension: {suffix}") root = Path(workspace_root).expanduser().resolve() image_dir = root / IMAGE_INPUT_REL_DIR image_dir.mkdir(parents=True, exist_ok=True) stem = _safe_image_stem(filename, f"image_{image_index:03d}") dest = _unique_image_path(image_dir, image_index=image_index, stem=stem, suffix=normalized_suffix) dest.write_bytes(raw) return dest.relative_to(root).as_posix() def stage_image_file_for_input( source_path: Union[str, Path], *, workspace_root: Union[str, Path], image_index: int, max_bytes: int = MAX_INPUT_IMAGE_BYTES, ) -> tuple[str, str]: source = Path(source_path).expanduser() if not source.is_absolute(): source = (Path.cwd() / source).resolve() else: source = source.resolve() if not source.exists(): raise ValueError(f"image path does not exist: {source}") if not source.is_file(): raise ValueError(f"image path is not a file: {source}") suffix = source.suffix.lower() mime_type = IMAGE_MIME_BY_EXTENSION.get(suffix) if mime_type is None: raise ValueError(f"unsupported image extension for {source}; expected one of {', '.join(sorted(IMAGE_MIME_BY_EXTENSION))}") size = source.stat().st_size if size <= 0: raise ValueError(f"image file is empty: {source}") if size > max_bytes: raise ValueError(f"image file exceeds {max_bytes} bytes: {source}") root = Path(workspace_root).expanduser().resolve() image_dir = root / IMAGE_INPUT_REL_DIR image_dir.mkdir(parents=True, exist_ok=True) stem = _safe_image_stem(source.name, f"image_{image_index:03d}") dest = _unique_image_path(image_dir, image_index=image_index, stem=stem, suffix=suffix) shutil.copyfile(source, dest) rel_path = dest.relative_to(root).as_posix() data_url = f"data:{mime_type};base64," + base64.b64encode(dest.read_bytes()).decode("ascii") return rel_path, data_url def append_saved_image_paths_to_prompt(prompt: str, saved_paths: Iterable[str]) -> str: paths = [str(path).strip() for path in saved_paths if str(path).strip()] if not paths: return prompt lines = "\n".join(f"- {path}" for path in paths) return ( f"{prompt.strip()}\n\n" "The user attached image input. The images are saved locally inside the workspace:\n" f"{lines}\n" "Use the direct image input when the model supports vision. If tool-based inspection is needed, use ReadImage on the saved local paths." ) def safe_jsonable(value: Any) -> Any: if isinstance(value, (str, int, float, bool)) or value is None: return value if isinstance(value, dict): return {str(key): safe_jsonable(item) for key, item in value.items()} if isinstance(value, (list, tuple)): return [safe_jsonable(item) for item in value] return str(value) def append_jsonl(path: Union[str, Path], record: dict[str, Any]) -> None: output_path = Path(path) output_path.parent.mkdir(parents=True, exist_ok=True) with output_path.open("a", encoding="utf-8") as fp: fp.write(json.dumps(record, ensure_ascii=False) + "\n") def read_text_lossy(path: Union[str, Path]) -> str: file_path = Path(path) try: return file_path.read_text(encoding="utf-8") except UnicodeDecodeError: return file_path.read_text(encoding="utf-8", errors="replace") def main(argv: Optional[list[str]] = None) -> int: parser = argparse.ArgumentParser(description="Inspect shared agent_base utilities.") parser.add_argument("--dotenv", help="Optional dotenv path to load before printing the summary.") args = parser.parse_args(argv) if args.dotenv: load_dotenv(args.dotenv) payload = { "project_root": str(PROJECT_ROOT), "dotenv_loaded": bool(args.dotenv), } print(json.dumps(payload, ensure_ascii=False, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main(sys.argv[1:]))