import os, io, tempfile, mimetypes, camelot, pdfplumber, pandas as pd, google.generativeai as genai import re DEFAULT_API_KEY = "AIzaSyBbK-1P3JD6HPyE3QLhkOps6_-Xo3wUFbs" INTERNAL_MODEL_MAP = { "Gemini 2.5 Flash": "gemini-2.5-flash", "Gemini 2.5 Pro": "gemini-2.5-pro", } PROMPT_FREIGHT_JSON = """ Please analyze the freight rate table in the file I provide and convert it into JSON in the following structure: { "shipping_line": "...", "shipping_line_code": "...", "shipping_line_reason": "Why this carrier is chosen?", "fee_type": "Air Freight", "valid_from": ..., "valid_to": ..., "charges": [ { "frequency": "...", "package_type": "...", "aircraft_type": "...", "direction": "Export or Import or null", "origin": "...", "destination": "...", "charge_name": "...", "charge_code": "...", "charge_code_reason": "...", "cargo_type": "...", "currency": "...", "transit": "...", "transit_time": "...", "weight_breaks": { "M": ..., "N": ..., "+45kg": ..., "+100kg": ..., "+300kg": ..., "+500kg": ..., "+1000kg": ..., "other": { key: value }, "weight_breaks_reason":"Why chosen weight_breaks?" }, "remark": "..." } ], "local_charges": [ { "charge_name": "...", "charge_code": "...", "unit": "...", "amount": ..., "remark": "..." } ] } ### Date rules - valid_from format: - `DD/MM/YYYY` (if full date) - `01/MM/YYYY` (if month+year only) - `01/01/YYYY` (if year only) - `UFN` if missing - valid_to: - exact `DD/MM/YYYY` if present - else `UFN` STRICT RULES: - ONLY return a single JSON object as specified above. - All rates must exactly match the corresponding weight break columns (M,N,45kg, 100kg, 300kg, 500kg, 1000kg, etc.). set null if N/A. No assumptions or interpolations. - If the table shows "RQ" or similar, set value as "RQST". - Group same-price destinations into one record separated by "/". - Always use IATA code for origin and destination. - Flight number (e.g. ZH118) is not charge code. - Frequency: D[1-7]; 'Daily' = D1234567. Join multiple (e.g. D3,D4→D34). - If local charges exist, list them. - If validity missing, set null. - Direction: Export if origin is Vietnam (SGN, HAN, DAD...), else Import. - Provide short plain English reasons for "shipping_line_reason" & "charge_code_reason". - Replace commas in remarks with semicolons. - Only return JSON. """ # ========== Helpers ========== def _read_file_bytes(upload): if isinstance(upload, str): with open(upload, "rb") as f: return f.read() elif hasattr(upload, "read"): return upload.read() raise TypeError("Unsupported file input") def _guess_name_and_mime(file, file_bytes): filename = os.path.basename(file.name if hasattr(file, "name") else str(file)) mime, _ = mimetypes.guess_type(filename) if not mime and file_bytes[:4] == b"%PDF": mime = "application/pdf" return filename, mime or "application/octet-stream" def check_pdf_structure(file_bytes: bytes) -> bool: try: with pdfplumber.open(io.BytesIO(file_bytes)) as pdf: if len(pdf.pages) <= 2: return False for page in pdf.pages[:3]: if page.find_tables(): return True return False except Exception as e: print("PDF check error:", e); return False # ========== 1️⃣ Extract bảng bằng Camelot ========== def extract_pdf_tables(file_path: str) -> pd.DataFrame: all_dfs = [] try: print("🔍 Try lattice mode...") tables = camelot.read_pdf(file_path, flavor="lattice", pages="all") if tables.n > 0: for t in tables: all_dfs.append(t.df) print(f"✅ Lattice: {tables.n} tables.") except Exception as e: print(f"⚠️ Lattice failed: {e}") if not all_dfs: try: print("🔁 Try stream mode...") tables = camelot.read_pdf(file_path, flavor="stream", pages="all") if tables.n > 0: for t in tables: all_dfs.append(t.df) print(f"✅ Stream: {tables.n} tables.") except Exception as e: print(f"❌ Stream failed: {e}") if not all_dfs: print("🚫 No table detected.") return pd.DataFrame() df_final = pd.concat(all_dfs, ignore_index=True) if all(str(c).isdigit() for c in df_final.columns): print("🧠 Detected numeric headers (0,1,2..), using first row as real header.") df_final.columns = df_final.iloc[0] df_final = df_final[1:] df_final = df_final.dropna(how="all").reset_index(drop=True) print(f"✅ Total: {len(df_final)} rows × {len(df_final.columns)} columns.") return df_final # ========== 2️⃣ Extract phần Note / Header ========== def extract_pdf_note(file_bytes: bytes) -> str: """ Lấy phần text ở đầu PDF (ví dụ: Start Date, Expiry Date, Origin, các note nhỏ) Bỏ qua vùng bảng phía dưới. """ try: with pdfplumber.open(io.BytesIO(file_bytes)) as pdf: first_page = pdf.pages[0] text = first_page.extract_text() or "" # cắt phần note: chỉ lấy 15 dòng đầu để tránh trích luôn bảng lines = text.splitlines()[:15] note_lines = [] for line in lines: if re.search(r"(Start Date|Origin|Expiry|Product|MY|SC|All rates|Currency)", line, re.I): note_lines.append(line.strip()) note_text = " ".join(note_lines) return note_text.strip() except Exception as e: print(f"⚠️ Note extraction failed: {e}") return "" # ========== 3️⃣ Gọi Gemini ========== def call_gemini_with_prompt(csv_text: str, note_text: str, model_choice: str, temperature: float, top_p: float): api_key = os.environ.get("GOOGLE_API_KEY", DEFAULT_API_KEY) genai.configure(api_key=api_key) model = genai.GenerativeModel( model_name=INTERNAL_MODEL_MAP.get(model_choice, "gemini-2.5-flash"), generation_config={"temperature": temperature, "top_p": top_p} ) prompt = f"""{PROMPT_FREIGHT_JSON} Below is the extracted freight rate table (CSV) and additional notes: Notes: {note_text or '[No notes detected]'} CSV: {csv_text} → Convert to valid JSON as per schema above. """ resp = model.generate_content(prompt) return getattr(resp, "text", str(resp)) # ========== 4️⃣ Main process ========== def run_process(file, question, model_choice, temperature, top_p, external_api_url): try: if file is None: return "❌ No file uploaded.", None file_bytes = _read_file_bytes(file) filename, mime = _guess_name_and_mime(file, file_bytes) print(f"[UPLOAD] {filename} ({mime})") if mime == "application/pdf" and check_pdf_structure(file_bytes): print("➡️ PDF has multi-page table → extract before Gemini.") with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp: tmp.write(file_bytes) tmp_path = tmp.name df = extract_pdf_tables(tmp_path) if not df.empty: note_text = extract_pdf_note(file_bytes) csv_text = df.to_csv(index=False) print("✅ Send table + note to Gemini...") message = call_gemini_with_prompt(csv_text, note_text, model_choice, temperature, top_p) return message, None else: print("⚠️ No valid table found → fallback to OCR Gemini.") # fallback OCR api_key = os.environ.get("GOOGLE_API_KEY", DEFAULT_API_KEY) genai.configure(api_key=api_key) model = genai.GenerativeModel( model_name=INTERNAL_MODEL_MAP.get(model_choice, "gemini-2.5-flash"), generation_config={"temperature": temperature, "top_p": top_p} ) uploaded = genai.upload_file(path=file.name) resp = model.generate_content([PROMPT_FREIGHT_JSON, uploaded]) genai.delete_file(uploaded.name) return getattr(resp, "text", str(resp)), None except Exception as e: return f"ERROR: {type(e).__name__}: {e}", None