from __future__ import annotations import os, io, re, json, time, mimetypes, tempfile, string from typing import List, Union, Tuple, Any, Iterable from PIL import Image import pandas as pd import gradio as gr import google.generativeai as genai import requests # ================== CONFIG ================== # KHÔNG hardcode key. YÊU CẦU đặt biến môi trường GOOGLE_API_KEY. DEFAULT_API_KEY = "AIzaSyCwyYCNqWWA7jqcc5WAG5jQhnGdWKslD4o" # để trống. Nếu cần, bạn có thể set tạm thời ở ENV. INTERNAL_MODEL_MAP = { "Gemini 2.5 Flash": "gemini-2.5-flash", "Gemini 2.5 Pro": "gemini-2.5-pro", } EXTERNAL_MODEL_NAME = "prithivMLmods/Camel-Doc-OCR-062825 (External)" try: RESAMPLE = Image.Resampling.LANCZOS # Pillow >= 10 except AttributeError: RESAMPLE = Image.LANCZOS # Pillow < 10 PROMPT_FREIGHT_JSON = """ Please analyze the freight rate table in the file I provide and convert it into JSON in the following structure: { "shipping_line": "...", "shipping_line_code": "...", "shipping_line_reason": "Why this carrier is chosen?", "fee_type": "Air Freight", "valid_from": ..., "valid_to": ..., "charges": [ { "frequency": "...", "package_type": "...", "aircraft_type": "...", "direction": "Export or Import or null", "origin": "...", "destination": "...", "charge_name": "...", "charge_code": "...", "charge_code_reason": "...", "cargo_type": "...", "currency": "...", "transit": "...", "transit_time": "...", "weight_breaks": { "m": ..., "n": ..., "45kg": ..., "100kg": ..., "300kg": ..., "500kg": ..., "1000kg": ..., "other": { key: value } }, "remark": "..." } ], "local_charges": [ { "charge_name": "...", "charge_code": "...", "unit": "...", "amount": ..., "remark": "..." } ] } ### Date rules - valid_from format: - `DD/MM/YYYY` (if full date) - `01/MM/YYYY` (if month+year only) - `01/01/YYYY` (if year only) - `UFN` if missing - valid_to: - exact `DD/MM/YYYY` if present - else `UFN` STRICT RULES: - ONLY return a single JSON object as specified above. - All rates must exactly match the corresponding weight break columns (45kg, 100kg, 300kg, 500kg, 1000kg, etc.). set null if N/A. No assumptions or interpolations. - If the table shows "RQ" or similar, set value as "RQST". - Group same-price destinations into one record separated by "/". - Always use IATA code for origin and destination. - Flight number (e.g. ZH118) is not charge code. - Frequency: D[1-7]; 'Daily' = D1234567. Join multiple (e.g. D3,D4→D34). - If local charges exist, list them. - If validity missing, set null. - Direction: Export if origin is Vietnam (SGN, HAN, DAD...), else Import. - Provide short plain English reasons for "shipping_line_reason" & "charge_code_reason". - Replace commas in remarks with semicolons. - Only return JSON. """ # ================== HELPERS ================== import fitz # PyMuPDF def pdf_to_images(pdf_bytes: bytes) -> list[Image.Image]: doc = fitz.open(stream=pdf_bytes, filetype="pdf") pages = [] for p in doc: pix = p.get_pixmap(dpi=200) img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) pages.append(img) return pages def ensure_rgb(im: Image.Image) -> Image.Image: return im.convert("RGB") if im.mode != "RGB" else im def _read_file_bytes(upload: Union[str, os.PathLike, dict, object] | None) -> bytes: if upload is None: raise ValueError("No file uploaded.") if isinstance(upload, (str, os.PathLike)): with open(upload, "rb") as f: return f.read() if isinstance(upload, dict) and "path" in upload: with open(upload["path"], "rb") as f: return f.read() if hasattr(upload, "read"): return upload.read() raise TypeError(f"Unsupported file object: {type(upload)}") def _make_previews(file_bytes: bytes, max_side: int = 2000) -> List[Image.Image]: """Trả list PIL.Image đã RGB + resize theo max_side.""" if len(file_bytes) >= 4 and file_bytes[:4] == b"%PDF": pages = pdf_to_images(file_bytes) else: pages = [Image.open(io.BytesIO(file_bytes))] out = [] for im in pages: im = ensure_rgb(im) if max_side: w, h = im.size scale = min(max_side / float(w), max_side / float(h), 1.0) if scale < 1.0: im = im.resize((max(1, int(w*scale)), max(1, int(h*scale))), RESAMPLE) out.append(im) return out def _guess_name_and_mime(file, file_bytes: bytes) -> Tuple[str, str]: if isinstance(file, (str, os.PathLike)): filename = os.path.basename(str(file)) elif isinstance(file, dict) and "name" in file: filename = os.path.basename(file["name"]) elif isinstance(file, dict) and "path" in file: filename = os.path.basename(file["path"]) else: filename = "upload.bin" mime, _ = mimetypes.guess_type(filename) if not mime: if len(file_bytes) >= 4 and file_bytes[:4] == b"%PDF": mime = "application/pdf" if not filename.lower().endswith(".pdf"): filename += ".pdf" else: mime = "image/png" return filename, mime def _extract_json_from_message(msg: str): """Bóc JSON trong ```json ...``` nếu có. Trả về (obj, cleaned_string).""" s = (msg or "").strip() s = re.sub(r"^\s*```(?:json)?\s*", "", s, flags=re.IGNORECASE) s = re.sub(r"\s*```\s*$", "", s) try: return json.loads(s), s except Exception: return None, s def _pretty_message(msg: str) -> str: obj, s = _extract_json_from_message(msg) return json.dumps(obj, ensure_ascii=False, indent=2) if obj is not None else s def _safe_text_from_gemini(resp): try: return resp.text except Exception: pass texts = [] for c in getattr(resp, "candidates", []) or []: content = getattr(c, "content", None) parts = getattr(content, "parts", None) if content else None if not parts: continue for p in parts: t = getattr(p, "text", None) if t: texts.append(t) return "\n".join(texts).strip() def _wait_file_active(file_obj, timeout_s: int = 60) -> object: """Chờ file upload sang Gemini ở trạng thái ACTIVE, có timeout + backoff.""" start = time.time() delay = 0.5 while hasattr(file_obj, "state") and getattr(file_obj.state, "name", "") == "PROCESSING": if time.time() - start > timeout_s: raise TimeoutError("Upload processing timeout.") time.sleep(delay) delay = min(delay * 1.5, 2.0) file_obj = genai.get_file(file_obj.name) if not hasattr(file_obj, "state") or file_obj.state.name != "ACTIVE": st = getattr(file_obj, "state", None) raise RuntimeError(f"Upload failed or not active. State={getattr(st, 'name', 'UNKNOWN')}") return file_obj # ---------- JSON → Excel (schema-agnostic) ---------- def _flatten_dict(d: dict, parent_key: str = "", sep: str = ".") -> dict: """Flatten dict lồng nhau thành 1 level: {'a':{'b':1}} -> {'a.b':1}""" items = [] for k, v in (d or {}).items(): new_key = f"{parent_key}{sep}{k}" if parent_key else str(k) if isinstance(v, dict): items.extend(_flatten_dict(v, new_key, sep=sep).items()) else: items.append((new_key, v)) return dict(items) def _sanitize_sheet_name(name: str, used: set[str]) -> str: # Excel sheet name ≤ 31 chars, không chứa []:*?/\ invalid = set(r'[]:*?/\'' + '"') clean = "".join(ch for ch in name if ch not in invalid) clean = clean.strip() if not clean: clean = "sheet" clean = clean[:31] # đảm bảo unique base, idx = clean, 1 while clean in used: suffix = f"_{idx}" clean = (base[: (31 - len(suffix))] + suffix) idx += 1 used.add(clean) return clean def _to_excel_generic(data: Any, path: str) -> str: """ Quy tắc: - Nếu là list[dict] -> 1 sheet "data" (json_normalize) - Nếu là dict: + Tạo 1 sheet "summary" từ các field dạng scalar/dict (flatten) + Với mỗi field là list: · list[dict] -> 1 sheet theo tên key (normalize) · list[scalar]-> 1 sheet 1 cột 'value' · list[mixed] -> chuyển thành cột 'value' dạng chuỗi """ with pd.ExcelWriter(path) as writer: used_names = set() def add_df(df: pd.DataFrame, sheet: str): sheetname = _sanitize_sheet_name(sheet, used_names) df.to_excel(writer, index=False, sheet_name=sheetname) if isinstance(data, list): # list tổng quát try: df = pd.json_normalize(data, sep=".") except Exception: df = pd.DataFrame({"value": [json.dumps(x, ensure_ascii=False) for x in data]}) add_df(df, "data") return path if isinstance(data, dict): scalars = {} list_sheets: list[tuple[str, pd.DataFrame]] = [] for k, v in data.items(): if isinstance(v, list): if len(v) == 0: list_sheets.append((k, pd.DataFrame())) elif isinstance(v[0], dict): try: df = pd.json_normalize(v, sep=".") except Exception: df = pd.DataFrame({"value": [json.dumps(x, ensure_ascii=False) for x in v]}) list_sheets.append((k, df)) elif not isinstance(v[0], (list, dict)): df = pd.DataFrame({"value": v}) list_sheets.append((k, df)) else: df = pd.DataFrame({"value": [json.dumps(x, ensure_ascii=False) for x in v]}) list_sheets.append((k, df)) elif isinstance(v, dict): scalars.update(_flatten_dict({k: v})) else: scalars[k] = v # summary sheet if len(scalars) > 0: add_df(pd.DataFrame([scalars]), "summary") # each list -> one sheet for k, df in list_sheets: add_df(df, k if k else "list") # nếu dict chỉ có list, không có summary => vẫn OK (chỉ có các sheet list) return path # kiểu khác: ghi thành 1 cột value add_df(pd.DataFrame({"value": [json.dumps(data, ensure_ascii=False)]}), "data") return path # ================== HANDLERS ================== def preview_process(file): """Trả list đường dẫn ảnh PNG tạm cho Gallery (ổn định hơn list PIL).""" if file is None: return [] try: file_bytes = _read_file_bytes(file) images = _make_previews(file_bytes, max_side=2000) paths = [] for i, im in enumerate(images): fd, path = tempfile.mkstemp(suffix=f"_preview_{i}.png") os.close(fd) im.save(path, format="PNG") paths.append(path) return paths except Exception as e: print(f"Preview error: {e}") return [] def _merge_freight_objects(objs: list[dict]) -> dict | None: if not objs: return None base = {} for k in ["shipping_line","shipping_line_code","shipping_line_reason","fee_type","valid_from","valid_to"]: for o in objs: if isinstance(o, dict) and o.get(k): base[k] = o[k] break base.setdefault(k, None) seen = set() merged_charges, merged_local = [], [] def norm(v): return v.replace(",", ";") if isinstance(v, str) else v for o in objs: for c in (o.get("charges") or []): wb = json.dumps(c.get("weight_breaks", {}), sort_keys=True, ensure_ascii=False) key = (c.get("origin"), c.get("destination"), c.get("charge_name"), c.get("charge_code"), c.get("currency"), wb) if key in seen: continue c["remark"] = norm(c.get("remark")) merged_charges.append(c) seen.add(key) for lc in (o.get("local_charges") or []): lc["remark"] = norm(lc.get("remark")) merged_local.append(lc) base["charges"] = merged_charges base["local_charges"] = merged_local return base def _coerce_only_json(text: str) -> str: obj, s = _extract_json_from_message(text) if obj is not None: return json.dumps(obj, ensure_ascii=False) m = re.search(r"\{.*\}\s*$", text, flags=re.DOTALL) return m.group(0) if m else text.strip() # -------- Internal (Gemini) - Base (1 lượt, không thinking) -------- def run_process_internal_base(file_bytes, filename, mime, question, model_choice, temperature, top_p): api_key = os.environ.get("GOOGLE_API_KEY", DEFAULT_API_KEY) if not api_key: return "ERROR: Missing GOOGLE_API_KEY.", None genai.configure(api_key=api_key) model_name = INTERNAL_MODEL_MAP.get(model_choice, INTERNAL_MODEL_MAP["Gemini 2.5 Flash"]) gen_config = {"temperature": float(temperature), "top_p": float(top_p)} model = genai.GenerativeModel(model_name=model_name, generation_config=gen_config) uploaded = None tmp_path = None try: if file_bytes: suffix = os.path.splitext(filename)[1] or ".bin" with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: tmp.write(file_bytes) tmp_path = tmp.name uploaded = genai.upload_file(path=tmp_path, mime_type=mime) uploaded = _wait_file_active(uploaded, timeout_s=60) user_prompt = (question or "").strip() if not user_prompt: user_prompt = ( "Perform high-quality OCR on the provided file. If PDF: read all pages in order. " "Return clean plain text. If structure is obvious (tables, key:value), preserve it. " "If you can, output JSON that captures the structure." ) # Gọi model if uploaded: resp = model.generate_content([user_prompt, uploaded]) else: resp = model.generate_content(user_prompt) # Lấy đúng message LLM (pretty nếu là JSON) answer_raw = _safe_text_from_gemini(resp) message = _pretty_message(answer_raw) # Parse JSON (nếu có) để export. Không validate schema. parsed_obj, _ = _extract_json_from_message(answer_raw) return message, parsed_obj finally: if tmp_path and os.path.exists(tmp_path): try: os.remove(tmp_path) except Exception: pass try: if uploaded and hasattr(uploaded, "name"): genai.delete_file(uploaded.name) except Exception: pass # ================== MAIN OCR FUNCTION ================== def run_process_internal_base_v2(file_bytes, filename, mime, question, model_choice, temperature, top_p, batch_size=3): api_key = os.environ.get("GOOGLE_API_KEY", DEFAULT_API_KEY) if not api_key: return "ERROR: Missing GOOGLE_API_KEY.", None genai.configure(api_key=api_key) model_name = INTERNAL_MODEL_MAP.get(model_choice, "gemini-2.5-flash") model = genai.GenerativeModel(model_name=model_name, generation_config={"temperature": float(temperature), "top_p": float(top_p)}) if file_bytes[:4] == b"%PDF": pages = pdf_to_images(file_bytes) else: pages = [Image.open(io.BytesIO(file_bytes))] user_prompt = (question or "").strip() or PROMPT_FREIGHT_JSON all_json_results, all_text_results = [], [] previous_header_json = None for i in range(0, len(pages), batch_size): batch = pages[i:i+batch_size] uploaded = [] for im in batch: with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as tmp: im.save(tmp.name) up = genai.upload_file(path=tmp.name, mime_type="image/png") up = _wait_file_active(up) uploaded.append(up) # build dynamic prompt if previous_header_json: context_prompt = ( f"{user_prompt}\n" "The previous page had this table structure:\n" f"{json.dumps(previous_header_json, ensure_ascii=False, indent=2)}\n" "If this page has no header, assume it continues with the same structure." ) else: context_prompt = user_prompt resp = model.generate_content([f"{context_prompt}\n(This is batch {i//batch_size+1})"] + uploaded) text = _safe_text_from_gemini(resp) json_text = _coerce_only_json(text) try: parsed = json.loads(json_text) all_json_results.append(parsed) # ✅ update header context (for next page) if i == 0: # chỉ cần giữ phần "charges[0].weight_breaks" làm cấu trúc header first_charge = (parsed.get("charges") or [{}])[0] if "weight_breaks" in first_charge: previous_header_json = first_charge["weight_breaks"] except Exception: all_text_results.append(text) finally: for up in uploaded: try: genai.delete_file(up.name) except: pass if all_json_results: merged_json = _merge_freight_objects(all_json_results) message = json.dumps(merged_json, ensure_ascii=False, indent=2) return message, merged_json combined_text = "\n\n".join(all_text_results) message = _pretty_message(combined_text) parsed_obj, _ = _extract_json_from_message(combined_text) return message, parsed_obj # -------- External API -------- def run_process_external(file_bytes, filename, mime, question, api_url, temperature, top_p): if not api_url or not str(api_url).strip(): return "ERROR: Missing external API endpoint (hãy dán URL).", None try: user_prompt = (question or "").strip() if not user_prompt: user_prompt = ( "Perform high-quality OCR on the provided file. If PDF: read all pages in order. " "Return clean plain text. If structure is obvious (tables, key:value), preserve it. " "If you can, output JSON that captures the structure." ) data = {"prompt": user_prompt, "temperature": str(temperature), "top_p": str(top_p)} if file_bytes: files = {"file": (filename, file_bytes, mime)} r = requests.post(api_url, files=files, data=data, timeout=60) else: r = requests.post(api_url, json=data, timeout=60) if r.status_code >= 400: return f"ERROR: External API HTTP {r.status_code}: {r.text[:300]}", None answer = None try: j = r.json() answer = j.get("message") or j.get("text") or j.get("data") if isinstance(answer, (dict, list)): answer = json.dumps(answer, ensure_ascii=False) except Exception: answer = r.text answer = (answer or "").strip() message = _pretty_message(answer) parsed_obj, _ = _extract_json_from_message(answer) return message, parsed_obj except Exception as e: return f"ERROR: {type(e).__name__}: {str(e) or repr(e)}", None # -------- Router -------- def run_process(file, question, model_choice, temperature, top_p, external_api_url): """ Router (không Agent, không thinking): - Nếu chọn External model -> run_process_external - Ngược lại -> Gemini nội bộ (Base 1 lượt) """ try: has_file = file is not None file_bytes = filename = mime = None if has_file: file_bytes = _read_file_bytes(file) filename, mime = _guess_name_and_mime(file, file_bytes) if model_choice == EXTERNAL_MODEL_NAME: return run_process_external( file_bytes=file_bytes, filename=filename, mime=mime, question=question, api_url=external_api_url, temperature=temperature, top_p=top_p ) return run_process_internal_base_v2( file_bytes=file_bytes, filename=filename, mime=mime, question=question, model_choice=model_choice, temperature=temperature, top_p=top_p ) except Exception as e: return f"ERROR: {type(e).__name__}: {str(e) or repr(e)}", None def on_export_excel(parsed_obj): try: if not parsed_obj: # không có JSON để export → giữ nguyên, không hiện nút tải return gr.update(value=None, visible=False) # tạo file an toàn, giữ lại sau khi request kết thúc fd, tmp_path = tempfile.mkstemp(suffix=".xlsx") os.close(fd) _to_excel_generic(parsed_obj, tmp_path) # trả về path và bật visible để hiện link download return gr.update(value=tmp_path, visible=True) except Exception as e: print(f"Export error: {e}") return gr.update(value=None, visible=False) def clear_all(): # file, preview, output_text, question, model, parsed_state, download, # temperature, top_p, external_api_url return ( None, [], "", "", "Gemini 2.5 Flash", None, None, 0.2, 0.95, "" ) # ================== UI ================== def _toggle_external_visibility(selected: str): return gr.update(visible=(selected == EXTERNAL_MODEL_NAME)) def main(): custom_css = """ .gradio-container { max-width: 1400px !important; margin: 0 auto; } #main-row { display: flex; gap: 20px; align-items: flex-start; } #left-column { flex: 1; min-width: 400px; max-width: 600px; } #right-column { flex: 1; min-width: 400px; } #file-upload { border: 2px dashed #d1d5db; border-radius: 12px; padding: 20px; text-align: center; transition: border-color 0.3s ease; } #file-upload:hover { border-color: #3b82f6; } #preview-gallery { max-height: 600px; overflow-y: auto; border: 1px solid #e5e7eb; border-radius: 12px; background: #f9fafb; padding: 10px; } #preview-gallery .grid { grid-template-columns: 1fr !important; gap: 10px !important; } #preview-gallery img { width: 100% !important; height: auto !important; object-fit: contain !important; background: white; } #controls-section { background: #f8fafc; padding: 20px; border-radius: 12px; margin-bottom: 20px; } #results-section { background: #ffffff; border: 1px solid #e5e7eb; border-radius: 12px; padding: 20px; } #llm-output { max-height: 500px; overflow-y: auto; font-family: monospace; font-size: 13px; } .primary-button { background: linear-gradient(90deg, #3b82f6, #1d4ed8) !important; color: white !important; border: none !important; border-radius: 8px !important; padding: 10px 20px !important; font-weight: 500 !important; } .primary-button:hover { transform: translateY(-1px) !important; box-shadow: 0 4px 12px rgba(59, 130, 246, 0.3) !important; } .secondary-button { background: #f3f4f6 !important; color: #374151 !important; border: 1px solid #d1d5db !important; border-radius: 8px !important; padding: 8px 16px !important; } @media (max-width: 1024px) { #main-row { flex-direction: column; } #left-column, #right-column { min-width: 100%; max-width: 100%; } } """ with gr.Blocks(title="OCR Multi-Agent System", css=custom_css, theme=gr.themes.Soft()) as demo: gr.HTML("""

📄 OCR Extraction (LLM-first)

Upload PDF/images → LLM produces raw text/JSON → Export Excel (schema-agnostic)

""") last_parsed_state = gr.State(value=None) with gr.Row(elem_id="main-row"): # Left with gr.Column(elem_id="left-column"): gr.Markdown("### 📁 Upload Document") file = gr.File( label="Choose PDF or Image file", file_types=[".pdf", ".png", ".jpg", ".jpeg", ".tiff", ".bmp"], type="filepath", elem_id="file-upload" ) gr.Markdown("### 👁️ Document Preview") preview = gr.Gallery(columns=1, height=None, show_label=False, elem_id="preview-gallery", allow_preview=True) # Right with gr.Column(elem_id="right-column"): with gr.Group(elem_id="controls-section"): gr.Markdown("### ⚙️ Processing Options") with gr.Row(): model_choice = gr.Dropdown( choices=[*INTERNAL_MODEL_MAP.keys(), EXTERNAL_MODEL_NAME], value="Gemini 2.5 Flash", label="Model" ) with gr.Row(): temperature = gr.Slider(0.0, 2.0, value=0.2, step=0.05, label="temperature") top_p = gr.Slider(0.0, 1.0, value=0.95, step=0.01, label="top_p") external_api_url = gr.Textbox( label="External API endpoint (URL)", placeholder="https://your-host/path/to/ocr", visible=False ) question = gr.Textbox( label="Custom Prompt (optional)", placeholder="Leave blank for default OCR; or ask model to output JSON by your own schema...", lines=3 ) with gr.Row(): run_btn = gr.Button("🚀 Process Document", elem_classes=["primary-button"]) clear_btn = gr.Button("🗑️ Clear All", elem_classes=["secondary-button"]) with gr.Group(elem_id="results-section"): gr.Markdown("### 📊 LLM Message (raw/pretty)") output_text = gr.Code(label="LLM Message", language="json", elem_id="llm-output") with gr.Row(): export_btn = gr.Button("⬇️ Export to Excel", elem_classes=["secondary-button"]) download_file = gr.File(label="Download Excel", interactive=False, visible=False) # Events file.change(preview_process, inputs=[file], outputs=[preview]) model_choice.change(_toggle_external_visibility, inputs=[model_choice], outputs=[external_api_url]) run_btn.click( run_process, inputs=[file, question, model_choice, temperature, top_p, external_api_url], outputs=[output_text, last_parsed_state] ) export_btn.click(on_export_excel, inputs=[last_parsed_state], outputs=[download_file]) clear_btn.click( clear_all, inputs=[], outputs=[file, preview, output_text, question, model_choice, last_parsed_state, download_file, temperature, top_p, external_api_url] ) return demo demo = main() if __name__ == "__main__": demo.launch()