LeomordKaly commited on
Commit
09fee34
Β·
verified Β·
1 Parent(s): b6574e8

deploy: phase 3 BYOK backend (Dockerfile.hf, FastAPI on 7860)

Browse files
Files changed (2) hide show
  1. core/extraction.py +190 -0
  2. interfaces/api.py +153 -2
core/extraction.py ADDED
@@ -0,0 +1,190 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Structured-data extraction: document text -> JSON against a field schema.
2
+
3
+ This is the *extraction mode* (Tier X) β€” the second face of the platform next to
4
+ RAG Q&A. Instead of "ask a question, get a cited answer," the caller supplies a
5
+ small **field schema** (name + type + description per field) and gets a single
6
+ validated JSON object back. No retrieval, no vector DB β€” just parse β†’ one
7
+ ``json_mode`` LLM call β†’ validate.
8
+
9
+ It reuses the same inference router as the RAG pipeline, so the visitor's BYOK
10
+ key powers the call and the same sensitivity-routing applies (HIGH-sensitivity
11
+ docs stay local on a self-hosted deploy). Kept framework-free so it is unit
12
+ testable without FastAPI.
13
+ """
14
+
15
+ from __future__ import annotations
16
+
17
+ import json
18
+ import re
19
+ from dataclasses import dataclass
20
+
21
+ from utils.logging import get_logger
22
+
23
+ logger = get_logger(__name__)
24
+
25
+ # Bound the document text fed to the model so a long PDF cannot blow the
26
+ # token budget / rate limit. Extraction targets a handful of fields, so the
27
+ # salient content is almost always near the top of the document.
28
+ MAX_EXTRACTION_CHARS = 12_000
29
+
30
+ # Field types we coerce to. Anything else is treated as a string.
31
+ _ALLOWED_TYPES = frozenset({"string", "number", "integer", "boolean", "date"})
32
+
33
+
34
+ @dataclass(frozen=True)
35
+ class ExtractionField:
36
+ """One field to pull out of a document.
37
+
38
+ Attributes:
39
+ name: JSON key to emit (e.g. ``"total_amount"``).
40
+ type: One of ``string`` / ``number`` / ``integer`` / ``boolean`` /
41
+ ``date``. Unknown types fall back to ``string``.
42
+ description: Plain-language hint that tells the model what to look for
43
+ (e.g. "the grand total including VAT, as a number").
44
+ """
45
+
46
+ name: str
47
+ type: str = "string"
48
+ description: str = ""
49
+
50
+ def safe_type(self) -> str:
51
+ t = (self.type or "string").lower().strip()
52
+ return t if t in _ALLOWED_TYPES else "string"
53
+
54
+
55
+ def normalise_fields(raw_fields: list[dict]) -> list[ExtractionField]:
56
+ """Coerce a list of raw field dicts into validated ``ExtractionField`` objects.
57
+
58
+ Drops entries without a usable ``name``; caps the count so a caller cannot
59
+ request hundreds of fields in one prompt. Raises ``ValueError`` when nothing
60
+ usable remains.
61
+ """
62
+ out: list[ExtractionField] = []
63
+ for f in raw_fields or []:
64
+ if not isinstance(f, dict):
65
+ continue
66
+ name = str(f.get("name", "")).strip()
67
+ if not name:
68
+ continue
69
+ out.append(
70
+ ExtractionField(
71
+ name=name,
72
+ type=str(f.get("type", "string")),
73
+ description=str(f.get("description", "")).strip(),
74
+ )
75
+ )
76
+ if len(out) >= 25: # hard cap β€” keep the prompt + output bounded
77
+ break
78
+ if not out:
79
+ raise ValueError("no usable fields in the extraction schema")
80
+ return out
81
+
82
+
83
+ def build_extraction_prompt(text: str, fields: list[ExtractionField]) -> str:
84
+ """Build a strict JSON-only extraction prompt."""
85
+ field_lines = "\n".join(
86
+ f'- "{f.name}" ({f.safe_type()}): {f.description or "extract this field"}' for f in fields
87
+ )
88
+ keys = ", ".join(f'"{f.name}"' for f in fields)
89
+ return (
90
+ "You are a precise document data-extraction engine. Extract the fields "
91
+ "below from the DOCUMENT and return a SINGLE valid JSON object β€” nothing "
92
+ "else, no markdown fences, no commentary.\n\n"
93
+ "RULES:\n"
94
+ "1. Output exactly these keys and no others: " + keys + ".\n"
95
+ "2. Use the field type as a hint. Numbers as JSON numbers, booleans as "
96
+ "true/false, dates as ISO-8601 strings (YYYY-MM-DD) when possible.\n"
97
+ "3. If a field is not present in the document, set its value to null. "
98
+ "Do NOT invent values.\n"
99
+ "4. Answer in the document's own language for free-text values "
100
+ "(Arabic documents -> Arabic values).\n\n"
101
+ f"FIELDS:\n{field_lines}\n\n"
102
+ f"DOCUMENT:\n{text[:MAX_EXTRACTION_CHARS]}\n\n"
103
+ "Return ONLY the JSON object:"
104
+ )
105
+
106
+
107
+ def parse_extraction_response(raw: str, fields: list[ExtractionField]) -> dict:
108
+ """Parse the model's JSON, keep only the requested keys, coerce types.
109
+
110
+ Robust to a model that wraps the JSON in ``` fences or adds a ``<think>``
111
+ preamble. Always returns a dict with **every** requested key present
112
+ (missing -> ``None``), so the caller gets a stable shape.
113
+ """
114
+ cleaned = re.sub(r"<think>.*?</think>", "", raw or "", flags=re.DOTALL | re.IGNORECASE)
115
+ cleaned = cleaned.strip()
116
+ # Strip a leading ```json / ``` fence if present.
117
+ if cleaned.startswith("```"):
118
+ cleaned = cleaned.split("\n", 1)[1] if "\n" in cleaned else ""
119
+ if cleaned.rstrip().endswith("```"):
120
+ cleaned = cleaned.rsplit("```", 1)[0]
121
+ cleaned = cleaned.strip()
122
+ # Fall back to the first {...} block if there is still surrounding prose.
123
+ if not cleaned.startswith("{"):
124
+ m = re.search(r"\{.*\}", cleaned, flags=re.DOTALL)
125
+ cleaned = m.group(0) if m else "{}"
126
+
127
+ try:
128
+ data = json.loads(cleaned)
129
+ except json.JSONDecodeError:
130
+ logger.warning("extraction_json_parse_failed", preview=cleaned[:120])
131
+ data = {}
132
+ if not isinstance(data, dict):
133
+ data = {}
134
+
135
+ result: dict = {}
136
+ for f in fields:
137
+ result[f.name] = _coerce(data.get(f.name), f.safe_type())
138
+ return result
139
+
140
+
141
+ def _coerce(value: object, typ: str):
142
+ """Best-effort coerce a raw JSON value to the requested field type."""
143
+ if value is None:
144
+ return None
145
+ try:
146
+ if typ == "integer":
147
+ return int(float(str(value).replace(",", "").strip()))
148
+ if typ == "number":
149
+ return float(str(value).replace(",", "").strip())
150
+ if typ == "boolean":
151
+ if isinstance(value, bool):
152
+ return value
153
+ return str(value).strip().lower() in ("true", "yes", "1", "Ω†ΨΉΩ…")
154
+ # string / date β€” return as-is string
155
+ return value if isinstance(value, (str, int, float, bool)) else str(value)
156
+ except (ValueError, TypeError):
157
+ return value # keep the raw value rather than dropping data
158
+
159
+
160
+ async def extract_fields(
161
+ text: str,
162
+ fields: list[ExtractionField],
163
+ *,
164
+ prefer_cloud: bool = True,
165
+ sensitivity_level: str = "low",
166
+ ) -> dict:
167
+ """Run one ``json_mode`` extraction call and return the validated result.
168
+
169
+ Returns a dict: ``{"fields": {...}, "model": str, "provider": str,
170
+ "latency_ms": float, "raw": str}``. Never raises on a bad LLM response β€”
171
+ returns all-null fields so the caller always gets a stable shape.
172
+ """
173
+ from core.agents.router import call_llm_with_decision
174
+
175
+ prompt = build_extraction_prompt(text, fields)
176
+ raw, decision, response = await call_llm_with_decision(
177
+ prompt,
178
+ system_prompt="You output only valid JSON. No prose, no markdown fences.",
179
+ sensitivity_level=sensitivity_level,
180
+ prefer_cloud=prefer_cloud,
181
+ json_mode=True,
182
+ )
183
+ parsed = parse_extraction_response(raw or "", fields)
184
+ return {
185
+ "fields": parsed,
186
+ "model": decision.model if decision else "unknown",
187
+ "provider": decision.provider if decision else "unknown",
188
+ "latency_ms": response.latency_ms if response else 0.0,
189
+ "raw": raw or "",
190
+ }
interfaces/api.py CHANGED
@@ -417,8 +417,9 @@ if _FASTAPI_AVAILABLE:
417
  async def byok_corpus() -> dict:
418
  """Summarise the base demo corpus -- source files + metadata.
419
 
420
- Scrolls the root tenant Qdrant collection (the 10 hand-curated
421
- demo docs) and groups points by ``source_file``. Returns one
 
422
  row per file with the chunk count, sensitivity label, and
423
  roles -- never the chunk text. Visitor uploads under
424
  ``documents_sess_<sid>`` are NOT included (those live in the
@@ -1190,6 +1191,156 @@ if _FASTAPI_AVAILABLE:
1190
  "deleted_chunks": deleted,
1191
  }
1192
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1193
  @app.post("/query", response_model=QueryResponse, tags=["rag"])
1194
  async def query_endpoint(
1195
  body: QueryRequest,
 
417
  async def byok_corpus() -> dict:
418
  """Summarise the base demo corpus -- source files + metadata.
419
 
420
+ Scrolls the root tenant Qdrant collection (the hand-curated demo
421
+ docs β€” English RBAC + Arabic Egypt) and groups points by
422
+ ``source_file``. Returns one
423
  row per file with the chunk count, sensitivity label, and
424
  roles -- never the chunk text. Visitor uploads under
425
  ``documents_sess_<sid>`` are NOT included (those live in the
 
1191
  "deleted_chunks": deleted,
1192
  }
1193
 
1194
+ # ── BYOK extraction mode (doc -> structured JSON) ────────────────
1195
+ # The platform's second face next to RAG Q&A: upload a document + a
1196
+ # field schema, get back one validated JSON object. No retrieval, no
1197
+ # vector DB β€” parse -> one json_mode LLM call -> validate. Reuses the
1198
+ # inference router (visitor's BYOK key powers it; sensitivity routing
1199
+ # applies) and lands on the same audit chain. See ADR-041 / Tier X.
1200
+ from fastapi import Form
1201
+
1202
+ @app.post("/byok/extract", tags=["byok"])
1203
+ async def byok_extract(
1204
+ request: _FastApiRequest,
1205
+ file: Annotated[UploadFile, File(...)],
1206
+ fields: Annotated[str, Form(...)],
1207
+ creds: Annotated[ByokCreds, Depends(extract_byok)],
1208
+ ) -> dict:
1209
+ """Extract a caller-defined field schema from an uploaded document.
1210
+
1211
+ ``fields`` is a JSON string: ``[{"name","type","description"}, ...]``.
1212
+ Returns ``{fields: {...}, model, provider, latency_ms}``. Same
1213
+ throttle / BYOK-runtime contract as ``/byok/chat``.
1214
+ """
1215
+ from core.extraction import extract_fields, normalise_fields
1216
+
1217
+ # Parse + validate the schema first (cheap, fail fast).
1218
+ try:
1219
+ raw_fields = json.loads(fields)
1220
+ if not isinstance(raw_fields, list):
1221
+ raise ValueError("fields must be a JSON array")
1222
+ schema = normalise_fields(raw_fields)
1223
+ except (json.JSONDecodeError, ValueError) as exc:
1224
+ raise HTTPException(
1225
+ status.HTTP_400_BAD_REQUEST,
1226
+ detail={"reason": "bad_schema", "error": str(exc)},
1227
+ ) from exc
1228
+
1229
+ # Throttle owner-key fallback exactly like chat.
1230
+ if not creds.byok_active():
1231
+ throttle = get_owner_key_throttle()
1232
+ ok, meta = throttle.allow(client_ip_from_request(request))
1233
+ if not ok:
1234
+ raise HTTPException(
1235
+ status.HTTP_429_TOO_MANY_REQUESTS,
1236
+ detail={
1237
+ "reason": meta["reason"],
1238
+ "retry_after_seconds": meta["retry_after"],
1239
+ "hint": (
1240
+ "Owner-key fallback exhausted for this IP. Paste "
1241
+ "your own LLM key to continue β€” never stored."
1242
+ ),
1243
+ },
1244
+ )
1245
+
1246
+ # Validate ext + size (mirror the upload caps).
1247
+ filename = file.filename or "upload"
1248
+ ext = ("." + filename.rsplit(".", 1)[-1].lower()) if "." in filename else ""
1249
+ allowed = {e.lower() for e in settings.byok_upload_allowed_extensions}
1250
+ if ext not in allowed:
1251
+ raise HTTPException(
1252
+ status.HTTP_400_BAD_REQUEST,
1253
+ detail={"reason": "unsupported_extension", "allowed": sorted(allowed)},
1254
+ )
1255
+ max_bytes = int(settings.byok_upload_max_bytes)
1256
+ buf = bytearray()
1257
+ while True:
1258
+ chunk = await file.read(64 * 1024)
1259
+ if not chunk:
1260
+ break
1261
+ buf.extend(chunk)
1262
+ if len(buf) > max_bytes:
1263
+ raise HTTPException(
1264
+ status.HTTP_413_CONTENT_TOO_LARGE,
1265
+ detail={"reason": "file_too_large", "limit_bytes": max_bytes},
1266
+ )
1267
+ if not buf:
1268
+ raise HTTPException(status.HTTP_400_BAD_REQUEST, detail={"reason": "empty_file"})
1269
+
1270
+ # Spool + parse to text via the existing loaders.
1271
+ import os as _os
1272
+ import tempfile as _tempfile
1273
+
1274
+ from ingestion.loaders import load_document
1275
+
1276
+ safe_name = (
1277
+ "".join(c if (c.isalnum() or c in "._-") else "_" for c in filename) or "upload"
1278
+ )
1279
+ tmp_dir = _tempfile.mkdtemp(prefix=f"byok_extract_{creds.session_id}_")
1280
+ tmp_path = _os.path.join(tmp_dir, safe_name)
1281
+ _t0 = __import__("time").perf_counter()
1282
+ try:
1283
+ with open(tmp_path, "wb") as fh:
1284
+ fh.write(bytes(buf))
1285
+ try:
1286
+ docs = await asyncio.to_thread(load_document, tmp_path)
1287
+ except Exception as exc:
1288
+ raise HTTPException(
1289
+ status.HTTP_422_UNPROCESSABLE_ENTITY,
1290
+ detail={"reason": "parse_failed", "error": str(exc)},
1291
+ ) from exc
1292
+ text = "\n\n".join(d.text for d in docs if d.text).strip()
1293
+ if not text:
1294
+ raise HTTPException(
1295
+ status.HTTP_422_UNPROCESSABLE_ENTITY,
1296
+ detail={
1297
+ "reason": "no_text",
1298
+ "hint": "No extractable text (scanned image PDFs need OCR).",
1299
+ },
1300
+ )
1301
+
1302
+ _byok_tok = set_byok_runtime(_byok_runtime_for(creds))
1303
+ try:
1304
+ result = await extract_fields(
1305
+ text, schema, prefer_cloud=True, sensitivity_level="low"
1306
+ )
1307
+ finally:
1308
+ reset_byok_runtime(_byok_tok)
1309
+ finally:
1310
+ try:
1311
+ _os.remove(tmp_path)
1312
+ _os.rmdir(tmp_dir)
1313
+ except OSError:
1314
+ pass
1315
+
1316
+ elapsed_ms = (__import__("time").perf_counter() - _t0) * 1000
1317
+ try:
1318
+ audit_logger.log_query(
1319
+ user_id=f"demo-{creds.session_id}",
1320
+ org_id=_DEMO_ORG_ID,
1321
+ query=f"[extract] {safe_name} ({len(schema)} fields)",
1322
+ response_summary=f"extracted {len(result['fields'])} fields",
1323
+ sensitivity="low",
1324
+ status="success",
1325
+ latency_ms=elapsed_ms,
1326
+ action_hint="extract",
1327
+ byok_used=creds.has_user_key(),
1328
+ synth_provider=result["provider"],
1329
+ synth_model=result["model"],
1330
+ )
1331
+ except Exception as exc: # pragma: no cover - defensive
1332
+ logger.warning("byok_extract_audit_failed", error=str(exc))
1333
+
1334
+ return {
1335
+ "session_id": creds.session_id,
1336
+ "filename": safe_name,
1337
+ "byok_used": creds.has_user_key(),
1338
+ "fields": result["fields"],
1339
+ "provider": result["provider"],
1340
+ "model": result["model"],
1341
+ "latency_ms": elapsed_ms,
1342
+ }
1343
+
1344
  @app.post("/query", response_model=QueryResponse, tags=["rag"])
1345
  async def query_endpoint(
1346
  body: QueryRequest,