import streamlit as st import io import base64 import pandas as pd from PIL import Image from datetime import datetime import csv import json import os import requests # Optional PDF support via PyMuPDF try: import fitz # PyMuPDF PDF_SUPPORT = True except ImportError: PDF_SUPPORT = False # Optional HF Inference API client (for LLaVA serverless) try: from huggingface_hub import InferenceClient HF_CLIENT_AVAILABLE = True except ImportError: HF_CLIENT_AVAILABLE = False # --------------------------- # Page config (must be first Streamlit call) # --------------------------- st.set_page_config( page_title="EZOFIS AI OCR", page_icon="🔍", layout="wide", initial_sidebar_state="expanded" ) # --------------------------- # Global UI / Render constants (NOT args to set_page_config) # --------------------------- IMAGE_PREVIEW_WIDTH = 1000 PDF_RENDER_SCALE = 3.0 # --------------------------- # Secrets / Tokens # --------------------------- # OpenRouter + HF API OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY") # For OpenRouter models HF_TOKEN = os.getenv("HF_TOKEN") # For HF Inference API (LLaVA) # RunPod (secured, OpenAI-compatible) RUNPOD_SECURE_BASE_URL = os.getenv("RUNPOD_SECURE_BASE_URL", "").rstrip("/") # e.g. http://194.68.245.201:22156/v1 RUNPOD_SECURE_API_KEY = os.getenv("RUNPOD_SECURE_API_KEY") # optional RUNPOD_SECURE_MODEL = os.getenv("RUNPOD_SECURE_MODEL", "qwen2.5:32b-instruct") # set to your model id # --------------------------- # Helpers # --------------------------- def resize_image(image, max_size=1920): w, h = image.size if w > max_size or h > max_size: if w > h: nw = max_size nh = int(h * (max_size / w)) else: nh = max_size nw = int(w * (max_size / h)) return image.resize((nw, nh), Image.LANCZOS) return image def image_to_base64(image): buf = io.BytesIO() image.save(buf, format='JPEG') return base64.b64encode(buf.getvalue()).decode('utf-8') def extract_structured_data(content, fields): """Attempt to parse JSON object from model text.""" structured_data = {} try: if "```json" in content and "```" in content.split("```json")[1]: json_str = content.split("```json")[1].split("```")[0].strip() structured_data.update(json.loads(json_str)) else: try: maybe = json.loads(content) if isinstance(maybe, dict): structured_data.update(maybe) except Exception: pass except Exception: pass return structured_data def is_vision_model_name(name: str) -> bool: """Heuristic: treat models containing 'vl', 'vision', 'mm', or 'multimodal' as vision-capable.""" n = (name or "").lower() return any(k in n for k in ["vl", "vision", "mm", "multimodal"]) # --------------------------- # OpenRouter client (multimodal chat) # --------------------------- def query_openrouter(prompt: str, image_base64: str, model_id: str) -> str: if not OPENROUTER_API_KEY: raise RuntimeError("Missing OPENROUTER_API_KEY. Add it in your Space → Settings → Variables & secrets.") data_url = f"data:image/jpeg;base64,{image_base64}" payload = { "model": model_id, "messages": [ { "role": "user", "content": [ {"type": "text", "text": prompt}, {"type": "image_url", "image_url": {"url": data_url}} ] } ], "max_tokens": 800 } headers = { "Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json", "HTTP-Referer": st.secrets.get("SPACE_URL", "https://hf.space"), "X-Title": "EZOFIS AI OCR" } r = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=payload, timeout=120) r.raise_for_status() data = r.json() return data["choices"][0]["message"]["content"] # --------------------------- # HF Inference API client for LLaVA (serverless VQA-style) # --------------------------- @st.cache_resource def _hf_client(model_id: str): if not HF_CLIENT_AVAILABLE: raise RuntimeError("huggingface_hub not installed. Add it to requirements.txt.") if not HF_TOKEN: raise RuntimeError("Missing HF_TOKEN. Add it in your Space → Settings → Variables & secrets.") return InferenceClient(model=model_id, token=HF_TOKEN) def query_hf_llava_vqa(prompt: str, image_base64: str, model_id: str) -> str: client = _hf_client(model_id) image_bytes = base64.b64decode(image_base64) try: result = client.visual_question_answering(image=image_bytes, question=prompt) except TypeError: result = client.request( task="visual_question_answering", data={"inputs": {"question": prompt}}, files={"image": image_bytes} ) if isinstance(result, str): return result if isinstance(result, dict): return result.get("answer") or result.get("generated_text") or json.dumps(result, ensure_ascii=False) if isinstance(result, list) and result: first = result[0] if isinstance(first, dict): return first.get("answer") or first.get("generated_text") or json.dumps(first, ensure_ascii=False) return str(first) return str(result) # --------------------------- # RunPod (secured, OpenAI-compatible) # --------------------------- def _secured_openai_compatible(prompt: str, image_base64: str) -> str: """ Call your OpenAI-compatible server on RunPod/OpenWebUI/Ollama. Works with base URLs that already include /v1 or not. API key header is added only if provided. """ if not RUNPOD_SECURE_BASE_URL: raise RuntimeError("RUNPOD_SECURE_BASE_URL is missing.") base = RUNPOD_SECURE_BASE_URL.rstrip("/") if base.endswith("/v1"): url = f"{base}/chat/completions" else: url = f"{base}/v1/chat/completions" headers = {"Content-Type": "application/json"} if RUNPOD_SECURE_API_KEY: headers["Authorization"] = f"Bearer {RUNPOD_SECURE_API_KEY}" # If the configured model isn't vision-capable, send text-only content. model_name = RUNPOD_SECURE_MODEL vision_ok = is_vision_model_name(model_name) if vision_ok: data_url = f"data:image/jpeg;base64,{image_base64}" content = [ {"type": "text", "text": prompt}, {"type": "image_url", "image_url": {"url": data_url}} ] else: # Text-only fallback: no image is sent. content = [ {"type": "text", "text": f"{prompt}\n\n(Note: model configured as text-only; image not sent.)"} ] payload = { "model": model_name, "messages": [{"role": "user", "content": content}], "max_tokens": 800 } r = requests.post(url, headers=headers, json=payload, timeout=600) r.raise_for_status() js = r.json() return js["choices"][0]["message"]["content"] def query_runpod_secured(prompt: str, image_base64: str) -> str: return _secured_openai_compatible(prompt, image_base64) # --------------------------- # Router to pick the right backend by model selection # --------------------------- HF_LLaVA_LABEL = "llava-hf/llava-v1.6-mistral-7b-hf (HF API)" HF_LLaVA_ID = "llava-hf/llava-v1.6-mistral-7b-hf" RUNPOD_SECURE_LABEL = "RunPod (secured)" def run_vision_inference(prompt: str, img_b64: str, model_id: str) -> str: if model_id == HF_LLaVA_LABEL: return query_hf_llava_vqa(prompt, img_b64, HF_LLaVA_ID) if model_id == RUNPOD_SECURE_LABEL: return query_runpod_secured(prompt, img_b64) # All others go via OpenRouter return query_openrouter(prompt, img_b64, model_id) # --------------------------- # Core processing # --------------------------- def process_image(image, filename, fields=None, model=None): img_base64 = image_to_base64(resize_image(image)) if fields is None: prompt = "Describe this image in detail." content = run_vision_inference(prompt, img_base64, model) return {'filename': filename, 'description': content}, content, None else: fields_str = ", ".join(fields) prompt = ( "Extract the following fields from this image and return JSON only " f"with these exact keys: {fields_str}. If a field is missing, use an empty string." ) content = run_vision_inference(prompt, img_base64, model) structured_data = {'filename': filename} parsed = extract_structured_data(content, fields) if parsed: structured_data.update(parsed) return {'filename': filename, 'extraction': content}, content, structured_data def process_pdf(file_bytes, filename, fields=None, process_pages_separately=True, model=None): if not PDF_SUPPORT: yield None, None, None, filename, "PDF support requires PyMuPDF. Install pymupdf.", None return try: pdf_document = fitz.open(stream=file_bytes, filetype="pdf") page_count = len(pdf_document) def _render_page(page): # Higher-res, no alpha to keep RGB consistent pix = page.get_pixmap(matrix=fitz.Matrix(PDF_RENDER_SCALE, PDF_RENDER_SCALE), alpha=False) img = Image.frombytes("RGB", (pix.width, pix.height), pix.samples) return img if process_pages_separately: for page_num in range(page_count): page = pdf_document[page_num] img = _render_page(page) page_filename = f"{filename} (Page {page_num+1})" result, content, structured_data = process_image(img, page_filename, fields, model) yield page_num, page_count, img, page_filename, content, structured_data else: page = pdf_document[0] img = _render_page(page) result, content, structured_data = process_image(img, filename, fields, model) yield 0, page_count, img, filename, content, structured_data except Exception as e: yield None, None, None, filename, f"Error processing PDF: {str(e)}", None def create_download_buttons(results, structured_results, extraction_mode): st.header("Download Results") base_csv = io.StringIO() base_writer = csv.writer(base_csv) base_writer.writerow(['Filename', 'Description/Extraction']) for r in results: base_writer.writerow([r['filename'], r.get('description', r.get('extraction', ''))]) ts = datetime.now().strftime("%Y%m%d_%H%M%S") base_name = f"image_analysis_{ts}.csv" st.success("All files processed.") st.download_button( label="Download Results (CSV)", data=base_csv.getvalue(), file_name=base_name, mime="text/csv", use_container_width=True ) if extraction_mode == "Custom field extraction" and structured_results: all_fields = set(['filename']) for row in structured_results: all_fields.update(row.keys()) headers = sorted(list(all_fields)) buff = io.StringIO() w = csv.writer(buff) w.writerow(headers) for row in structured_results: w.writerow([row.get(h, '') for h in headers]) st.download_button( label="Download Structured Data (CSV)", data=buff.getvalue(), file_name=f"structured_data_{ts}.csv", mime="text/csv", use_container_width=True ) # --------------------------- # UI # --------------------------- st.title("EZOFIS AI OCR") if 'results' not in st.session_state: st.session_state.results = [] if 'structured_results' not in st.session_state: st.session_state.structured_results = [] with st.sidebar: st.header("Upload Files") uploaded_files = st.file_uploader( "Choose images or PDFs", accept_multiple_files=True, type=['png', 'jpg', 'jpeg', 'pdf'] ) st.header("Model Settings") selected_model = st.selectbox( "Choose vision model:", [ "google/gemma-3-4b-it", "google/gemma-3-12b-it", "openai/gpt-4.1", "openai/gpt-4.1-mini", "qwen/qwen2.5-vl-32b-instruct", # OpenRouter vision option HF_LLaVA_LABEL, # LLaVA via HF API RUNPOD_SECURE_LABEL # Your RunPod OpenAI-compatible server ], help=("OpenRouter uses OPENROUTER_API_KEY. " "LLaVA (HF API) uses HF_TOKEN. " "RunPod (secured) uses RUNPOD_SECURE_* env vars. " f"Current RunPod model: {RUNPOD_SECURE_MODEL}") ) # If RunPod model looks text-only, warn user if selected_model == RUNPOD_SECURE_LABEL and not is_vision_model_name(RUNPOD_SECURE_MODEL): st.warning( f"RunPod model '{RUNPOD_SECURE_MODEL}' appears text-only. " "Requests to this endpoint will NOT include images. " "Use a VL model (e.g. 'qwen2.5-vl:32b-instruct') for vision." ) extraction_mode = "General description" pdf_process_mode = "Process each page separately" fields = None if uploaded_files: st.write(f"Uploaded {len(uploaded_files)} file(s)") st.header("Data Extraction Options") extraction_mode = st.radio( "Choose extraction mode:", ["General description", "Custom field extraction"] ) if extraction_mode == "Custom field extraction": custom_fields = st.text_area( "Enter fields to extract (comma separated or your prompt here):", value="Invoice number, Date, Company name, Total amount" ) fields = [f.strip() for f in custom_fields.split(",") if f.strip()] if any(file.name.lower().endswith('.pdf') for file in uploaded_files): pdf_process_mode = st.radio( "How to process PDF files:", ["Process each page separately", "Process entire PDF as one document"] ) process_button = st.button("Process Files", use_container_width=True) else: process_button = False st.info("Upload images or PDFs to begin.") # Processing loop if uploaded_files and process_button: # Token checks by route can_run = False if selected_model == HF_LLaVA_LABEL: if not HF_CLIENT_AVAILABLE: st.error("huggingface_hub not installed. Add 'huggingface_hub' to requirements.txt.") elif not HF_TOKEN: st.error("HF_TOKEN is not set.") else: can_run = True elif selected_model == RUNPOD_SECURE_LABEL: if not RUNPOD_SECURE_BASE_URL: st.error("RUNPOD_SECURE_BASE_URL is not set.") else: can_run = True else: if not OPENROUTER_API_KEY: st.error("OPENROUTER_API_KEY is not set.") else: can_run = True if can_run: st.header("Processing Results") progress_bar = st.progress(0) status_text = st.empty() st.session_state.results = [] st.session_state.structured_results = [] total_items = 0 for f in uploaded_files: file_bytes = f.read() f.seek(0) if f.name.lower().endswith('.pdf') and PDF_SUPPORT: if pdf_process_mode == "Process each page separately": try: pdf_document = fitz.open(stream=file_bytes, filetype="pdf") total_items += len(pdf_document) except Exception: total_items += 1 else: total_items += 1 else: total_items += 1 processed_count = 0 for f in uploaded_files: file_bytes = f.read() f.seek(0) if f.name.lower().endswith('.pdf'): if not PDF_SUPPORT: st.error("PDF support requires PyMuPDF. Add 'pymupdf' to requirements.txt.") processed_count += 1 progress_bar.progress(processed_count / max(total_items, 1)) continue try: process_separately = pdf_process_mode == "Process each page separately" for page_info in process_pdf(file_bytes, f.name, fields, process_separately, selected_model): page_num, page_count, image, page_filename, content, structured_data = page_info if page_num is None: st.error(content) continue status_text.text(f"Processing {page_filename} ({page_num+1}/{page_count})") result = {'filename': page_filename, 'description': content} st.session_state.results.append(result) if structured_data and len(structured_data) > 1: st.session_state.structured_results.append(structured_data) st.subheader(page_filename) c1, c2 = st.columns([3, 2]) # give image more room with c1: st.image(image, width=IMAGE_PREVIEW_WIDTH) if page_count > 1 and not process_separately: st.info(f"PDF has {page_count} pages. Showing first page only.") with c2: st.write(content) if structured_data and len(structured_data) > 1: st.success("Extracted structured data") st.json(structured_data) st.divider() processed_count += 1 progress_bar.progress(min(processed_count / max(total_items, 1), 1.0)) except Exception as e: st.error(f"Error processing PDF {f.name}: {e}") processed_count += 1 progress_bar.progress(min(processed_count / max(total_items, 1), 1.0)) else: try: status_text.text(f"Processing image {f.name}") image = Image.open(f).convert("RGB") result, content, structured_data = process_image(image, f.name, fields, selected_model) st.session_state.results.append(result) if structured_data and len(structured_data) > 1: st.session_state.structured_results.append(structured_data) st.subheader(f"Image: {f.name}") c1, c2 = st.columns([3, 2]) with c1: st.image(image, width=IMAGE_PREVIEW_WIDTH) with c2: st.write(content) if structured_data and len(structured_data) > 1: st.success("Extracted structured data") st.json(structured_data) st.divider() except Exception as e: st.error(f"Error processing image {f.name}: {e}") processed_count += 1 progress_bar.progress(min(processed_count / max(total_items, 1), 1.0)) status_text.text("Processing complete.") if st.session_state.results: create_download_buttons( st.session_state.results, st.session_state.structured_results, extraction_mode ) if not uploaded_files: st.info("Upload files using the sidebar to get started.") st.write(""" How to use: 1) Upload one or more images or PDFs 2) Choose a model: - OpenRouter: Gemma-3 4B/12B, GPT-4.1/4.1-mini, Qwen2.5-VL-32B - HF API: LLaVA v1.6 Mistral-7B - RunPod (secured): OpenAI-compatible base URL (supports images only if the model is VL) 3) Pick description or custom field extraction 4) For PDFs, choose page-by-page or first page 5) Click Process Files 6) Review outputs and download CSVs """) st.markdown("---") st.markdown( """