| """Optional sponsor/provider judging adapters for the command center.""" |
|
|
| from __future__ import annotations |
|
|
| import base64 |
| import json |
| import os |
| import time |
| import urllib.error |
| import urllib.parse |
| import urllib.request |
| from dataclasses import asdict, dataclass |
| from pathlib import Path |
| from typing import Any |
|
|
|
|
| OPENBMB_REPO_ID = "openbmb/MiniCPM-V-4.6" |
| NEMOTRON_PARSE_REPO_ID = "nvidia/NVIDIA-Nemotron-Parse-v1.2" |
| NEMOTRON_NANO_REPO_ID = "nvidia/NVIDIA-Nemotron-3-Nano-4B-GGUF" |
| MAX_PROVIDER_IMAGE_BYTES = 10 * 1024 * 1024 |
| IMAGE_MIME_BY_SUFFIX = { |
| ".png": "image/png", |
| ".jpg": "image/jpeg", |
| ".jpeg": "image/jpeg", |
| ".webp": "image/webp", |
| } |
| SENSITIVE_PROVIDER_KEYS = ("token", "secret", "api_key", "authorization", "payload", "raw", "base64", "bytes") |
|
|
|
|
| @dataclass(frozen=True) |
| class ProviderJudgeResult: |
| status: str |
| provider_state: str |
| provider: str |
| repo_id: str |
| model: str |
| message: str |
| evidence: dict[str, Any] |
| latency_seconds: float | None = None |
|
|
| def to_dict(self) -> dict[str, Any]: |
| """ |
| Convert the dataclass instance to a dictionary. |
| |
| Returns: |
| dict[str, Any]: A dictionary representation of the dataclass fields. |
| """ |
| return asdict(self) |
|
|
|
|
| def _short_error(exc: BaseException) -> str: |
| """ |
| Format an exception as a compact single-line error string. |
| |
| Returns: |
| A string with the exception class name and message, truncated to approximately 360 characters. |
| """ |
| text = str(exc).replace("\n", " ").strip() |
| if len(text) > 360: |
| text = text[:357] + "..." |
| return f"{exc.__class__.__name__}: {text}" |
|
|
|
|
| def _image_data_url(path: str | None) -> str | None: |
| """ |
| Generates a data URL from a local image file. |
| |
| Returns: |
| A data URL string if the file exists and is readable, None otherwise. |
| """ |
| if not path: |
| return None |
| target = Path(path) |
| try: |
| if not target.exists() or not target.is_file(): |
| return None |
| suffix = target.suffix.lower() |
| mime = IMAGE_MIME_BY_SUFFIX.get(suffix) |
| if mime is None or target.stat().st_size > MAX_PROVIDER_IMAGE_BYTES: |
| return None |
| data = base64.b64encode(target.read_bytes()).decode("ascii") |
| return f"data:{mime};base64,{data}" |
| except OSError: |
| return None |
|
|
|
|
| def _is_loopback_host(hostname: str | None) -> bool: |
| return hostname in {"localhost", "127.0.0.1", "::1"} |
|
|
|
|
| def _safe_provider_payload(value: Any) -> Any: |
| if isinstance(value, dict): |
| redacted: dict[str, Any] = {} |
| for key, item in value.items(): |
| normalized = str(key).lower() |
| if any(marker in normalized for marker in SENSITIVE_PROVIDER_KEYS): |
| redacted[str(key)] = "[redacted]" |
| else: |
| redacted[str(key)] = _safe_provider_payload(item) |
| return redacted |
| if isinstance(value, list): |
| return [_safe_provider_payload(item) for item in value] |
| if isinstance(value, tuple): |
| return [_safe_provider_payload(item) for item in value] |
| return value |
|
|
|
|
| def _post_json(url: str, token: str, payload: dict[str, Any], timeout: float) -> dict[str, Any]: |
| """ |
| Send an authenticated JSON POST request and return the parsed response. |
| |
| Raises ValueError if the URL is not an HTTP or HTTPS URL with a host. |
| |
| Parameters: |
| token (str): Authorization credential passed through the request header |
| payload (dict[str, Any]): Data to send as JSON in the request body |
| timeout (float): Request timeout in seconds |
| |
| Returns: |
| dict[str, Any]: Parsed JSON response body |
| """ |
| parsed = urllib.parse.urlparse(url) |
| if parsed.scheme not in {"http", "https"} or not parsed.netloc: |
| raise ValueError(f"Invalid URL: expected http(s) URL with host, got {url!r}.") |
| if parsed.scheme == "http" and not _is_loopback_host(parsed.hostname): |
| raise ValueError("Provider URLs must use HTTPS unless targeting loopback development hosts.") |
| body = json.dumps(payload).encode("utf-8") |
| request = urllib.request.Request( |
| url, |
| data=body, |
| headers={ |
| "Authorization": f"Bearer {token}", |
| "Content-Type": "application/json", |
| }, |
| method="POST", |
| ) |
| with urllib.request.urlopen(request, timeout=timeout) as response: |
| return json.loads(response.read().decode("utf-8", errors="replace")) |
|
|
|
|
| def _extract_content(response: dict[str, Any]) -> str: |
| """ |
| Extract the message content from a chat-completions-style response. |
| |
| If the extracted content is a string, returns it as-is. If it is another type, serializes it to JSON. Returns an empty string if no choices are present in the response. |
| |
| Parameters: |
| response: A dictionary representing a chat-completions API response. |
| |
| Returns: |
| The extracted content as a string, or an empty string if unavailable. |
| """ |
| choices = response.get("choices") or [] |
| if not choices: |
| return "" |
| message = choices[0].get("message") or {} |
| content = message.get("content", "") |
| if isinstance(content, str): |
| return content |
| return json.dumps(content, ensure_ascii=True) |
|
|
|
|
| def _safe_json_from_text(text: str) -> dict[str, Any]: |
| """ |
| Extract and parse a JSON object from text, falling back to raw text summary. |
| |
| Attempts to parse JSON from the input, extracting the outermost braces-delimited substring if necessary. If parsing fails or the result is not a dictionary, returns a dictionary with the input text (first 1200 characters) under the "raw_summary" key. Returns an empty dictionary if the input is empty after whitespace stripping. |
| |
| Returns: |
| dict[str, Any]: Parsed JSON object if valid, or a dictionary with "raw_summary" key and text if parsing fails or result is not a dict. |
| """ |
| stripped = text.strip() |
| if not stripped: |
| return {} |
| start = stripped.find("{") |
| end = stripped.rfind("}") |
| if start >= 0 and end > start: |
| stripped = stripped[start : end + 1] |
| try: |
| parsed = json.loads(stripped) |
| except json.JSONDecodeError: |
| return {"raw_summary": text[:1200]} |
| return parsed if isinstance(parsed, dict) else {"raw_summary": parsed} |
|
|
|
|
| def judge_with_minicpm( |
| *, |
| prompt: str, |
| image_path: str | None, |
| scan: dict[str, Any], |
| wardrobe_summary: str, |
| timeout: float = 45.0, |
| ) -> ProviderJudgeResult: |
| """ |
| Evaluate a generated image against visual and wardrobe requirements using MiniCPM-V. |
| |
| Configuration is read from environment variables: MINICPM_BASE_URL, MINICPM_API_KEY |
| (or OPENBMB_API_KEY), and optionally MINICPM_MODEL. |
| |
| Parameters: |
| prompt (str): Brief describing the visual and style requirements. |
| image_path (str | None): Path to the generated image file. |
| scan (dict[str, Any]): Metadata including 'export_gate' status. |
| wardrobe_summary (str): Wardrobe context and constraints. |
| timeout (float): Request timeout in seconds. Default is 45.0. |
| |
| Returns: |
| ProviderJudgeResult: A result object with judgment status (success, failed, |
| missing_secret, no_artifact), provider metadata, evidence dict, and request latency. |
| """ |
| base_url = os.environ.get("MINICPM_BASE_URL", "").rstrip("/") |
| token = os.environ.get("MINICPM_API_KEY") or os.environ.get("OPENBMB_API_KEY") |
| model = os.environ.get("MINICPM_MODEL", "MiniCPM-V-4.6") |
| if not base_url or not token: |
| return ProviderJudgeResult( |
| status="missing_secret", |
| provider_state="missing secret", |
| provider="OpenBMB", |
| repo_id=OPENBMB_REPO_ID, |
| model=model, |
| message="MiniCPM-V judge is not configured. Add MINICPM_BASE_URL and MINICPM_API_KEY as Space secrets.", |
| evidence={"configured": False, "scan_gate": scan.get("export_gate", "pending")}, |
| ) |
|
|
| image_url = _image_data_url(image_path) |
| if not image_url: |
| return ProviderJudgeResult( |
| status="no_artifact", |
| provider_state="blocked", |
| provider="OpenBMB", |
| repo_id=OPENBMB_REPO_ID, |
| model=model, |
| message="MiniCPM-V judge skipped because no generated image artifact is available.", |
| evidence={"configured": True, "scan_gate": scan.get("export_gate", "pending")}, |
| ) |
|
|
| instruction = ( |
| "Return strict JSON only with keys: wardrobe_compliance, footwear_check, " |
| "material_drift, gothic_couture_match, lore_continuity, export_safety_notes, " |
| "overall_status. Be concise and judge the visible generated image against the brief." |
| ) |
| payload = { |
| "model": model, |
| "messages": [ |
| { |
| "role": "user", |
| "content": [ |
| {"type": "text", "text": f"{instruction}\nBrief: {prompt}\nWardrobe: {wardrobe_summary}"}, |
| {"type": "image_url", "image_url": {"url": image_url}}, |
| ], |
| } |
| ], |
| "temperature": 0.1, |
| } |
| started = time.perf_counter() |
| try: |
| response = _post_json(f"{base_url}/v1/chat/completions", token, payload, timeout) |
| content = _extract_content(response) |
| evidence = _safe_json_from_text(content) |
| return ProviderJudgeResult( |
| status="success", |
| provider_state="configured", |
| provider="OpenBMB", |
| repo_id=OPENBMB_REPO_ID, |
| model=model, |
| message="MiniCPM-V returned visual judge evidence.", |
| evidence=evidence or {"raw_summary": content[:1200]}, |
| latency_seconds=round(time.perf_counter() - started, 2), |
| ) |
| except Exception as exc: |
| return ProviderJudgeResult( |
| status="failed", |
| provider_state="failed", |
| provider="OpenBMB", |
| repo_id=OPENBMB_REPO_ID, |
| model=model, |
| message=f"MiniCPM-V judge call failed. {_short_error(exc)}", |
| evidence={"configured": True, "error": _short_error(exc)}, |
| latency_seconds=round(time.perf_counter() - started, 2), |
| ) |
|
|
|
|
| def judge_with_nemotron( |
| *, |
| prompt: str, |
| run_packet: dict[str, Any], |
| minicpm_result: dict[str, Any] | None = None, |
| timeout: float = 45.0, |
| ) -> ProviderJudgeResult: |
| """ |
| Call Nemotron to generate structured evidence for a visual creation run. |
| |
| Returns a standardized ProviderJudgeResult with status "success" (including parsed |
| evidence and measured latency), "missing_secret" if credentials are not configured, |
| or "failed" if the API call encounters an error. |
| |
| Parameters: |
| minicpm_result (dict[str, Any] | None): Optional prior judgment output from MiniCPM. |
| timeout (float): Request timeout in seconds (default: 45.0). |
| |
| Returns: |
| ProviderJudgeResult: Result with status, evidence, and measured latency. |
| """ |
| base_url = os.environ.get("NEMOTRON_BASE_URL", "").rstrip("/") |
| token = os.environ.get("NEMOTRON_API_KEY") or os.environ.get("NVIDIA_API_KEY") |
| model = os.environ.get("NEMOTRON_MODEL", "nvidia/NVIDIA-Nemotron-Parse-v1.2") |
| repo_id = NEMOTRON_PARSE_REPO_ID if "Parse" in model or "parse" in model else NEMOTRON_NANO_REPO_ID |
| if not base_url or not token: |
| return ProviderJudgeResult( |
| status="missing_secret", |
| provider_state="missing secret", |
| provider="NVIDIA", |
| repo_id=repo_id, |
| model=model, |
| message="Nemotron evidence lane is not configured. Add NEMOTRON_BASE_URL and NEMOTRON_API_KEY/NVIDIA_API_KEY.", |
| evidence={"configured": False, "repo_id": repo_id}, |
| ) |
|
|
| instruction = ( |
| "Return strict JSON only with keys: sponsor_model_used, structured_parse, " |
| "risk_notes, parameter_budget_notes, final_claim_status. Parse this visual creation run." |
| ) |
| safe_run_packet = _safe_provider_payload(run_packet) |
| safe_minicpm_result = _safe_provider_payload(minicpm_result or {}) |
| payload = { |
| "model": model, |
| "messages": [ |
| { |
| "role": "user", |
| "content": f"{instruction}\nPrompt: {prompt}\nRun: {json.dumps(safe_run_packet, ensure_ascii=True)[:6000]}\nMiniCPM: {json.dumps(safe_minicpm_result, ensure_ascii=True)[:2500]}", |
| } |
| ], |
| "temperature": 0.1, |
| } |
| started = time.perf_counter() |
| try: |
| response = _post_json(f"{base_url}/v1/chat/completions", token, payload, timeout) |
| content = _extract_content(response) |
| evidence = _safe_json_from_text(content) |
| return ProviderJudgeResult( |
| status="success", |
| provider_state="configured", |
| provider="NVIDIA", |
| repo_id=repo_id, |
| model=model, |
| message="Nemotron returned structured sponsor evidence.", |
| evidence=evidence or {"raw_summary": content[:1200]}, |
| latency_seconds=round(time.perf_counter() - started, 2), |
| ) |
| except Exception as exc: |
| return ProviderJudgeResult( |
| status="failed", |
| provider_state="failed", |
| provider="NVIDIA", |
| repo_id=repo_id, |
| model=model, |
| message=f"Nemotron evidence call failed. {_short_error(exc)}", |
| evidence={"configured": True, "error": _short_error(exc)}, |
| latency_seconds=round(time.perf_counter() - started, 2), |
| ) |
|
|