Spaces:
Sleeping
Sleeping
File size: 14,243 Bytes
c289504 cdae312 0592d14 cdae312 0592d14 2682cc6 0592d14 c289504 0592d14 50aaed9 0592d14 c289504 0592d14 7b9561b 0592d14 7b9561b 0592d14 796b6f2 0592d14 796b6f2 0592d14 796b6f2 0592d14 5959a3c c1904cf 0592d14 796b6f2 0592d14 7b9561b 0592d14 cdae312 0ce55d2 0592d14 bec67ce 2682cc6 0592d14 e3ce562 2682cc6 b3c1ec9 2682cc6 b3c1ec9 3a1cba3 2682cc6 b3c1ec9 2682cc6 b3c1ec9 0592d14 e6cd773 b3c1ec9 0592d14 77a95c0 0592d14 b3c1ec9 0592d14 e6cd773 0592d14 2682cc6 0592d14 2682cc6 0592d14 2682cc6 0592d14 918613a 0592d14 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 | import streamlit as st
import requests
import json
import re
import os
import time
import mimetypes
st.set_page_config(page_title="PDF Tools", layout="wide")
# -------- LLM Model Setup (same as before) --------
MODELS = {
"DeepSeek v3": {
"api_url": "https://api.deepseek.com/v1/chat/completions",
"model": "deepseek-chat",
"key_env": "DEEPSEEK_API_KEY",
"response_format": {"type": "json_object"},
},
"DeepSeek R1": {
"api_url": "https://api.deepseek.com/v1/chat/completions",
"model": "deepseek-reasoner",
"key_env": "DEEPSEEK_API_KEY",
"response_format": None,
},
"OpenAI GPT-4.1": {
"api_url": "https://api.openai.com/v1/chat/completions",
"model": "gpt-4-1106-preview",
"key_env": "OPENAI_API_KEY",
"response_format": None,
"extra_headers": {},
},
"Mistral Small": {
"api_url": "https://openrouter.ai/api/v1/chat/completions",
"model": "mistralai/ministral-8b",
"key_env": "OPENROUTER_API_KEY",
"response_format": {"type": "json_object"},
"extra_headers": {
"HTTP-Referer": "https://huggingface.co",
"X-Title": "Invoice Extractor",
},
},
}
def get_api_key(model_choice):
key = os.getenv(MODELS[model_choice]["key_env"])
if not key:
st.error(f"❌ {MODELS[model_choice]['key_env']} not set")
st.stop()
return key
def query_llm(model_choice, prompt):
cfg = MODELS[model_choice]
headers = {
"Authorization": f"Bearer {get_api_key(model_choice)}",
"Content-Type": "application/json",
}
if cfg.get("extra_headers"):
headers.update(cfg["extra_headers"])
payload = {
"model": cfg["model"],
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.1,
"max_tokens": 2000,
}
if cfg.get("response_format"):
payload["response_format"] = cfg["response_format"]
try:
with st.spinner(f"🔍 Querying {model_choice}..."):
r = requests.post(cfg["api_url"], headers=headers, json=payload, timeout=90)
if r.status_code != 200:
if "No instances available" in r.text or r.status_code == 503:
st.error(f"{model_choice} is currently unavailable. Please try again later or select another model.")
else:
st.error(f"🚨 API Error {r.status_code}: {r.text}")
return None
content = r.json()["choices"][0]["message"]["content"]
st.session_state.last_api = content
st.session_state.last_raw = r.text
return content
except Exception as e:
st.error(f"Connection error: {e}")
return None
def clean_json_response(text):
if not text:
return None
orig = text
text = re.sub(r'```(?:json)?', '', text).strip()
start, end = text.find('{'), text.rfind('}') + 1
if start < 0 or end < 1:
st.error("Couldn't locate JSON in response.")
st.code(orig)
return None
frag = text[start:end]
frag = re.sub(r',\s*([}\]])', r'\1', frag)
try:
return json.loads(frag)
except json.JSONDecodeError as e:
repaired = re.sub(r'"\s*"\s*(?="[^"]+"\s*:)', '","', frag)
try:
return json.loads(repaired)
except json.JSONDecodeError:
st.error(f"JSON parse error: {e}")
st.code(frag)
return None
def fallback_supplier(text):
for line in text.splitlines():
line = line.strip()
if line:
return line
return None
def get_extraction_prompt(model_choice, txt):
return (
"You are an expert invoice parser. "
"Extract data according to the visible table structure and column headers in the invoice. "
"For every line item, only extract fields that correspond to the table columns for that row (do not include header/shipment fields in line items). "
"Merge all multi-line content within a single cell into that field (especially for the 'description' and 'notes'). "
"Shipment/invoice-level fields such as CAR NUMBER, SHIPPING POINT, SHIPMENT NUMBER, CURRENCY, etc., must go ONLY into the 'invoice_header', not as line item fields.\n"
"Use this schema:\n"
'{\n'
' "invoice_header": {\n'
' "car_number": "string or null",\n'
' "shipment_number": "string or null",\n'
' "shipping_point": "string or null",\n'
' "currency": "string or null",\n'
' "invoice_number": "string or null",\n'
' "invoice_date": "string or null",\n'
' "order_number": "string or null",\n'
' "customer_order_number": "string or null",\n'
' "our_order_number": "string or null",\n'
' "sales_order_number": "string or null",\n'
' "purchase_order_number": "string or null",\n'
' "order_date": "string or null",\n'
' "supplier_name": "string or null",\n'
' "supplier_address": "string or null",\n'
' "supplier_phone": "string or null",\n'
' "supplier_email": "string or null",\n'
' "supplier_tax_id": "string or null",\n'
' "customer_name": "string or null",\n'
' "customer_address": "string or null",\n'
' "customer_phone": "string or null",\n'
' "customer_email": "string or null",\n'
' "customer_tax_id": "string or null",\n'
' "ship_to_name": "string or null",\n'
' "ship_to_address": "string or null",\n'
' "bill_to_name": "string or null",\n'
' "bill_to_address": "string or null",\n'
' "remit_to_name": "string or null",\n'
' "remit_to_address": "string or null",\n'
' "tax_id": "string or null",\n'
' "tax_registration_number": "string or null",\n'
' "vat_number": "string or null",\n'
' "payment_terms": "string or null",\n'
' "payment_method": "string or null",\n'
' "payment_reference": "string or null",\n'
' "bank_account_number": "string or null",\n'
' "iban": "string or null",\n'
' "swift_code": "string or null",\n'
' "total_before_tax": "string or null",\n'
' "tax_amount": "string or null",\n'
' "tax_rate": "string or null",\n'
' "shipping_charges": "string or null",\n'
' "discount": "string or null",\n'
' "total_due": "string or null",\n'
' "amount_paid": "string or null",\n'
' "balance_due": "string or null",\n'
' "due_date": "string or null",\n'
' "invoice_status": "string or null",\n'
' "reference_number": "string or null",\n'
' "project_code": "string or null",\n'
' "department": "string or null",\n'
' "contact_person": "string or null",\n'
' "notes": "string or null",\n'
' "additional_info": "string or null"\n'
' },\n'
' "line_items": [\n'
' {\n'
' "quantity": "string or null",\n'
' "units": "string or null",\n'
' "description": "string or null",\n'
' "footage": "string or null",\n'
' "price": "string or null",\n'
' "amount": "string or null",\n'
' "notes": "string or null"\n'
' }\n'
' ]\n'
'}'
"\nIf a field is missing for a line item or header, use null. "
"Do not invent fields. Do not add any header or shipment data to any line item. Return ONLY the JSON object, no explanation.\n"
"\nInvoice Text:\n"
f"{txt}"
)
def extract_invoice_info(model_choice, text):
prompt = get_extraction_prompt(model_choice, text)
raw = query_llm(model_choice, prompt)
if not raw:
return None
data = clean_json_response(raw)
if not data:
return None
if model_choice.startswith("DeepSeek"):
header = {k: v for k, v in data.items() if k != "line_items"}
items = data.get("line_items", [])
if not isinstance(items, list):
items = []
for itm in items:
if not isinstance(itm, dict):
continue
for k in ("description","quantity","unit_price","total_price"):
itm.setdefault(k, None)
return {"invoice_header": header, "line_items": items}
hdr = data.get("invoice_header", {})
if not hdr and any(k in data for k in ("invoice_number","supplier_name","customer_name")):
hdr = data
for k in ("invoice_number","invoice_date","po_number","invoice_value","supplier_name","customer_name"):
hdr.setdefault(k, None)
if not hdr.get("supplier_name"):
hdr["supplier_name"] = fallback_supplier(text)
items = data.get("line_items", [])
if not isinstance(items, list):
items = []
for itm in items:
if not isinstance(itm, dict):
continue
for k in ("item_number","description","quantity","unit_price","total_price"):
itm.setdefault(k, None)
return {"invoice_header": hdr, "line_items": items}
# --------- File type/content-type detection ---------
def get_content_type(filename):
mime, _ = mimetypes.guess_type(filename)
ext = filename.lower().split('.')[-1]
# Special case for PDF (Unstract quirk)
if ext == "pdf":
return "text/plain"
if mime is None:
return "application/octet-stream"
return mime
# --------- UNSTRACT API Multi-file PDF/Doc/Image-to-Text ---------
UNSTRACT_BASE = "https://llmwhisperer-api.us-central.unstract.com/api/v2"
UNSTRACT_API_KEY = os.getenv("UNSTRACT_API_KEY") # Set this in your environment!
def extract_text_from_unstract(uploaded_file):
filename = getattr(uploaded_file, "name", "uploaded_file")
file_bytes = uploaded_file.read()
content_type = get_content_type(filename)
headers = {
"unstract-key": UNSTRACT_API_KEY,
"Content-Type": content_type,
}
url = f"{UNSTRACT_BASE}/whisper"
with st.spinner("Uploading and processing document with Unstract..."):
r = requests.post(url, headers=headers, data=file_bytes)
if r.status_code != 202:
st.error(f"Unstract: Error uploading file: {r.status_code} - {r.text}")
return None
whisper_hash = r.json().get("whisper_hash")
if not whisper_hash:
st.error("Unstract: No whisper_hash received.")
return None
status_url = f"{UNSTRACT_BASE}/whisper-status?whisper_hash={whisper_hash}"
for i in range(30): # Wait up to 60s (2s x 30)
status_r = requests.get(status_url, headers={"unstract-key": UNSTRACT_API_KEY})
if status_r.status_code != 200:
st.error(f"Unstract: Error checking status: {status_r.status_code} - {status_r.text}")
return None
status = status_r.json().get("status")
if status == "processed":
break
st.info(f"Unstract status: {status or 'waiting'}... ({i+1})")
time.sleep(2)
else:
st.error("Unstract: Timeout waiting for OCR to finish.")
return None
retrieve_url = f"{UNSTRACT_BASE}/whisper-retrieve?whisper_hash={whisper_hash}&text_only=true"
r = requests.get(retrieve_url, headers={"unstract-key": UNSTRACT_API_KEY})
if r.status_code != 200:
st.error(f"Unstract: Error retrieving extracted text: {r.status_code} - {r.text}")
return None
try:
data = r.json()
return data.get("result_text") or r.text
except Exception:
return r.text
# --------- INVOICE EXTRACTOR UI ---------
st.title("Invoice/Document Extractor")
mdl = st.selectbox("Model", list(MODELS.keys()), key="extract_model")
inv_file = st.file_uploader(
"Invoice or Document File",
type=["pdf", "docx", "xlsx", "xls", "png", "jpg", "jpeg", "tiff"]
)
extracted_info = None
if st.button("Extract") and inv_file:
with st.spinner("Extracting text from document using Unstract..."):
text = extract_text_from_unstract(inv_file)
if text:
extracted_info = extract_invoice_info(mdl, text)
if extracted_info:
st.success("Extraction Complete")
st.subheader("Invoice Metadata")
st.table([{k.replace("_", " ").title(): v for k, v in extracted_info["invoice_header"].items()}])
st.subheader("Line Items")
st.table(extracted_info["line_items"])
st.session_state["last_extracted_info"] = extracted_info # store in session
# If we've already extracted info, or in this session, show further controls
extracted_info = extracted_info or st.session_state.get("last_extracted_info", None)
if extracted_info:
st.markdown("---")
st.subheader("📝 Fine-tune Extracted Data with Your Own Prompt")
user_prompt = st.text_area(
"Enter your prompt for further processing or transformation (the extracted JSON will be available as context).",
height=120,
key="custom_prompt"
)
model_2 = st.selectbox("Model for Fine-Tuning Prompt", list(MODELS.keys()), key="refine_model")
if st.button("Run Custom Prompt"):
refine_input = (
"Here is an extracted invoice in JSON format:\n"
f"{json.dumps(extracted_info, indent=2)}\n"
"Follow this instruction and return the result as a JSON object only (no explanation):\n"
f"{user_prompt}"
)
result = query_llm(model_2, refine_input)
refined_json = clean_json_response(result)
st.subheader("Fine-Tuned Output")
if refined_json:
st.json(refined_json)
else:
st.error("Could not parse a valid JSON output from the model.")
st.caption("The prompt is run on the above-extracted fields as JSON. Try instructions like: 'Add a new field for net_amount (amount minus tax) to each line item', or 'Summarize the total quantity ordered', etc.")
if "last_api" in st.session_state:
with st.expander("Debug"):
st.code(st.session_state.last_api)
st.code(st.session_state.last_raw)
|