import streamlit as st import io import requests import json import re import os from main import read_pdf, extract_key_phrases, score_sentences, summarize_text st.set_page_config(page_title="PDF Tools", layout="wide") MODELS = { "DeepSeek v3": { "api_url": "https://api.deepseek.com/v1/chat/completions", "model": "deepseek-chat", "key_env": "DEEPSEEK_API_KEY", "response_format": {"type": "json_object"}, }, "DeepSeek R1": { "api_url": "https://api.deepseek.com/v1/chat/completions", "model": "deepseek-reasoner", "key_env": "DEEPSEEK_API_KEY", "response_format": None, }, "OpenAI GPT-4.1": { "api_url": "https://api.openai.com/v1/chat/completions", "model": "gpt-4-1106-preview", "key_env": "OPENAI_API_KEY", "response_format": None, "extra_headers": {}, }, "Mistral Small": { "api_url": "https://openrouter.ai/api/v1/chat/completions", "model": "mistralai/mistral-small-3.1-24b-instruct:free", "key_env": "OPENROUTER_API_KEY", "response_format": {"type": "json_object"}, "extra_headers": { "HTTP-Referer": "https://huggingface.co", "X-Title": "Invoice Extractor", }, }, } def get_api_key(model_choice): key = os.getenv(MODELS[model_choice]["key_env"]) if not key: st.error(f"❌ {MODELS[model_choice]['key_env']} not set") st.stop() return key def query_llm(model_choice, prompt): cfg = MODELS[model_choice] headers = { "Authorization": f"Bearer {get_api_key(model_choice)}", "Content-Type": "application/json", } if cfg.get("extra_headers"): headers.update(cfg["extra_headers"]) payload = { "model": cfg["model"], "messages": [{"role": "user", "content": prompt}], "temperature": 0.1, "max_tokens": 2000, } if cfg.get("response_format"): payload["response_format"] = cfg["response_format"] try: with st.spinner(f"🔍 Querying {model_choice}..."): r = requests.post(cfg["api_url"], headers=headers, json=payload, timeout=90) if r.status_code != 200: if "No instances available" in r.text or r.status_code == 503: st.error(f"{model_choice} is currently unavailable. Please try again later or select another model.") else: st.error(f"🚨 API Error {r.status_code}: {r.text}") return None content = r.json()["choices"][0]["message"]["content"] st.session_state.last_api = content st.session_state.last_raw = r.text return content except Exception as e: st.error(f"Connection error: {e}") return None def clean_json_response(text): if not text: return None orig = text # strip ``` fences text = re.sub(r'```(?:json)?', '', text).strip() # find outer braces start, end = text.find('{'), text.rfind('}') + 1 if start < 0 or end < 1: st.error("Couldn't locate JSON in response.") st.code(orig) return None frag = text[start:end] # remove stray trailing commas frag = re.sub(r',\s*([}\]])', r'\1', frag) try: return json.loads(frag) except json.JSONDecodeError as e: # attempt to insert missing commas between adjacent fields repaired = re.sub(r'"\s*"\s*(?="[^"]+"\s*:)', '","', frag) try: return json.loads(repaired) except json.JSONDecodeError: st.error(f"JSON parse error: {e}") st.code(frag) return None def fallback_supplier(text): for line in text.splitlines(): line = line.strip() if line: return line return None def get_extraction_prompt(model_choice, txt): return ( "You are an expert invoice parser. " "Extract data according to the visible table structure and column headers in the invoice. " "For every line item, only extract fields that correspond to the table columns for that row (do not include header/shipment fields in line items). " "Merge all multi-line content within a single cell into that field (especially for the 'description' and 'notes'). " "Shipment/invoice-level fields such as CAR NUMBER, SHIPPING POINT, SHIPMENT NUMBER, CURRENCY, etc., must go ONLY into the 'invoice_header', not as line item fields.\n" "Use this schema:\n" '{\n' ' "invoice_header": {\n' ' "car_number": "string or null",\n' ' "shipment_number": "string or null",\n' ' "shipping_point": "string or null",\n' ' "currency": "string or null",\n' ' "invoice_number": "string or null",\n' ' "invoice_date": "string or null",\n' ' "order_number": "string or null",\n' ' "customer_order_number": "string or null",\n' ' "our_order_number": "string or null",\n' ' "sales_order_number": "string or null",\n' ' "purchase_order_number": "string or null",\n' ' "order_date": "string or null",\n' ' "supplier_name": "string or null",\n' ' "supplier_address": "string or null",\n' ' "supplier_phone": "string or null",\n' ' "supplier_email": "string or null",\n' ' "supplier_tax_id": "string or null",\n' ' "customer_name": "string or null",\n' ' "customer_address": "string or null",\n' ' "customer_phone": "string or null",\n' ' "customer_email": "string or null",\n' ' "customer_tax_id": "string or null",\n' ' "ship_to_name": "string or null",\n' ' "ship_to_address": "string or null",\n' ' "bill_to_name": "string or null",\n' ' "bill_to_address": "string or null",\n' ' "remit_to_name": "string or null",\n' ' "remit_to_address": "string or null",\n' ' "tax_id": "string or null",\n' ' "tax_registration_number": "string or null",\n' ' "vat_number": "string or null",\n' ' "payment_terms": "string or null",\n' ' "payment_method": "string or null",\n' ' "payment_reference": "string or null",\n' ' "bank_account_number": "string or null",\n' ' "iban": "string or null",\n' ' "swift_code": "string or null",\n' ' "total_before_tax": "string or null",\n' ' "tax_amount": "string or null",\n' ' "tax_rate": "string or null",\n' ' "shipping_charges": "string or null",\n' ' "discount": "string or null",\n' ' "total_due": "string or null",\n' ' "amount_paid": "string or null",\n' ' "balance_due": "string or null",\n' ' "due_date": "string or null",\n' ' "invoice_status": "string or null",\n' ' "reference_number": "string or null",\n' ' "project_code": "string or null",\n' ' "department": "string or null",\n' ' "contact_person": "string or null",\n' ' "notes": "string or null",\n' ' "additional_info": "string or null"\n' ' },\n' ' "line_items": [\n' ' {\n' ' "quantity": "string or null",\n' ' "units": "string or null",\n' ' "description": "string or null",\n' ' "footage": "string or null",\n' ' "price": "string or null",\n' ' "amount": "string or null",\n' ' "notes": "string or null"\n' ' }\n' ' ]\n' '}' "\nIf a field is missing for a line item or header, use null. " "Do not invent fields. Do not add any header or shipment data to any line item. Return ONLY the JSON object, no explanation.\n" "\nInvoice Text:\n" f"{txt}" ) def extract_invoice_info(model_choice, text): prompt = get_extraction_prompt(model_choice, text) raw = query_llm(model_choice, prompt) if not raw: return None data = clean_json_response(raw) if not data: return None # DeepSeek models: flat format, but we standardize to always return "invoice_header" and "line_items" if model_choice.startswith("DeepSeek"): # Put all keys except "line_items" into invoice_header header = {k: v for k, v in data.items() if k != "line_items"} items = data.get("line_items", []) if not isinstance(items, list): items = [] for itm in items: if not isinstance(itm, dict): continue for k in ("description","quantity","unit_price","total_price"): itm.setdefault(k, None) return {"invoice_header": header, "line_items": items} # Other models (OpenAI GPT-4.1, Mistral): expect proper structure hdr = data.get("invoice_header", {}) if not hdr and any(k in data for k in ("invoice_number","supplier_name","customer_name")): # If model returned flat, treat top-level keys as header hdr = data for k in ("invoice_number","invoice_date","po_number","invoice_value","supplier_name","customer_name"): hdr.setdefault(k, None) if not hdr.get("supplier_name"): hdr["supplier_name"] = fallback_supplier(text) items = data.get("line_items", []) if not isinstance(items, list): items = [] for itm in items: if not isinstance(itm, dict): continue for k in ("item_number","description","quantity","unit_price","total_price"): itm.setdefault(k, None) return {"invoice_header": hdr, "line_items": items} # ---- UI ---- tab1, tab2 = st.tabs(["PDF Summarizer","Invoice Extractor"]) with tab1: st.title("PDF → Bullet Points") pdf = st.file_uploader("Upload PDF", type="pdf") pct = st.slider("Summarization %", 1, 100, 20) if st.button("Summarize") and pdf: txt = read_pdf(io.BytesIO(pdf.getvalue())) keys = extract_key_phrases(txt) scores = score_sentences(txt, keys) n = max(1, len(scores)*pct//100) st.markdown(summarize_text(scores, num_points=n)) with tab2: st.title("Invoice Extractor") mdl = st.selectbox("Model", list(MODELS.keys()), key="extract_model") inv_pdf = st.file_uploader("Invoice PDF", type="pdf") extracted_info = None if st.button("Extract") and inv_pdf: txt = read_pdf(io.BytesIO(inv_pdf.getvalue())) extracted_info = extract_invoice_info(mdl, txt) if extracted_info: st.success("Extraction Complete") st.subheader("Invoice Metadata") st.table([{k.replace("_", " ").title(): v for k, v in extracted_info["invoice_header"].items()}]) st.subheader("Line Items") st.table(extracted_info["line_items"]) st.session_state["last_extracted_info"] = extracted_info # store in session # If we've already extracted info, or in this session, show further controls extracted_info = extracted_info or st.session_state.get("last_extracted_info", None) if extracted_info: st.markdown("---") st.subheader("📝 Fine-tune Extracted Data with Your Own Prompt") user_prompt = st.text_area( "Enter your prompt for further processing or transformation (the extracted JSON will be available as context).", height=120, key="custom_prompt" ) model_2 = st.selectbox("Model for Fine-Tuning Prompt", list(MODELS.keys()), key="refine_model") if st.button("Run Custom Prompt"): # Compose the prompt for the LLM, including the JSON and user's instruction refine_input = ( "Here is an extracted invoice in JSON format:\n" f"{json.dumps(extracted_info, indent=2)}\n" "Follow this instruction and return the result as a JSON object only (no explanation):\n" f"{user_prompt}" ) result = query_llm(model_2, refine_input) refined_json = clean_json_response(result) st.subheader("Fine-Tuned Output") if refined_json: st.json(refined_json) else: st.error("Could not parse a valid JSON output from the model.") st.caption("The prompt is run on the above-extracted fields as JSON. Try instructions like: 'Add a new field for net_amount (amount minus tax) to each line item', or 'Summarize the total quantity ordered', etc.") if "last_api" in st.session_state: with st.expander("Debug"): st.code(st.session_state.last_api) st.code(st.session_state.last_raw)