air_flow / app.py
vithacocf's picture
Update app.py
bf0f7cb verified
raw
history blame
8.36 kB
import os, io, tempfile, mimetypes, camelot, pdfplumber, pandas as pd, google.generativeai as genai
import re
DEFAULT_API_KEY = "AIzaSyBbK-1P3JD6HPyE3QLhkOps6_-Xo3wUFbs"
INTERNAL_MODEL_MAP = {
"Gemini 2.5 Flash": "gemini-2.5-flash",
"Gemini 2.5 Pro": "gemini-2.5-pro",
}
PROMPT_FREIGHT_JSON = """
Please analyze the freight rate table in the file I provide and convert it into JSON in the following structure:
{
"shipping_line": "...",
"shipping_line_code": "...",
"shipping_line_reason": "Why this carrier is chosen?",
"fee_type": "Air Freight",
"valid_from": ...,
"valid_to": ...,
"charges": [
{
"frequency": "...",
"package_type": "...",
"aircraft_type": "...",
"direction": "Export or Import or null",
"origin": "...",
"destination": "...",
"charge_name": "...",
"charge_code": "...",
"charge_code_reason": "...",
"cargo_type": "...",
"currency": "...",
"transit": "...",
"transit_time": "...",
"weight_breaks": {
"M": ...,
"N": ...,
"+45kg": ...,
"+100kg": ...,
"+300kg": ...,
"+500kg": ...,
"+1000kg": ...,
"other": {
key: value
},
"weight_breaks_reason":"Why chosen weight_breaks?"
},
"remark": "..."
}
],
"local_charges": [
{
"charge_name": "...",
"charge_code": "...",
"unit": "...",
"amount": ...,
"remark": "..."
}
]
}
### Date rules
- valid_from format:
- `DD/MM/YYYY` (if full date)
- `01/MM/YYYY` (if month+year only)
- `01/01/YYYY` (if year only)
- `UFN` if missing
- valid_to:
- exact `DD/MM/YYYY` if present
- else `UFN`
STRICT RULES:
- ONLY return a single JSON object as specified above.
- All rates must exactly match the corresponding weight break columns (M,N,45kg, 100kg, 300kg, 500kg, 1000kg, etc.). set null if N/A. No assumptions or interpolations.
- If the table shows "RQ" or similar, set value as "RQST".
- Group same-price destinations into one record separated by "/".
- Always use IATA code for origin and destination.
- Flight number (e.g. ZH118) is not charge code.
- Frequency: D[1-7]; 'Daily' = D1234567. Join multiple (e.g. D3,D4→D34).
- If local charges exist, list them.
- If validity missing, set null.
- Direction: Export if origin is Vietnam (SGN, HAN, DAD...), else Import.
- Provide short plain English reasons for "shipping_line_reason" & "charge_code_reason".
- Replace commas in remarks with semicolons.
- Only return JSON.
"""
# ========== Helpers ==========
def _read_file_bytes(upload):
if isinstance(upload, str):
with open(upload, "rb") as f: return f.read()
elif hasattr(upload, "read"):
return upload.read()
raise TypeError("Unsupported file input")
def _guess_name_and_mime(file, file_bytes):
filename = os.path.basename(file.name if hasattr(file, "name") else str(file))
mime, _ = mimetypes.guess_type(filename)
if not mime and file_bytes[:4] == b"%PDF": mime = "application/pdf"
return filename, mime or "application/octet-stream"
def check_pdf_structure(file_bytes: bytes) -> bool:
try:
with pdfplumber.open(io.BytesIO(file_bytes)) as pdf:
if len(pdf.pages) <= 2: return False
for page in pdf.pages[:3]:
if page.find_tables(): return True
return False
except Exception as e:
print("PDF check error:", e); return False
# ========== 1️⃣ Extract bảng bằng Camelot ==========
def extract_pdf_tables(file_path: str) -> pd.DataFrame:
all_dfs = []
try:
print("🔍 Try lattice mode...")
tables = camelot.read_pdf(file_path, flavor="lattice", pages="all")
if tables.n > 0:
for t in tables: all_dfs.append(t.df)
print(f"✅ Lattice: {tables.n} tables.")
except Exception as e:
print(f"⚠️ Lattice failed: {e}")
if not all_dfs:
try:
print("🔁 Try stream mode...")
tables = camelot.read_pdf(file_path, flavor="stream", pages="all")
if tables.n > 0:
for t in tables: all_dfs.append(t.df)
print(f"✅ Stream: {tables.n} tables.")
except Exception as e:
print(f"❌ Stream failed: {e}")
if not all_dfs:
print("🚫 No table detected.")
return pd.DataFrame()
df_final = pd.concat(all_dfs, ignore_index=True)
if all(str(c).isdigit() for c in df_final.columns):
print("🧠 Detected numeric headers (0,1,2..), using first row as real header.")
df_final.columns = df_final.iloc[0]
df_final = df_final[1:]
df_final = df_final.dropna(how="all").reset_index(drop=True)
print(f"✅ Total: {len(df_final)} rows × {len(df_final.columns)} columns.")
return df_final
# ========== 2️⃣ Extract phần Note / Header ==========
def extract_pdf_note(file_bytes: bytes) -> str:
"""
Lấy phần text ở đầu PDF (ví dụ: Start Date, Expiry Date, Origin, các note nhỏ)
Bỏ qua vùng bảng phía dưới.
"""
try:
with pdfplumber.open(io.BytesIO(file_bytes)) as pdf:
first_page = pdf.pages[0]
text = first_page.extract_text() or ""
# cắt phần note: chỉ lấy 15 dòng đầu để tránh trích luôn bảng
lines = text.splitlines()[:15]
note_lines = []
for line in lines:
if re.search(r"(Start Date|Origin|Expiry|Product|MY|SC|All rates|Currency)", line, re.I):
note_lines.append(line.strip())
note_text = " ".join(note_lines)
return note_text.strip()
except Exception as e:
print(f"⚠️ Note extraction failed: {e}")
return ""
# ========== 3️⃣ Gọi Gemini ==========
def call_gemini_with_prompt(csv_text: str, note_text: str, model_choice: str, temperature: float, top_p: float):
api_key = os.environ.get("GOOGLE_API_KEY", DEFAULT_API_KEY)
genai.configure(api_key=api_key)
model = genai.GenerativeModel(
model_name=INTERNAL_MODEL_MAP.get(model_choice, "gemini-2.5-flash"),
generation_config={"temperature": temperature, "top_p": top_p}
)
prompt = f"""{PROMPT_FREIGHT_JSON}
Below is the extracted freight rate table (CSV) and additional notes:
Notes:
{note_text or '[No notes detected]'}
CSV:
{csv_text}
→ Convert to valid JSON as per schema above.
"""
resp = model.generate_content(prompt)
return getattr(resp, "text", str(resp))
# ========== 4️⃣ Main process ==========
def run_process(file, question, model_choice, temperature, top_p, external_api_url):
try:
if file is None:
return "❌ No file uploaded.", None
file_bytes = _read_file_bytes(file)
filename, mime = _guess_name_and_mime(file, file_bytes)
print(f"[UPLOAD] {filename} ({mime})")
if mime == "application/pdf" and check_pdf_structure(file_bytes):
print("➡️ PDF has multi-page table → extract before Gemini.")
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp:
tmp.write(file_bytes)
tmp_path = tmp.name
df = extract_pdf_tables(tmp_path)
if not df.empty:
note_text = extract_pdf_note(file_bytes)
csv_text = df.to_csv(index=False)
print("✅ Send table + note to Gemini...")
message = call_gemini_with_prompt(csv_text, note_text, model_choice, temperature, top_p)
return message, None
else:
print("⚠️ No valid table found → fallback to OCR Gemini.")
# fallback OCR
api_key = os.environ.get("GOOGLE_API_KEY", DEFAULT_API_KEY)
genai.configure(api_key=api_key)
model = genai.GenerativeModel(
model_name=INTERNAL_MODEL_MAP.get(model_choice, "gemini-2.5-flash"),
generation_config={"temperature": temperature, "top_p": top_p}
)
uploaded = genai.upload_file(path=file.name)
resp = model.generate_content([PROMPT_FREIGHT_JSON, uploaded])
genai.delete_file(uploaded.name)
return getattr(resp, "text", str(resp)), None
except Exception as e:
return f"ERROR: {type(e).__name__}: {e}", None