from __future__ import annotations import base64 import io import ipaddress import json import socket from urllib.parse import urlparse import httpx from PIL import Image from app.config import Settings from app.lenses.base import canonical_hash from app.lenses.registry import build_lenses from app.providers.llm.openai_compatible import FakeLlmProvider from app.providers.llm.zai_glm52 import ZaiGlm52Provider from app.schemas.openai import ResponsesRequest from app.storage.models import ConversationRecord, FileRecord, VisualAssetRecord, VisualViewRecord from app.storage.object_store import ObjectStore from app.storage.repositories import JsonLedger from .evidence_compiler import compile_evidence from .intent_router import route_intent from .reference_resolver import property_view_for_phrase, resolve_asset from .visual_compiler import compile_visual_plan class Runtime: def __init__(self, settings: Settings): self.settings = settings settings.data_dir.mkdir(parents=True, exist_ok=True) self.store = ObjectStore(settings.data_dir / "objects") self.ledger = JsonLedger(settings.data_dir / "ledger.json") self.lenses = build_lenses(settings) self.llm = FakeLlmProvider() if settings.visual_runtime_mode == "test" else ZaiGlm52Provider(settings) async def create_file(self, filename: str, data: bytes, mime_type: str) -> FileRecord: self._validate_image(data, mime_type) uri, sha256 = self.store.put_bytes(data, suffix=self._suffix_for_mime(mime_type)) return self.ledger.create_file(filename, mime_type, len(data), uri, sha256) def get_file(self, file_id: str) -> FileRecord | None: return self.ledger.get_file(file_id) def get_file_bytes(self, file_id: str) -> tuple[FileRecord, bytes]: record = self.ledger.get_file(file_id) if not record: raise KeyError(file_id) return record, self.store.get_bytes(record.uri) async def handle_responses(self, request: ResponsesRequest) -> dict: conversation = self._conversation_for_request(request) text, image_refs = self._extract_text_and_images(request.input) current_assets: list[VisualAssetRecord] = [] for image_ref in image_refs: asset = await self._ingest_image_ref(conversation, image_ref) current_assets.append(asset) await self._get_view(asset, "fingerprint", {}) all_assets = self.ledger.list_assets(conversation.id) intent, routed_views = route_intent(text) selected_asset = resolve_asset(text, conversation, all_assets, current_assets) preferred_view = property_view_for_phrase(text, routed_views) if preferred_view and preferred_view not in routed_views: routed_views.insert(0, preferred_view) plan = compile_visual_plan(intent, text, selected_asset, routed_views) executed_views: list[VisualViewRecord] = [] for op in plan.operations: if op.op == "get_view" and op.asset_id and op.view: asset = self.ledger.get_asset(op.asset_id) if asset: executed_views.append(await self._get_view(asset, op.view, op.parameters)) evidence = compile_evidence(text, executed_views) output_text, artifacts = await self._answer(intent, text, evidence, executed_views) if selected_asset: conversation.active_asset_id = selected_asset.id conversation.last_referenced_asset_id = selected_asset.id if preferred_view: conversation.active_view_type = preferred_view conversation.last_referenced_property = preferred_view self.ledger.update_conversation(conversation) response_json = { "model": self.settings.public_model_id, "output_text": output_text, "plan": plan.model_dump(), "evidence": evidence, "artifacts": artifacts, } response_record = self.ledger.create_response( conversation.id, request.previous_response_id, request.model_dump(mode="json"), response_json, ) return { "id": response_record.id, "object": "response", "created_at": response_record.created_at, "status": "completed", "model": self.settings.public_model_id, "conversation_id": conversation.id, "output": [ { "id": f"msg_{response_record.id}", "type": "message", "role": "assistant", "content": [{"type": "output_text", "text": output_text}], } ], "output_text": output_text, "artifacts": artifacts, } def _conversation_for_request(self, request: ResponsesRequest) -> ConversationRecord: if request.previous_response_id: previous = self.ledger.get_response(request.previous_response_id) if previous: conversation = self.ledger.get_conversation(previous.conversation_id) if conversation: return conversation return self.ledger.create_conversation() def _extract_text_and_images(self, input_value) -> tuple[str, list[dict]]: if isinstance(input_value, str): return input_value, [] texts: list[str] = [] images: list[dict] = [] for message in input_value: content = message.get("content", []) if isinstance(message, dict) else message.content if isinstance(content, str): texts.append(content) continue for part in content: if part.get("type") in {"input_text", "text"}: texts.append(part.get("text", "")) if part.get("type") == "input_image": images.append(part) if part.get("type") == "image_url": image_url = part.get("image_url", {}) images.append({"type": "input_image", "image_url": image_url.get("url") if isinstance(image_url, dict) else image_url}) return "\n".join(t for t in texts if t), images async def _ingest_image_ref(self, conversation: ConversationRecord, image_ref: dict) -> VisualAssetRecord: source_file_id = image_ref.get("file_id") if source_file_id: file_record, data = self.get_file_bytes(source_file_id) mime_type = file_record.mime_type uri = file_record.uri sha256 = file_record.sha256 else: image_url = image_ref.get("image_url") data, mime_type = await self._load_image_url(image_url) uri, sha256 = self.store.put_bytes(data, suffix=self._suffix_for_mime(mime_type)) source_file_id = self.ledger.create_file("snapshotted-url-image", mime_type, len(data), uri, sha256).id width, height = self._image_dimensions(data) return self.ledger.create_asset(conversation.id, source_file_id, sha256, uri, mime_type, width, height) async def _get_view(self, asset: VisualAssetRecord, view_type: str, parameters: dict) -> VisualViewRecord: lens = self.lenses[view_type] parameters_hash = canonical_hash(parameters) cached = self.ledger.find_view(asset.sha256, lens.name, lens.version, parameters_hash) if cached: return cached image_bytes = self.store.get_bytes(asset.original_uri) view = await lens.run(asset, image_bytes, parameters) return self.ledger.save_view(view) async def _answer(self, intent: str, text: str, evidence: dict, views: list[VisualViewRecord]) -> tuple[str, list[dict]]: artifacts: list[dict] = [] if intent == "answer_ocr": for view in views: if view.view_type == "ocr" and view.payload_json.get("lines"): line = view.payload_json["lines"][0] return f"{line['text']} (source bbox: {line['bbox']}, confidence: {line['confidence']})", artifacts if intent == "query_chart": for view in views: if view.view_type == "chart_ir": points = view.payload_json.get("series", [{}])[0].get("points", []) match = next((p for p in points if str(p[0]).lower() == "q3"), None) if match: return f"Q3 value is {match[1]} (from ChartIR, confidence: {view.confidence}).", artifacts if intent == "create_presentation": artifacts.append({"type": "pptx", "status": "planned", "note": "PptxGenJS renderer service contract is available at /render"}) return "I prepared a presentation plan using the resolved visual theme. The PPTX renderer service can turn this into an editable deck.", artifacts content = await self.llm.complete( [ {"role": "developer", "content": "You are GLM-5.2 reasoning over compact visual evidence from the Visual Variable Runtime."}, {"role": "user", "content": json.dumps(evidence, ensure_ascii=False)}, ] ) return content, artifacts async def _load_image_url(self, image_url: str | None) -> tuple[bytes, str]: if not image_url: raise ValueError("input_image requires image_url or file_id") if image_url.startswith("data:"): header, encoded = image_url.split(",", 1) mime_type = header.split(";")[0].removeprefix("data:") data = base64.b64decode(encoded) self._validate_image(data, mime_type) return data, mime_type self._validate_public_url(image_url) async with httpx.AsyncClient(follow_redirects=False, timeout=20) as client: response = await client.get(image_url) for _ in range(3): if response.status_code not in {301, 302, 303, 307, 308}: break location = response.headers.get("location") self._validate_public_url(location) response = await client.get(location) response.raise_for_status() mime_type = response.headers.get("content-type", "application/octet-stream").split(";")[0] data = response.content self._validate_image(data, mime_type) return data, mime_type def _validate_public_url(self, url: str | None) -> None: if not url: raise ValueError("empty URL") parsed = urlparse(url) if parsed.scheme not in {"http", "https"} or not parsed.hostname: raise ValueError("unsupported image URL") host = parsed.hostname.lower() if host in {"localhost", "127.0.0.1", "::1"} or host.endswith(".local"): raise ValueError("private image URL rejected") for result in socket.getaddrinfo(host, None): ip = ipaddress.ip_address(result[4][0]) if ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_reserved: raise ValueError("private image URL rejected") def _validate_image(self, data: bytes, mime_type: str) -> None: if len(data) > self.settings.max_image_bytes: raise ValueError("image exceeds max size") if mime_type not in {"image/png", "image/jpeg", "image/webp"}: raise ValueError("unsupported image MIME type") width, height = self._image_dimensions(data) if width * height > self.settings.max_image_pixels: raise ValueError("image dimensions exceed limit") def _image_dimensions(self, data: bytes) -> tuple[int, int]: image = Image.open(io.BytesIO(data)) image.verify() image = Image.open(io.BytesIO(data)) return image.size def _suffix_for_mime(self, mime_type: str) -> str: return {"image/png": ".png", "image/jpeg": ".jpg", "image/webp": ".webp"}.get(mime_type, "")