import gradio as gr
import json
import tempfile
import os
import base64
import re
from io import BytesIO
from PIL import Image
from typing import Optional
from pydantic import BaseModel, Field, create_model
from datetime import date
from openai import OpenAI
from dotenv import load_dotenv
load_dotenv()
FIELD_FORMATS = [
"text",
"date",
"number",
"true/false",
"empty",
"multiple choice",
"unit",
]
NAME_MAX_CHARS = 100
PROMPT_MAX_CHARS = 300
def normalize_format_label(fmt_raw: str) -> str:
mapping = {
# French → English
"texte": "text",
"date": "date",
"nombre": "number",
"vrai/faux": "true/false",
"vide": "empty",
"choix multiple": "multiple choice",
"unité": "unit",
# English (idempotent)
"text": "text",
"number": "number",
"true/false": "true/false",
"empty": "empty",
"multiple choice": "multiple choice",
"unit": "unit",
}
return mapping.get(str(fmt_raw or "").strip().lower(), "text")
IDENTIFIER_REGEX = re.compile(r"^[A-Za-z][A-Za-z0-9_-]{0,99}$")
def is_image_url(url: str) -> bool:
if not url:
return False
u = url.strip().lower()
if not (u.startswith("http://") or u.startswith("https://")):
return False
# Accept common raster image extensions only
allowed_exts = (".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".tiff", ".tif")
# remove querystring/fragment before checking suffix
base = u.split("?")[0].split("#")[0]
return base.endswith(allowed_exts)
def is_valid_ascii_identifier(value: str) -> bool:
s = str(value or "").strip()
if not s:
return False
if not IDENTIFIER_REGEX.match(s):
return False
try:
s.encode("ascii")
except Exception:
return False
return True
def live_validate_field_name(name: str):
msg = ""
if not is_valid_ascii_identifier(name):
msg = "Only ASCII letters, digits, '_' or '-' allowed; start with a letter; no spaces or accents."
html = f"{msg}" if msg else ""
return gr.update(value=html, visible=bool(msg)), gr.update(interactive=(msg == ""))
def live_validate_choice(choice: str):
msg = ""
c = (choice or "").strip()
if not c:
msg = "Enter a non-empty choice."
elif not is_valid_ascii_identifier(c):
msg = "Only ASCII letters, digits, '_' or '-' allowed; start with a letter; no spaces or accents."
html = f"{msg}" if msg else ""
return gr.update(value=html, visible=bool(msg)), gr.update(interactive=(msg == ""))
def error_update(msg: str):
return gr.update(value=f"{msg}", visible=True)
def fields_to_rows(fields):
return [[
f["name"],
f["format"],
f.get("description", ""),
f.get("details", ""),
] for f in fields]
def names_from_fields(fields):
return [str(f.get("name", "")) for f in (fields or [])]
def add_field(name, field_format, description, choices_list, unit, fields):
name = (name or "").strip()
field_format = normalize_format_label(field_format)
description = (description or "").strip()
# choices_list is a list of strings when format == "multiple choice"
unit = (unit or "").strip()
# validations
if not name:
return (
error_update("⚠️ Field name is required."),
(fields or []),
fields_to_rows(fields or []),
gr.update(choices=names_from_fields(fields or []), value=None, visible=len(fields or []) > 0),
gr.update(visible=len(fields or []) > 0),
ready_update_from_fields(fields or []),
)
if not is_valid_ascii_identifier(name):
return (
error_update("⚠️ Invalid field name: use ASCII letters, digits, '_' or '-'; start with a letter; no spaces or accents."),
(fields or []),
fields_to_rows(fields or []),
gr.update(choices=names_from_fields(fields or []), value=None, visible=len(fields or []) > 0),
gr.update(visible=len(fields or []) > 0),
ready_update_from_fields(fields or []),
)
# uniqueness (case-insensitive, trimmed)
existing = {str(f.get("name", "")).strip().lower() for f in (fields or [])}
if name.lower() in existing:
return (
error_update("⚠️ This field name already exists."),
(fields or []),
fields_to_rows(fields or []),
gr.update(choices=names_from_fields(fields or []), value=None, visible=len(fields or []) > 0),
gr.update(visible=len(fields or []) > 0),
ready_update_from_fields(fields or []),
)
if len(name) > NAME_MAX_CHARS:
return (
error_update(f"⚠️ Name too long (max {NAME_MAX_CHARS} characters)."),
(fields or []),
fields_to_rows(fields or []),
gr.update(choices=names_from_fields(fields or []), value=None, visible=len(fields or []) > 0),
gr.update(visible=len(fields or []) > 0),
ready_update_from_fields(fields or []),
)
if len(description) > PROMPT_MAX_CHARS:
return (
error_update(f"⚠️ Description too long (max {PROMPT_MAX_CHARS} characters)."),
(fields or []),
fields_to_rows(fields or []),
gr.update(choices=names_from_fields(fields or []), value=None, visible=len(fields or []) > 0),
gr.update(visible=len(fields or []) > 0),
ready_update_from_fields(fields or []),
)
new_fields = list(fields or [])
details = ""
if field_format == "multiple choice":
options = [c for c in (choices_list or []) if str(c).strip()]
if len(options) < 2:
return (
error_update("⚠️ For ‘multiple choice’, add at least 2 choices."),
(fields or []),
fields_to_rows(fields or []),
gr.update(choices=names_from_fields(fields or []), value=None, visible=len(fields or []) > 0),
gr.update(visible=len(fields or []) > 0),
ready_update_from_fields(fields or []),
)
normalized = [str(c).strip().lower() for c in options]
if len(set(normalized)) != len(options):
return (
error_update("⚠️ For ‘multiple choice’, choices must be unique."),
(fields or []),
fields_to_rows(fields or []),
gr.update(choices=names_from_fields(fields or []), value=None, visible=len(fields or []) > 0),
gr.update(visible=len(fields or []) > 0),
ready_update_from_fields(fields or []),
)
if options:
details = "choices: " + " | ".join(options)
elif field_format == "unit":
if unit:
details = f"unit: {unit}"
new_fields.append({
"name": name,
"format": field_format,
"description": description,
"details": details,
"options": options if field_format == "multiple choice" else [],
"unit": unit if field_format == "unit" else "",
})
return (
gr.update(value="", visible=False),
new_fields,
fields_to_rows(new_fields),
gr.update(choices=names_from_fields(new_fields), value=None, visible=len(new_fields) > 0),
gr.update(visible=len(new_fields) > 0),
ready_update_from_fields(new_fields),
)
def delete_field(delete_name, fields):
current_fields = list(fields or [])
if not delete_name:
return (
error_update("⚠️ Select a field to delete."),
current_fields,
fields_to_rows(current_fields),
gr.update(choices=names_from_fields(current_fields), value=None, visible=len(current_fields) > 0),
gr.update(visible=len(current_fields) > 0),
ready_update_from_fields(current_fields),
)
new_fields = [
f for f in current_fields
if str(f.get("name", "")).strip().lower() != str(delete_name).strip().lower()
]
if len(new_fields) == len(current_fields):
return (
error_update("⚠️ Field not found."),
current_fields,
fields_to_rows(current_fields),
gr.update(choices=names_from_fields(current_fields), value=None, visible=len(current_fields) > 0),
gr.update(visible=len(current_fields) > 0),
ready_update_from_fields(current_fields),
)
return (
gr.update(value="", visible=False),
new_fields,
fields_to_rows(new_fields),
gr.update(choices=names_from_fields(new_fields), value=None, visible=len(new_fields) > 0),
gr.update(visible=len(new_fields) > 0),
ready_update_from_fields(new_fields),
)
def serialize_model(fields):
return {"version": 1, "fields": list(fields or [])}
def count_message(fields):
n = len(fields or [])
if n == 0:
return "0 field in model"
if n == 1:
return "1 field in model"
return f"{n} fields in model"
def visibility_updates_from_fields(fields):
has = len(fields or []) > 0
return (
gr.update(choices=names_from_fields(fields or []), value=None, visible=has), # delete_dropdown
gr.update(visible=has), # download_btn
gr.update(visible=has), # delete_btn
gr.update(visible=has), # model_filename
)
def sanitize_filename(name):
candidate = (name or "").strip()
if not candidate:
return "model.json"
# enlever répertoires et caractères peu sûrs
candidate = candidate.replace("\\", "/").split("/")[-1]
allowed = []
for ch in candidate:
if ch.isalnum() or ch in ("-", "_", ".", " "):
allowed.append(ch)
else:
allowed.append("-")
candidate = "".join(allowed)
if not candidate.lower().endswith(".json"):
candidate += ".json"
if len(candidate) > 100:
candidate = candidate[:100]
return candidate
def export_model(fields, filename):
model = serialize_model(fields)
if not fields:
return gr.update(visible=False)
file_name = sanitize_filename(filename)
temp_dir = tempfile.mkdtemp(prefix="model-")
path = os.path.join(temp_dir, file_name)
with open(path, "w", encoding="utf-8") as f:
json.dump(model, f, ensure_ascii=False, indent=2)
return gr.update(value=path, visible=True)
def to_python_identifier(name: str) -> str:
s = str(name or "").strip().lower()
if not s:
return "field"
out = []
prev_underscore = False
for ch in s:
if ch.isalnum():
out.append(ch)
prev_underscore = False
else:
if not prev_underscore:
out.append("_")
prev_underscore = True
ident = "".join(out).strip("_")
if not ident:
ident = "field"
if ident[0].isdigit():
ident = f"field_{ident}"
return ident
def generate_pydantic_code(fields, class_name: str = "DocumentModel") -> str:
fields = list(fields or [])
uses_optional = any((normalize_format_label(f.get("format")) == "empty") for f in fields)
uses_literal = any((normalize_format_label(f.get("format")) == "multiple choice" and f.get("options")) for f in fields)
uses_date = any((normalize_format_label(f.get("format")) == "date") for f in fields)
def type_for(f):
fmt = normalize_format_label(f.get("format"))
options = f.get("options", [])
if fmt == "text":
return "str", False
if fmt == "date":
return "date", False
if fmt == "number":
return "float", False
if fmt == "true/false":
return "bool", False
if fmt == "empty":
return "Optional[str]", True
if fmt == "multiple choice":
if options:
lits = ", ".join(repr(str(o)) for o in options)
return f"Literal[{lits}]", False
return "str", False
if fmt == "unit":
return "float", False
return "str", False
lines = []
lines.append("from pydantic import BaseModel, Field")
if uses_optional:
lines.append("from typing import Optional")
if uses_literal:
lines.append("from typing import Literal")
if uses_date:
lines.append("from datetime import date")
lines.append("")
lines.append(f"class {class_name}(BaseModel):")
if not fields:
lines.append(" pass")
return "\n".join(lines)
for f in fields:
raw_name = f.get("name", "")
ident = to_python_identifier(raw_name)
typ, is_optional = type_for(f)
desc = f.get("description", "")
details = f.get("details", "")
desc_full = desc if details == "" else (desc + " | " + details)
lines.append(f" # {raw_name} ({f.get('format')})")
if is_optional:
lines.append(f" {ident}: {typ} = Field(None, description={desc_full!r})")
else:
lines.append(f" {ident}: {typ} = Field(..., description={desc_full!r})")
return "\n".join(lines)
def pydantic_code_update_from_fields(fields):
# Conservé pour compat éventuelle mais rendu non utilisé
has = len(fields or []) > 0
if not has:
return gr.update(value="", visible=False)
code = generate_pydantic_code(fields)
return gr.update(value=code, visible=False)
def export_pydantic_py(fields):
if not fields:
return gr.update(visible=False)
code = generate_pydantic_code(fields)
temp_dir = tempfile.mkdtemp(prefix="pydantic-")
path = os.path.join(temp_dir, "document_model.py")
with open(path, "w", encoding="utf-8") as f:
f.write(code)
return gr.update(value=path, visible=True)
def build_pydantic_model_class(fields, class_name: str = "DocumentModel"):
field_definitions = {}
for f in (fields or []):
raw_name = f.get("name", "")
ident = to_python_identifier(raw_name)
fmt = normalize_format_label(f.get("format"))
desc = f.get("description", "")
details = f.get("details", "")
desc_full = desc if details == "" else (desc + " | " + details)
options = f.get("options", []) or []
json_extra = None
if fmt == "text":
typ = str
default = ...
elif fmt == "date":
typ = date
default = ...
elif fmt == "number":
typ = float
default = ...
elif fmt == "true/false":
typ = bool
default = ...
elif fmt == "empty":
typ = Optional[str]
default = None
elif fmt == "multiple choice":
typ = str
default = ...
if options:
json_extra = {"enum": [str(o) for o in options]}
elif fmt == "unit":
typ = float
default = ...
else:
typ = str
default = ...
if json_extra is not None:
field_definitions[ident] = (typ, Field(default, description=desc_full, json_schema_extra=json_extra))
else:
field_definitions[ident] = (typ, Field(default, description=desc_full))
model = create_model(class_name, **field_definitions)
return model
def json_schema_from_fields(fields):
model = build_pydantic_model_class(fields)
schema = model.model_json_schema()
return json.dumps(schema, ensure_ascii=False, indent=2)
def instruction_from_fields(fields):
if not fields:
return ""
schema_json = json_schema_from_fields(fields)
return (
"Extract the following information from the provided image. "
"Respond only with a strictly valid JSON that conforms to this JSON Schema (no text outside JSON):\n"
+ schema_json
)
def document_file_to_data_url_with_error(path: str):
if not path or not os.path.exists(path):
return "", "File not found."
p = str(path).lower()
if p.endswith(".pdf"):
try:
import fitz # PyMuPDF
except Exception:
return "", "PDF support requires PyMuPDF. Install with: pip install pymupdf"
try:
doc = fitz.open(path)
if doc.page_count == 0:
return "", "PDF has no pages."
page = doc.load_page(0)
zoom = 300.0 / 72.0
mat = fitz.Matrix(zoom, zoom)
pix = page.get_pixmap(matrix=mat, alpha=False)
png_bytes = pix.tobytes("png")
b64 = base64.b64encode(png_bytes).decode("utf-8")
return f"data:image/png;base64,{b64}", None
except Exception as e:
return "", f"Failed to render PDF: {e}"
# Image path
try:
with Image.open(path) as im:
im = im.convert("RGB")
buf = BytesIO()
im.save(buf, format="PNG", optimize=True)
data = buf.getvalue()
b64 = base64.b64encode(data).decode("utf-8")
return f"data:image/png;base64,{b64}", None
except Exception:
return "", "Invalid image file."
def parse_json_from_text(text: str):
if text is None:
return None, "Empty text"
s = str(text)
if "```" in s:
parts = s.split("```")
if len(parts) >= 3:
# si bloc balisé, prendre le contenu central
s = parts[1]
start = s.find("{")
end = s.rfind("}")
if start == -1 or end == -1 or end <= start:
return None, "JSON not detected"
candidate = s[start:end + 1]
try:
return json.loads(candidate), None
except Exception as e:
return None, f"Invalid JSON: {e}"
def validate_output_against_model(fields, text):
model = build_pydantic_model_class(fields)
data, err = parse_json_from_text(text)
if err:
return False, err, None
try:
instance = model.model_validate(data)
normalized = json.dumps(instance.model_dump(mode="json"), ensure_ascii=False, indent=2)
return True, "OK", normalized
except Exception as e:
try:
details = getattr(e, 'errors', lambda: [])()
msgs = []
for d in details[:5]:
loc = ".".join(map(str, d.get('loc', [])))
msg = d.get('msg', 'error')
msgs.append(f"- {loc}: {msg}")
extra = "\n".join(msgs) if msgs else str(e)
except Exception:
extra = str(e)
return False, extra, None
def run_extraction(model_file_extraction, model_file_modeltab, fields_state, image_path, image_url, hf_token):
# Choose model source: Extraction > Model (upload) > Model (built)
try:
selected_fields = None
# 1) File uploaded in Extraction tab
if model_file_extraction:
path = model_file_extraction if isinstance(model_file_extraction, str) else model_file_extraction.get("path")
if path and os.path.exists(path):
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
fields_raw = data.get("fields", []) if isinstance(data, dict) else []
cleaned = []
seen = set()
for item in fields_raw:
name = str(item.get("name", "")).strip()
fmt = normalize_format_label(str(item.get("format", "")).strip())
description = str(item.get("description", ""))
options = item.get("options", []) if isinstance(item, dict) else []
unit = str(item.get("unit", ""))
if not name or len(name) > NAME_MAX_CHARS or not is_valid_ascii_identifier(name):
yield ("", gr.update(value="⚠️ Invalid model: field name must be ASCII [A-Za-z][A-Za-z0-9_-]* and <= length limit.", visible=True))
return
key = name.lower()
if key in seen:
yield ("", gr.update(value="⚠️ Invalid model: duplicate field names.", visible=True))
return
seen.add(key)
if fmt not in FIELD_FORMATS:
yield ("", gr.update(value="⚠️ Invalid model: unknown format.", visible=True))
return
if len(description) > PROMPT_MAX_CHARS:
yield ("", gr.update(value="⚠️ Invalid model: description too long.", visible=True))
return
details = ""
if fmt == "multiple choice":
options = [str(c).strip() for c in (options or []) if str(c).strip()]
if len(options) < 2:
yield ("", gr.update(value="⚠️ Invalid model: ‘multiple choice’ requires at least 2 choices.", visible=True))
return
for c in options:
if not is_valid_ascii_identifier(c):
yield ("", gr.update(value="⚠️ Invalid model: choices must match [A-Za-z][A-Za-z0-9_-]* with no spaces or accents.", visible=True))
return
normalized = [c.lower() for c in options]
if len(set(normalized)) != len(options):
yield ("", gr.update(value="⚠️ Invalid model: choices must be unique.", visible=True))
return
details = "choices: " + " | ".join(options)
elif fmt == "unit":
unit = unit.strip()
if unit:
details = f"unit: {unit}"
cleaned.append({
"name": name,
"format": fmt,
"description": description,
"details": details,
"options": options if fmt == "multiple choice" else [],
"unit": unit if fmt == "unit" else "",
})
selected_fields = cleaned
else:
yield ("", error_update("⚠️ Model file not found."))
return
# 2) File uploaded in Model tab
elif model_file_modeltab:
path = model_file_modeltab if isinstance(model_file_modeltab, str) else model_file_modeltab.get("path")
if path and os.path.exists(path):
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
raw_fields = data.get("fields", []) if isinstance(data, dict) else []
# normalize formats to English for internal use
selected_fields = []
for item in raw_fields:
item = dict(item)
item["format"] = normalize_format_label(item.get("format"))
selected_fields.append(item)
else:
yield ("", error_update("⚠️ Model file not found."))
return
# 3) Model built manually (state)
else:
# normalize possible legacy French formats in state
selected_fields = []
for item in (fields_state or []):
obj = dict(item)
obj["format"] = normalize_format_label(obj.get("format"))
selected_fields.append(obj)
if not selected_fields:
yield ("", error_update("⚠️ Model not ready."))
return
except Exception:
yield ("", gr.update(value="⚠️ Invalid model file.", visible=True))
return
# Construit instruction et lance appel streaming, renvoie (texte acumulé, statut)
instruction_text = instruction_from_fields(selected_fields)
if not instruction_text:
yield ("", error_update("⚠️ Model not ready."))
return
# Choose image source: URL has priority over uploaded file
image_url = (image_url or "").strip()
if image_url:
# N'accepter que des URLs d'images (pas de PDF)
if not is_image_url(image_url):
yield ("", error_update("⚠️ Only direct image URLs are allowed (jpg, jpeg, png, gif, webp, bmp, tiff)."))
return
final_image_ref = image_url
else:
if not image_path:
yield ("", error_update("⚠️ Provide an image/PDF file or a URL."))
return
data_url, err = document_file_to_data_url_with_error(image_path)
if not data_url:
msg = err or "Invalid document (image/PDF)."
yield ("", error_update("⚠️ " + msg))
return
final_image_ref = data_url
try:
api_key = (hf_token or "").strip() or os.getenv("OPENROUTER_API_KEY", "")
client = OpenAI(base_url="https://openrouter.ai/api/v1", api_key=api_key)
if not client.api_key:
yield ("", gr.update(value="⚠️ Missing OPENROUTER_API_KEY environment variable.", visible=True))
return
extra_headers = {}
ref = os.getenv("OPENROUTER_HTTP_REFERER", "").strip()
ttl = os.getenv("OPENROUTER_X_TITLE", "").strip()
if ref:
extra_headers["HTTP-Referer"] = ref
if ttl:
extra_headers["X-Title"] = ttl
model_name = os.getenv("OPENROUTER_MODEL", "openai/gpt-4o")
stream = client.chat.completions.create(
extra_headers=extra_headers or None,
model=model_name,
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": instruction_text},
{"type": "image_url", "image_url": {"url": final_image_ref}},
],
}
],
stream=True,
)
collected = ""
for chunk in stream:
choices = getattr(chunk, "choices", None)
if not choices:
continue
first = choices[0]
delta = getattr(first, "delta", None)
piece = getattr(delta, "content", None) if delta is not None else None
if piece:
collected += piece
yield (collected, gr.update(value="Validating…", visible=True))
if not collected:
yield ("", gr.update(value="⚠️ Empty model response.", visible=True))
else:
ok, info, normalized = validate_output_against_model(selected_fields, collected)
if ok:
msg = "✅ Output matches the model."
if normalized:
msg += "\n\nNormalized preview:\n" + normalized
yield (collected, gr.update(value=msg, visible=True))
else:
yield (collected, gr.update(value=f"❌ Output not compliant:\n{info}", visible=True))
return
except Exception as e:
yield ("", gr.update(value=f"⚠️ API call error: {e}", visible=True))
return
def import_model(uploaded_file):
try:
if not uploaded_file:
return (
error_update("⚠️ No file provided."),
[],
[],
gr.update(choices=[], value=None, visible=False),
gr.update(visible=False),
ready_update_from_fields([]),
)
path = uploaded_file if isinstance(uploaded_file, str) else uploaded_file.get("path")
if not path or not os.path.exists(path):
return (
error_update("⚠️ File not found."),
[],
[],
gr.update(choices=[], value=None, visible=False),
gr.update(visible=False),
ready_update_from_fields([]),
)
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
fields = data.get("fields", []) if isinstance(data, dict) else []
# basic validation
cleaned = []
seen = set()
for item in fields:
name = str(item.get("name", "")).strip()
fmt = str(item.get("format", "")).strip()
description = str(item.get("description", ""))
options = item.get("options", []) if isinstance(item, dict) else []
unit = str(item.get("unit", ""))
if not name or len(name) > NAME_MAX_CHARS or not is_valid_ascii_identifier(name):
return (
error_update("⚠️ Invalid model: field name must match [A-Za-z][A-Za-z0-9_-]* and length limit."),
[],
[],
gr.update(choices=[], value=None, visible=False),
gr.update(visible=False),
ready_update_from_fields([]),
)
key = name.lower()
if key in seen:
return (
error_update("⚠️ Invalid model: duplicate field names."),
[],
[],
gr.update(choices=[], value=None, visible=False),
gr.update(visible=False),
ready_update_from_fields([]),
)
seen.add(key)
fmt = normalize_format_label(fmt)
if fmt not in FIELD_FORMATS:
return (
error_update("⚠️ Invalid model: unknown format."),
[],
[],
gr.update(choices=[], value=None, visible=False),
gr.update(visible=False),
ready_update_from_fields([]),
)
if len(description) > PROMPT_MAX_CHARS:
return (
error_update("⚠️ Invalid model: description too long."),
[],
[],
gr.update(choices=[], value=None, visible=False),
gr.update(visible=False),
ready_update_from_fields([]),
)
details = ""
if fmt == "multiple choice":
options = [str(c).strip() for c in (options or []) if str(c).strip()]
if len(options) < 2:
return (
error_update("⚠️ Invalid model: ‘multiple choice’ requires at least 2 choices."),
[],
[],
gr.update(choices=[], value=None, visible=False),
gr.update(visible=False),
ready_update_from_fields([]),
)
for c in options:
if not is_valid_ascii_identifier(c):
return (
error_update("⚠️ Invalid model: choices must match [A-Za-z][A-Za-z0-9_-]* with no spaces or accents."),
[],
[],
gr.update(choices=[], value=None, visible=False),
gr.update(visible=False),
ready_update_from_fields([]),
)
normalized = [c.lower() for c in options]
if len(set(normalized)) != len(options):
return (
error_update("⚠️ Invalid model: choices must be unique."),
[],
[],
gr.update(choices=[], value=None, visible=False),
gr.update(visible=False),
ready_update_from_fields([]),
)
details = "choices: " + " | ".join(options)
elif fmt == "unit":
unit = unit.strip()
if unit:
details = f"unit: {unit}"
cleaned.append({
"name": name,
"format": fmt,
"description": description,
"details": details,
"options": options if fmt == "multiple choice" else [],
"unit": unit if fmt == "unit" else "",
})
return (
gr.update(value="", visible=False),
cleaned,
fields_to_rows(cleaned),
gr.update(choices=names_from_fields(cleaned), value=None, visible=len(cleaned) > 0),
gr.update(visible=len(cleaned) > 0),
ready_update_from_fields(cleaned),
)
except Exception:
return (
error_update("⚠️ Invalid model file."),
[],
[],
gr.update(choices=[], value=None, visible=False),
gr.update(visible=False),
ready_update_from_fields([]),
)
def ready_update_from_fields(fields):
ready = len(fields or []) > 0
if ready:
return gr.update(value="✅ Model ready. You can proceed to the ‘Extraction’ tab.", visible=True)
return gr.update(visible=False)
def toggle_conditionals(field_format):
fmt = normalize_format_label(field_format)
visible_multi = (fmt == "multiple choice")
visible_unit = (fmt == "unit")
return (
# show/hide: choice input, add button, choices list, unit input, choices error
gr.update(visible=visible_multi),
gr.update(visible=visible_multi),
gr.update(visible=visible_multi),
gr.update(visible=visible_unit),
gr.update(visible=visible_multi, value=""),
)
def update_char_counter(text):
length = len(text or "")
return f"{length}/{PROMPT_MAX_CHARS}"
def add_choice(choice, current_choices):
raw = (choice or "")
normalized = raw.strip()
choices = list(current_choices or [])
existing_norm = {str(c).strip().lower() for c in choices}
if not normalized:
rows = [[c] for c in choices]
return error_update("⚠️ Enter a non-empty choice."), choices, rows, raw
if not is_valid_ascii_identifier(normalized):
rows = [[c] for c in choices]
return error_update("⚠️ Invalid choice: use ASCII letters, digits, '_' or '-'; start with a letter; no spaces or accents."), choices, rows, raw
if normalized.lower() in existing_norm:
rows = [[c] for c in choices]
return error_update("⚠️ This choice already exists."), choices, rows, raw
choices.append(normalized)
rows = [[c] for c in choices]
return gr.update(value="", visible=False), choices, rows, ""
def clear_choices_after_add(error_text, current_choices, current_rows, current_input):
# Reset only if there is no error message displayed
text = str(error_text or "").strip()
if text:
return current_choices, current_rows, current_input, gr.update()
return [], gr.update(value=[]), "", gr.update(value="", visible=False)
def build_ui():
with gr.Blocks(title="Document model builder", analytics_enabled=False) as demo:
with gr.Tabs():
with gr.TabItem("Model"):
gr.Markdown("## Step 1 — Create or load a model")
gr.Markdown(
"Use this step to define the fields to extract. "
"You can either build the model manually or import a .json file. "
"This model will be used to validate and normalize the response.")
gr.Markdown("### 1.1 Add a field")
gr.Markdown(
"- Name: must be unique and short.\n"
"- Format: text, date, number, true/false, empty, multiple choice, unit.\n"
"- Description: short extraction hint (useful examples).")
with gr.Row():
name_input = gr.Textbox(
label="Field name",
placeholder="e.g., Accident date",
info=f"Allowed: [A-Za-z][A-Za-z0-9_-]*, no spaces/accents, max {NAME_MAX_CHARS} chars",
)
fmt_input = gr.Dropdown(
choices=FIELD_FORMATS,
value="text",
label="Format",
)
desc_input = gr.Textbox(
label="Description / Prompt",
placeholder=(
"E.g., Date when the accident happened. Example: 2021-06-27"
),
lines=3,
info=f"Max {PROMPT_MAX_CHARS} characters",
)
name_live_error = gr.Markdown(visible=False)
with gr.Row():
char_counter = gr.Markdown(f"0/{PROMPT_MAX_CHARS}")
add_btn = gr.Button("Add +")
error_box = gr.Markdown(visible=False)
with gr.Row():
live_count = gr.Markdown(count_message([]))
gr.Markdown("### 1.2 Format options (shown if needed)")
with gr.Row():
choice_input = gr.Textbox(
label="Add a choice",
placeholder="e.g., yes",
visible=False,
info="Same rule as field name: [A-Za-z][A-Za-z0-9_-]*",
)
add_choice_btn = gr.Button("Add a choice", visible=False)
unit_input = gr.Textbox(
label="Unit(s)",
placeholder="e.g., €, km, %",
visible=False,
)
choices_live_error = gr.Markdown(visible=False)
choices_error = gr.Markdown(visible=False)
choices_state = gr.State([])
choices_list = gr.Dataframe(
headers=["Choices"],
value=[],
interactive=False,
visible=False,
label="Available choices",
)
gr.Markdown("### 1.3 Model fields (preview)")
fields_state = gr.State([])
table = gr.Dataframe(
headers=["Field name", "Format", "Description", "Details"],
value=[],
interactive=False,
label="Model fields",
)
gr.Markdown("### 1.4 Manage fields")
with gr.Row():
delete_dropdown = gr.Dropdown(
label="Delete a field",
choices=[],
value=None,
visible=False,
)
delete_btn = gr.Button("Delete", variant="stop", visible=False)
gr.Markdown("### 1.5 Export / Import a model")
gr.Markdown(
"- Export: generates a reusable .json file.\n"
"- Import: loads an existing .json and fills the table above.")
with gr.Row():
download_btn = gr.Button("Download model", visible=False)
model_filename = gr.Textbox(label="Filename", placeholder="e.g., claim_form.json", scale=2, visible=False)
file_out = gr.File(label="Model file", visible=False)
upload_in = gr.File(label="Upload a model (.json)")
ready_msg = gr.Markdown(visible=False)
fmt_input.change(
fn=toggle_conditionals,
inputs=[fmt_input],
outputs=[choice_input, add_choice_btn, choices_list, unit_input, choices_error],
)
desc_input.input(
fn=update_char_counter,
inputs=[desc_input],
outputs=[char_counter],
)
name_input.input(
fn=live_validate_field_name,
inputs=[name_input],
outputs=[name_live_error, add_btn],
)
add_choice_btn.click(
fn=add_choice,
inputs=[choice_input, choices_state],
outputs=[choices_error, choices_state, choices_list, choice_input],
)
choice_input.input(
fn=live_validate_choice,
inputs=[choice_input],
outputs=[choices_live_error, add_choice_btn],
)
add_btn.click(
fn=add_field,
inputs=[name_input, fmt_input, desc_input, choices_state, unit_input, fields_state],
outputs=[error_box, fields_state, table, delete_dropdown, download_btn, ready_msg],
)
add_btn.click(
fn=lambda f: visibility_updates_from_fields(f),
inputs=[fields_state],
outputs=[delete_dropdown, download_btn, delete_btn, model_filename],
)
# Après tentative d'ajout, si pas d'erreur (error_box vide), on réinitialise les choix temporaires
add_btn.click(
fn=clear_choices_after_add,
inputs=[error_box, choices_state, choices_list, choice_input],
outputs=[choices_state, choices_list, choice_input, choices_error],
)
# Compteur dynamique
add_btn.click(lambda f: count_message(f), inputs=[fields_state], outputs=[live_count])
delete_btn.click(lambda f: count_message(f), inputs=[fields_state], outputs=[live_count])
upload_in.change(lambda f: count_message(f), inputs=[fields_state], outputs=[live_count])
# Pydantic callbacks branch added after components are created below
delete_evt = delete_btn.click(
fn=delete_field,
inputs=[delete_dropdown, fields_state],
outputs=[error_box, fields_state, table, delete_dropdown, download_btn, ready_msg],
)
delete_evt.then(
lambda f: visibility_updates_from_fields(f),
inputs=[fields_state],
outputs=[delete_dropdown, download_btn, delete_btn, model_filename],
)
download_btn.click(
fn=export_model,
inputs=[fields_state, model_filename],
outputs=[file_out],
)
import_evt = upload_in.change(
fn=import_model,
inputs=[upload_in],
outputs=[error_box, fields_state, table, delete_dropdown, download_btn, ready_msg],
)
import_evt.then(lambda f: count_message(f), inputs=[fields_state], outputs=[live_count])
import_evt.then(lambda f: visibility_updates_from_fields(f), inputs=[fields_state], outputs=[delete_dropdown, download_btn, delete_btn, model_filename])
import_evt.then(lambda f: gr.update(visible=len(f or []) > 0), inputs=[fields_state], outputs=[delete_btn])
with gr.TabItem("Extract"):
gr.Markdown("## Step 2 — Extract fields from the document")
gr.Markdown(
"Follow the order: 2.1 Auth, 2.2 Model, 2.3 Image, 2.4 Extract.\n"
"Model priority: (A) .json uploaded in Extract, (B) .json uploaded in ‘Model’, (C) model built manually.")
gr.Markdown("### 2.1 Authentication (OPENROUTER_API_KEY)")
with gr.Row():
hf_token_input = gr.Textbox(label="OPENROUTER_API_KEY", type="password", placeholder="OpenRouter API key")
gr.Markdown("### 2.2 Choose the model to use")
gr.Markdown(
"- Option A: upload a .json here (priority).\n"
"- Option B: use the file imported in the ‘Model’ tab.\n"
"- Option C: use the model you built manually (table).")
with gr.Row():
model_file_input = gr.File(label="Model file (.json) — Extract (optional)")
gr.Markdown("### 2.3 Provide the document and run extraction")
with gr.Row():
img_input = gr.File(label="Document (image/PDF upload)", file_count="single", file_types=["image", ".pdf"], type="filepath")
image_url_input = gr.Textbox(label="Or image URL (images only)", placeholder="https://example.com/file.png")
extract_btn = gr.Button("Extract", variant="primary")
gr.Markdown("### 2.4 Result")
with gr.Row():
extraction_output = gr.Code(label="Result (stream)", language="json")
validation_msg = gr.Markdown(visible=False)
# Lancer l'extraction; l'ordre des entrées permet 3 cas:
# 1) modèle uploadé dans Extraction (prioritaire)
# 2) modèle uploadé dans l'onglet Modèle
# 3) modèle construit manuellement (fields_state)
extract_btn.click(
fn=run_extraction,
inputs=[model_file_input, upload_in, fields_state, img_input, image_url_input, hf_token_input],
outputs=[extraction_output, validation_msg],
concurrency_limit=2,
api_name="extract",
)
# Synchronisation des fichiers modèle entre onglets
# Quand on charge dans Extraction, répliquer vers l'onglet Modèle
model_file_input.change(lambda f: f, inputs=[model_file_input], outputs=[upload_in])
# Quand on charge dans Modèle, répliquer vers l'onglet Extraction
import_evt.then(lambda f: f, inputs=[upload_in], outputs=[model_file_input])
# Activer la file d'attente (sans paramètre déprécié)
demo.queue()
return demo
def main():
demo = build_ui()
demo.launch(mcp_server=True)
if __name__ == "__main__":
main()