Spaces:
Running
Running
| import gradio as gr | |
| import json | |
| import tempfile | |
| import os | |
| import base64 | |
| import re | |
| from io import BytesIO | |
| from PIL import Image | |
| from typing import Optional | |
| from pydantic import BaseModel, Field, create_model | |
| from datetime import date | |
| from openai import OpenAI | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| FIELD_FORMATS = [ | |
| "text", | |
| "date", | |
| "number", | |
| "true/false", | |
| "empty", | |
| "multiple choice", | |
| "unit", | |
| ] | |
| NAME_MAX_CHARS = 100 | |
| PROMPT_MAX_CHARS = 300 | |
| def normalize_format_label(fmt_raw: str) -> str: | |
| mapping = { | |
| # French → English | |
| "texte": "text", | |
| "date": "date", | |
| "nombre": "number", | |
| "vrai/faux": "true/false", | |
| "vide": "empty", | |
| "choix multiple": "multiple choice", | |
| "unité": "unit", | |
| # English (idempotent) | |
| "text": "text", | |
| "number": "number", | |
| "true/false": "true/false", | |
| "empty": "empty", | |
| "multiple choice": "multiple choice", | |
| "unit": "unit", | |
| } | |
| return mapping.get(str(fmt_raw or "").strip().lower(), "text") | |
| IDENTIFIER_REGEX = re.compile(r"^[A-Za-z][A-Za-z0-9_-]{0,99}$") | |
| def is_image_url(url: str) -> bool: | |
| if not url: | |
| return False | |
| u = url.strip().lower() | |
| if not (u.startswith("http://") or u.startswith("https://")): | |
| return False | |
| # Accept common raster image extensions only | |
| allowed_exts = (".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".tiff", ".tif") | |
| # remove querystring/fragment before checking suffix | |
| base = u.split("?")[0].split("#")[0] | |
| return base.endswith(allowed_exts) | |
| def is_valid_ascii_identifier(value: str) -> bool: | |
| s = str(value or "").strip() | |
| if not s: | |
| return False | |
| if not IDENTIFIER_REGEX.match(s): | |
| return False | |
| try: | |
| s.encode("ascii") | |
| except Exception: | |
| return False | |
| return True | |
| def live_validate_field_name(name: str): | |
| msg = "" | |
| if not is_valid_ascii_identifier(name): | |
| msg = "Only ASCII letters, digits, '_' or '-' allowed; start with a letter; no spaces or accents." | |
| html = f"<span style='color:#dc2626;font-weight:600'>{msg}</span>" if msg else "" | |
| return gr.update(value=html, visible=bool(msg)), gr.update(interactive=(msg == "")) | |
| def live_validate_choice(choice: str): | |
| msg = "" | |
| c = (choice or "").strip() | |
| if not c: | |
| msg = "Enter a non-empty choice." | |
| elif not is_valid_ascii_identifier(c): | |
| msg = "Only ASCII letters, digits, '_' or '-' allowed; start with a letter; no spaces or accents." | |
| html = f"<span style='color:#dc2626;font-weight:600'>{msg}</span>" if msg else "" | |
| return gr.update(value=html, visible=bool(msg)), gr.update(interactive=(msg == "")) | |
| def error_update(msg: str): | |
| return gr.update(value=f"<span style='color:#dc2626;font-weight:600'>{msg}</span>", visible=True) | |
| def fields_to_rows(fields): | |
| return [[ | |
| f["name"], | |
| f["format"], | |
| f.get("description", ""), | |
| f.get("details", ""), | |
| ] for f in fields] | |
| def names_from_fields(fields): | |
| return [str(f.get("name", "")) for f in (fields or [])] | |
| def add_field(name, field_format, description, choices_list, unit, fields): | |
| name = (name or "").strip() | |
| field_format = normalize_format_label(field_format) | |
| description = (description or "").strip() | |
| # choices_list is a list of strings when format == "multiple choice" | |
| unit = (unit or "").strip() | |
| # validations | |
| if not name: | |
| return ( | |
| error_update("⚠️ Field name is required."), | |
| (fields or []), | |
| fields_to_rows(fields or []), | |
| gr.update(choices=names_from_fields(fields or []), value=None, visible=len(fields or []) > 0), | |
| gr.update(visible=len(fields or []) > 0), | |
| ready_update_from_fields(fields or []), | |
| ) | |
| if not is_valid_ascii_identifier(name): | |
| return ( | |
| error_update("⚠️ Invalid field name: use ASCII letters, digits, '_' or '-'; start with a letter; no spaces or accents."), | |
| (fields or []), | |
| fields_to_rows(fields or []), | |
| gr.update(choices=names_from_fields(fields or []), value=None, visible=len(fields or []) > 0), | |
| gr.update(visible=len(fields or []) > 0), | |
| ready_update_from_fields(fields or []), | |
| ) | |
| # uniqueness (case-insensitive, trimmed) | |
| existing = {str(f.get("name", "")).strip().lower() for f in (fields or [])} | |
| if name.lower() in existing: | |
| return ( | |
| error_update("⚠️ This field name already exists."), | |
| (fields or []), | |
| fields_to_rows(fields or []), | |
| gr.update(choices=names_from_fields(fields or []), value=None, visible=len(fields or []) > 0), | |
| gr.update(visible=len(fields or []) > 0), | |
| ready_update_from_fields(fields or []), | |
| ) | |
| if len(name) > NAME_MAX_CHARS: | |
| return ( | |
| error_update(f"⚠️ Name too long (max {NAME_MAX_CHARS} characters)."), | |
| (fields or []), | |
| fields_to_rows(fields or []), | |
| gr.update(choices=names_from_fields(fields or []), value=None, visible=len(fields or []) > 0), | |
| gr.update(visible=len(fields or []) > 0), | |
| ready_update_from_fields(fields or []), | |
| ) | |
| if len(description) > PROMPT_MAX_CHARS: | |
| return ( | |
| error_update(f"⚠️ Description too long (max {PROMPT_MAX_CHARS} characters)."), | |
| (fields or []), | |
| fields_to_rows(fields or []), | |
| gr.update(choices=names_from_fields(fields or []), value=None, visible=len(fields or []) > 0), | |
| gr.update(visible=len(fields or []) > 0), | |
| ready_update_from_fields(fields or []), | |
| ) | |
| new_fields = list(fields or []) | |
| details = "" | |
| if field_format == "multiple choice": | |
| options = [c for c in (choices_list or []) if str(c).strip()] | |
| if len(options) < 2: | |
| return ( | |
| error_update("⚠️ For ‘multiple choice’, add at least 2 choices."), | |
| (fields or []), | |
| fields_to_rows(fields or []), | |
| gr.update(choices=names_from_fields(fields or []), value=None, visible=len(fields or []) > 0), | |
| gr.update(visible=len(fields or []) > 0), | |
| ready_update_from_fields(fields or []), | |
| ) | |
| normalized = [str(c).strip().lower() for c in options] | |
| if len(set(normalized)) != len(options): | |
| return ( | |
| error_update("⚠️ For ‘multiple choice’, choices must be unique."), | |
| (fields or []), | |
| fields_to_rows(fields or []), | |
| gr.update(choices=names_from_fields(fields or []), value=None, visible=len(fields or []) > 0), | |
| gr.update(visible=len(fields or []) > 0), | |
| ready_update_from_fields(fields or []), | |
| ) | |
| if options: | |
| details = "choices: " + " | ".join(options) | |
| elif field_format == "unit": | |
| if unit: | |
| details = f"unit: {unit}" | |
| new_fields.append({ | |
| "name": name, | |
| "format": field_format, | |
| "description": description, | |
| "details": details, | |
| "options": options if field_format == "multiple choice" else [], | |
| "unit": unit if field_format == "unit" else "", | |
| }) | |
| return ( | |
| gr.update(value="", visible=False), | |
| new_fields, | |
| fields_to_rows(new_fields), | |
| gr.update(choices=names_from_fields(new_fields), value=None, visible=len(new_fields) > 0), | |
| gr.update(visible=len(new_fields) > 0), | |
| ready_update_from_fields(new_fields), | |
| ) | |
| def delete_field(delete_name, fields): | |
| current_fields = list(fields or []) | |
| if not delete_name: | |
| return ( | |
| error_update("⚠️ Select a field to delete."), | |
| current_fields, | |
| fields_to_rows(current_fields), | |
| gr.update(choices=names_from_fields(current_fields), value=None, visible=len(current_fields) > 0), | |
| gr.update(visible=len(current_fields) > 0), | |
| ready_update_from_fields(current_fields), | |
| ) | |
| new_fields = [ | |
| f for f in current_fields | |
| if str(f.get("name", "")).strip().lower() != str(delete_name).strip().lower() | |
| ] | |
| if len(new_fields) == len(current_fields): | |
| return ( | |
| error_update("⚠️ Field not found."), | |
| current_fields, | |
| fields_to_rows(current_fields), | |
| gr.update(choices=names_from_fields(current_fields), value=None, visible=len(current_fields) > 0), | |
| gr.update(visible=len(current_fields) > 0), | |
| ready_update_from_fields(current_fields), | |
| ) | |
| return ( | |
| gr.update(value="", visible=False), | |
| new_fields, | |
| fields_to_rows(new_fields), | |
| gr.update(choices=names_from_fields(new_fields), value=None, visible=len(new_fields) > 0), | |
| gr.update(visible=len(new_fields) > 0), | |
| ready_update_from_fields(new_fields), | |
| ) | |
| def serialize_model(fields): | |
| return {"version": 1, "fields": list(fields or [])} | |
| def count_message(fields): | |
| n = len(fields or []) | |
| if n == 0: | |
| return "0 field in model" | |
| if n == 1: | |
| return "1 field in model" | |
| return f"{n} fields in model" | |
| def visibility_updates_from_fields(fields): | |
| has = len(fields or []) > 0 | |
| return ( | |
| gr.update(choices=names_from_fields(fields or []), value=None, visible=has), # delete_dropdown | |
| gr.update(visible=has), # download_btn | |
| gr.update(visible=has), # delete_btn | |
| gr.update(visible=has), # model_filename | |
| ) | |
| def sanitize_filename(name): | |
| candidate = (name or "").strip() | |
| if not candidate: | |
| return "model.json" | |
| # enlever répertoires et caractères peu sûrs | |
| candidate = candidate.replace("\\", "/").split("/")[-1] | |
| allowed = [] | |
| for ch in candidate: | |
| if ch.isalnum() or ch in ("-", "_", ".", " "): | |
| allowed.append(ch) | |
| else: | |
| allowed.append("-") | |
| candidate = "".join(allowed) | |
| if not candidate.lower().endswith(".json"): | |
| candidate += ".json" | |
| if len(candidate) > 100: | |
| candidate = candidate[:100] | |
| return candidate | |
| def export_model(fields, filename): | |
| model = serialize_model(fields) | |
| if not fields: | |
| return gr.update(visible=False) | |
| file_name = sanitize_filename(filename) | |
| temp_dir = tempfile.mkdtemp(prefix="model-") | |
| path = os.path.join(temp_dir, file_name) | |
| with open(path, "w", encoding="utf-8") as f: | |
| json.dump(model, f, ensure_ascii=False, indent=2) | |
| return gr.update(value=path, visible=True) | |
| def to_python_identifier(name: str) -> str: | |
| s = str(name or "").strip().lower() | |
| if not s: | |
| return "field" | |
| out = [] | |
| prev_underscore = False | |
| for ch in s: | |
| if ch.isalnum(): | |
| out.append(ch) | |
| prev_underscore = False | |
| else: | |
| if not prev_underscore: | |
| out.append("_") | |
| prev_underscore = True | |
| ident = "".join(out).strip("_") | |
| if not ident: | |
| ident = "field" | |
| if ident[0].isdigit(): | |
| ident = f"field_{ident}" | |
| return ident | |
| def generate_pydantic_code(fields, class_name: str = "DocumentModel") -> str: | |
| fields = list(fields or []) | |
| uses_optional = any((normalize_format_label(f.get("format")) == "empty") for f in fields) | |
| uses_literal = any((normalize_format_label(f.get("format")) == "multiple choice" and f.get("options")) for f in fields) | |
| uses_date = any((normalize_format_label(f.get("format")) == "date") for f in fields) | |
| def type_for(f): | |
| fmt = normalize_format_label(f.get("format")) | |
| options = f.get("options", []) | |
| if fmt == "text": | |
| return "str", False | |
| if fmt == "date": | |
| return "date", False | |
| if fmt == "number": | |
| return "float", False | |
| if fmt == "true/false": | |
| return "bool", False | |
| if fmt == "empty": | |
| return "Optional[str]", True | |
| if fmt == "multiple choice": | |
| if options: | |
| lits = ", ".join(repr(str(o)) for o in options) | |
| return f"Literal[{lits}]", False | |
| return "str", False | |
| if fmt == "unit": | |
| return "float", False | |
| return "str", False | |
| lines = [] | |
| lines.append("from pydantic import BaseModel, Field") | |
| if uses_optional: | |
| lines.append("from typing import Optional") | |
| if uses_literal: | |
| lines.append("from typing import Literal") | |
| if uses_date: | |
| lines.append("from datetime import date") | |
| lines.append("") | |
| lines.append(f"class {class_name}(BaseModel):") | |
| if not fields: | |
| lines.append(" pass") | |
| return "\n".join(lines) | |
| for f in fields: | |
| raw_name = f.get("name", "") | |
| ident = to_python_identifier(raw_name) | |
| typ, is_optional = type_for(f) | |
| desc = f.get("description", "") | |
| details = f.get("details", "") | |
| desc_full = desc if details == "" else (desc + " | " + details) | |
| lines.append(f" # {raw_name} ({f.get('format')})") | |
| if is_optional: | |
| lines.append(f" {ident}: {typ} = Field(None, description={desc_full!r})") | |
| else: | |
| lines.append(f" {ident}: {typ} = Field(..., description={desc_full!r})") | |
| return "\n".join(lines) | |
| def pydantic_code_update_from_fields(fields): | |
| # Conservé pour compat éventuelle mais rendu non utilisé | |
| has = len(fields or []) > 0 | |
| if not has: | |
| return gr.update(value="", visible=False) | |
| code = generate_pydantic_code(fields) | |
| return gr.update(value=code, visible=False) | |
| def export_pydantic_py(fields): | |
| if not fields: | |
| return gr.update(visible=False) | |
| code = generate_pydantic_code(fields) | |
| temp_dir = tempfile.mkdtemp(prefix="pydantic-") | |
| path = os.path.join(temp_dir, "document_model.py") | |
| with open(path, "w", encoding="utf-8") as f: | |
| f.write(code) | |
| return gr.update(value=path, visible=True) | |
| def build_pydantic_model_class(fields, class_name: str = "DocumentModel"): | |
| field_definitions = {} | |
| for f in (fields or []): | |
| raw_name = f.get("name", "") | |
| ident = to_python_identifier(raw_name) | |
| fmt = normalize_format_label(f.get("format")) | |
| desc = f.get("description", "") | |
| details = f.get("details", "") | |
| desc_full = desc if details == "" else (desc + " | " + details) | |
| options = f.get("options", []) or [] | |
| json_extra = None | |
| if fmt == "text": | |
| typ = str | |
| default = ... | |
| elif fmt == "date": | |
| typ = date | |
| default = ... | |
| elif fmt == "number": | |
| typ = float | |
| default = ... | |
| elif fmt == "true/false": | |
| typ = bool | |
| default = ... | |
| elif fmt == "empty": | |
| typ = Optional[str] | |
| default = None | |
| elif fmt == "multiple choice": | |
| typ = str | |
| default = ... | |
| if options: | |
| json_extra = {"enum": [str(o) for o in options]} | |
| elif fmt == "unit": | |
| typ = float | |
| default = ... | |
| else: | |
| typ = str | |
| default = ... | |
| if json_extra is not None: | |
| field_definitions[ident] = (typ, Field(default, description=desc_full, json_schema_extra=json_extra)) | |
| else: | |
| field_definitions[ident] = (typ, Field(default, description=desc_full)) | |
| model = create_model(class_name, **field_definitions) | |
| return model | |
| def json_schema_from_fields(fields): | |
| model = build_pydantic_model_class(fields) | |
| schema = model.model_json_schema() | |
| return json.dumps(schema, ensure_ascii=False, indent=2) | |
| def instruction_from_fields(fields): | |
| if not fields: | |
| return "" | |
| schema_json = json_schema_from_fields(fields) | |
| return ( | |
| "Extract the following information from the provided image. " | |
| "Respond only with a strictly valid JSON that conforms to this JSON Schema (no text outside JSON):\n" | |
| + schema_json | |
| ) | |
| def document_file_to_data_url_with_error(path: str): | |
| if not path or not os.path.exists(path): | |
| return "", "File not found." | |
| p = str(path).lower() | |
| if p.endswith(".pdf"): | |
| try: | |
| import fitz # PyMuPDF | |
| except Exception: | |
| return "", "PDF support requires PyMuPDF. Install with: pip install pymupdf" | |
| try: | |
| doc = fitz.open(path) | |
| if doc.page_count == 0: | |
| return "", "PDF has no pages." | |
| page = doc.load_page(0) | |
| zoom = 300.0 / 72.0 | |
| mat = fitz.Matrix(zoom, zoom) | |
| pix = page.get_pixmap(matrix=mat, alpha=False) | |
| png_bytes = pix.tobytes("png") | |
| b64 = base64.b64encode(png_bytes).decode("utf-8") | |
| return f"data:image/png;base64,{b64}", None | |
| except Exception as e: | |
| return "", f"Failed to render PDF: {e}" | |
| # Image path | |
| try: | |
| with Image.open(path) as im: | |
| im = im.convert("RGB") | |
| buf = BytesIO() | |
| im.save(buf, format="PNG", optimize=True) | |
| data = buf.getvalue() | |
| b64 = base64.b64encode(data).decode("utf-8") | |
| return f"data:image/png;base64,{b64}", None | |
| except Exception: | |
| return "", "Invalid image file." | |
| def parse_json_from_text(text: str): | |
| if text is None: | |
| return None, "Empty text" | |
| s = str(text) | |
| if "```" in s: | |
| parts = s.split("```") | |
| if len(parts) >= 3: | |
| # si bloc balisé, prendre le contenu central | |
| s = parts[1] | |
| start = s.find("{") | |
| end = s.rfind("}") | |
| if start == -1 or end == -1 or end <= start: | |
| return None, "JSON not detected" | |
| candidate = s[start:end + 1] | |
| try: | |
| return json.loads(candidate), None | |
| except Exception as e: | |
| return None, f"Invalid JSON: {e}" | |
| def validate_output_against_model(fields, text): | |
| model = build_pydantic_model_class(fields) | |
| data, err = parse_json_from_text(text) | |
| if err: | |
| return False, err, None | |
| try: | |
| instance = model.model_validate(data) | |
| normalized = json.dumps(instance.model_dump(mode="json"), ensure_ascii=False, indent=2) | |
| return True, "OK", normalized | |
| except Exception as e: | |
| try: | |
| details = getattr(e, 'errors', lambda: [])() | |
| msgs = [] | |
| for d in details[:5]: | |
| loc = ".".join(map(str, d.get('loc', []))) | |
| msg = d.get('msg', 'error') | |
| msgs.append(f"- {loc}: {msg}") | |
| extra = "\n".join(msgs) if msgs else str(e) | |
| except Exception: | |
| extra = str(e) | |
| return False, extra, None | |
| def run_extraction(model_file_extraction, model_file_modeltab, fields_state, image_path, image_url, hf_token): | |
| # Choose model source: Extraction > Model (upload) > Model (built) | |
| try: | |
| selected_fields = None | |
| # 1) File uploaded in Extraction tab | |
| if model_file_extraction: | |
| path = model_file_extraction if isinstance(model_file_extraction, str) else model_file_extraction.get("path") | |
| if path and os.path.exists(path): | |
| with open(path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| fields_raw = data.get("fields", []) if isinstance(data, dict) else [] | |
| cleaned = [] | |
| seen = set() | |
| for item in fields_raw: | |
| name = str(item.get("name", "")).strip() | |
| fmt = normalize_format_label(str(item.get("format", "")).strip()) | |
| description = str(item.get("description", "")) | |
| options = item.get("options", []) if isinstance(item, dict) else [] | |
| unit = str(item.get("unit", "")) | |
| if not name or len(name) > NAME_MAX_CHARS or not is_valid_ascii_identifier(name): | |
| yield ("", gr.update(value="⚠️ Invalid model: field name must be ASCII [A-Za-z][A-Za-z0-9_-]* and <= length limit.", visible=True)) | |
| return | |
| key = name.lower() | |
| if key in seen: | |
| yield ("", gr.update(value="⚠️ Invalid model: duplicate field names.", visible=True)) | |
| return | |
| seen.add(key) | |
| if fmt not in FIELD_FORMATS: | |
| yield ("", gr.update(value="⚠️ Invalid model: unknown format.", visible=True)) | |
| return | |
| if len(description) > PROMPT_MAX_CHARS: | |
| yield ("", gr.update(value="⚠️ Invalid model: description too long.", visible=True)) | |
| return | |
| details = "" | |
| if fmt == "multiple choice": | |
| options = [str(c).strip() for c in (options or []) if str(c).strip()] | |
| if len(options) < 2: | |
| yield ("", gr.update(value="⚠️ Invalid model: ‘multiple choice’ requires at least 2 choices.", visible=True)) | |
| return | |
| for c in options: | |
| if not is_valid_ascii_identifier(c): | |
| yield ("", gr.update(value="⚠️ Invalid model: choices must match [A-Za-z][A-Za-z0-9_-]* with no spaces or accents.", visible=True)) | |
| return | |
| normalized = [c.lower() for c in options] | |
| if len(set(normalized)) != len(options): | |
| yield ("", gr.update(value="⚠️ Invalid model: choices must be unique.", visible=True)) | |
| return | |
| details = "choices: " + " | ".join(options) | |
| elif fmt == "unit": | |
| unit = unit.strip() | |
| if unit: | |
| details = f"unit: {unit}" | |
| cleaned.append({ | |
| "name": name, | |
| "format": fmt, | |
| "description": description, | |
| "details": details, | |
| "options": options if fmt == "multiple choice" else [], | |
| "unit": unit if fmt == "unit" else "", | |
| }) | |
| selected_fields = cleaned | |
| else: | |
| yield ("", error_update("⚠️ Model file not found.")) | |
| return | |
| # 2) File uploaded in Model tab | |
| elif model_file_modeltab: | |
| path = model_file_modeltab if isinstance(model_file_modeltab, str) else model_file_modeltab.get("path") | |
| if path and os.path.exists(path): | |
| with open(path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| raw_fields = data.get("fields", []) if isinstance(data, dict) else [] | |
| # normalize formats to English for internal use | |
| selected_fields = [] | |
| for item in raw_fields: | |
| item = dict(item) | |
| item["format"] = normalize_format_label(item.get("format")) | |
| selected_fields.append(item) | |
| else: | |
| yield ("", error_update("⚠️ Model file not found.")) | |
| return | |
| # 3) Model built manually (state) | |
| else: | |
| # normalize possible legacy French formats in state | |
| selected_fields = [] | |
| for item in (fields_state or []): | |
| obj = dict(item) | |
| obj["format"] = normalize_format_label(obj.get("format")) | |
| selected_fields.append(obj) | |
| if not selected_fields: | |
| yield ("", error_update("⚠️ Model not ready.")) | |
| return | |
| except Exception: | |
| yield ("", gr.update(value="⚠️ Invalid model file.", visible=True)) | |
| return | |
| # Construit instruction et lance appel streaming, renvoie (texte acumulé, statut) | |
| instruction_text = instruction_from_fields(selected_fields) | |
| if not instruction_text: | |
| yield ("", error_update("⚠️ Model not ready.")) | |
| return | |
| # Choose image source: URL has priority over uploaded file | |
| image_url = (image_url or "").strip() | |
| if image_url: | |
| # N'accepter que des URLs d'images (pas de PDF) | |
| if not is_image_url(image_url): | |
| yield ("", error_update("⚠️ Only direct image URLs are allowed (jpg, jpeg, png, gif, webp, bmp, tiff).")) | |
| return | |
| final_image_ref = image_url | |
| else: | |
| if not image_path: | |
| yield ("", error_update("⚠️ Provide an image/PDF file or a URL.")) | |
| return | |
| data_url, err = document_file_to_data_url_with_error(image_path) | |
| if not data_url: | |
| msg = err or "Invalid document (image/PDF)." | |
| yield ("", error_update("⚠️ " + msg)) | |
| return | |
| final_image_ref = data_url | |
| try: | |
| api_key = (hf_token or "").strip() or os.getenv("OPENROUTER_API_KEY", "") | |
| client = OpenAI(base_url="https://openrouter.ai/api/v1", api_key=api_key) | |
| if not client.api_key: | |
| yield ("", gr.update(value="⚠️ Missing OPENROUTER_API_KEY environment variable.", visible=True)) | |
| return | |
| extra_headers = {} | |
| ref = os.getenv("OPENROUTER_HTTP_REFERER", "").strip() | |
| ttl = os.getenv("OPENROUTER_X_TITLE", "").strip() | |
| if ref: | |
| extra_headers["HTTP-Referer"] = ref | |
| if ttl: | |
| extra_headers["X-Title"] = ttl | |
| model_name = os.getenv("OPENROUTER_MODEL", "openai/gpt-4o") | |
| stream = client.chat.completions.create( | |
| extra_headers=extra_headers or None, | |
| model=model_name, | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": instruction_text}, | |
| {"type": "image_url", "image_url": {"url": final_image_ref}}, | |
| ], | |
| } | |
| ], | |
| stream=True, | |
| ) | |
| collected = "" | |
| for chunk in stream: | |
| choices = getattr(chunk, "choices", None) | |
| if not choices: | |
| continue | |
| first = choices[0] | |
| delta = getattr(first, "delta", None) | |
| piece = getattr(delta, "content", None) if delta is not None else None | |
| if piece: | |
| collected += piece | |
| yield (collected, gr.update(value="Validating…", visible=True)) | |
| if not collected: | |
| yield ("", gr.update(value="⚠️ Empty model response.", visible=True)) | |
| else: | |
| ok, info, normalized = validate_output_against_model(selected_fields, collected) | |
| if ok: | |
| msg = "✅ Output matches the model." | |
| if normalized: | |
| msg += "\n\nNormalized preview:\n" + normalized | |
| yield (collected, gr.update(value=msg, visible=True)) | |
| else: | |
| yield (collected, gr.update(value=f"❌ Output not compliant:\n{info}", visible=True)) | |
| return | |
| except Exception as e: | |
| yield ("", gr.update(value=f"⚠️ API call error: {e}", visible=True)) | |
| return | |
| def import_model(uploaded_file): | |
| try: | |
| if not uploaded_file: | |
| return ( | |
| error_update("⚠️ No file provided."), | |
| [], | |
| [], | |
| gr.update(choices=[], value=None, visible=False), | |
| gr.update(visible=False), | |
| ready_update_from_fields([]), | |
| ) | |
| path = uploaded_file if isinstance(uploaded_file, str) else uploaded_file.get("path") | |
| if not path or not os.path.exists(path): | |
| return ( | |
| error_update("⚠️ File not found."), | |
| [], | |
| [], | |
| gr.update(choices=[], value=None, visible=False), | |
| gr.update(visible=False), | |
| ready_update_from_fields([]), | |
| ) | |
| with open(path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| fields = data.get("fields", []) if isinstance(data, dict) else [] | |
| # basic validation | |
| cleaned = [] | |
| seen = set() | |
| for item in fields: | |
| name = str(item.get("name", "")).strip() | |
| fmt = str(item.get("format", "")).strip() | |
| description = str(item.get("description", "")) | |
| options = item.get("options", []) if isinstance(item, dict) else [] | |
| unit = str(item.get("unit", "")) | |
| if not name or len(name) > NAME_MAX_CHARS or not is_valid_ascii_identifier(name): | |
| return ( | |
| error_update("⚠️ Invalid model: field name must match [A-Za-z][A-Za-z0-9_-]* and length limit."), | |
| [], | |
| [], | |
| gr.update(choices=[], value=None, visible=False), | |
| gr.update(visible=False), | |
| ready_update_from_fields([]), | |
| ) | |
| key = name.lower() | |
| if key in seen: | |
| return ( | |
| error_update("⚠️ Invalid model: duplicate field names."), | |
| [], | |
| [], | |
| gr.update(choices=[], value=None, visible=False), | |
| gr.update(visible=False), | |
| ready_update_from_fields([]), | |
| ) | |
| seen.add(key) | |
| fmt = normalize_format_label(fmt) | |
| if fmt not in FIELD_FORMATS: | |
| return ( | |
| error_update("⚠️ Invalid model: unknown format."), | |
| [], | |
| [], | |
| gr.update(choices=[], value=None, visible=False), | |
| gr.update(visible=False), | |
| ready_update_from_fields([]), | |
| ) | |
| if len(description) > PROMPT_MAX_CHARS: | |
| return ( | |
| error_update("⚠️ Invalid model: description too long."), | |
| [], | |
| [], | |
| gr.update(choices=[], value=None, visible=False), | |
| gr.update(visible=False), | |
| ready_update_from_fields([]), | |
| ) | |
| details = "" | |
| if fmt == "multiple choice": | |
| options = [str(c).strip() for c in (options or []) if str(c).strip()] | |
| if len(options) < 2: | |
| return ( | |
| error_update("⚠️ Invalid model: ‘multiple choice’ requires at least 2 choices."), | |
| [], | |
| [], | |
| gr.update(choices=[], value=None, visible=False), | |
| gr.update(visible=False), | |
| ready_update_from_fields([]), | |
| ) | |
| for c in options: | |
| if not is_valid_ascii_identifier(c): | |
| return ( | |
| error_update("⚠️ Invalid model: choices must match [A-Za-z][A-Za-z0-9_-]* with no spaces or accents."), | |
| [], | |
| [], | |
| gr.update(choices=[], value=None, visible=False), | |
| gr.update(visible=False), | |
| ready_update_from_fields([]), | |
| ) | |
| normalized = [c.lower() for c in options] | |
| if len(set(normalized)) != len(options): | |
| return ( | |
| error_update("⚠️ Invalid model: choices must be unique."), | |
| [], | |
| [], | |
| gr.update(choices=[], value=None, visible=False), | |
| gr.update(visible=False), | |
| ready_update_from_fields([]), | |
| ) | |
| details = "choices: " + " | ".join(options) | |
| elif fmt == "unit": | |
| unit = unit.strip() | |
| if unit: | |
| details = f"unit: {unit}" | |
| cleaned.append({ | |
| "name": name, | |
| "format": fmt, | |
| "description": description, | |
| "details": details, | |
| "options": options if fmt == "multiple choice" else [], | |
| "unit": unit if fmt == "unit" else "", | |
| }) | |
| return ( | |
| gr.update(value="", visible=False), | |
| cleaned, | |
| fields_to_rows(cleaned), | |
| gr.update(choices=names_from_fields(cleaned), value=None, visible=len(cleaned) > 0), | |
| gr.update(visible=len(cleaned) > 0), | |
| ready_update_from_fields(cleaned), | |
| ) | |
| except Exception: | |
| return ( | |
| error_update("⚠️ Invalid model file."), | |
| [], | |
| [], | |
| gr.update(choices=[], value=None, visible=False), | |
| gr.update(visible=False), | |
| ready_update_from_fields([]), | |
| ) | |
| def ready_update_from_fields(fields): | |
| ready = len(fields or []) > 0 | |
| if ready: | |
| return gr.update(value="✅ Model ready. You can proceed to the ‘Extraction’ tab.", visible=True) | |
| return gr.update(visible=False) | |
| def toggle_conditionals(field_format): | |
| fmt = normalize_format_label(field_format) | |
| visible_multi = (fmt == "multiple choice") | |
| visible_unit = (fmt == "unit") | |
| return ( | |
| # show/hide: choice input, add button, choices list, unit input, choices error | |
| gr.update(visible=visible_multi), | |
| gr.update(visible=visible_multi), | |
| gr.update(visible=visible_multi), | |
| gr.update(visible=visible_unit), | |
| gr.update(visible=visible_multi, value=""), | |
| ) | |
| def update_char_counter(text): | |
| length = len(text or "") | |
| return f"{length}/{PROMPT_MAX_CHARS}" | |
| def add_choice(choice, current_choices): | |
| raw = (choice or "") | |
| normalized = raw.strip() | |
| choices = list(current_choices or []) | |
| existing_norm = {str(c).strip().lower() for c in choices} | |
| if not normalized: | |
| rows = [[c] for c in choices] | |
| return error_update("⚠️ Enter a non-empty choice."), choices, rows, raw | |
| if not is_valid_ascii_identifier(normalized): | |
| rows = [[c] for c in choices] | |
| return error_update("⚠️ Invalid choice: use ASCII letters, digits, '_' or '-'; start with a letter; no spaces or accents."), choices, rows, raw | |
| if normalized.lower() in existing_norm: | |
| rows = [[c] for c in choices] | |
| return error_update("⚠️ This choice already exists."), choices, rows, raw | |
| choices.append(normalized) | |
| rows = [[c] for c in choices] | |
| return gr.update(value="", visible=False), choices, rows, "" | |
| def clear_choices_after_add(error_text, current_choices, current_rows, current_input): | |
| # Reset only if there is no error message displayed | |
| text = str(error_text or "").strip() | |
| if text: | |
| return current_choices, current_rows, current_input, gr.update() | |
| return [], gr.update(value=[]), "", gr.update(value="", visible=False) | |
| def build_ui(): | |
| with gr.Blocks(title="Document model builder", analytics_enabled=False) as demo: | |
| with gr.Tabs(): | |
| with gr.TabItem("Model"): | |
| gr.Markdown("## Step 1 — Create or load a model") | |
| gr.Markdown( | |
| "Use this step to define the fields to extract. " | |
| "You can either build the model manually or import a .json file. " | |
| "This model will be used to validate and normalize the response.") | |
| gr.Markdown("### 1.1 Add a field") | |
| gr.Markdown( | |
| "- Name: must be unique and short.\n" | |
| "- Format: text, date, number, true/false, empty, multiple choice, unit.\n" | |
| "- Description: short extraction hint (useful examples).") | |
| with gr.Row(): | |
| name_input = gr.Textbox( | |
| label="Field name", | |
| placeholder="e.g., Accident date", | |
| info=f"Allowed: [A-Za-z][A-Za-z0-9_-]*, no spaces/accents, max {NAME_MAX_CHARS} chars", | |
| ) | |
| fmt_input = gr.Dropdown( | |
| choices=FIELD_FORMATS, | |
| value="text", | |
| label="Format", | |
| ) | |
| desc_input = gr.Textbox( | |
| label="Description / Prompt", | |
| placeholder=( | |
| "E.g., Date when the accident happened. Example: 2021-06-27" | |
| ), | |
| lines=3, | |
| info=f"Max {PROMPT_MAX_CHARS} characters", | |
| ) | |
| name_live_error = gr.Markdown(visible=False) | |
| with gr.Row(): | |
| char_counter = gr.Markdown(f"0/{PROMPT_MAX_CHARS}") | |
| add_btn = gr.Button("Add +") | |
| error_box = gr.Markdown(visible=False) | |
| with gr.Row(): | |
| live_count = gr.Markdown(count_message([])) | |
| gr.Markdown("### 1.2 Format options (shown if needed)") | |
| with gr.Row(): | |
| choice_input = gr.Textbox( | |
| label="Add a choice", | |
| placeholder="e.g., yes", | |
| visible=False, | |
| info="Same rule as field name: [A-Za-z][A-Za-z0-9_-]*", | |
| ) | |
| add_choice_btn = gr.Button("Add a choice", visible=False) | |
| unit_input = gr.Textbox( | |
| label="Unit(s)", | |
| placeholder="e.g., €, km, %", | |
| visible=False, | |
| ) | |
| choices_live_error = gr.Markdown(visible=False) | |
| choices_error = gr.Markdown(visible=False) | |
| choices_state = gr.State([]) | |
| choices_list = gr.Dataframe( | |
| headers=["Choices"], | |
| value=[], | |
| interactive=False, | |
| visible=False, | |
| label="Available choices", | |
| ) | |
| gr.Markdown("### 1.3 Model fields (preview)") | |
| fields_state = gr.State([]) | |
| table = gr.Dataframe( | |
| headers=["Field name", "Format", "Description", "Details"], | |
| value=[], | |
| interactive=False, | |
| label="Model fields", | |
| ) | |
| gr.Markdown("### 1.4 Manage fields") | |
| with gr.Row(): | |
| delete_dropdown = gr.Dropdown( | |
| label="Delete a field", | |
| choices=[], | |
| value=None, | |
| visible=False, | |
| ) | |
| delete_btn = gr.Button("Delete", variant="stop", visible=False) | |
| gr.Markdown("### 1.5 Export / Import a model") | |
| gr.Markdown( | |
| "- Export: generates a reusable .json file.\n" | |
| "- Import: loads an existing .json and fills the table above.") | |
| with gr.Row(): | |
| download_btn = gr.Button("Download model", visible=False) | |
| model_filename = gr.Textbox(label="Filename", placeholder="e.g., claim_form.json", scale=2, visible=False) | |
| file_out = gr.File(label="Model file", visible=False) | |
| upload_in = gr.File(label="Upload a model (.json)") | |
| ready_msg = gr.Markdown(visible=False) | |
| fmt_input.change( | |
| fn=toggle_conditionals, | |
| inputs=[fmt_input], | |
| outputs=[choice_input, add_choice_btn, choices_list, unit_input, choices_error], | |
| ) | |
| desc_input.input( | |
| fn=update_char_counter, | |
| inputs=[desc_input], | |
| outputs=[char_counter], | |
| ) | |
| name_input.input( | |
| fn=live_validate_field_name, | |
| inputs=[name_input], | |
| outputs=[name_live_error, add_btn], | |
| ) | |
| add_choice_btn.click( | |
| fn=add_choice, | |
| inputs=[choice_input, choices_state], | |
| outputs=[choices_error, choices_state, choices_list, choice_input], | |
| ) | |
| choice_input.input( | |
| fn=live_validate_choice, | |
| inputs=[choice_input], | |
| outputs=[choices_live_error, add_choice_btn], | |
| ) | |
| add_btn.click( | |
| fn=add_field, | |
| inputs=[name_input, fmt_input, desc_input, choices_state, unit_input, fields_state], | |
| outputs=[error_box, fields_state, table, delete_dropdown, download_btn, ready_msg], | |
| ) | |
| add_btn.click( | |
| fn=lambda f: visibility_updates_from_fields(f), | |
| inputs=[fields_state], | |
| outputs=[delete_dropdown, download_btn, delete_btn, model_filename], | |
| ) | |
| # Après tentative d'ajout, si pas d'erreur (error_box vide), on réinitialise les choix temporaires | |
| add_btn.click( | |
| fn=clear_choices_after_add, | |
| inputs=[error_box, choices_state, choices_list, choice_input], | |
| outputs=[choices_state, choices_list, choice_input, choices_error], | |
| ) | |
| # Compteur dynamique | |
| add_btn.click(lambda f: count_message(f), inputs=[fields_state], outputs=[live_count]) | |
| delete_btn.click(lambda f: count_message(f), inputs=[fields_state], outputs=[live_count]) | |
| upload_in.change(lambda f: count_message(f), inputs=[fields_state], outputs=[live_count]) | |
| # Pydantic callbacks branch added after components are created below | |
| delete_evt = delete_btn.click( | |
| fn=delete_field, | |
| inputs=[delete_dropdown, fields_state], | |
| outputs=[error_box, fields_state, table, delete_dropdown, download_btn, ready_msg], | |
| ) | |
| delete_evt.then( | |
| lambda f: visibility_updates_from_fields(f), | |
| inputs=[fields_state], | |
| outputs=[delete_dropdown, download_btn, delete_btn, model_filename], | |
| ) | |
| download_btn.click( | |
| fn=export_model, | |
| inputs=[fields_state, model_filename], | |
| outputs=[file_out], | |
| ) | |
| import_evt = upload_in.change( | |
| fn=import_model, | |
| inputs=[upload_in], | |
| outputs=[error_box, fields_state, table, delete_dropdown, download_btn, ready_msg], | |
| ) | |
| import_evt.then(lambda f: count_message(f), inputs=[fields_state], outputs=[live_count]) | |
| import_evt.then(lambda f: visibility_updates_from_fields(f), inputs=[fields_state], outputs=[delete_dropdown, download_btn, delete_btn, model_filename]) | |
| import_evt.then(lambda f: gr.update(visible=len(f or []) > 0), inputs=[fields_state], outputs=[delete_btn]) | |
| with gr.TabItem("Extract"): | |
| gr.Markdown("## Step 2 — Extract fields from the document") | |
| gr.Markdown( | |
| "Follow the order: 2.1 Auth, 2.2 Model, 2.3 Image, 2.4 Extract.\n" | |
| "Model priority: (A) .json uploaded in Extract, (B) .json uploaded in ‘Model’, (C) model built manually.") | |
| gr.Markdown("### 2.1 Authentication (OPENROUTER_API_KEY)") | |
| with gr.Row(): | |
| hf_token_input = gr.Textbox(label="OPENROUTER_API_KEY", type="password", placeholder="OpenRouter API key") | |
| gr.Markdown("### 2.2 Choose the model to use") | |
| gr.Markdown( | |
| "- Option A: upload a .json here (priority).\n" | |
| "- Option B: use the file imported in the ‘Model’ tab.\n" | |
| "- Option C: use the model you built manually (table).") | |
| with gr.Row(): | |
| model_file_input = gr.File(label="Model file (.json) — Extract (optional)") | |
| gr.Markdown("### 2.3 Provide the document and run extraction") | |
| with gr.Row(): | |
| img_input = gr.File(label="Document (image/PDF upload)", file_count="single", file_types=["image", ".pdf"], type="filepath") | |
| image_url_input = gr.Textbox(label="Or image URL (images only)", placeholder="https://example.com/file.png") | |
| extract_btn = gr.Button("Extract", variant="primary") | |
| gr.Markdown("### 2.4 Result") | |
| with gr.Row(): | |
| extraction_output = gr.Code(label="Result (stream)", language="json") | |
| validation_msg = gr.Markdown(visible=False) | |
| # Lancer l'extraction; l'ordre des entrées permet 3 cas: | |
| # 1) modèle uploadé dans Extraction (prioritaire) | |
| # 2) modèle uploadé dans l'onglet Modèle | |
| # 3) modèle construit manuellement (fields_state) | |
| extract_btn.click( | |
| fn=run_extraction, | |
| inputs=[model_file_input, upload_in, fields_state, img_input, image_url_input, hf_token_input], | |
| outputs=[extraction_output, validation_msg], | |
| concurrency_limit=2, | |
| api_name="extract", | |
| ) | |
| # Synchronisation des fichiers modèle entre onglets | |
| # Quand on charge dans Extraction, répliquer vers l'onglet Modèle | |
| model_file_input.change(lambda f: f, inputs=[model_file_input], outputs=[upload_in]) | |
| # Quand on charge dans Modèle, répliquer vers l'onglet Extraction | |
| import_evt.then(lambda f: f, inputs=[upload_in], outputs=[model_file_input]) | |
| # Activer la file d'attente (sans paramètre déprécié) | |
| demo.queue() | |
| return demo | |
| def main(): | |
| demo = build_ui() | |
| demo.launch(mcp_server=True) | |
| if __name__ == "__main__": | |
| main() |