"""Structured-data extraction: document text -> JSON against a field schema. This is the *extraction mode* (Tier X) — the second face of the platform next to RAG Q&A. Instead of "ask a question, get a cited answer," the caller supplies a small **field schema** (name + type + description per field) and gets a single validated JSON object back. No retrieval, no vector DB — just parse → one ``json_mode`` LLM call → validate. It reuses the same inference router as the RAG pipeline, so the visitor's BYOK key powers the call and the same sensitivity-routing applies (HIGH-sensitivity docs stay local on a self-hosted deploy). Kept framework-free so it is unit testable without FastAPI. """ from __future__ import annotations import json import re from dataclasses import dataclass from utils.logging import get_logger logger = get_logger(__name__) # Bound the document text fed to the model so a long PDF cannot blow the # token budget / rate limit. Extraction targets a handful of fields, so the # salient content is almost always near the top of the document. MAX_EXTRACTION_CHARS = 12_000 # Field types we coerce to. Anything else is treated as a string. _ALLOWED_TYPES = frozenset({"string", "number", "integer", "boolean", "date"}) @dataclass(frozen=True) class ExtractionField: """One field to pull out of a document. Attributes: name: JSON key to emit (e.g. ``"total_amount"``). type: One of ``string`` / ``number`` / ``integer`` / ``boolean`` / ``date``. Unknown types fall back to ``string``. description: Plain-language hint that tells the model what to look for (e.g. "the grand total including VAT, as a number"). """ name: str type: str = "string" description: str = "" def safe_type(self) -> str: t = (self.type or "string").lower().strip() return t if t in _ALLOWED_TYPES else "string" def normalise_fields(raw_fields: list[dict]) -> list[ExtractionField]: """Coerce a list of raw field dicts into validated ``ExtractionField`` objects. Drops entries without a usable ``name``; caps the count so a caller cannot request hundreds of fields in one prompt. Raises ``ValueError`` when nothing usable remains. """ out: list[ExtractionField] = [] for f in raw_fields or []: if not isinstance(f, dict): continue name = str(f.get("name", "")).strip() if not name: continue out.append( ExtractionField( name=name, type=str(f.get("type", "string")), description=str(f.get("description", "")).strip(), ) ) if len(out) >= 25: # hard cap — keep the prompt + output bounded break if not out: raise ValueError("no usable fields in the extraction schema") return out def build_extraction_prompt(text: str, fields: list[ExtractionField]) -> str: """Build a strict JSON-only extraction prompt.""" field_lines = "\n".join( f'- "{f.name}" ({f.safe_type()}): {f.description or "extract this field"}' for f in fields ) keys = ", ".join(f'"{f.name}"' for f in fields) return ( "You are a precise document data-extraction engine. Extract the fields " "below from the DOCUMENT and return a SINGLE valid JSON object — nothing " "else, no markdown fences, no commentary.\n\n" "RULES:\n" "1. Output exactly these keys and no others: " + keys + ".\n" "2. Use the field type as a hint. Numbers as JSON numbers, booleans as " "true/false, dates as ISO-8601 strings (YYYY-MM-DD) when possible.\n" "3. If a field is not present in the document, set its value to null. " "Do NOT invent values.\n" "4. Answer in the document's own language for free-text values " "(Arabic documents -> Arabic values).\n\n" f"FIELDS:\n{field_lines}\n\n" f"DOCUMENT:\n{text[:MAX_EXTRACTION_CHARS]}\n\n" "Return ONLY the JSON object:" ) def parse_extraction_response(raw: str, fields: list[ExtractionField]) -> dict: """Parse the model's JSON, keep only the requested keys, coerce types. Robust to a model that wraps the JSON in ``` fences or adds a ```` preamble. Always returns a dict with **every** requested key present (missing -> ``None``), so the caller gets a stable shape. """ cleaned = re.sub(r".*?", "", raw or "", flags=re.DOTALL | re.IGNORECASE) cleaned = cleaned.strip() # Strip a leading ```json / ``` fence if present. if cleaned.startswith("```"): cleaned = cleaned.split("\n", 1)[1] if "\n" in cleaned else "" if cleaned.rstrip().endswith("```"): cleaned = cleaned.rsplit("```", 1)[0] cleaned = cleaned.strip() # Fall back to the first {...} block if there is still surrounding prose. if not cleaned.startswith("{"): m = re.search(r"\{.*\}", cleaned, flags=re.DOTALL) cleaned = m.group(0) if m else "{}" try: data = json.loads(cleaned) except json.JSONDecodeError: logger.warning("extraction_json_parse_failed", preview=cleaned[:120]) data = {} if not isinstance(data, dict): data = {} result: dict = {} for f in fields: result[f.name] = _coerce(data.get(f.name), f.safe_type()) return result def _coerce(value: object, typ: str): """Best-effort coerce a raw JSON value to the requested field type.""" if value is None: return None try: if typ == "integer": return int(float(str(value).replace(",", "").strip())) if typ == "number": return float(str(value).replace(",", "").strip()) if typ == "boolean": if isinstance(value, bool): return value return str(value).strip().lower() in ("true", "yes", "1", "نعم") # string / date — return as-is string return value if isinstance(value, (str, int, float, bool)) else str(value) except (ValueError, TypeError): return value # keep the raw value rather than dropping data async def extract_fields( text: str, fields: list[ExtractionField], *, prefer_cloud: bool = True, sensitivity_level: str = "low", ) -> dict: """Run one ``json_mode`` extraction call and return the validated result. Returns a dict: ``{"fields": {...}, "model": str, "provider": str, "latency_ms": float, "raw": str}``. Never raises on a bad LLM response — returns all-null fields so the caller always gets a stable shape. """ from core.agents.router import call_llm_with_decision prompt = build_extraction_prompt(text, fields) raw, decision, response = await call_llm_with_decision( prompt, system_prompt="You output only valid JSON. No prose, no markdown fences.", sensitivity_level=sensitivity_level, prefer_cloud=prefer_cloud, json_mode=True, ) parsed = parse_extraction_response(raw or "", fields) return { "fields": parsed, "model": decision.model if decision else "unknown", "provider": decision.provider if decision else "unknown", "latency_ms": response.latency_ms if response else 0.0, "raw": raw or "", }