| |
| """ |
| Final Gradio app — robust document tagging + automated taxonomy via GPT-5 (OpenAI new client). |
| Features: |
| - Upload PDF or Image |
| - Extract text (PyMuPDF + Tesseract fallback) |
| - Chunk text, call GPT-5 to produce JSON metadata between markers <<BEGIN_JSON>><<END_JSON>> |
| - Validate JSON with jsonschema |
| - Automatic repair attempts + manual-repair (paste raw output) |
| - Detailed step-by-step logs displayed on the UI and full GPT response shown |
| - Download metadata JSON on success |
| |
| Requirements (requirements.txt): |
| gradio>=3.0 |
| PyMuPDF |
| pytesseract |
| Pillow |
| openai>=1.0.0 |
| jsonschema |
| |
| System packages (apt-packages for HF Spaces): |
| tesseract-ocr |
| poppler-utils |
| |
| Put OPENAI_API_KEY into HF Space Secrets or environment. |
| """ |
|
|
| import os |
| import json |
| import tempfile |
| import datetime |
| import re |
| from typing import List, Dict, Any |
|
|
| import gradio as gr |
| from PIL import Image |
| import fitz |
| import pytesseract |
| from jsonschema import validate as json_validate, ValidationError |
| from openai import OpenAI |
|
|
| |
| |
| |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") |
| if not OPENAI_API_KEY: |
| raise RuntimeError("OPENAI_API_KEY not found in environment. Add it to HF Space Secrets or env var.") |
|
|
| client = OpenAI(api_key=OPENAI_API_KEY) |
|
|
| LLM_MODEL = os.getenv("OPENAI_MODEL", "gpt-5") |
| MAX_COMPLETION_TOKENS = int(os.getenv("MAX_COMPLETION_TOKENS", "1500")) |
|
|
| |
| |
| |
| METADATA_SCHEMA = { |
| "type": "object", |
| "required": [ |
| "doc_id", "title", "summary", "doc_type", "source", "tags", |
| "tag_confidences", "taxonomy_path", "extracted_entities", "raw_url", "ingest_timestamp" |
| ], |
| "properties": { |
| "doc_id": {"type": "string"}, |
| "title": {"type": "string"}, |
| "summary": {"type": "string"}, |
| "doc_type": {"type": "string"}, |
| "source": {"type": "string"}, |
| "tags": {"type": "array", "items": {"type": "string"}}, |
| "tag_confidences": {"type": "object"}, |
| "taxonomy_path": {"type": "array", "items": {"type": "string"}}, |
| "extracted_entities": {"type": "object"}, |
| "raw_url": {"type": "string"}, |
| "ingest_timestamp": {"type": "string"}, |
| }, |
| "additionalProperties": True, |
| } |
|
|
| |
| |
| |
| def extract_text_from_pdf(path: str, log: List[str]) -> str: |
| log.append(f"Opening PDF: {path}") |
| try: |
| doc = fitz.open(path) |
| except Exception as e: |
| raise RuntimeError(f"Failed to open PDF: {e}") |
| texts: List[str] = [] |
| for i in range(len(doc)): |
| page = doc.load_page(i) |
| txt = page.get_text("text").strip() |
| if txt: |
| log.append(f"Page {i+1}: text extracted ({len(txt)} chars)") |
| texts.append(txt) |
| else: |
| log.append(f"Page {i+1}: no text found, performing OCR fallback") |
| pix = page.get_pixmap(dpi=200) |
| with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp: |
| pix.save(tmp.name) |
| ocr_text = pytesseract.image_to_string(Image.open(tmp.name)) |
| log.append(f"Page {i+1}: OCR extracted ({len(ocr_text)} chars)") |
| texts.append(ocr_text) |
| return "\n\n".join(texts).strip() |
|
|
|
|
| def extract_text_from_image(path: str, log: List[str]) -> str: |
| log.append(f"OCR on image: {path}") |
| img = Image.open(path).convert("RGB") |
| txt = pytesseract.image_to_string(img).strip() |
| log.append(f"OCR extracted ({len(txt)} chars)") |
| return txt |
|
|
|
|
| def chunk_text(text: str, max_chars: int = 3000) -> List[str]: |
| paragraphs = [p.strip() for p in re.split(r"\n\s*\n", text) if p.strip()] |
| chunks: List[str] = [] |
| current = "" |
| for p in paragraphs: |
| if len(current) + len(p) + 2 <= max_chars: |
| current = (current + "\n\n" + p) if current else p |
| else: |
| if current: |
| chunks.append(current) |
| current = p |
| if current: |
| chunks.append(current) |
| return chunks |
|
|
| |
| |
| |
| def save_uploaded_to_tmp(file_obj, log: List[str]): |
| log.append(f"Saving uploaded object of type {type(file_obj)}") |
| |
| if hasattr(file_obj, "read") and callable(getattr(file_obj, "read")): |
| try: |
| content = file_obj.read() |
| if isinstance(content, str): |
| content = content.encode("utf-8") |
| name = getattr(file_obj, "name", "uploaded_file") |
| suffix = os.path.splitext(name)[1] or "" |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: |
| tmp.write(content) |
| log.append(f"Saved uploaded file-like as {tmp.name}") |
| return tmp.name, os.path.basename(name) |
| except Exception as e: |
| log.append(f"file-like save failed: {e}") |
| |
| if isinstance(file_obj, dict) and "data" in file_obj and "name" in file_obj: |
| try: |
| data = file_obj["data"] |
| if isinstance(data, str): |
| data = data.encode("utf-8") |
| name = file_obj["name"] |
| suffix = os.path.splitext(name)[1] or "" |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: |
| tmp.write(data) |
| log.append(f"Saved dict-like upload as {tmp.name}") |
| return tmp.name, os.path.basename(name) |
| except Exception as e: |
| log.append(f"dict-like save failed: {e}") |
| |
| if isinstance(file_obj, str): |
| if os.path.exists(file_obj): |
| log.append(f"Upload was path string existing on disk: {file_obj}") |
| return file_obj, os.path.basename(file_obj) |
| try: |
| with open(file_obj, "rb") as f: |
| data = f.read() |
| suffix = os.path.splitext(file_obj)[1] or "" |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: |
| tmp.write(data) |
| log.append(f"Copied path-string file to {tmp.name}") |
| return tmp.name, os.path.basename(file_obj) |
| except Exception as e: |
| log.append(f"path-string handling failed: {e}") |
| |
| name = getattr(file_obj, "name", None) |
| if name and isinstance(name, str): |
| try: |
| with open(name, "rb") as f: |
| data = f.read() |
| suffix = os.path.splitext(name)[1] or "" |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: |
| tmp.write(data) |
| log.append(f"Saved file from .name attr to {tmp.name}") |
| return tmp.name, os.path.basename(name) |
| except Exception as e: |
| log.append(f".name-based save failed: {e}") |
| raise ValueError(f"Unsupported uploaded file object type: {type(file_obj)}. repr: {repr(file_obj)[:400]}") |
|
|
| |
| |
| |
| def extract_json_from_text(text: str) -> str: |
| m = re.search(r"<<BEGIN_JSON>>(.*?)<<END_JSON>>", text, re.DOTALL) |
| if m: |
| return m.group(1).strip() |
| m2 = re.search(r"\{[\s\S]*\}$", text) |
| if m2: |
| return m2.group(0) |
| m3 = re.search(r"\{[\s\S]*?\}", text) |
| if m3: |
| return m3.group(0) |
| return "" |
|
|
|
|
| def try_parse_and_validate(json_text: str) -> (bool, Dict[str, Any], str): |
| try: |
| parsed = json.loads(json_text) |
| except Exception as e: |
| return False, None, f"json.loads error: {e}" |
| try: |
| json_validate(parsed, METADATA_SCHEMA) |
| except ValidationError as e: |
| return False, parsed, f"schema validation error: {e}" |
| except Exception as e: |
| return False, parsed, f"schema validation unexpected error: {e}" |
| return True, parsed, "" |
|
|
| |
| |
| |
| def call_gpt5_for_metadata(title: str, short_text: str, top_chunks: List[str], log: List[str], max_attempts: int = 3): |
| """ |
| Robust metadata generation: |
| - Prevents tool invocation by instruction |
| - Includes example JSON |
| - Retries with explicit document_text if model returns tool-like MISSING_INPUT objects |
| - Logs full model response |
| """ |
| system_msg = ( |
| "You are an assistant that must PRODUCE a JSON metadata object for the uploaded document. " |
| "Do NOT attempt to call any external APIs or tools. Do NOT return status/error objects from other services. " |
| "Return ONLY a JSON object wrapped between <<BEGIN_JSON>> and <<END_JSON>> and nothing else." |
| ) |
|
|
| example_json = { |
| "doc_id": "example_001", |
| "title": "Example Title", |
| "summary": "Short summary of the document in 1-2 sentences.", |
| "doc_type": "architecture_comparison", |
| "source": "user_upload", |
| "tags": ["arch:docai", "topic:ocr-parsing"], |
| "tag_confidences": {"arch:docai": 0.95, "topic:ocr-parsing": 0.9}, |
| "taxonomy_path": ["Technology", "Document Processing", "OCR & Parsing"], |
| "extracted_entities": {"platforms": ["GCP", "BigQuery"], "tools": ["DocAI"]}, |
| "raw_url": "", |
| "ingest_timestamp": "2025-09-19T09:13:00+05:30" |
| } |
| example_block = "Example JSON (use this schema, but fill with values from the document):\n<<BEGIN_JSON>>\n" + json.dumps(example_json, ensure_ascii=False, indent=2) + "\n<<END_JSON>>\n\n" |
|
|
| prompt_intro = f"Document title: {title}\n\nShort document text (first ~1000 chars): {short_text}\n\nTop content chunks:\n" |
| prompt_chunks = "" |
| for i, c in enumerate(top_chunks[:6]): |
| chunk_text_clean = c[:800].replace("\n", " ") |
| prompt_chunks += f"CHUNK_{i+1}: {chunk_text_clean}\n\n" |
|
|
| prompt_end = ( |
| "Task: Produce a JSON object with EXACT keys: doc_id, title, summary, doc_type, source, tags (array of strings), " |
| "tag_confidences (map tag->float), taxonomy_path (array of strings), extracted_entities (map), raw_url, ingest_timestamp.\n" |
| "Return ONLY the JSON between <<BEGIN_JSON>> and <<END_JSON>>. Do not add any commentary." |
| ) |
|
|
| messages = [ |
| {"role": "system", "content": system_msg}, |
| {"role": "user", "content": example_block + prompt_intro + prompt_chunks + prompt_end}, |
| ] |
|
|
| last_raw = None |
|
|
| for attempt in range(1, max_attempts + 1): |
| log.append(f"OpenAI call attempt {attempt}") |
| try: |
| resp = client.chat.completions.create( |
| model=LLM_MODEL, |
| messages=messages, |
| max_completion_tokens=MAX_COMPLETION_TOKENS, |
| ) |
| except Exception as e: |
| log.append(f"OpenAI API call failed on attempt {attempt}: {e}") |
| return {"_api_error": True, "error": f"OpenAI API call failed: {e}", "log": log, "raw_response": None} |
|
|
| |
| try: |
| full_text = resp.choices[0].message["content"].strip() |
| except Exception: |
| try: |
| full_text = resp.choices[0].message.content.strip() |
| except Exception: |
| full_text = str(resp) |
| last_raw = full_text |
| log.append(f"OpenAI response received (len={len(full_text)})") |
| log.append("---- FULL MODEL RESPONSE START ----") |
| log.append(full_text) |
| log.append("---- FULL MODEL RESPONSE END ----") |
|
|
| |
| if not full_text: |
| log.append("Model returned empty response — will retry with explicit document_text provided.") |
| if attempt < max_attempts: |
| messages = [ |
| {"role": "system", "content": system_msg}, |
| {"role": "user", "content": example_block + "Providing document_text to avoid missing-input errors.\n\ndocument_text: " + short_text + "\n\n" + prompt_chunks + prompt_end} |
| ] |
| continue |
| else: |
| return {"_parsing_error": True, "raw_output": last_raw, "log": log, "raw_response": full_text} |
|
|
| |
| json_text = extract_json_from_text(full_text) |
| if not json_text: |
| |
| try: |
| maybe_obj = json.loads(full_text) |
| if isinstance(maybe_obj, dict) and any("document" in str(v).lower() or "missing_input" in str(v).lower() for v in maybe_obj.values()): |
| log.append("Model returned an error-like dict referencing 'document' or 'missing_input'. Retrying with explicit document_text.") |
| if attempt < max_attempts: |
| messages = [ |
| {"role": "system", "content": system_msg}, |
| {"role": "user", "content": example_block + "The model output looked like an error requiring a 'document_text' parameter. " |
| + "Provide the document_text here explicitly and return the metadata JSON.\n\n" |
| + "document_text: " + short_text + "\n\n" + prompt_chunks + prompt_end} |
| ] |
| continue |
| else: |
| return {"_parsing_error": True, "raw_output": last_raw, "log": log, "raw_response": full_text} |
| except Exception: |
| pass |
|
|
| log.append("No JSON found in response") |
| if attempt < max_attempts: |
| messages = [ |
| {"role": "system", "content": system_msg}, |
| {"role": "user", "content": "Previous response lacked a JSON block. Return ONLY the JSON between <<BEGIN_JSON>> and <<END_JSON>>. Use the example format."} |
| ] |
| continue |
| else: |
| return {"_parsing_error": True, "raw_output": last_raw, "log": log, "raw_response": full_text} |
|
|
| |
| ok, parsed_or_partial, parse_err = try_parse_and_validate(json_text) |
| if ok: |
| log.append("JSON parsed and validated successfully") |
| return {"metadata": parsed_or_partial, "log": log, "raw_response": full_text} |
| else: |
| log.append(f"JSON parsed but schema validation failed: {parse_err}") |
| |
| if isinstance(parsed_or_partial, dict) and parsed_or_partial.get("status") == "error" and ("MISSING_INPUT" in str(parsed_or_partial.get("error_code", "")).upper() or "document" in str(parsed_or_partial.get("message", "")).lower()): |
| log.append("Detected tool-like MISSING_INPUT response inside JSON. Retrying with explicit document_text.") |
| if attempt < max_attempts: |
| messages = [ |
| {"role": "system", "content": system_msg}, |
| {"role": "user", "content": example_block + "The previous response contained an error object asking for document_text. " |
| + "Please produce the metadata JSON now. document_text: " + short_text + "\n\n" + prompt_chunks + prompt_end} |
| ] |
| continue |
| else: |
| return {"_parsing_error": True, "raw_output": last_raw, "parsed_partial": parsed_or_partial, "parse_error": parse_err, "log": log, "raw_response": full_text} |
| if attempt < max_attempts: |
| messages = [ |
| {"role": "system", "content": system_msg}, |
| {"role": "user", "content": "Your JSON is invalid vs schema. Return corrected JSON only between markers, using the example format."} |
| ] |
| continue |
| else: |
| return {"_parsing_error": True, "raw_output": last_raw, "parsed_partial": parsed_or_partial, "parse_error": parse_err, "log": log, "raw_response": full_text} |
|
|
| return {"_parsing_error": True, "raw_output": last_raw, "log": log, "raw_response": last_raw} |
|
|
| |
| |
| |
| def repair_raw_output(raw_output: str, manual_pasted_json: str, log: List[str], max_attempts: int = 2): |
| log.append("Starting repair flow") |
| |
| if manual_pasted_json: |
| log.append("User provided manual pasted JSON — trying to parse and validate") |
| jtxt = extract_json_from_text(manual_pasted_json) or manual_pasted_json |
| ok, parsed, err = try_parse_and_validate(jtxt) |
| if ok: |
| log.append("Manual pasted JSON validated successfully") |
| return {"metadata": parsed, "log": log, "raw_response": manual_pasted_json} |
| else: |
| log.append(f"Manual pasted JSON validation failed: {err}") |
| return {"_parsing_error": True, "raw_output": manual_pasted_json, "parsed_partial": parsed, "parse_error": err, "log": log} |
|
|
| |
| system_msg = ( |
| "You are an assistant that must extract and/or correct a malformed JSON from the user's raw_output. " |
| "Return ONLY a corrected JSON object wrapped between <<BEGIN_JSON>> and <<END_JSON>> and nothing else." |
| ) |
| repair_prompt = ( |
| "Here is the raw output (possibly containing a malformed JSON). Extract and return a corrected JSON object " |
| "containing keys: doc_id,title,summary,doc_type,source,tags,tag_confidences,taxonomy_path,extracted_entities,raw_url,ingest_timestamp. " |
| "If fields are missing, use reasonable defaults (empty string, empty list or empty map)." |
| ) |
| messages = [{"role": "system", "content": system_msg}, {"role": "user", "content": repair_prompt + "\n\nRaw output:\n\n" + (raw_output or "")}] |
| last_raw = None |
| for attempt in range(1, max_attempts + 1): |
| log.append(f"Repair attempt {attempt}") |
| try: |
| resp = client.chat.completions.create( |
| model=LLM_MODEL, |
| messages=messages, |
| max_completion_tokens=MAX_COMPLETION_TOKENS, |
| ) |
| except Exception as e: |
| log.append(f"Repair API call failed: {e}") |
| return {"_api_error": True, "error": f"OpenAI API call failed: {e}", "log": log, "raw_response": None} |
| try: |
| full_text = resp.choices[0].message["content"].strip() |
| except Exception: |
| try: |
| full_text = resp.choices[0].message.content.strip() |
| except Exception: |
| full_text = str(resp) |
| last_raw = full_text |
| log.append("Repair model response received (raw length: " + str(len(full_text)) + ")") |
| json_text = extract_json_from_text(full_text) |
| if not json_text: |
| log.append("Repair response contained no JSON") |
| if attempt < max_attempts: |
| messages = [{"role": "system", "content": system_msg}, {"role": "user", "content": "Your previous reply did not include the JSON. Return ONLY the corrected JSON between markers."}] |
| continue |
| else: |
| return {"_parsing_error": True, "raw_output": last_raw, "log": log, "raw_response": full_text} |
| ok, parsed_or_partial, parse_err = try_parse_and_validate(json_text) |
| if ok: |
| log.append("Repair produced valid JSON") |
| return {"metadata": parsed_or_partial, "log": log, "raw_response": full_text} |
| else: |
| log.append(f"Repair produced JSON but validation failed: {parse_err}") |
| if attempt < max_attempts: |
| messages = [{"role": "system", "content": system_msg}, {"role": "user", "content": "Your JSON is invalid. Please correct and return ONLY the corrected JSON between markers."}] |
| continue |
| else: |
| return {"_parsing_error": True, "raw_output": last_raw, "parsed_partial": parsed_or_partial, "parse_error": parse_err, "log": log, "raw_response": full_text} |
| return {"_parsing_error": True, "raw_output": last_raw or "", "log": log, "raw_response": last_raw or ""} |
|
|
| def auto_complete_partial(parsed_partial: Dict[str, Any], orig_name: str, extracted_text: str, top_chunks: List[str], log: List[str], max_attempts: int = 2): |
| log.append("Starting auto-complete for parsed partial") |
| system_msg = ( |
| "You are an assistant that must fill missing metadata fields for a document. " |
| "Return ONLY a single JSON object wrapped in <<BEGIN_JSON>> and <<END_JSON>> with the exact keys: " |
| "doc_id, title, summary, doc_type, source, tags, tag_confidences, taxonomy_path, extracted_entities, raw_url, ingest_timestamp. " |
| "If you cannot infer a value, use reasonable defaults." |
| ) |
| partial_str = json.dumps(parsed_partial, ensure_ascii=False) |
| short_text = (extracted_text[:1200] + "...") if len(extracted_text) > 1200 else extracted_text |
| prompt = f"Original filename: {orig_name}\n\nPreviously parsed partial JSON:\n{partial_str}\n\nDocument short text:\n{short_text}\n\nTop chunks:\n" |
| for i, c in enumerate(top_chunks[:6]): |
| prompt += f"CHUNK_{i+1}: {c[:900].replace(chr(10), ' ')}\n\n" |
| prompt += ("Task: Fill any missing or empty fields in the JSON above using the document context. " |
| "Return ONLY the completed JSON wrapped between <<BEGIN_JSON>> and <<END_JSON>>.") |
| messages = [{"role": "system", "content": system_msg}, {"role": "user", "content": prompt}] |
| last_raw = None |
| for attempt in range(1, max_attempts + 1): |
| log.append(f"Auto-complete attempt {attempt}") |
| try: |
| resp = client.chat.completions.create( |
| model=LLM_MODEL, |
| messages=messages, |
| max_completion_tokens=MAX_COMPLETION_TOKENS, |
| ) |
| except Exception as e: |
| log.append(f"Auto-complete API call failed: {e}") |
| return {"_api_error": True, "error": f"OpenAI API call failed: {e}", "log": log} |
| try: |
| full_text = resp.choices[0].message["content"].strip() |
| except Exception: |
| try: |
| full_text = resp.choices[0].message.content.strip() |
| except Exception: |
| full_text = str(resp) |
| last_raw = full_text |
| log.append("Auto-complete model response received") |
| json_text = extract_json_from_text(full_text) |
| if not json_text: |
| log.append("Auto-complete response had no JSON") |
| if attempt < max_attempts: |
| messages = [{"role": "system", "content": system_msg}, {"role": "user", "content": "Return ONLY the JSON wrapped in <<BEGIN_JSON>> and <<END_JSON>>."}] |
| continue |
| else: |
| return {"_parsing_error": True, "raw_output": last_raw, "log": log, "raw_response": full_text} |
| ok, parsed_or_partial2, parse_err = try_parse_and_validate(json_text) |
| if ok: |
| log.append("Auto-complete succeeded and validated") |
| return {"metadata": parsed_or_partial2, "log": log, "raw_response": full_text} |
| else: |
| log.append(f"Auto-complete produced JSON but validation failed: {parse_err}") |
| if attempt < max_attempts: |
| messages = [{"role": "system", "content": system_msg}, {"role": "user", "content": "The JSON you returned is invalid. Please correct and return ONLY the JSON wrapped in <<BEGIN_JSON>> and <<END_JSON>>."}] |
| continue |
| else: |
| return {"_parsing_error": True, "raw_output": last_raw, "parsed_partial": parsed_or_partial2, "parse_error": parse_err, "log": log, "raw_response": full_text} |
| return {"_parsing_error": True, "raw_output": last_raw or "", "log": log, "raw_response": last_raw or ""} |
|
|
| |
| |
| |
| def process_file(file_obj): |
| ui_log: List[str] = [] |
| try: |
| tmp_path, orig_name = save_uploaded_to_tmp(file_obj, ui_log) |
| except Exception as e: |
| ui_log.append(f"Failed to save upload: {e}") |
| return {"error": f"Failed to save uploaded file: {e}", "log": ui_log, "raw_response": ""} |
|
|
| try: |
| if orig_name.lower().endswith(".pdf"): |
| extracted_text = extract_text_from_pdf(tmp_path, ui_log) |
| else: |
| extracted_text = extract_text_from_image(tmp_path, ui_log) |
| except Exception as e: |
| ui_log.append(f"Text extraction failed: {e}") |
| return {"error": f"Text extraction failed: {e}", "log": ui_log, "raw_response": ""} |
|
|
| if not extracted_text: |
| ui_log.append("No text found after extraction.") |
| return {"error": "No text found in document after extraction.", "log": ui_log, "raw_response": ""} |
|
|
| chunks = chunk_text(extracted_text) |
| ui_log.append(f"Document split into {len(chunks)} chunks") |
| sorted_chunks = sorted(chunks, key=lambda x: len(x), reverse=True) |
| top_chunks = sorted_chunks[:6] if sorted_chunks else [extracted_text[:2000]] |
| short_text = (extracted_text[:1000] + "...") if len(extracted_text) > 1000 else extracted_text |
|
|
| |
| result = call_gpt5_for_metadata(orig_name, short_text, top_chunks, ui_log, max_attempts=3) |
|
|
| |
| if result.get("_api_error"): |
| return {"error": result.get("error"), "log": ui_log + result.get("log", []), "raw_response": result.get("raw_response")} |
|
|
| |
| if result.get("_parsing_error"): |
| ui_log += result.get("log", []) |
| raw_out = result.get("raw_output", result.get("raw_response", "")) |
| parsed_partial = result.get("parsed_partial", {}) |
| ui_log.append("Initial parse failed; attempting auto-complete if partial available") |
| if parsed_partial: |
| ac = auto_complete_partial(parsed_partial, orig_name, extracted_text, top_chunks, ui_log, max_attempts=2) |
| if ac.get("_api_error"): |
| ui_log += ac.get("log", []) |
| return {"error": "Auto-complete API error", "log": ui_log, "raw_response": ac.get("raw_response", raw_out)} |
| if ac.get("_parsing_error"): |
| ui_log += ac.get("log", []) |
| return {"error": "LLM output parsing failed. See raw_output.", "raw_output": ac.get("raw_output", raw_out), "parsed_partial": ac.get("parsed_partial"), "parse_error": ac.get("parse_error"), "log": ui_log, "raw_response": ac.get("raw_response", raw_out)} |
| |
| metadata = ac.get("metadata") |
| ui_log += ac.get("log", []) |
| ui_log.append("Auto-complete produced metadata") |
| |
| now = datetime.datetime.now(datetime.timezone.utc).astimezone().isoformat() |
| metadata.setdefault("doc_id", os.path.splitext(orig_name)[0]) |
| metadata.setdefault("title", orig_name) |
| metadata.setdefault("source", "user_upload") |
| metadata.setdefault("raw_url", "") |
| metadata.setdefault("ingest_timestamp", now) |
| return {"metadata": metadata, "log": ui_log, "raw_response": ac.get("raw_response", raw_out)} |
| else: |
| ui_log.append("No parsed_partial to auto-complete; returning raw output for manual repair") |
| return {"error": "LLM output parsing failed. See raw_output.", "raw_output": raw_out, "parsed_partial": parsed_partial, "parse_error": result.get("parse_error"), "log": ui_log, "raw_response": result.get("raw_response", raw_out)} |
|
|
| |
| metadata = result.get("metadata") |
| ui_log += result.get("log", []) |
| raw_model_response = result.get("raw_response") |
| now = datetime.datetime.now(datetime.timezone.utc).astimezone().isoformat() |
| metadata.setdefault("doc_id", os.path.splitext(orig_name)[0]) |
| metadata.setdefault("title", orig_name) |
| metadata.setdefault("source", "user_upload") |
| metadata.setdefault("raw_url", "") |
| metadata.setdefault("ingest_timestamp", now) |
| ui_log.append("Metadata generation successful") |
| return {"metadata": metadata, "log": ui_log, "raw_response": raw_model_response} |
|
|
| |
| |
| |
| with gr.Blocks(title="DocClassify — Final Robust") as demo: |
| gr.Markdown("## 📂 Upload PDF / Image → automated taxonomy & tagging (GPT-5). Logs & GPT response shown below.") |
| with gr.Row(): |
| with gr.Column(scale=1): |
| uploader = gr.File(label="Upload PDF / Image", file_types=[".pdf", ".png", ".jpg", ".jpeg", ".tiff"]) |
| run_button = gr.Button("Process document") |
| status = gr.Textbox(label="Status", value="", interactive=False) |
| download_button = gr.File(label="Download metadata JSON", visible=False) |
| gr.Markdown("### Manual repair (paste raw LLM output if needed)") |
| manual_raw_input = gr.Textbox(label="Paste raw LLM output here (optional)", lines=8, placeholder="Paste the malformed raw response here if you need manual repair") |
| repair_from_paste_btn = gr.Button("Repair from pasted raw output") |
| repair_auto_btn = gr.Button("Attempt automatic repair of last raw output") |
| with gr.Column(scale=1): |
| output_json = gr.JSON(label="Metadata JSON (parsed)") |
| raw_output_box = gr.Textbox(label="Full GPT model raw response", lines=12, interactive=False) |
| logs_box = gr.Textbox(label="Step-by-step logs", lines=18, interactive=False) |
|
|
| |
| last_raw_state = gr.State(value=None) |
| last_metadata_file = gr.State(value=None) |
|
|
| def on_process(file_obj): |
| if not file_obj: |
| return {}, "No file uploaded", None, "", "" |
| status_msg = "Processing..." |
| try: |
| result = process_file(file_obj) |
| except Exception as e: |
| return {}, f"Failed: {e}", None, "", "\n".join([f"Exception: {e}"]) |
| |
| logs = result.get("log", []) |
| raw_response = result.get("raw_response", "") |
| if result.get("error"): |
| |
| raw_out = result.get("raw_output", raw_response) or "" |
| parsed_partial = result.get("parsed_partial") |
| display = {"error": result.get("error")} |
| if parsed_partial is not None: |
| display["parsed_partial"] = parsed_partial |
| logs_text = "\n".join(logs + [f"Error: {result.get('error')}"]) |
| return display, f"Error: {result.get('error')}", None, raw_out, logs_text |
| |
| metadata = result.get("metadata") |
| tmpf = tempfile.NamedTemporaryFile(delete=False, suffix=".json") |
| with open(tmpf.name, "w", encoding="utf8") as f: |
| json.dump(metadata, f, indent=2, ensure_ascii=False) |
| logs_text = "\n".join(logs) |
| return metadata, "Done", tmpf.name, raw_response or "", logs_text |
|
|
| def on_repair_from_paste(manual_text): |
| if not manual_text: |
| return {}, "No pasted raw output provided.", None, "", "No pasted raw output provided." |
| ui_log = ["Repair-from-paste initiated"] |
| repaired = repair_raw_output(raw_output=None, manual_pasted_json=manual_text, log=ui_log, max_attempts=2) |
| logs_text = "\n".join(repaired.get("log", ui_log)) |
| if repaired.get("_api_error"): |
| return {}, f"Repair API error: {repaired.get('error')}", None, repaired.get("raw_response", manual_text), logs_text |
| if repaired.get("_parsing_error"): |
| display = {"error": "Repair failed to produce valid JSON", "parsed_partial": repaired.get("parsed_partial"), "parse_error": repaired.get("parse_error")} |
| return display, "Repair failed", None, repaired.get("raw_response", manual_text), logs_text |
| metadata = repaired.get("metadata") |
| tmpf = tempfile.NamedTemporaryFile(delete=False, suffix=".json") |
| with open(tmpf.name, "w", encoding="utf8") as f: |
| json.dump(metadata, f, indent=2, ensure_ascii=False) |
| return metadata, "Repair succeeded", tmpf.name, repaired.get("raw_response", manual_text), logs_text |
|
|
| def on_repair_auto(raw_response_text): |
| if not raw_response_text: |
| return {}, "No raw_response available for auto repair. Run process or paste raw output.", None, "", "No raw_response available." |
| ui_log = ["Auto repair initiated"] |
| repaired = repair_raw_output(raw_output=raw_response_text, manual_pasted_json=None, log=ui_log, max_attempts=2) |
| logs_text = "\n".join(repaired.get("log", ui_log)) |
| if repaired.get("_api_error"): |
| return {}, f"Repair API error: {repaired.get('error')}", None, repaired.get("raw_response", raw_response_text), logs_text |
| if repaired.get("_parsing_error"): |
| display = {"error": "Auto-repair failed to produce valid JSON", "parsed_partial": repaired.get("parsed_partial"), "parse_error": repaired.get("parse_error")} |
| return display, "Auto-repair failed", None, repaired.get("raw_response", raw_response_text), logs_text |
| metadata = repaired.get("metadata") |
| tmpf = tempfile.NamedTemporaryFile(delete=False, suffix=".json") |
| with open(tmpf.name, "w", encoding="utf8") as f: |
| json.dump(metadata, f, indent=2, ensure_ascii=False) |
| return metadata, "Auto-repair succeeded", tmpf.name, repaired.get("raw_response", raw_response_text), logs_text |
|
|
| run_button.click(on_process, inputs=[uploader], outputs=[output_json, status, download_button, raw_output_box, logs_box]) |
| repair_from_paste_btn.click(on_repair_from_paste, inputs=[manual_raw_input], outputs=[output_json, status, download_button, raw_output_box, logs_box]) |
| repair_auto_btn.click(on_repair_auto, inputs=[raw_output_box], outputs=[output_json, status, download_button, raw_output_box, logs_box]) |
|
|
| if __name__ == "__main__": |
| demo.launch() |
|
|