specimba's picture
Deploy v4 review hardening
621cf5d verified
Raw
History Blame Contribute Delete
13.6 kB
"""Optional sponsor/provider judging adapters for the command center."""
from __future__ import annotations
import base64
import json
import os
import time
import urllib.error
import urllib.parse
import urllib.request
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any
OPENBMB_REPO_ID = "openbmb/MiniCPM-V-4.6"
NEMOTRON_PARSE_REPO_ID = "nvidia/NVIDIA-Nemotron-Parse-v1.2"
NEMOTRON_NANO_REPO_ID = "nvidia/NVIDIA-Nemotron-3-Nano-4B-GGUF"
MAX_PROVIDER_IMAGE_BYTES = 10 * 1024 * 1024
IMAGE_MIME_BY_SUFFIX = {
".png": "image/png",
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".webp": "image/webp",
}
SENSITIVE_PROVIDER_KEYS = ("token", "secret", "api_key", "authorization", "payload", "raw", "base64", "bytes")
@dataclass(frozen=True)
class ProviderJudgeResult:
status: str
provider_state: str
provider: str
repo_id: str
model: str
message: str
evidence: dict[str, Any]
latency_seconds: float | None = None
def to_dict(self) -> dict[str, Any]:
"""
Convert the dataclass instance to a dictionary.
Returns:
dict[str, Any]: A dictionary representation of the dataclass fields.
"""
return asdict(self)
def _short_error(exc: BaseException) -> str:
"""
Format an exception as a compact single-line error string.
Returns:
A string with the exception class name and message, truncated to approximately 360 characters.
"""
text = str(exc).replace("\n", " ").strip()
if len(text) > 360:
text = text[:357] + "..."
return f"{exc.__class__.__name__}: {text}"
def _image_data_url(path: str | None) -> str | None:
"""
Generates a data URL from a local image file.
Returns:
A data URL string if the file exists and is readable, None otherwise.
"""
if not path:
return None
target = Path(path)
try:
if not target.exists() or not target.is_file():
return None
suffix = target.suffix.lower()
mime = IMAGE_MIME_BY_SUFFIX.get(suffix)
if mime is None or target.stat().st_size > MAX_PROVIDER_IMAGE_BYTES:
return None
data = base64.b64encode(target.read_bytes()).decode("ascii")
return f"data:{mime};base64,{data}"
except OSError:
return None
def _is_loopback_host(hostname: str | None) -> bool:
return hostname in {"localhost", "127.0.0.1", "::1"}
def _safe_provider_payload(value: Any) -> Any:
if isinstance(value, dict):
redacted: dict[str, Any] = {}
for key, item in value.items():
normalized = str(key).lower()
if any(marker in normalized for marker in SENSITIVE_PROVIDER_KEYS):
redacted[str(key)] = "[redacted]"
else:
redacted[str(key)] = _safe_provider_payload(item)
return redacted
if isinstance(value, list):
return [_safe_provider_payload(item) for item in value]
if isinstance(value, tuple):
return [_safe_provider_payload(item) for item in value]
return value
def _post_json(url: str, token: str, payload: dict[str, Any], timeout: float) -> dict[str, Any]:
"""
Send an authenticated JSON POST request and return the parsed response.
Raises ValueError if the URL is not an HTTP or HTTPS URL with a host.
Parameters:
token (str): Authorization credential passed through the request header
payload (dict[str, Any]): Data to send as JSON in the request body
timeout (float): Request timeout in seconds
Returns:
dict[str, Any]: Parsed JSON response body
"""
parsed = urllib.parse.urlparse(url)
if parsed.scheme not in {"http", "https"} or not parsed.netloc:
raise ValueError(f"Invalid URL: expected http(s) URL with host, got {url!r}.")
if parsed.scheme == "http" and not _is_loopback_host(parsed.hostname):
raise ValueError("Provider URLs must use HTTPS unless targeting loopback development hosts.")
body = json.dumps(payload).encode("utf-8")
request = urllib.request.Request(
url,
data=body,
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
},
method="POST",
)
with urllib.request.urlopen(request, timeout=timeout) as response: # noqa: S310 - URL comes from Space secret/config.
return json.loads(response.read().decode("utf-8", errors="replace"))
def _extract_content(response: dict[str, Any]) -> str:
"""
Extract the message content from a chat-completions-style response.
If the extracted content is a string, returns it as-is. If it is another type, serializes it to JSON. Returns an empty string if no choices are present in the response.
Parameters:
response: A dictionary representing a chat-completions API response.
Returns:
The extracted content as a string, or an empty string if unavailable.
"""
choices = response.get("choices") or []
if not choices:
return ""
message = choices[0].get("message") or {}
content = message.get("content", "")
if isinstance(content, str):
return content
return json.dumps(content, ensure_ascii=True)
def _safe_json_from_text(text: str) -> dict[str, Any]:
"""
Extract and parse a JSON object from text, falling back to raw text summary.
Attempts to parse JSON from the input, extracting the outermost braces-delimited substring if necessary. If parsing fails or the result is not a dictionary, returns a dictionary with the input text (first 1200 characters) under the "raw_summary" key. Returns an empty dictionary if the input is empty after whitespace stripping.
Returns:
dict[str, Any]: Parsed JSON object if valid, or a dictionary with "raw_summary" key and text if parsing fails or result is not a dict.
"""
stripped = text.strip()
if not stripped:
return {}
start = stripped.find("{")
end = stripped.rfind("}")
if start >= 0 and end > start:
stripped = stripped[start : end + 1]
try:
parsed = json.loads(stripped)
except json.JSONDecodeError:
return {"raw_summary": text[:1200]}
return parsed if isinstance(parsed, dict) else {"raw_summary": parsed}
def judge_with_minicpm(
*,
prompt: str,
image_path: str | None,
scan: dict[str, Any],
wardrobe_summary: str,
timeout: float = 45.0,
) -> ProviderJudgeResult:
"""
Evaluate a generated image against visual and wardrobe requirements using MiniCPM-V.
Configuration is read from environment variables: MINICPM_BASE_URL, MINICPM_API_KEY
(or OPENBMB_API_KEY), and optionally MINICPM_MODEL.
Parameters:
prompt (str): Brief describing the visual and style requirements.
image_path (str | None): Path to the generated image file.
scan (dict[str, Any]): Metadata including 'export_gate' status.
wardrobe_summary (str): Wardrobe context and constraints.
timeout (float): Request timeout in seconds. Default is 45.0.
Returns:
ProviderJudgeResult: A result object with judgment status (success, failed,
missing_secret, no_artifact), provider metadata, evidence dict, and request latency.
"""
base_url = os.environ.get("MINICPM_BASE_URL", "").rstrip("/")
token = os.environ.get("MINICPM_API_KEY") or os.environ.get("OPENBMB_API_KEY")
model = os.environ.get("MINICPM_MODEL", "MiniCPM-V-4.6")
if not base_url or not token:
return ProviderJudgeResult(
status="missing_secret",
provider_state="missing secret",
provider="OpenBMB",
repo_id=OPENBMB_REPO_ID,
model=model,
message="MiniCPM-V judge is not configured. Add MINICPM_BASE_URL and MINICPM_API_KEY as Space secrets.",
evidence={"configured": False, "scan_gate": scan.get("export_gate", "pending")},
)
image_url = _image_data_url(image_path)
if not image_url:
return ProviderJudgeResult(
status="no_artifact",
provider_state="blocked",
provider="OpenBMB",
repo_id=OPENBMB_REPO_ID,
model=model,
message="MiniCPM-V judge skipped because no generated image artifact is available.",
evidence={"configured": True, "scan_gate": scan.get("export_gate", "pending")},
)
instruction = (
"Return strict JSON only with keys: wardrobe_compliance, footwear_check, "
"material_drift, gothic_couture_match, lore_continuity, export_safety_notes, "
"overall_status. Be concise and judge the visible generated image against the brief."
)
payload = {
"model": model,
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": f"{instruction}\nBrief: {prompt}\nWardrobe: {wardrobe_summary}"},
{"type": "image_url", "image_url": {"url": image_url}},
],
}
],
"temperature": 0.1,
}
started = time.perf_counter()
try:
response = _post_json(f"{base_url}/v1/chat/completions", token, payload, timeout)
content = _extract_content(response)
evidence = _safe_json_from_text(content)
return ProviderJudgeResult(
status="success",
provider_state="configured",
provider="OpenBMB",
repo_id=OPENBMB_REPO_ID,
model=model,
message="MiniCPM-V returned visual judge evidence.",
evidence=evidence or {"raw_summary": content[:1200]},
latency_seconds=round(time.perf_counter() - started, 2),
)
except Exception as exc:
return ProviderJudgeResult(
status="failed",
provider_state="failed",
provider="OpenBMB",
repo_id=OPENBMB_REPO_ID,
model=model,
message=f"MiniCPM-V judge call failed. {_short_error(exc)}",
evidence={"configured": True, "error": _short_error(exc)},
latency_seconds=round(time.perf_counter() - started, 2),
)
def judge_with_nemotron(
*,
prompt: str,
run_packet: dict[str, Any],
minicpm_result: dict[str, Any] | None = None,
timeout: float = 45.0,
) -> ProviderJudgeResult:
"""
Call Nemotron to generate structured evidence for a visual creation run.
Returns a standardized ProviderJudgeResult with status "success" (including parsed
evidence and measured latency), "missing_secret" if credentials are not configured,
or "failed" if the API call encounters an error.
Parameters:
minicpm_result (dict[str, Any] | None): Optional prior judgment output from MiniCPM.
timeout (float): Request timeout in seconds (default: 45.0).
Returns:
ProviderJudgeResult: Result with status, evidence, and measured latency.
"""
base_url = os.environ.get("NEMOTRON_BASE_URL", "").rstrip("/")
token = os.environ.get("NEMOTRON_API_KEY") or os.environ.get("NVIDIA_API_KEY")
model = os.environ.get("NEMOTRON_MODEL", "nvidia/NVIDIA-Nemotron-Parse-v1.2")
repo_id = NEMOTRON_PARSE_REPO_ID if "Parse" in model or "parse" in model else NEMOTRON_NANO_REPO_ID
if not base_url or not token:
return ProviderJudgeResult(
status="missing_secret",
provider_state="missing secret",
provider="NVIDIA",
repo_id=repo_id,
model=model,
message="Nemotron evidence lane is not configured. Add NEMOTRON_BASE_URL and NEMOTRON_API_KEY/NVIDIA_API_KEY.",
evidence={"configured": False, "repo_id": repo_id},
)
instruction = (
"Return strict JSON only with keys: sponsor_model_used, structured_parse, "
"risk_notes, parameter_budget_notes, final_claim_status. Parse this visual creation run."
)
safe_run_packet = _safe_provider_payload(run_packet)
safe_minicpm_result = _safe_provider_payload(minicpm_result or {})
payload = {
"model": model,
"messages": [
{
"role": "user",
"content": f"{instruction}\nPrompt: {prompt}\nRun: {json.dumps(safe_run_packet, ensure_ascii=True)[:6000]}\nMiniCPM: {json.dumps(safe_minicpm_result, ensure_ascii=True)[:2500]}",
}
],
"temperature": 0.1,
}
started = time.perf_counter()
try:
response = _post_json(f"{base_url}/v1/chat/completions", token, payload, timeout)
content = _extract_content(response)
evidence = _safe_json_from_text(content)
return ProviderJudgeResult(
status="success",
provider_state="configured",
provider="NVIDIA",
repo_id=repo_id,
model=model,
message="Nemotron returned structured sponsor evidence.",
evidence=evidence or {"raw_summary": content[:1200]},
latency_seconds=round(time.perf_counter() - started, 2),
)
except Exception as exc:
return ProviderJudgeResult(
status="failed",
provider_state="failed",
provider="NVIDIA",
repo_id=repo_id,
model=model,
message=f"Nemotron evidence call failed. {_short_error(exc)}",
evidence={"configured": True, "error": _short_error(exc)},
latency_seconds=round(time.perf_counter() - started, 2),
)