"""Validate MiniCPM-V object understanding on the hosted Hugging Face Space.""" from __future__ import annotations import argparse import json import signal import sys import time import urllib.request from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any from urllib.parse import urlparse PROJECT_ROOT = Path(__file__).resolve().parents[1] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from src.models.schema import TraceRecord DEFAULT_SPACE_URL = "https://huggingface.co/spaces/build-small-hackathon/ObjectverseDiary" DEFAULT_OUTPUT_PATH = Path("docs/SPACE_VLM_REPORT.md") DEFAULT_JSON_OUTPUT_PATH = Path("docs/SPACE_VLM_REPORT.json") DEFAULT_ASSET_DIR = Path(".tmp/space-vlm-assets") DEFAULT_FAILURE_NOTES_PATH = Path("docs/FAILURES.md") DEFAULT_HARDWARE = "zero-a10g" MOCK_SAFE_HARDWARE = "cpu-basic" GENERATE_API_NAME = "/generate_object_file" PROBE_API_NAME = "/vision_runtime_probe" REQUEST_TIMEOUT_SECONDS = 45 PREDICTION_TIMEOUT_SECONDS = 360 LATEST_FAILURE_HEADING = "## Latest Space VLM Validation Failure" SPACE_VARIABLES = { "OBJECTVERSE_VISION_BACKEND": "minicpm-v", "VISION_MODEL_ID": "openbmb/MiniCPM-V-2_6", "OBJECTVERSE_TEXT_BACKEND": "mock", } MOCK_SAFE_VARIABLES = { "OBJECTVERSE_VISION_BACKEND": "mock", "OBJECTVERSE_TEXT_BACKEND": "mock", } SENSITIVE_TRACE_MARKERS = ("HUGGINGFACE_TOKEN", "HF_TOKEN", "hf_") @dataclass(frozen=True) class ValidationAsset: key: str label: str source_page: str download_url: str expected_terms: tuple[str, ...] description: str mode: str = "Cynical" @dataclass(frozen=True) class ValidationResult: key: str label: str source_page: str image_path: str passed: bool object_name: str visible_features: list[str] likely_context: str confidence: float runtime_vision: str runtime_text: str fallbacks: list[str] error: str = "" TEST_ASSETS = [ ValidationAsset( key="mug", label="Coffee mug", source_page="https://commons.wikimedia.org/wiki/File:Striped_coffee_mug.jpg", download_url="https://commons.wikimedia.org/wiki/Special:Redirect/file/Striped_coffee_mug.jpg", expected_terms=("mug", "cup", "coffee", "ceramic", "handle"), description="A public Wikimedia Commons photo of a striped coffee mug.", ), ValidationAsset( key="keyboard", label="Computer keyboard", source_page="https://commons.wikimedia.org/wiki/File:Computer_keyboard.jpg", download_url="https://commons.wikimedia.org/wiki/Special:Redirect/file/Computer_keyboard.jpg", expected_terms=("keyboard", "key", "computer", "keys"), description="A public Wikimedia Commons photo of a computer keyboard.", mode="Philosopher", ), ValidationAsset( key="shoe", label="Running shoe", source_page="https://commons.wikimedia.org/wiki/File:Running_shoes.jpg", download_url="https://commons.wikimedia.org/wiki/Special:Redirect/file/Running_shoes.jpg", expected_terms=("shoe", "sneaker", "running", "footwear", "trainer"), description="A public Wikimedia Commons photo of running shoes.", mode="Dramatic", ), ] def parse_space_repo_id(space_url: str) -> str: parsed = urlparse(space_url) parts = [part for part in parsed.path.split("/") if part] if len(parts) >= 3 and parts[0] == "spaces": return f"{parts[1]}/{parts[2]}" if len(parts) == 2: return f"{parts[0]}/{parts[1]}" raise ValueError(f"Could not parse Hugging Face Space repo id from {space_url!r}") def download_validation_assets( asset_dir: Path = DEFAULT_ASSET_DIR, assets: list[ValidationAsset] | None = None, ) -> dict[str, Path]: selected_assets = assets or TEST_ASSETS asset_dir.mkdir(parents=True, exist_ok=True) paths: dict[str, Path] = {} for asset in selected_assets: output_path = asset_dir / f"{asset.key}.jpg" if not output_path.exists(): _download_url(asset.download_url, output_path) paths[asset.key] = output_path return paths def configure_space_for_vlm( repo_id: str, *, hardware: str = DEFAULT_HARDWARE, wait: bool = True, timeout_seconds: int = 900, ) -> dict[str, str]: from huggingface_hub import HfApi, SpaceHardware api = HfApi() _assert_hf_auth(api) for key, value in SPACE_VARIABLES.items(): api.add_space_variable(repo_id=repo_id, key=key, value=value) api.request_space_hardware(repo_id=repo_id, hardware=SpaceHardware(hardware)) if wait: wait_for_space_running(repo_id, timeout_seconds=timeout_seconds) return {"repo_id": repo_id, "hardware": hardware, **SPACE_VARIABLES} def rollback_space_to_mock(repo_id: str, *, hardware: str = MOCK_SAFE_HARDWARE) -> dict[str, str]: from huggingface_hub import HfApi, SpaceHardware api = HfApi() _assert_hf_auth(api) for key, value in MOCK_SAFE_VARIABLES.items(): api.add_space_variable(repo_id=repo_id, key=key, value=value) api.request_space_hardware(repo_id=repo_id, hardware=SpaceHardware(hardware)) return {"repo_id": repo_id, "hardware": hardware, **MOCK_SAFE_VARIABLES} def wait_for_space_running( repo_id: str, *, timeout_seconds: int = 900, poll_seconds: int = 20, ) -> str: from huggingface_hub import HfApi api = HfApi() deadline = time.monotonic() + timeout_seconds last_stage = "unknown" while time.monotonic() < deadline: runtime = api.get_space_runtime(repo_id=repo_id) last_stage = _runtime_stage_name(runtime) if last_stage.upper() == "RUNNING": return last_stage time.sleep(poll_seconds) raise TimeoutError(f"Space {repo_id} did not reach RUNNING within {timeout_seconds}s; last stage: {last_stage}") def run_space_validation( *, space_url: str = DEFAULT_SPACE_URL, asset_dir: Path = DEFAULT_ASSET_DIR, timeout_seconds: int = 900, assets: list[ValidationAsset] | None = None, trace_output_dir: Path | None = None, ) -> list[ValidationResult]: from gradio_client import handle_file selected_assets = assets or TEST_ASSETS paths = download_validation_assets(asset_dir, selected_assets) client_url = space_client_url(space_url) client = _build_gradio_client(client_url, timeout_seconds=timeout_seconds) results: list[ValidationResult] = [] started = time.monotonic() for asset in selected_assets: remaining = timeout_seconds - int(time.monotonic() - started) if remaining <= 0: raise TimeoutError(f"Validation exceeded timeout of {timeout_seconds}s") try: response = _predict_with_timeout( client, handle_file(str(paths[asset.key])), asset.description, asset.mode, timeout_seconds=min(PREDICTION_TIMEOUT_SECONDS, remaining), ) if trace_output_dir is not None: trace = extract_trace_record(response) write_trace_record(trace, trace_output_dir / f"{asset.key}.json") results.append(validate_prediction(asset, paths[asset.key], response)) except Exception as exc: results.append( ValidationResult( key=asset.key, label=asset.label, source_page=asset.source_page, image_path=str(paths[asset.key]), passed=False, object_name="", visible_features=[], likely_context="", confidence=0.0, runtime_vision="", runtime_text="", fallbacks=[], error=f"{type(exc).__name__}: {exc}", ) ) return results def run_vision_runtime_probe( *, space_url: str = DEFAULT_SPACE_URL, timeout_seconds: int = 900, ) -> dict[str, Any]: client_url = space_client_url(space_url) client = _build_gradio_client(client_url, timeout_seconds=timeout_seconds) response = _predict_api_with_timeout( client, api_name=PROBE_API_NAME, timeout_seconds=min(PREDICTION_TIMEOUT_SECONDS, timeout_seconds), ) payload = _extract_probe_payload(response) _assert_public_safe_serialized(json.dumps(payload, ensure_ascii=False, sort_keys=True), "Probe output") return payload def _predict_with_timeout( client: Any, image: Any, description: str, mode: str, *, timeout_seconds: int, ) -> Any: return _predict_api_with_timeout( client, image, description, mode, api_name=GENERATE_API_NAME, timeout_seconds=timeout_seconds, ) def _predict_api_with_timeout( client: Any, *inputs: Any, api_name: str, timeout_seconds: int, ) -> Any: def _raise_timeout(_signum: int, _frame: Any) -> None: raise TimeoutError(f"Gradio prediction did not finish within {timeout_seconds}s") previous_handler = signal.signal(signal.SIGALRM, _raise_timeout) signal.alarm(max(1, timeout_seconds)) try: return client.predict(*inputs, api_name=api_name) finally: signal.alarm(0) signal.signal(signal.SIGALRM, previous_handler) def _build_gradio_client(space_url: str, *, timeout_seconds: int) -> Any: from gradio_client import Client deadline = time.monotonic() + timeout_seconds last_error: Exception | None = None while time.monotonic() < deadline: try: return Client(space_url, verbose=False) except Exception as exc: last_error = exc time.sleep(10) if last_error is None: raise TimeoutError(f"Could not create Gradio client for {space_url}") raise TimeoutError(f"Could not fetch Gradio config for {space_url}: {type(last_error).__name__}: {last_error}") def space_client_url(space_url: str) -> str: parsed = urlparse(space_url) if parsed.netloc.endswith(".hf.space"): return space_url.rstrip("/") repo_id = parse_space_repo_id(space_url) owner, space_name = repo_id.split("/", 1) return f"https://{owner}-{space_name}.hf.space".lower() def validate_prediction( asset: ValidationAsset, image_path: Path, response: Any, ) -> ValidationResult: trace_payload = _extract_trace_payload(response) trace = TraceRecord.model_validate(trace_payload) object_info = trace.object_understanding.object search_text = " ".join( [ object_info.name, object_info.likely_context, " ".join(object_info.visible_features), ] ).lower() expected_match = any(term in search_text for term in asset.expected_terms) vision_runtime_ok = trace.model_runtime.get("vision") == "minicpm-v object understanding" text_runtime_ok = trace.model_runtime.get("text") == "mock persona and diary generation" no_vision_fallback = "vision-fallback-to-mock" not in trace.fallbacks passed = expected_match and vision_runtime_ok and text_runtime_ok and no_vision_fallback return ValidationResult( key=asset.key, label=asset.label, source_page=asset.source_page, image_path=str(image_path), passed=passed, object_name=object_info.name, visible_features=object_info.visible_features, likely_context=object_info.likely_context, confidence=object_info.confidence, runtime_vision=trace.model_runtime.get("vision", ""), runtime_text=trace.model_runtime.get("text", ""), fallbacks=trace.fallbacks, error="" if passed else _failure_reason(expected_match, vision_runtime_ok, text_runtime_ok, no_vision_fallback), ) def render_report( *, space_url: str, repo_id: str, results: list[ValidationResult], probe_result: dict[str, Any] | None = None, configured: dict[str, str] | None = None, rollback: dict[str, str] | None = None, configuration_error: str = "", ) -> str: now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") status = "NOT RUN" if configuration_error: status = "FAIL" elif results: status = "PASS" if all(result.passed for result in results) else "FAIL" lines = [ "# Space VLM Validation Report", "", f"- Generated at: {now}", f"- Space URL: {space_url}", f"- Space repo: `{repo_id}`", f"- Overall status: {status}", "- Vision backend expected: `minicpm-v`", "- Text backend expected: `mock`", "", "## Space Configuration", "", ] if configured: lines.extend(_config_lines("Applied configuration", configured)) else: lines.append("- Applied configuration: not changed by this run.") if rollback: lines.extend(["", *_config_lines("Rollback configuration", rollback)]) else: lines.append("- Rollback configuration: not applied by this run; live MiniCPM-V configuration remains active.") if configuration_error: lines.extend(["", "## Configuration Error", "", f"- Error: `{configuration_error}`"]) lines.extend(["", "## Vision Runtime Probe", ""]) if probe_result: lines.extend(_probe_lines(probe_result)) else: lines.append("- Probe was not run.") lines.extend(["", "## Results", ""]) for result in results: lines.extend( [ f"### {result.label}", "", f"- Status: {'PASS' if result.passed else 'FAIL'}", f"- Source: {result.source_page}", f"- Local temporary image: `{result.image_path}`", f"- Object name: `{result.object_name}`", f"- Visible features: {', '.join(result.visible_features) or 'n/a'}", f"- Likely context: `{result.likely_context}`", f"- Confidence: {result.confidence:.2f}", f"- Runtime vision: `{result.runtime_vision}`", f"- Runtime text: `{result.runtime_text}`", f"- Fallbacks: {', '.join(result.fallbacks) or 'none'}", ] ) if result.error: lines.append(f"- Error: `{result.error}`") lines.append("") lines.extend( [ "## Notes", "", "- Test images are temporary public Wikimedia Commons assets and are not committed.", "- No tokens, secrets, or private file paths should be recorded in this report.", "- If live validation fails, run the documented rollback command to switch `OBJECTVERSE_VISION_BACKEND` back to `mock`.", ] ) return "\n".join(lines) + "\n" def write_report(markdown: str, output_path: Path = DEFAULT_OUTPUT_PATH) -> Path: output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(markdown, encoding="utf-8") return output_path def write_json_results( results: list[ValidationResult], output_path: Path, *, probe_result: dict[str, Any] | None = None, ) -> Path: output_path.parent.mkdir(parents=True, exist_ok=True) result_payload = [result.__dict__ for result in results] payload: Any = result_payload if probe_result is not None: payload = {"probe": probe_result, "results": result_payload} serialized = json.dumps(payload, ensure_ascii=False, indent=2) _assert_public_safe_serialized(serialized, "JSON report") output_path.write_text(serialized, encoding="utf-8") return output_path def write_trace_record(trace: TraceRecord, output_path: Path) -> Path: output_path.parent.mkdir(parents=True, exist_ok=True) serialized = json.dumps(trace.model_dump(mode="json"), ensure_ascii=False, indent=2, sort_keys=True) _assert_public_safe_serialized(serialized, "Trace output") output_path.write_text(serialized + "\n", encoding="utf-8") return output_path def update_failure_notes( *, results: list[ValidationResult], probe_result: dict[str, Any] | None, output_path: Path = DEFAULT_FAILURE_NOTES_PATH, configuration_error: str = "", ) -> Path | None: failed_results = [result for result in results if not result.passed] if not configuration_error and not failed_results: return None output_path.parent.mkdir(parents=True, exist_ok=True) existing = output_path.read_text(encoding="utf-8") if output_path.exists() else "# Failure Notes\n" section = _latest_failure_section( results=failed_results, probe_result=probe_result, configuration_error=configuration_error, ) updated = _replace_or_append_section(existing, LATEST_FAILURE_HEADING, section) _assert_public_safe_serialized(updated, "Failure notes") output_path.write_text(updated, encoding="utf-8") return output_path def _download_url(url: str, output_path: Path) -> None: request = urllib.request.Request( url, headers={"User-Agent": "Objectverse-Diary-Space-VLM-Check/0.1"}, ) with urllib.request.urlopen(request, timeout=REQUEST_TIMEOUT_SECONDS) as response: output_path.write_bytes(response.read()) def _extract_trace_payload(response: Any) -> dict[str, Any]: if isinstance(response, tuple | list): if len(response) < 7: raise ValueError("Gradio response did not include trace JSON output.") trace_payload = response[6] elif isinstance(response, dict) and "trace" in response: trace_payload = response["trace"] else: raise ValueError("Unsupported Gradio response shape.") if not isinstance(trace_payload, dict): raise ValueError("Trace output was not a JSON object.") return trace_payload def _extract_probe_payload(response: Any) -> dict[str, Any]: if isinstance(response, dict): return response if isinstance(response, tuple | list) and len(response) == 1 and isinstance(response[0], dict): return response[0] raise ValueError("Probe output was not a JSON object.") def extract_trace_record(response: Any) -> TraceRecord: return TraceRecord.model_validate(_extract_trace_payload(response)) def _assert_public_safe_serialized(serialized_payload: str, label: str) -> None: for marker in SENSITIVE_TRACE_MARKERS: if marker in serialized_payload: raise ValueError(f"{label} may contain a sensitive token marker.") def _failure_reason( expected_match: bool, vision_runtime_ok: bool, text_runtime_ok: bool, no_vision_fallback: bool, ) -> str: reasons: list[str] = [] if not expected_match: reasons.append("object output did not match expected terms") if not vision_runtime_ok: reasons.append("vision runtime was not minicpm-v") if not text_runtime_ok: reasons.append("text runtime was not mock") if not no_vision_fallback: reasons.append("vision fallback marker was present") return "; ".join(reasons) def _runtime_stage_name(runtime: Any) -> str: stage = getattr(runtime, "stage", None) if stage is None and isinstance(runtime, dict): stage = runtime.get("stage") if hasattr(stage, "value"): return str(stage.value) return str(stage or "unknown") def _safe_error_payload(exc: Exception, *, stage: str) -> dict[str, str]: return { "backend": "unknown", "probe_ok": "false", "stage": stage, "error_type": type(exc).__name__, "error_summary": _sanitize_error_summary(str(exc) or type(exc).__name__), } def _sanitize_error_summary(value: str, *, max_length: int = 240) -> str: clean = value.replace(str(Path.home()), "[home]") clean = clean.replace("HUGGINGFACE_TOKEN", "[redacted]") clean = clean.replace("HF_TOKEN", "[redacted]") clean = clean.replace("hf_", "[redacted]") if len(clean) > max_length: return clean[: max_length - 3] + "..." return clean def _probe_lines(probe_result: dict[str, Any]) -> list[str]: summary_keys = ( "backend", "vision_model_id", "torch_import", "transformers_import", "cuda_available", "device_count", "device_name", "mps_available", "minicpm_load_attempted", "minicpm_load_ok", ) lines: list[str] = [] for key in summary_keys: if key in probe_result: lines.append(f"- `{key}`: `{probe_result[key]}`") errors = probe_result.get("errors") if isinstance(errors, list) and errors: lines.append("- Errors:") for error in errors: if isinstance(error, dict): stage = error.get("stage", "unknown") error_type = error.get("type", "unknown") summary = error.get("summary", "") lines.append(f" - `{stage}`: `{error_type}` - {summary}") elif "error_type" in probe_result: lines.append(f"- Error: `{probe_result['error_type']}` - {probe_result.get('error_summary', '')}") else: lines.append("- Errors: none") return lines def _latest_failure_section( *, results: list[ValidationResult], probe_result: dict[str, Any] | None, configuration_error: str, ) -> str: now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") lines = [ LATEST_FAILURE_HEADING, "", f"- Updated: {now}", "- Area: Hugging Face Space vision runtime.", ] if configuration_error: lines.append(f"- Configuration error: `{_sanitize_error_summary(configuration_error)}`") if probe_result: lines.append(f"- Probe backend: `{probe_result.get('backend', 'unknown')}`") lines.append(f"- MiniCPM load attempted: `{probe_result.get('minicpm_load_attempted', 'unknown')}`") lines.append(f"- MiniCPM load ok: `{probe_result.get('minicpm_load_ok', 'unknown')}`") errors = probe_result.get("errors") if isinstance(errors, list) and errors: probe_errors = [] for error in errors: if isinstance(error, dict): probe_errors.append(f"{error.get('stage', 'unknown')}={error.get('type', 'unknown')}") if probe_errors: lines.append(f"- Probe errors: {', '.join(probe_errors)}") if results: failures = [f"{result.key}: {result.error or 'failed'}" for result in results] lines.append(f"- Failed checks: {'; '.join(failures)}") lines.extend( [ "- Fallback used: mock object understanding plus mock text runtime if validation reaches generation.", "- Resolution: unresolved; keep the public Space mock-safe until this section reports a passing VLM validation.", "", ] ) return "\n".join(lines) def _replace_or_append_section(markdown: str, heading: str, section: str) -> str: start = markdown.find(heading) if start == -1: return markdown.rstrip() + "\n\n" + section next_start = markdown.find("\n## ", start + len(heading)) if next_start == -1: return markdown[:start].rstrip() + "\n\n" + section return markdown[:start].rstrip() + "\n\n" + section.rstrip() + "\n" + markdown[next_start:] def _assert_hf_auth(api: Any) -> None: try: user = api.whoami() except Exception as exc: raise RuntimeError( f"Hugging Face authentication check failed: {type(exc).__name__}: {exc}" ) from exc if not isinstance(user, dict) or not user.get("name"): raise RuntimeError("Hugging Face authentication did not return a user name.") def _config_lines(title: str, config: dict[str, str]) -> list[str]: lines = [f"- {title}:"] for key, value in config.items(): lines.append(f" - `{key}`: `{value}`") return lines def _parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--space-url", default=DEFAULT_SPACE_URL) parser.add_argument("--asset-dir", type=Path, default=DEFAULT_ASSET_DIR) parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT_PATH) parser.add_argument("--json-output", type=Path) parser.add_argument("--timeout-seconds", type=int, default=900) parser.add_argument("--configure-space", action="store_true") parser.add_argument("--rollback-to-mock", action="store_true") parser.add_argument("--hardware", default=DEFAULT_HARDWARE) parser.add_argument("--skip-validation", action="store_true") parser.add_argument("--trace-output-dir", type=Path) parser.add_argument("--failure-notes-output", type=Path, default=DEFAULT_FAILURE_NOTES_PATH) return parser.parse_args() def main() -> None: args = _parse_args() repo_id = parse_space_repo_id(args.space_url) configured = None rollback = None probe_result = None configuration_error = "" if args.configure_space: try: configured = configure_space_for_vlm( repo_id, hardware=args.hardware, wait=True, timeout_seconds=args.timeout_seconds, ) except Exception as exc: configuration_error = f"{type(exc).__name__}: {exc}" if args.rollback_to_mock: try: rollback = rollback_space_to_mock(repo_id) except Exception as rollback_exc: configuration_error = ( f"{configuration_error}; rollback failed with " f"{type(rollback_exc).__name__}: {rollback_exc}" ) results: list[ValidationResult] = [] if not args.skip_validation and not configuration_error: try: probe_result = run_vision_runtime_probe( space_url=args.space_url, timeout_seconds=args.timeout_seconds, ) except Exception as exc: probe_result = _safe_error_payload(exc, stage="vision_runtime_probe") try: results = run_space_validation( space_url=args.space_url, asset_dir=args.asset_dir, timeout_seconds=args.timeout_seconds, trace_output_dir=args.trace_output_dir, ) except Exception as exc: configuration_error = f"{type(exc).__name__}: {exc}" if args.rollback_to_mock and rollback is None: try: rollback = rollback_space_to_mock(repo_id) except Exception as rollback_exc: configuration_error = ( f"{configuration_error}; rollback failed with " f"{type(rollback_exc).__name__}: {rollback_exc}" ) if args.rollback_to_mock and rollback is None: rollback = rollback_space_to_mock(repo_id) report = render_report( space_url=args.space_url, repo_id=repo_id, results=results, probe_result=probe_result, configured=configured, rollback=rollback, configuration_error=configuration_error, ) write_report(report, args.output) if args.json_output: write_json_results(results, args.json_output, probe_result=probe_result) update_failure_notes( results=results, probe_result=probe_result, output_path=args.failure_notes_output, configuration_error=configuration_error, ) if configuration_error or (results and not all(result.passed for result in results)): raise SystemExit(1) print(f"wrote Space VLM report to {args.output}") if __name__ == "__main__": main()