| from __future__ import annotations |
|
|
| import base64 |
| import io |
| import ipaddress |
| import json |
| import socket |
| from urllib.parse import urlparse |
|
|
| import httpx |
| from PIL import Image |
|
|
| from app.config import Settings |
| from app.lenses.base import canonical_hash |
| from app.lenses.registry import build_lenses |
| from app.providers.llm.openai_compatible import FakeLlmProvider |
| from app.providers.llm.zai_glm52 import ZaiGlm52Provider |
| from app.schemas.openai import ResponsesRequest |
| from app.storage.models import ConversationRecord, FileRecord, VisualAssetRecord, VisualViewRecord |
| from app.storage.object_store import ObjectStore |
| from app.storage.repositories import JsonLedger |
|
|
| from .evidence_compiler import compile_evidence |
| from .intent_router import route_intent |
| from .reference_resolver import property_view_for_phrase, resolve_asset |
| from .visual_compiler import compile_visual_plan |
|
|
|
|
| class Runtime: |
| def __init__(self, settings: Settings): |
| self.settings = settings |
| settings.data_dir.mkdir(parents=True, exist_ok=True) |
| self.store = ObjectStore(settings.data_dir / "objects") |
| self.ledger = JsonLedger(settings.data_dir / "ledger.json") |
| self.lenses = build_lenses(settings) |
| self.llm = FakeLlmProvider() if settings.visual_runtime_mode == "test" else ZaiGlm52Provider(settings) |
|
|
| async def create_file(self, filename: str, data: bytes, mime_type: str) -> FileRecord: |
| self._validate_image(data, mime_type) |
| uri, sha256 = self.store.put_bytes(data, suffix=self._suffix_for_mime(mime_type)) |
| return self.ledger.create_file(filename, mime_type, len(data), uri, sha256) |
|
|
| def get_file(self, file_id: str) -> FileRecord | None: |
| return self.ledger.get_file(file_id) |
|
|
| def get_file_bytes(self, file_id: str) -> tuple[FileRecord, bytes]: |
| record = self.ledger.get_file(file_id) |
| if not record: |
| raise KeyError(file_id) |
| return record, self.store.get_bytes(record.uri) |
|
|
| async def handle_responses(self, request: ResponsesRequest) -> dict: |
| conversation = self._conversation_for_request(request) |
| text, image_refs = self._extract_text_and_images(request.input) |
| current_assets: list[VisualAssetRecord] = [] |
| for image_ref in image_refs: |
| asset = await self._ingest_image_ref(conversation, image_ref) |
| current_assets.append(asset) |
| await self._get_view(asset, "fingerprint", {}) |
|
|
| all_assets = self.ledger.list_assets(conversation.id) |
| intent, routed_views = route_intent(text) |
| selected_asset = resolve_asset(text, conversation, all_assets, current_assets) |
| preferred_view = property_view_for_phrase(text, routed_views) |
| if preferred_view and preferred_view not in routed_views: |
| routed_views.insert(0, preferred_view) |
| plan = compile_visual_plan(intent, text, selected_asset, routed_views) |
|
|
| executed_views: list[VisualViewRecord] = [] |
| for op in plan.operations: |
| if op.op == "get_view" and op.asset_id and op.view: |
| asset = self.ledger.get_asset(op.asset_id) |
| if asset: |
| executed_views.append(await self._get_view(asset, op.view, op.parameters)) |
|
|
| evidence = compile_evidence(text, executed_views) |
| output_text, artifacts = await self._answer(intent, text, evidence, executed_views) |
|
|
| if selected_asset: |
| conversation.active_asset_id = selected_asset.id |
| conversation.last_referenced_asset_id = selected_asset.id |
| if preferred_view: |
| conversation.active_view_type = preferred_view |
| conversation.last_referenced_property = preferred_view |
| self.ledger.update_conversation(conversation) |
|
|
| response_json = { |
| "model": self.settings.public_model_id, |
| "output_text": output_text, |
| "plan": plan.model_dump(), |
| "evidence": evidence, |
| "artifacts": artifacts, |
| } |
| response_record = self.ledger.create_response( |
| conversation.id, |
| request.previous_response_id, |
| request.model_dump(mode="json"), |
| response_json, |
| ) |
| return { |
| "id": response_record.id, |
| "object": "response", |
| "created_at": response_record.created_at, |
| "status": "completed", |
| "model": self.settings.public_model_id, |
| "conversation_id": conversation.id, |
| "output": [ |
| { |
| "id": f"msg_{response_record.id}", |
| "type": "message", |
| "role": "assistant", |
| "content": [{"type": "output_text", "text": output_text}], |
| } |
| ], |
| "output_text": output_text, |
| "artifacts": artifacts, |
| } |
|
|
| def _conversation_for_request(self, request: ResponsesRequest) -> ConversationRecord: |
| if request.previous_response_id: |
| previous = self.ledger.get_response(request.previous_response_id) |
| if previous: |
| conversation = self.ledger.get_conversation(previous.conversation_id) |
| if conversation: |
| return conversation |
| return self.ledger.create_conversation() |
|
|
| def _extract_text_and_images(self, input_value) -> tuple[str, list[dict]]: |
| if isinstance(input_value, str): |
| return input_value, [] |
| texts: list[str] = [] |
| images: list[dict] = [] |
| for message in input_value: |
| content = message.get("content", []) if isinstance(message, dict) else message.content |
| if isinstance(content, str): |
| texts.append(content) |
| continue |
| for part in content: |
| if part.get("type") in {"input_text", "text"}: |
| texts.append(part.get("text", "")) |
| if part.get("type") == "input_image": |
| images.append(part) |
| if part.get("type") == "image_url": |
| image_url = part.get("image_url", {}) |
| images.append({"type": "input_image", "image_url": image_url.get("url") if isinstance(image_url, dict) else image_url}) |
| return "\n".join(t for t in texts if t), images |
|
|
| async def _ingest_image_ref(self, conversation: ConversationRecord, image_ref: dict) -> VisualAssetRecord: |
| source_file_id = image_ref.get("file_id") |
| if source_file_id: |
| file_record, data = self.get_file_bytes(source_file_id) |
| mime_type = file_record.mime_type |
| uri = file_record.uri |
| sha256 = file_record.sha256 |
| else: |
| image_url = image_ref.get("image_url") |
| data, mime_type = await self._load_image_url(image_url) |
| uri, sha256 = self.store.put_bytes(data, suffix=self._suffix_for_mime(mime_type)) |
| source_file_id = self.ledger.create_file("snapshotted-url-image", mime_type, len(data), uri, sha256).id |
| width, height = self._image_dimensions(data) |
| return self.ledger.create_asset(conversation.id, source_file_id, sha256, uri, mime_type, width, height) |
|
|
| async def _get_view(self, asset: VisualAssetRecord, view_type: str, parameters: dict) -> VisualViewRecord: |
| lens = self.lenses[view_type] |
| parameters_hash = canonical_hash(parameters) |
| cached = self.ledger.find_view(asset.sha256, lens.name, lens.version, parameters_hash) |
| if cached: |
| return cached |
| image_bytes = self.store.get_bytes(asset.original_uri) |
| view = await lens.run(asset, image_bytes, parameters) |
| return self.ledger.save_view(view) |
|
|
| async def _answer(self, intent: str, text: str, evidence: dict, views: list[VisualViewRecord]) -> tuple[str, list[dict]]: |
| artifacts: list[dict] = [] |
| if intent == "answer_ocr": |
| for view in views: |
| if view.view_type == "ocr" and view.payload_json.get("lines"): |
| line = view.payload_json["lines"][0] |
| return f"{line['text']} (source bbox: {line['bbox']}, confidence: {line['confidence']})", artifacts |
| if intent == "query_chart": |
| for view in views: |
| if view.view_type == "chart_ir": |
| points = view.payload_json.get("series", [{}])[0].get("points", []) |
| match = next((p for p in points if str(p[0]).lower() == "q3"), None) |
| if match: |
| return f"Q3 value is {match[1]} (from ChartIR, confidence: {view.confidence}).", artifacts |
| if intent == "create_presentation": |
| artifacts.append({"type": "pptx", "status": "planned", "note": "PptxGenJS renderer service contract is available at /render"}) |
| return "I prepared a presentation plan using the resolved visual theme. The PPTX renderer service can turn this into an editable deck.", artifacts |
| content = await self.llm.complete( |
| [ |
| {"role": "developer", "content": "You are GLM-5.2 reasoning over compact visual evidence from the Visual Variable Runtime."}, |
| {"role": "user", "content": json.dumps(evidence, ensure_ascii=False)}, |
| ] |
| ) |
| return content, artifacts |
|
|
| async def _load_image_url(self, image_url: str | None) -> tuple[bytes, str]: |
| if not image_url: |
| raise ValueError("input_image requires image_url or file_id") |
| if image_url.startswith("data:"): |
| header, encoded = image_url.split(",", 1) |
| mime_type = header.split(";")[0].removeprefix("data:") |
| data = base64.b64decode(encoded) |
| self._validate_image(data, mime_type) |
| return data, mime_type |
| self._validate_public_url(image_url) |
| async with httpx.AsyncClient(follow_redirects=False, timeout=20) as client: |
| response = await client.get(image_url) |
| for _ in range(3): |
| if response.status_code not in {301, 302, 303, 307, 308}: |
| break |
| location = response.headers.get("location") |
| self._validate_public_url(location) |
| response = await client.get(location) |
| response.raise_for_status() |
| mime_type = response.headers.get("content-type", "application/octet-stream").split(";")[0] |
| data = response.content |
| self._validate_image(data, mime_type) |
| return data, mime_type |
|
|
| def _validate_public_url(self, url: str | None) -> None: |
| if not url: |
| raise ValueError("empty URL") |
| parsed = urlparse(url) |
| if parsed.scheme not in {"http", "https"} or not parsed.hostname: |
| raise ValueError("unsupported image URL") |
| host = parsed.hostname.lower() |
| if host in {"localhost", "127.0.0.1", "::1"} or host.endswith(".local"): |
| raise ValueError("private image URL rejected") |
| for result in socket.getaddrinfo(host, None): |
| ip = ipaddress.ip_address(result[4][0]) |
| if ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_reserved: |
| raise ValueError("private image URL rejected") |
|
|
| def _validate_image(self, data: bytes, mime_type: str) -> None: |
| if len(data) > self.settings.max_image_bytes: |
| raise ValueError("image exceeds max size") |
| if mime_type not in {"image/png", "image/jpeg", "image/webp"}: |
| raise ValueError("unsupported image MIME type") |
| width, height = self._image_dimensions(data) |
| if width * height > self.settings.max_image_pixels: |
| raise ValueError("image dimensions exceed limit") |
|
|
| def _image_dimensions(self, data: bytes) -> tuple[int, int]: |
| image = Image.open(io.BytesIO(data)) |
| image.verify() |
| image = Image.open(io.BytesIO(data)) |
| return image.size |
|
|
| def _suffix_for_mime(self, mime_type: str) -> str: |
| return {"image/png": ".png", "image/jpeg": ".jpg", "image/webp": ".webp"}.get(mime_type, "") |
|
|