import gradio as gr import json import tempfile import os import base64 import re from io import BytesIO from PIL import Image from typing import Optional from pydantic import BaseModel, Field, create_model from datetime import date from openai import OpenAI from dotenv import load_dotenv load_dotenv() FIELD_FORMATS = [ "text", "date", "number", "true/false", "empty", "multiple choice", "unit", ] NAME_MAX_CHARS = 100 PROMPT_MAX_CHARS = 300 def normalize_format_label(fmt_raw: str) -> str: mapping = { # French → English "texte": "text", "date": "date", "nombre": "number", "vrai/faux": "true/false", "vide": "empty", "choix multiple": "multiple choice", "unité": "unit", # English (idempotent) "text": "text", "number": "number", "true/false": "true/false", "empty": "empty", "multiple choice": "multiple choice", "unit": "unit", } return mapping.get(str(fmt_raw or "").strip().lower(), "text") IDENTIFIER_REGEX = re.compile(r"^[A-Za-z][A-Za-z0-9_-]{0,99}$") def is_image_url(url: str) -> bool: if not url: return False u = url.strip().lower() if not (u.startswith("http://") or u.startswith("https://")): return False # Accept common raster image extensions only allowed_exts = (".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".tiff", ".tif") # remove querystring/fragment before checking suffix base = u.split("?")[0].split("#")[0] return base.endswith(allowed_exts) def is_valid_ascii_identifier(value: str) -> bool: s = str(value or "").strip() if not s: return False if not IDENTIFIER_REGEX.match(s): return False try: s.encode("ascii") except Exception: return False return True def live_validate_field_name(name: str): msg = "" if not is_valid_ascii_identifier(name): msg = "Only ASCII letters, digits, '_' or '-' allowed; start with a letter; no spaces or accents." html = f"{msg}" if msg else "" return gr.update(value=html, visible=bool(msg)), gr.update(interactive=(msg == "")) def live_validate_choice(choice: str): msg = "" c = (choice or "").strip() if not c: msg = "Enter a non-empty choice." elif not is_valid_ascii_identifier(c): msg = "Only ASCII letters, digits, '_' or '-' allowed; start with a letter; no spaces or accents." html = f"{msg}" if msg else "" return gr.update(value=html, visible=bool(msg)), gr.update(interactive=(msg == "")) def error_update(msg: str): return gr.update(value=f"{msg}", visible=True) def fields_to_rows(fields): return [[ f["name"], f["format"], f.get("description", ""), f.get("details", ""), ] for f in fields] def names_from_fields(fields): return [str(f.get("name", "")) for f in (fields or [])] def add_field(name, field_format, description, choices_list, unit, fields): name = (name or "").strip() field_format = normalize_format_label(field_format) description = (description or "").strip() # choices_list is a list of strings when format == "multiple choice" unit = (unit or "").strip() # validations if not name: return ( error_update("⚠️ Field name is required."), (fields or []), fields_to_rows(fields or []), gr.update(choices=names_from_fields(fields or []), value=None, visible=len(fields or []) > 0), gr.update(visible=len(fields or []) > 0), ready_update_from_fields(fields or []), ) if not is_valid_ascii_identifier(name): return ( error_update("⚠️ Invalid field name: use ASCII letters, digits, '_' or '-'; start with a letter; no spaces or accents."), (fields or []), fields_to_rows(fields or []), gr.update(choices=names_from_fields(fields or []), value=None, visible=len(fields or []) > 0), gr.update(visible=len(fields or []) > 0), ready_update_from_fields(fields or []), ) # uniqueness (case-insensitive, trimmed) existing = {str(f.get("name", "")).strip().lower() for f in (fields or [])} if name.lower() in existing: return ( error_update("⚠️ This field name already exists."), (fields or []), fields_to_rows(fields or []), gr.update(choices=names_from_fields(fields or []), value=None, visible=len(fields or []) > 0), gr.update(visible=len(fields or []) > 0), ready_update_from_fields(fields or []), ) if len(name) > NAME_MAX_CHARS: return ( error_update(f"⚠️ Name too long (max {NAME_MAX_CHARS} characters)."), (fields or []), fields_to_rows(fields or []), gr.update(choices=names_from_fields(fields or []), value=None, visible=len(fields or []) > 0), gr.update(visible=len(fields or []) > 0), ready_update_from_fields(fields or []), ) if len(description) > PROMPT_MAX_CHARS: return ( error_update(f"⚠️ Description too long (max {PROMPT_MAX_CHARS} characters)."), (fields or []), fields_to_rows(fields or []), gr.update(choices=names_from_fields(fields or []), value=None, visible=len(fields or []) > 0), gr.update(visible=len(fields or []) > 0), ready_update_from_fields(fields or []), ) new_fields = list(fields or []) details = "" if field_format == "multiple choice": options = [c for c in (choices_list or []) if str(c).strip()] if len(options) < 2: return ( error_update("⚠️ For ‘multiple choice’, add at least 2 choices."), (fields or []), fields_to_rows(fields or []), gr.update(choices=names_from_fields(fields or []), value=None, visible=len(fields or []) > 0), gr.update(visible=len(fields or []) > 0), ready_update_from_fields(fields or []), ) normalized = [str(c).strip().lower() for c in options] if len(set(normalized)) != len(options): return ( error_update("⚠️ For ‘multiple choice’, choices must be unique."), (fields or []), fields_to_rows(fields or []), gr.update(choices=names_from_fields(fields or []), value=None, visible=len(fields or []) > 0), gr.update(visible=len(fields or []) > 0), ready_update_from_fields(fields or []), ) if options: details = "choices: " + " | ".join(options) elif field_format == "unit": if unit: details = f"unit: {unit}" new_fields.append({ "name": name, "format": field_format, "description": description, "details": details, "options": options if field_format == "multiple choice" else [], "unit": unit if field_format == "unit" else "", }) return ( gr.update(value="", visible=False), new_fields, fields_to_rows(new_fields), gr.update(choices=names_from_fields(new_fields), value=None, visible=len(new_fields) > 0), gr.update(visible=len(new_fields) > 0), ready_update_from_fields(new_fields), ) def delete_field(delete_name, fields): current_fields = list(fields or []) if not delete_name: return ( error_update("⚠️ Select a field to delete."), current_fields, fields_to_rows(current_fields), gr.update(choices=names_from_fields(current_fields), value=None, visible=len(current_fields) > 0), gr.update(visible=len(current_fields) > 0), ready_update_from_fields(current_fields), ) new_fields = [ f for f in current_fields if str(f.get("name", "")).strip().lower() != str(delete_name).strip().lower() ] if len(new_fields) == len(current_fields): return ( error_update("⚠️ Field not found."), current_fields, fields_to_rows(current_fields), gr.update(choices=names_from_fields(current_fields), value=None, visible=len(current_fields) > 0), gr.update(visible=len(current_fields) > 0), ready_update_from_fields(current_fields), ) return ( gr.update(value="", visible=False), new_fields, fields_to_rows(new_fields), gr.update(choices=names_from_fields(new_fields), value=None, visible=len(new_fields) > 0), gr.update(visible=len(new_fields) > 0), ready_update_from_fields(new_fields), ) def serialize_model(fields): return {"version": 1, "fields": list(fields or [])} def count_message(fields): n = len(fields or []) if n == 0: return "0 field in model" if n == 1: return "1 field in model" return f"{n} fields in model" def visibility_updates_from_fields(fields): has = len(fields or []) > 0 return ( gr.update(choices=names_from_fields(fields or []), value=None, visible=has), # delete_dropdown gr.update(visible=has), # download_btn gr.update(visible=has), # delete_btn gr.update(visible=has), # model_filename ) def sanitize_filename(name): candidate = (name or "").strip() if not candidate: return "model.json" # enlever répertoires et caractères peu sûrs candidate = candidate.replace("\\", "/").split("/")[-1] allowed = [] for ch in candidate: if ch.isalnum() or ch in ("-", "_", ".", " "): allowed.append(ch) else: allowed.append("-") candidate = "".join(allowed) if not candidate.lower().endswith(".json"): candidate += ".json" if len(candidate) > 100: candidate = candidate[:100] return candidate def export_model(fields, filename): model = serialize_model(fields) if not fields: return gr.update(visible=False) file_name = sanitize_filename(filename) temp_dir = tempfile.mkdtemp(prefix="model-") path = os.path.join(temp_dir, file_name) with open(path, "w", encoding="utf-8") as f: json.dump(model, f, ensure_ascii=False, indent=2) return gr.update(value=path, visible=True) def to_python_identifier(name: str) -> str: s = str(name or "").strip().lower() if not s: return "field" out = [] prev_underscore = False for ch in s: if ch.isalnum(): out.append(ch) prev_underscore = False else: if not prev_underscore: out.append("_") prev_underscore = True ident = "".join(out).strip("_") if not ident: ident = "field" if ident[0].isdigit(): ident = f"field_{ident}" return ident def generate_pydantic_code(fields, class_name: str = "DocumentModel") -> str: fields = list(fields or []) uses_optional = any((normalize_format_label(f.get("format")) == "empty") for f in fields) uses_literal = any((normalize_format_label(f.get("format")) == "multiple choice" and f.get("options")) for f in fields) uses_date = any((normalize_format_label(f.get("format")) == "date") for f in fields) def type_for(f): fmt = normalize_format_label(f.get("format")) options = f.get("options", []) if fmt == "text": return "str", False if fmt == "date": return "date", False if fmt == "number": return "float", False if fmt == "true/false": return "bool", False if fmt == "empty": return "Optional[str]", True if fmt == "multiple choice": if options: lits = ", ".join(repr(str(o)) for o in options) return f"Literal[{lits}]", False return "str", False if fmt == "unit": return "float", False return "str", False lines = [] lines.append("from pydantic import BaseModel, Field") if uses_optional: lines.append("from typing import Optional") if uses_literal: lines.append("from typing import Literal") if uses_date: lines.append("from datetime import date") lines.append("") lines.append(f"class {class_name}(BaseModel):") if not fields: lines.append(" pass") return "\n".join(lines) for f in fields: raw_name = f.get("name", "") ident = to_python_identifier(raw_name) typ, is_optional = type_for(f) desc = f.get("description", "") details = f.get("details", "") desc_full = desc if details == "" else (desc + " | " + details) lines.append(f" # {raw_name} ({f.get('format')})") if is_optional: lines.append(f" {ident}: {typ} = Field(None, description={desc_full!r})") else: lines.append(f" {ident}: {typ} = Field(..., description={desc_full!r})") return "\n".join(lines) def pydantic_code_update_from_fields(fields): # Conservé pour compat éventuelle mais rendu non utilisé has = len(fields or []) > 0 if not has: return gr.update(value="", visible=False) code = generate_pydantic_code(fields) return gr.update(value=code, visible=False) def export_pydantic_py(fields): if not fields: return gr.update(visible=False) code = generate_pydantic_code(fields) temp_dir = tempfile.mkdtemp(prefix="pydantic-") path = os.path.join(temp_dir, "document_model.py") with open(path, "w", encoding="utf-8") as f: f.write(code) return gr.update(value=path, visible=True) def build_pydantic_model_class(fields, class_name: str = "DocumentModel"): field_definitions = {} for f in (fields or []): raw_name = f.get("name", "") ident = to_python_identifier(raw_name) fmt = normalize_format_label(f.get("format")) desc = f.get("description", "") details = f.get("details", "") desc_full = desc if details == "" else (desc + " | " + details) options = f.get("options", []) or [] json_extra = None if fmt == "text": typ = str default = ... elif fmt == "date": typ = date default = ... elif fmt == "number": typ = float default = ... elif fmt == "true/false": typ = bool default = ... elif fmt == "empty": typ = Optional[str] default = None elif fmt == "multiple choice": typ = str default = ... if options: json_extra = {"enum": [str(o) for o in options]} elif fmt == "unit": typ = float default = ... else: typ = str default = ... if json_extra is not None: field_definitions[ident] = (typ, Field(default, description=desc_full, json_schema_extra=json_extra)) else: field_definitions[ident] = (typ, Field(default, description=desc_full)) model = create_model(class_name, **field_definitions) return model def json_schema_from_fields(fields): model = build_pydantic_model_class(fields) schema = model.model_json_schema() return json.dumps(schema, ensure_ascii=False, indent=2) def instruction_from_fields(fields): if not fields: return "" schema_json = json_schema_from_fields(fields) return ( "Extract the following information from the provided image. " "Respond only with a strictly valid JSON that conforms to this JSON Schema (no text outside JSON):\n" + schema_json ) def document_file_to_data_url_with_error(path: str): if not path or not os.path.exists(path): return "", "File not found." p = str(path).lower() if p.endswith(".pdf"): try: import fitz # PyMuPDF except Exception: return "", "PDF support requires PyMuPDF. Install with: pip install pymupdf" try: doc = fitz.open(path) if doc.page_count == 0: return "", "PDF has no pages." page = doc.load_page(0) zoom = 300.0 / 72.0 mat = fitz.Matrix(zoom, zoom) pix = page.get_pixmap(matrix=mat, alpha=False) png_bytes = pix.tobytes("png") b64 = base64.b64encode(png_bytes).decode("utf-8") return f"data:image/png;base64,{b64}", None except Exception as e: return "", f"Failed to render PDF: {e}" # Image path try: with Image.open(path) as im: im = im.convert("RGB") buf = BytesIO() im.save(buf, format="PNG", optimize=True) data = buf.getvalue() b64 = base64.b64encode(data).decode("utf-8") return f"data:image/png;base64,{b64}", None except Exception: return "", "Invalid image file." def parse_json_from_text(text: str): if text is None: return None, "Empty text" s = str(text) if "```" in s: parts = s.split("```") if len(parts) >= 3: # si bloc balisé, prendre le contenu central s = parts[1] start = s.find("{") end = s.rfind("}") if start == -1 or end == -1 or end <= start: return None, "JSON not detected" candidate = s[start:end + 1] try: return json.loads(candidate), None except Exception as e: return None, f"Invalid JSON: {e}" def validate_output_against_model(fields, text): model = build_pydantic_model_class(fields) data, err = parse_json_from_text(text) if err: return False, err, None try: instance = model.model_validate(data) normalized = json.dumps(instance.model_dump(mode="json"), ensure_ascii=False, indent=2) return True, "OK", normalized except Exception as e: try: details = getattr(e, 'errors', lambda: [])() msgs = [] for d in details[:5]: loc = ".".join(map(str, d.get('loc', []))) msg = d.get('msg', 'error') msgs.append(f"- {loc}: {msg}") extra = "\n".join(msgs) if msgs else str(e) except Exception: extra = str(e) return False, extra, None def run_extraction(model_file_extraction, model_file_modeltab, fields_state, image_path, image_url, hf_token): # Choose model source: Extraction > Model (upload) > Model (built) try: selected_fields = None # 1) File uploaded in Extraction tab if model_file_extraction: path = model_file_extraction if isinstance(model_file_extraction, str) else model_file_extraction.get("path") if path and os.path.exists(path): with open(path, "r", encoding="utf-8") as f: data = json.load(f) fields_raw = data.get("fields", []) if isinstance(data, dict) else [] cleaned = [] seen = set() for item in fields_raw: name = str(item.get("name", "")).strip() fmt = normalize_format_label(str(item.get("format", "")).strip()) description = str(item.get("description", "")) options = item.get("options", []) if isinstance(item, dict) else [] unit = str(item.get("unit", "")) if not name or len(name) > NAME_MAX_CHARS or not is_valid_ascii_identifier(name): yield ("", gr.update(value="⚠️ Invalid model: field name must be ASCII [A-Za-z][A-Za-z0-9_-]* and <= length limit.", visible=True)) return key = name.lower() if key in seen: yield ("", gr.update(value="⚠️ Invalid model: duplicate field names.", visible=True)) return seen.add(key) if fmt not in FIELD_FORMATS: yield ("", gr.update(value="⚠️ Invalid model: unknown format.", visible=True)) return if len(description) > PROMPT_MAX_CHARS: yield ("", gr.update(value="⚠️ Invalid model: description too long.", visible=True)) return details = "" if fmt == "multiple choice": options = [str(c).strip() for c in (options or []) if str(c).strip()] if len(options) < 2: yield ("", gr.update(value="⚠️ Invalid model: ‘multiple choice’ requires at least 2 choices.", visible=True)) return for c in options: if not is_valid_ascii_identifier(c): yield ("", gr.update(value="⚠️ Invalid model: choices must match [A-Za-z][A-Za-z0-9_-]* with no spaces or accents.", visible=True)) return normalized = [c.lower() for c in options] if len(set(normalized)) != len(options): yield ("", gr.update(value="⚠️ Invalid model: choices must be unique.", visible=True)) return details = "choices: " + " | ".join(options) elif fmt == "unit": unit = unit.strip() if unit: details = f"unit: {unit}" cleaned.append({ "name": name, "format": fmt, "description": description, "details": details, "options": options if fmt == "multiple choice" else [], "unit": unit if fmt == "unit" else "", }) selected_fields = cleaned else: yield ("", error_update("⚠️ Model file not found.")) return # 2) File uploaded in Model tab elif model_file_modeltab: path = model_file_modeltab if isinstance(model_file_modeltab, str) else model_file_modeltab.get("path") if path and os.path.exists(path): with open(path, "r", encoding="utf-8") as f: data = json.load(f) raw_fields = data.get("fields", []) if isinstance(data, dict) else [] # normalize formats to English for internal use selected_fields = [] for item in raw_fields: item = dict(item) item["format"] = normalize_format_label(item.get("format")) selected_fields.append(item) else: yield ("", error_update("⚠️ Model file not found.")) return # 3) Model built manually (state) else: # normalize possible legacy French formats in state selected_fields = [] for item in (fields_state or []): obj = dict(item) obj["format"] = normalize_format_label(obj.get("format")) selected_fields.append(obj) if not selected_fields: yield ("", error_update("⚠️ Model not ready.")) return except Exception: yield ("", gr.update(value="⚠️ Invalid model file.", visible=True)) return # Construit instruction et lance appel streaming, renvoie (texte acumulé, statut) instruction_text = instruction_from_fields(selected_fields) if not instruction_text: yield ("", error_update("⚠️ Model not ready.")) return # Choose image source: URL has priority over uploaded file image_url = (image_url or "").strip() if image_url: # N'accepter que des URLs d'images (pas de PDF) if not is_image_url(image_url): yield ("", error_update("⚠️ Only direct image URLs are allowed (jpg, jpeg, png, gif, webp, bmp, tiff).")) return final_image_ref = image_url else: if not image_path: yield ("", error_update("⚠️ Provide an image/PDF file or a URL.")) return data_url, err = document_file_to_data_url_with_error(image_path) if not data_url: msg = err or "Invalid document (image/PDF)." yield ("", error_update("⚠️ " + msg)) return final_image_ref = data_url try: api_key = (hf_token or "").strip() or os.getenv("OPENROUTER_API_KEY", "") client = OpenAI(base_url="https://openrouter.ai/api/v1", api_key=api_key) if not client.api_key: yield ("", gr.update(value="⚠️ Missing OPENROUTER_API_KEY environment variable.", visible=True)) return extra_headers = {} ref = os.getenv("OPENROUTER_HTTP_REFERER", "").strip() ttl = os.getenv("OPENROUTER_X_TITLE", "").strip() if ref: extra_headers["HTTP-Referer"] = ref if ttl: extra_headers["X-Title"] = ttl model_name = os.getenv("OPENROUTER_MODEL", "openai/gpt-4o") stream = client.chat.completions.create( extra_headers=extra_headers or None, model=model_name, messages=[ { "role": "user", "content": [ {"type": "text", "text": instruction_text}, {"type": "image_url", "image_url": {"url": final_image_ref}}, ], } ], stream=True, ) collected = "" for chunk in stream: choices = getattr(chunk, "choices", None) if not choices: continue first = choices[0] delta = getattr(first, "delta", None) piece = getattr(delta, "content", None) if delta is not None else None if piece: collected += piece yield (collected, gr.update(value="Validating…", visible=True)) if not collected: yield ("", gr.update(value="⚠️ Empty model response.", visible=True)) else: ok, info, normalized = validate_output_against_model(selected_fields, collected) if ok: msg = "✅ Output matches the model." if normalized: msg += "\n\nNormalized preview:\n" + normalized yield (collected, gr.update(value=msg, visible=True)) else: yield (collected, gr.update(value=f"❌ Output not compliant:\n{info}", visible=True)) return except Exception as e: yield ("", gr.update(value=f"⚠️ API call error: {e}", visible=True)) return def import_model(uploaded_file): try: if not uploaded_file: return ( error_update("⚠️ No file provided."), [], [], gr.update(choices=[], value=None, visible=False), gr.update(visible=False), ready_update_from_fields([]), ) path = uploaded_file if isinstance(uploaded_file, str) else uploaded_file.get("path") if not path or not os.path.exists(path): return ( error_update("⚠️ File not found."), [], [], gr.update(choices=[], value=None, visible=False), gr.update(visible=False), ready_update_from_fields([]), ) with open(path, "r", encoding="utf-8") as f: data = json.load(f) fields = data.get("fields", []) if isinstance(data, dict) else [] # basic validation cleaned = [] seen = set() for item in fields: name = str(item.get("name", "")).strip() fmt = str(item.get("format", "")).strip() description = str(item.get("description", "")) options = item.get("options", []) if isinstance(item, dict) else [] unit = str(item.get("unit", "")) if not name or len(name) > NAME_MAX_CHARS or not is_valid_ascii_identifier(name): return ( error_update("⚠️ Invalid model: field name must match [A-Za-z][A-Za-z0-9_-]* and length limit."), [], [], gr.update(choices=[], value=None, visible=False), gr.update(visible=False), ready_update_from_fields([]), ) key = name.lower() if key in seen: return ( error_update("⚠️ Invalid model: duplicate field names."), [], [], gr.update(choices=[], value=None, visible=False), gr.update(visible=False), ready_update_from_fields([]), ) seen.add(key) fmt = normalize_format_label(fmt) if fmt not in FIELD_FORMATS: return ( error_update("⚠️ Invalid model: unknown format."), [], [], gr.update(choices=[], value=None, visible=False), gr.update(visible=False), ready_update_from_fields([]), ) if len(description) > PROMPT_MAX_CHARS: return ( error_update("⚠️ Invalid model: description too long."), [], [], gr.update(choices=[], value=None, visible=False), gr.update(visible=False), ready_update_from_fields([]), ) details = "" if fmt == "multiple choice": options = [str(c).strip() for c in (options or []) if str(c).strip()] if len(options) < 2: return ( error_update("⚠️ Invalid model: ‘multiple choice’ requires at least 2 choices."), [], [], gr.update(choices=[], value=None, visible=False), gr.update(visible=False), ready_update_from_fields([]), ) for c in options: if not is_valid_ascii_identifier(c): return ( error_update("⚠️ Invalid model: choices must match [A-Za-z][A-Za-z0-9_-]* with no spaces or accents."), [], [], gr.update(choices=[], value=None, visible=False), gr.update(visible=False), ready_update_from_fields([]), ) normalized = [c.lower() for c in options] if len(set(normalized)) != len(options): return ( error_update("⚠️ Invalid model: choices must be unique."), [], [], gr.update(choices=[], value=None, visible=False), gr.update(visible=False), ready_update_from_fields([]), ) details = "choices: " + " | ".join(options) elif fmt == "unit": unit = unit.strip() if unit: details = f"unit: {unit}" cleaned.append({ "name": name, "format": fmt, "description": description, "details": details, "options": options if fmt == "multiple choice" else [], "unit": unit if fmt == "unit" else "", }) return ( gr.update(value="", visible=False), cleaned, fields_to_rows(cleaned), gr.update(choices=names_from_fields(cleaned), value=None, visible=len(cleaned) > 0), gr.update(visible=len(cleaned) > 0), ready_update_from_fields(cleaned), ) except Exception: return ( error_update("⚠️ Invalid model file."), [], [], gr.update(choices=[], value=None, visible=False), gr.update(visible=False), ready_update_from_fields([]), ) def ready_update_from_fields(fields): ready = len(fields or []) > 0 if ready: return gr.update(value="✅ Model ready. You can proceed to the ‘Extraction’ tab.", visible=True) return gr.update(visible=False) def toggle_conditionals(field_format): fmt = normalize_format_label(field_format) visible_multi = (fmt == "multiple choice") visible_unit = (fmt == "unit") return ( # show/hide: choice input, add button, choices list, unit input, choices error gr.update(visible=visible_multi), gr.update(visible=visible_multi), gr.update(visible=visible_multi), gr.update(visible=visible_unit), gr.update(visible=visible_multi, value=""), ) def update_char_counter(text): length = len(text or "") return f"{length}/{PROMPT_MAX_CHARS}" def add_choice(choice, current_choices): raw = (choice or "") normalized = raw.strip() choices = list(current_choices or []) existing_norm = {str(c).strip().lower() for c in choices} if not normalized: rows = [[c] for c in choices] return error_update("⚠️ Enter a non-empty choice."), choices, rows, raw if not is_valid_ascii_identifier(normalized): rows = [[c] for c in choices] return error_update("⚠️ Invalid choice: use ASCII letters, digits, '_' or '-'; start with a letter; no spaces or accents."), choices, rows, raw if normalized.lower() in existing_norm: rows = [[c] for c in choices] return error_update("⚠️ This choice already exists."), choices, rows, raw choices.append(normalized) rows = [[c] for c in choices] return gr.update(value="", visible=False), choices, rows, "" def clear_choices_after_add(error_text, current_choices, current_rows, current_input): # Reset only if there is no error message displayed text = str(error_text or "").strip() if text: return current_choices, current_rows, current_input, gr.update() return [], gr.update(value=[]), "", gr.update(value="", visible=False) def build_ui(): with gr.Blocks(title="Document model builder", analytics_enabled=False) as demo: with gr.Tabs(): with gr.TabItem("Model"): gr.Markdown("## Step 1 — Create or load a model") gr.Markdown( "Use this step to define the fields to extract. " "You can either build the model manually or import a .json file. " "This model will be used to validate and normalize the response.") gr.Markdown("### 1.1 Add a field") gr.Markdown( "- Name: must be unique and short.\n" "- Format: text, date, number, true/false, empty, multiple choice, unit.\n" "- Description: short extraction hint (useful examples).") with gr.Row(): name_input = gr.Textbox( label="Field name", placeholder="e.g., Accident date", info=f"Allowed: [A-Za-z][A-Za-z0-9_-]*, no spaces/accents, max {NAME_MAX_CHARS} chars", ) fmt_input = gr.Dropdown( choices=FIELD_FORMATS, value="text", label="Format", ) desc_input = gr.Textbox( label="Description / Prompt", placeholder=( "E.g., Date when the accident happened. Example: 2021-06-27" ), lines=3, info=f"Max {PROMPT_MAX_CHARS} characters", ) name_live_error = gr.Markdown(visible=False) with gr.Row(): char_counter = gr.Markdown(f"0/{PROMPT_MAX_CHARS}") add_btn = gr.Button("Add +") error_box = gr.Markdown(visible=False) with gr.Row(): live_count = gr.Markdown(count_message([])) gr.Markdown("### 1.2 Format options (shown if needed)") with gr.Row(): choice_input = gr.Textbox( label="Add a choice", placeholder="e.g., yes", visible=False, info="Same rule as field name: [A-Za-z][A-Za-z0-9_-]*", ) add_choice_btn = gr.Button("Add a choice", visible=False) unit_input = gr.Textbox( label="Unit(s)", placeholder="e.g., €, km, %", visible=False, ) choices_live_error = gr.Markdown(visible=False) choices_error = gr.Markdown(visible=False) choices_state = gr.State([]) choices_list = gr.Dataframe( headers=["Choices"], value=[], interactive=False, visible=False, label="Available choices", ) gr.Markdown("### 1.3 Model fields (preview)") fields_state = gr.State([]) table = gr.Dataframe( headers=["Field name", "Format", "Description", "Details"], value=[], interactive=False, label="Model fields", ) gr.Markdown("### 1.4 Manage fields") with gr.Row(): delete_dropdown = gr.Dropdown( label="Delete a field", choices=[], value=None, visible=False, ) delete_btn = gr.Button("Delete", variant="stop", visible=False) gr.Markdown("### 1.5 Export / Import a model") gr.Markdown( "- Export: generates a reusable .json file.\n" "- Import: loads an existing .json and fills the table above.") with gr.Row(): download_btn = gr.Button("Download model", visible=False) model_filename = gr.Textbox(label="Filename", placeholder="e.g., claim_form.json", scale=2, visible=False) file_out = gr.File(label="Model file", visible=False) upload_in = gr.File(label="Upload a model (.json)") ready_msg = gr.Markdown(visible=False) fmt_input.change( fn=toggle_conditionals, inputs=[fmt_input], outputs=[choice_input, add_choice_btn, choices_list, unit_input, choices_error], ) desc_input.input( fn=update_char_counter, inputs=[desc_input], outputs=[char_counter], ) name_input.input( fn=live_validate_field_name, inputs=[name_input], outputs=[name_live_error, add_btn], ) add_choice_btn.click( fn=add_choice, inputs=[choice_input, choices_state], outputs=[choices_error, choices_state, choices_list, choice_input], ) choice_input.input( fn=live_validate_choice, inputs=[choice_input], outputs=[choices_live_error, add_choice_btn], ) add_btn.click( fn=add_field, inputs=[name_input, fmt_input, desc_input, choices_state, unit_input, fields_state], outputs=[error_box, fields_state, table, delete_dropdown, download_btn, ready_msg], ) add_btn.click( fn=lambda f: visibility_updates_from_fields(f), inputs=[fields_state], outputs=[delete_dropdown, download_btn, delete_btn, model_filename], ) # Après tentative d'ajout, si pas d'erreur (error_box vide), on réinitialise les choix temporaires add_btn.click( fn=clear_choices_after_add, inputs=[error_box, choices_state, choices_list, choice_input], outputs=[choices_state, choices_list, choice_input, choices_error], ) # Compteur dynamique add_btn.click(lambda f: count_message(f), inputs=[fields_state], outputs=[live_count]) delete_btn.click(lambda f: count_message(f), inputs=[fields_state], outputs=[live_count]) upload_in.change(lambda f: count_message(f), inputs=[fields_state], outputs=[live_count]) # Pydantic callbacks branch added after components are created below delete_evt = delete_btn.click( fn=delete_field, inputs=[delete_dropdown, fields_state], outputs=[error_box, fields_state, table, delete_dropdown, download_btn, ready_msg], ) delete_evt.then( lambda f: visibility_updates_from_fields(f), inputs=[fields_state], outputs=[delete_dropdown, download_btn, delete_btn, model_filename], ) download_btn.click( fn=export_model, inputs=[fields_state, model_filename], outputs=[file_out], ) import_evt = upload_in.change( fn=import_model, inputs=[upload_in], outputs=[error_box, fields_state, table, delete_dropdown, download_btn, ready_msg], ) import_evt.then(lambda f: count_message(f), inputs=[fields_state], outputs=[live_count]) import_evt.then(lambda f: visibility_updates_from_fields(f), inputs=[fields_state], outputs=[delete_dropdown, download_btn, delete_btn, model_filename]) import_evt.then(lambda f: gr.update(visible=len(f or []) > 0), inputs=[fields_state], outputs=[delete_btn]) with gr.TabItem("Extract"): gr.Markdown("## Step 2 — Extract fields from the document") gr.Markdown( "Follow the order: 2.1 Auth, 2.2 Model, 2.3 Image, 2.4 Extract.\n" "Model priority: (A) .json uploaded in Extract, (B) .json uploaded in ‘Model’, (C) model built manually.") gr.Markdown("### 2.1 Authentication (OPENROUTER_API_KEY)") with gr.Row(): hf_token_input = gr.Textbox(label="OPENROUTER_API_KEY", type="password", placeholder="OpenRouter API key") gr.Markdown("### 2.2 Choose the model to use") gr.Markdown( "- Option A: upload a .json here (priority).\n" "- Option B: use the file imported in the ‘Model’ tab.\n" "- Option C: use the model you built manually (table).") with gr.Row(): model_file_input = gr.File(label="Model file (.json) — Extract (optional)") gr.Markdown("### 2.3 Provide the document and run extraction") with gr.Row(): img_input = gr.File(label="Document (image/PDF upload)", file_count="single", file_types=["image", ".pdf"], type="filepath") image_url_input = gr.Textbox(label="Or image URL (images only)", placeholder="https://example.com/file.png") extract_btn = gr.Button("Extract", variant="primary") gr.Markdown("### 2.4 Result") with gr.Row(): extraction_output = gr.Code(label="Result (stream)", language="json") validation_msg = gr.Markdown(visible=False) # Lancer l'extraction; l'ordre des entrées permet 3 cas: # 1) modèle uploadé dans Extraction (prioritaire) # 2) modèle uploadé dans l'onglet Modèle # 3) modèle construit manuellement (fields_state) extract_btn.click( fn=run_extraction, inputs=[model_file_input, upload_in, fields_state, img_input, image_url_input, hf_token_input], outputs=[extraction_output, validation_msg], concurrency_limit=2, api_name="extract", ) # Synchronisation des fichiers modèle entre onglets # Quand on charge dans Extraction, répliquer vers l'onglet Modèle model_file_input.change(lambda f: f, inputs=[model_file_input], outputs=[upload_in]) # Quand on charge dans Modèle, répliquer vers l'onglet Extraction import_evt.then(lambda f: f, inputs=[upload_in], outputs=[model_file_input]) # Activer la file d'attente (sans paramètre déprécié) demo.queue() return demo def main(): demo = build_ui() demo.launch(mcp_server=True) if __name__ == "__main__": main()