Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import io | |
| import base64 | |
| import pandas as pd | |
| from PIL import Image | |
| from datetime import datetime | |
| import csv | |
| import json | |
| import os | |
| import requests | |
| # Optional PDF support via PyMuPDF | |
| try: | |
| import fitz # PyMuPDF | |
| PDF_SUPPORT = True | |
| except ImportError: | |
| PDF_SUPPORT = False | |
| # Optional HF Inference API client (for LLaVA serverless) | |
| try: | |
| from huggingface_hub import InferenceClient | |
| HF_CLIENT_AVAILABLE = True | |
| except ImportError: | |
| HF_CLIENT_AVAILABLE = False | |
| # --------------------------- | |
| # Page config (must be first Streamlit call) | |
| # --------------------------- | |
| st.set_page_config( | |
| page_title="EZOFIS AI OCR", | |
| page_icon="π", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| # --------------------------- | |
| # Global UI / Render constants (NOT args to set_page_config) | |
| # --------------------------- | |
| IMAGE_PREVIEW_WIDTH = 1000 | |
| PDF_RENDER_SCALE = 3.0 | |
| # --------------------------- | |
| # Secrets / Tokens | |
| # --------------------------- | |
| # OpenRouter + HF API | |
| OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY") # For OpenRouter models | |
| HF_TOKEN = os.getenv("HF_TOKEN") # For HF Inference API (LLaVA) | |
| # RunPod (secured, OpenAI-compatible) | |
| RUNPOD_SECURE_BASE_URL = os.getenv("RUNPOD_SECURE_BASE_URL", "").rstrip("/") # e.g. http://194.68.245.201:22156/v1 | |
| RUNPOD_SECURE_API_KEY = os.getenv("RUNPOD_SECURE_API_KEY") # optional | |
| RUNPOD_SECURE_MODEL = os.getenv("RUNPOD_SECURE_MODEL", "qwen2.5:32b-instruct") # set to your model id | |
| # --------------------------- | |
| # Helpers | |
| # --------------------------- | |
| def resize_image(image, max_size=1920): | |
| w, h = image.size | |
| if w > max_size or h > max_size: | |
| if w > h: | |
| nw = max_size | |
| nh = int(h * (max_size / w)) | |
| else: | |
| nh = max_size | |
| nw = int(w * (max_size / h)) | |
| return image.resize((nw, nh), Image.LANCZOS) | |
| return image | |
| def image_to_base64(image): | |
| buf = io.BytesIO() | |
| image.save(buf, format='JPEG') | |
| return base64.b64encode(buf.getvalue()).decode('utf-8') | |
| def extract_structured_data(content, fields): | |
| """Attempt to parse JSON object from model text.""" | |
| structured_data = {} | |
| try: | |
| if "```json" in content and "```" in content.split("```json")[1]: | |
| json_str = content.split("```json")[1].split("```")[0].strip() | |
| structured_data.update(json.loads(json_str)) | |
| else: | |
| try: | |
| maybe = json.loads(content) | |
| if isinstance(maybe, dict): | |
| structured_data.update(maybe) | |
| except Exception: | |
| pass | |
| except Exception: | |
| pass | |
| return structured_data | |
| def is_vision_model_name(name: str) -> bool: | |
| """Heuristic: treat models containing 'vl', 'vision', 'mm', or 'multimodal' as vision-capable.""" | |
| n = (name or "").lower() | |
| return any(k in n for k in ["vl", "vision", "mm", "multimodal"]) | |
| # --------------------------- | |
| # OpenRouter client (multimodal chat) | |
| # --------------------------- | |
| def query_openrouter(prompt: str, image_base64: str, model_id: str) -> str: | |
| if not OPENROUTER_API_KEY: | |
| raise RuntimeError("Missing OPENROUTER_API_KEY. Add it in your Space β Settings β Variables & secrets.") | |
| data_url = f"data:image/jpeg;base64,{image_base64}" | |
| payload = { | |
| "model": model_id, | |
| "messages": [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": prompt}, | |
| {"type": "image_url", "image_url": {"url": data_url}} | |
| ] | |
| } | |
| ], | |
| "max_tokens": 800 | |
| } | |
| headers = { | |
| "Authorization": f"Bearer {OPENROUTER_API_KEY}", | |
| "Content-Type": "application/json", | |
| "HTTP-Referer": st.secrets.get("SPACE_URL", "https://hf.space"), | |
| "X-Title": "EZOFIS AI OCR" | |
| } | |
| r = requests.post("https://openrouter.ai/api/v1/chat/completions", | |
| headers=headers, json=payload, timeout=120) | |
| r.raise_for_status() | |
| data = r.json() | |
| return data["choices"][0]["message"]["content"] | |
| # --------------------------- | |
| # HF Inference API client for LLaVA (serverless VQA-style) | |
| # --------------------------- | |
| def _hf_client(model_id: str): | |
| if not HF_CLIENT_AVAILABLE: | |
| raise RuntimeError("huggingface_hub not installed. Add it to requirements.txt.") | |
| if not HF_TOKEN: | |
| raise RuntimeError("Missing HF_TOKEN. Add it in your Space β Settings β Variables & secrets.") | |
| return InferenceClient(model=model_id, token=HF_TOKEN) | |
| def query_hf_llava_vqa(prompt: str, image_base64: str, model_id: str) -> str: | |
| client = _hf_client(model_id) | |
| image_bytes = base64.b64decode(image_base64) | |
| try: | |
| result = client.visual_question_answering(image=image_bytes, question=prompt) | |
| except TypeError: | |
| result = client.request( | |
| task="visual_question_answering", | |
| data={"inputs": {"question": prompt}}, | |
| files={"image": image_bytes} | |
| ) | |
| if isinstance(result, str): | |
| return result | |
| if isinstance(result, dict): | |
| return result.get("answer") or result.get("generated_text") or json.dumps(result, ensure_ascii=False) | |
| if isinstance(result, list) and result: | |
| first = result[0] | |
| if isinstance(first, dict): | |
| return first.get("answer") or first.get("generated_text") or json.dumps(first, ensure_ascii=False) | |
| return str(first) | |
| return str(result) | |
| # --------------------------- | |
| # RunPod (secured, OpenAI-compatible) | |
| # --------------------------- | |
| def _secured_openai_compatible(prompt: str, image_base64: str) -> str: | |
| """ | |
| Call your OpenAI-compatible server on RunPod/OpenWebUI/Ollama. | |
| Works with base URLs that already include /v1 or not. | |
| API key header is added only if provided. | |
| """ | |
| if not RUNPOD_SECURE_BASE_URL: | |
| raise RuntimeError("RUNPOD_SECURE_BASE_URL is missing.") | |
| base = RUNPOD_SECURE_BASE_URL.rstrip("/") | |
| if base.endswith("/v1"): | |
| url = f"{base}/chat/completions" | |
| else: | |
| url = f"{base}/v1/chat/completions" | |
| headers = {"Content-Type": "application/json"} | |
| if RUNPOD_SECURE_API_KEY: | |
| headers["Authorization"] = f"Bearer {RUNPOD_SECURE_API_KEY}" | |
| # If the configured model isn't vision-capable, send text-only content. | |
| model_name = RUNPOD_SECURE_MODEL | |
| vision_ok = is_vision_model_name(model_name) | |
| if vision_ok: | |
| data_url = f"data:image/jpeg;base64,{image_base64}" | |
| content = [ | |
| {"type": "text", "text": prompt}, | |
| {"type": "image_url", "image_url": {"url": data_url}} | |
| ] | |
| else: | |
| # Text-only fallback: no image is sent. | |
| content = [ | |
| {"type": "text", "text": f"{prompt}\n\n(Note: model configured as text-only; image not sent.)"} | |
| ] | |
| payload = { | |
| "model": model_name, | |
| "messages": [{"role": "user", "content": content}], | |
| "max_tokens": 800 | |
| } | |
| r = requests.post(url, headers=headers, json=payload, timeout=600) | |
| r.raise_for_status() | |
| js = r.json() | |
| return js["choices"][0]["message"]["content"] | |
| def query_runpod_secured(prompt: str, image_base64: str) -> str: | |
| return _secured_openai_compatible(prompt, image_base64) | |
| # --------------------------- | |
| # Router to pick the right backend by model selection | |
| # --------------------------- | |
| HF_LLaVA_LABEL = "llava-hf/llava-v1.6-mistral-7b-hf (HF API)" | |
| HF_LLaVA_ID = "llava-hf/llava-v1.6-mistral-7b-hf" | |
| RUNPOD_SECURE_LABEL = "RunPod (secured)" | |
| def run_vision_inference(prompt: str, img_b64: str, model_id: str) -> str: | |
| if model_id == HF_LLaVA_LABEL: | |
| return query_hf_llava_vqa(prompt, img_b64, HF_LLaVA_ID) | |
| if model_id == RUNPOD_SECURE_LABEL: | |
| return query_runpod_secured(prompt, img_b64) | |
| # All others go via OpenRouter | |
| return query_openrouter(prompt, img_b64, model_id) | |
| # --------------------------- | |
| # Core processing | |
| # --------------------------- | |
| def process_image(image, filename, fields=None, model=None): | |
| img_base64 = image_to_base64(resize_image(image)) | |
| if fields is None: | |
| prompt = "Describe this image in detail." | |
| content = run_vision_inference(prompt, img_base64, model) | |
| return {'filename': filename, 'description': content}, content, None | |
| else: | |
| fields_str = ", ".join(fields) | |
| prompt = ( | |
| "Extract the following fields from this image and return JSON only " | |
| f"with these exact keys: {fields_str}. If a field is missing, use an empty string." | |
| ) | |
| content = run_vision_inference(prompt, img_base64, model) | |
| structured_data = {'filename': filename} | |
| parsed = extract_structured_data(content, fields) | |
| if parsed: | |
| structured_data.update(parsed) | |
| return {'filename': filename, 'extraction': content}, content, structured_data | |
| def process_pdf(file_bytes, filename, fields=None, process_pages_separately=True, model=None): | |
| if not PDF_SUPPORT: | |
| yield None, None, None, filename, "PDF support requires PyMuPDF. Install pymupdf.", None | |
| return | |
| try: | |
| pdf_document = fitz.open(stream=file_bytes, filetype="pdf") | |
| page_count = len(pdf_document) | |
| def _render_page(page): | |
| # Higher-res, no alpha to keep RGB consistent | |
| pix = page.get_pixmap(matrix=fitz.Matrix(PDF_RENDER_SCALE, PDF_RENDER_SCALE), alpha=False) | |
| img = Image.frombytes("RGB", (pix.width, pix.height), pix.samples) | |
| return img | |
| if process_pages_separately: | |
| for page_num in range(page_count): | |
| page = pdf_document[page_num] | |
| img = _render_page(page) | |
| page_filename = f"{filename} (Page {page_num+1})" | |
| result, content, structured_data = process_image(img, page_filename, fields, model) | |
| yield page_num, page_count, img, page_filename, content, structured_data | |
| else: | |
| page = pdf_document[0] | |
| img = _render_page(page) | |
| result, content, structured_data = process_image(img, filename, fields, model) | |
| yield 0, page_count, img, filename, content, structured_data | |
| except Exception as e: | |
| yield None, None, None, filename, f"Error processing PDF: {str(e)}", None | |
| def create_download_buttons(results, structured_results, extraction_mode): | |
| st.header("Download Results") | |
| base_csv = io.StringIO() | |
| base_writer = csv.writer(base_csv) | |
| base_writer.writerow(['Filename', 'Description/Extraction']) | |
| for r in results: | |
| base_writer.writerow([r['filename'], r.get('description', r.get('extraction', ''))]) | |
| ts = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| base_name = f"image_analysis_{ts}.csv" | |
| st.success("All files processed.") | |
| st.download_button( | |
| label="Download Results (CSV)", | |
| data=base_csv.getvalue(), | |
| file_name=base_name, | |
| mime="text/csv", | |
| use_container_width=True | |
| ) | |
| if extraction_mode == "Custom field extraction" and structured_results: | |
| all_fields = set(['filename']) | |
| for row in structured_results: | |
| all_fields.update(row.keys()) | |
| headers = sorted(list(all_fields)) | |
| buff = io.StringIO() | |
| w = csv.writer(buff) | |
| w.writerow(headers) | |
| for row in structured_results: | |
| w.writerow([row.get(h, '') for h in headers]) | |
| st.download_button( | |
| label="Download Structured Data (CSV)", | |
| data=buff.getvalue(), | |
| file_name=f"structured_data_{ts}.csv", | |
| mime="text/csv", | |
| use_container_width=True | |
| ) | |
| # --------------------------- | |
| # UI | |
| # --------------------------- | |
| st.title("EZOFIS AI OCR") | |
| if 'results' not in st.session_state: | |
| st.session_state.results = [] | |
| if 'structured_results' not in st.session_state: | |
| st.session_state.structured_results = [] | |
| with st.sidebar: | |
| st.header("Upload Files") | |
| uploaded_files = st.file_uploader( | |
| "Choose images or PDFs", | |
| accept_multiple_files=True, | |
| type=['png', 'jpg', 'jpeg', 'pdf'] | |
| ) | |
| st.header("Model Settings") | |
| selected_model = st.selectbox( | |
| "Choose vision model:", | |
| [ | |
| "google/gemma-3-4b-it", | |
| "google/gemma-3-12b-it", | |
| "openai/gpt-4.1", | |
| "openai/gpt-4.1-mini", | |
| "qwen/qwen2.5-vl-32b-instruct", # OpenRouter vision option | |
| HF_LLaVA_LABEL, # LLaVA via HF API | |
| RUNPOD_SECURE_LABEL # Your RunPod OpenAI-compatible server | |
| ], | |
| help=("OpenRouter uses OPENROUTER_API_KEY. " | |
| "LLaVA (HF API) uses HF_TOKEN. " | |
| "RunPod (secured) uses RUNPOD_SECURE_* env vars. " | |
| f"Current RunPod model: {RUNPOD_SECURE_MODEL}") | |
| ) | |
| # If RunPod model looks text-only, warn user | |
| if selected_model == RUNPOD_SECURE_LABEL and not is_vision_model_name(RUNPOD_SECURE_MODEL): | |
| st.warning( | |
| f"RunPod model '{RUNPOD_SECURE_MODEL}' appears text-only. " | |
| "Requests to this endpoint will NOT include images. " | |
| "Use a VL model (e.g. 'qwen2.5-vl:32b-instruct') for vision." | |
| ) | |
| extraction_mode = "General description" | |
| pdf_process_mode = "Process each page separately" | |
| fields = None | |
| if uploaded_files: | |
| st.write(f"Uploaded {len(uploaded_files)} file(s)") | |
| st.header("Data Extraction Options") | |
| extraction_mode = st.radio( | |
| "Choose extraction mode:", | |
| ["General description", "Custom field extraction"] | |
| ) | |
| if extraction_mode == "Custom field extraction": | |
| custom_fields = st.text_area( | |
| "Enter fields to extract (comma separated or your prompt here):", | |
| value="Invoice number, Date, Company name, Total amount" | |
| ) | |
| fields = [f.strip() for f in custom_fields.split(",") if f.strip()] | |
| if any(file.name.lower().endswith('.pdf') for file in uploaded_files): | |
| pdf_process_mode = st.radio( | |
| "How to process PDF files:", | |
| ["Process each page separately", "Process entire PDF as one document"] | |
| ) | |
| process_button = st.button("Process Files", use_container_width=True) | |
| else: | |
| process_button = False | |
| st.info("Upload images or PDFs to begin.") | |
| # Processing loop | |
| if uploaded_files and process_button: | |
| # Token checks by route | |
| can_run = False | |
| if selected_model == HF_LLaVA_LABEL: | |
| if not HF_CLIENT_AVAILABLE: | |
| st.error("huggingface_hub not installed. Add 'huggingface_hub' to requirements.txt.") | |
| elif not HF_TOKEN: | |
| st.error("HF_TOKEN is not set.") | |
| else: | |
| can_run = True | |
| elif selected_model == RUNPOD_SECURE_LABEL: | |
| if not RUNPOD_SECURE_BASE_URL: | |
| st.error("RUNPOD_SECURE_BASE_URL is not set.") | |
| else: | |
| can_run = True | |
| else: | |
| if not OPENROUTER_API_KEY: | |
| st.error("OPENROUTER_API_KEY is not set.") | |
| else: | |
| can_run = True | |
| if can_run: | |
| st.header("Processing Results") | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| st.session_state.results = [] | |
| st.session_state.structured_results = [] | |
| total_items = 0 | |
| for f in uploaded_files: | |
| file_bytes = f.read() | |
| f.seek(0) | |
| if f.name.lower().endswith('.pdf') and PDF_SUPPORT: | |
| if pdf_process_mode == "Process each page separately": | |
| try: | |
| pdf_document = fitz.open(stream=file_bytes, filetype="pdf") | |
| total_items += len(pdf_document) | |
| except Exception: | |
| total_items += 1 | |
| else: | |
| total_items += 1 | |
| else: | |
| total_items += 1 | |
| processed_count = 0 | |
| for f in uploaded_files: | |
| file_bytes = f.read() | |
| f.seek(0) | |
| if f.name.lower().endswith('.pdf'): | |
| if not PDF_SUPPORT: | |
| st.error("PDF support requires PyMuPDF. Add 'pymupdf' to requirements.txt.") | |
| processed_count += 1 | |
| progress_bar.progress(processed_count / max(total_items, 1)) | |
| continue | |
| try: | |
| process_separately = pdf_process_mode == "Process each page separately" | |
| for page_info in process_pdf(file_bytes, f.name, fields, process_separately, selected_model): | |
| page_num, page_count, image, page_filename, content, structured_data = page_info | |
| if page_num is None: | |
| st.error(content) | |
| continue | |
| status_text.text(f"Processing {page_filename} ({page_num+1}/{page_count})") | |
| result = {'filename': page_filename, 'description': content} | |
| st.session_state.results.append(result) | |
| if structured_data and len(structured_data) > 1: | |
| st.session_state.structured_results.append(structured_data) | |
| st.subheader(page_filename) | |
| c1, c2 = st.columns([3, 2]) # give image more room | |
| with c1: | |
| st.image(image, width=IMAGE_PREVIEW_WIDTH) | |
| if page_count > 1 and not process_separately: | |
| st.info(f"PDF has {page_count} pages. Showing first page only.") | |
| with c2: | |
| st.write(content) | |
| if structured_data and len(structured_data) > 1: | |
| st.success("Extracted structured data") | |
| st.json(structured_data) | |
| st.divider() | |
| processed_count += 1 | |
| progress_bar.progress(min(processed_count / max(total_items, 1), 1.0)) | |
| except Exception as e: | |
| st.error(f"Error processing PDF {f.name}: {e}") | |
| processed_count += 1 | |
| progress_bar.progress(min(processed_count / max(total_items, 1), 1.0)) | |
| else: | |
| try: | |
| status_text.text(f"Processing image {f.name}") | |
| image = Image.open(f).convert("RGB") | |
| result, content, structured_data = process_image(image, f.name, fields, selected_model) | |
| st.session_state.results.append(result) | |
| if structured_data and len(structured_data) > 1: | |
| st.session_state.structured_results.append(structured_data) | |
| st.subheader(f"Image: {f.name}") | |
| c1, c2 = st.columns([3, 2]) | |
| with c1: | |
| st.image(image, width=IMAGE_PREVIEW_WIDTH) | |
| with c2: | |
| st.write(content) | |
| if structured_data and len(structured_data) > 1: | |
| st.success("Extracted structured data") | |
| st.json(structured_data) | |
| st.divider() | |
| except Exception as e: | |
| st.error(f"Error processing image {f.name}: {e}") | |
| processed_count += 1 | |
| progress_bar.progress(min(processed_count / max(total_items, 1), 1.0)) | |
| status_text.text("Processing complete.") | |
| if st.session_state.results: | |
| create_download_buttons( | |
| st.session_state.results, | |
| st.session_state.structured_results, | |
| extraction_mode | |
| ) | |
| if not uploaded_files: | |
| st.info("Upload files using the sidebar to get started.") | |
| st.write(""" | |
| How to use: | |
| 1) Upload one or more images or PDFs | |
| 2) Choose a model: | |
| - OpenRouter: Gemma-3 4B/12B, GPT-4.1/4.1-mini, Qwen2.5-VL-32B | |
| - HF API: LLaVA v1.6 Mistral-7B | |
| - RunPod (secured): OpenAI-compatible base URL (supports images only if the model is VL) | |
| 3) Pick description or custom field extraction | |
| 4) For PDFs, choose page-by-page or first page | |
| 5) Click Process Files | |
| 6) Review outputs and download CSVs | |
| """) | |
| st.markdown("---") | |
| st.markdown( | |
| """ | |
| <div style="text-align: center; margin-top: 12px; opacity: 0.7;"> | |
| EZOFIS AI OCR | |
| </div> | |
| """, | |
| unsafe_allow_html=True | |
| ) | |