# app.py """ ChatGPT-Premium-like open-source Gradio app with: - multi-image upload (practical "unlimited" via disk+queue) - OCR (PaddleOCR preferred, fallback to pytesseract) - Visual reasoning (LLaVA/MiniGPT-style if model available) - Math/aptitude pipeline (OCR -> math-specialized LLM) - Caching of processed images & embeddings - Simple in-process queue & streaming text output - Rate-limiting per-client (token-bucket) h NOTES: - Replace model IDs with ones that match your hardware/quotas. - For production, swap the in-process queue with Redis/Celery and use S3/MinIO for storage. - Achieving strictly "better than ChatGPT" across the board is unrealistic; this app aims to be the best open-source approximation. """ import os import time import uuid import threading import queue import json import math from pathlib import Path from typing import List, Dict, Tuple, Optional from collections import defaultdict, deque import gradio as gr from PIL import Image import torch from transformers import ( AutoProcessor, AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer ) # Optional OCR libs try: from paddleocr import PaddleOCR # pip install paddleocr PADDLE_AVAILABLE = True except Exception: PADDLE_AVAILABLE = False try: import pytesseract # pip install pytesseract TESSERACT_AVAILABLE = True except Exception: TESSERACT_AVAILABLE = False # --------------------------- # CONFIG: change these values # --------------------------- # Paths DATA_DIR = Path("data") IMAGES_DIR = DATA_DIR / "images" CACHE_DIR = DATA_DIR / "cache" IMAGES_DIR.mkdir(parents=True, exist_ok=True) CACHE_DIR.mkdir(parents=True, exist_ok=True) # Models - pick models appropriate to your hardware. # Visual reasoning model (LLaVA-style). If not available locally, this pipeline will skip visual-model step. VISUAL_MODEL_ID = "liuhaotian/llava-v1.5-7b" # heavy; change to smaller if needed VISUAL_USE = True # set False to skip LLaVA step # Math/Reasoning LLM MATH_LLM_ID = "mistralai/Mistral-7B-Instruct-v0.2" # good balance; change if you prefer LLaMA etc. # Device DEVICE = "cuda" if torch.cuda.is_available() else "cpu" # Limits & performance tuning MAX_IMAGES_PER_REQUEST = 64 # reasonable UI limit BATCH_SIZE = 4 # how many images we process at once for visual models MAX_HISTORY_TOKENS = 2048 STREAM_CHUNK_SECONDS = 0.12 # how often we yield tokens to user during streaming # Rate limit settings (simple token bucket) RATE_TOKENS = 40 # tokens added per interval RATE_INTERVAL = 60 # seconds for refill TOKENS_PER_REQUEST = 1 # cost per chat request (tune) # --------------------------- # Utilities: storage, caching # --------------------------- def save_uploaded_image(tempfile) -> Path: # tempfile is from Gradio; it has .name attribute uid = uuid.uuid4().hex ext = Path(tempfile.name).suffix or ".png" dest = IMAGES_DIR / f"{int(time.time())}_{uid}{ext}" # Copy content with open(tempfile.name, "rb") as src, open(dest, "wb") as dst: dst.write(src.read()) return dest # simple file-based cache for captions & ocr text def cache_get(key: str) -> Optional[str]: p = CACHE_DIR / f"{key}.json" if p.exists(): try: return json.loads(p.read_text())["value"] except Exception: return None return None def cache_set(key: str, value: str): p = CACHE_DIR / f"{key}.json" p.write_text(json.dumps({"value": value})) def path_hash(p: Path) -> str: # simple hash: file size + mtime st = p.stat() return f"{p.name}-{st.st_size}-{int(st.st_mtime)}" # --------------------------- # Rate limiter (per ip) # --------------------------- class TokenBucket: def __init__(self, rate=RATE_TOKENS, per=RATE_INTERVAL): self.rate = rate self.per = per self.allowance = rate self.last_check = time.time() def consume(self, tokens=1) -> bool: now = time.time() elapsed = now - self.last_check self.last_check = now self.allowance += elapsed * (self.rate / self.per) if self.allowance > self.rate: self.allowance = self.rate if self.allowance >= tokens: self.allowance -= tokens return True return False rate_buckets = defaultdict(lambda: TokenBucket()) def rate_ok(client_id: str) -> bool: return rate_buckets[client_id].consume(TOKENS_PER_REQUEST) # --------------------------- # OCR utilities # --------------------------- paddle_ocr = None if PADDLE_AVAILABLE: paddle_ocr = PaddleOCR(use_angle_cls=True, lang="en") # slow to init first time def run_ocr(path: Path) -> str: """ High-quality OCR pipeline: PaddleOCR -> pytesseract fallback """ key = f"ocr-{path_hash(path)}" cached = cache_get(key) if cached: return cached text = "" try: if paddle_ocr: result = paddle_ocr.ocr(str(path), cls=True) lines = [] for rec in result: for box, rec_res in rec: txt = rec_res[0] lines.append(txt) text = "\n".join(lines).strip() except Exception as e: # paddle may fail on some setups text = "" if not text and TESSERACT_AVAILABLE: try: pil = Image.open(path).convert("RGB") text = pytesseract.image_to_string(pil) text = text.strip() except Exception: text = "" if not text: text = "" cache_set(key, text or "") return text # --------------------------- # Visual reasoning (LLaVA) wrapper # --------------------------- visual_processor = None visual_model = None visual_tokenizer = None def init_visual_model(): global visual_processor, visual_model, visual_tokenizer if not VISUAL_USE: return try: visual_processor = AutoProcessor.from_pretrained(VISUAL_MODEL_ID) visual_model = AutoModelForCausalLM.from_pretrained( VISUAL_MODEL_ID, torch_dtype=torch.float16 if DEVICE == "cuda" else torch.float32, device_map="auto" ) # Some LLaVA models need tokenizer from model repo visual_tokenizer = AutoTokenizer.from_pretrained(VISUAL_MODEL_ID, use_fast=False) print("Visual model loaded.") except Exception as e: print("Could not load visual model:", e) # disable visual if fails visual_processor = visual_model = visual_tokenizer = None # Combine visual and text pipelines: pass image + question -> string answer def run_visual_reasoning(image_path: Path, question: str, max_new_tokens=256) -> str: if visual_processor is None or visual_model is None: return "" key = f"visual-{path_hash(image_path)}-{question[:96]}" cached = cache_get(key) if cached: return cached try: image = Image.open(image_path).convert("RGB") inputs = visual_processor(images=image, text=question, return_tensors="pt").to(DEVICE) with torch.no_grad(): outs = visual_model.generate(**inputs, max_new_tokens=max_new_tokens) ans = visual_tokenizer.decode(outs[0], skip_special_tokens=True) cache_set(key, ans) return ans except Exception as e: print("Visual reasoning error:", e) return "" # --------------------------- # Math/Reasoning LLM init # --------------------------- math_tokenizer = None math_model = None def init_math_model(): global math_tokenizer, math_model try: math_tokenizer = AutoTokenizer.from_pretrained(MATH_LLM_ID, use_fast=False) math_model = AutoModelForCausalLM.from_pretrained( MATH_LLM_ID, torch_dtype=torch.float16 if DEVICE == "cuda" else torch.float32, device_map="auto" ) print("Math LLM loaded.") except Exception as e: print("Could not load math model:", e) math_model = None def ask_math_llm(prompt: str, stream=False): """ If stream=True, return a generator which yields partial text as generated. Otherwise, return final string. """ if math_model is None: return "Math model not available." inputs = math_tokenizer(prompt, return_tensors="pt", truncation=True, max_length=MAX_HISTORY_TOKENS).to(DEVICE) if not stream: with torch.no_grad(): out_ids = math_model.generate(**inputs, max_new_tokens=512) return math_tokenizer.decode(out_ids[0], skip_special_tokens=True) # streaming mode using TextIteratorStreamer streamer = TextIteratorStreamer(math_tokenizer, skip_prompt=True, skip_special_tokens=True) generation_kwargs = dict( **inputs, streamer=streamer, max_new_tokens=512, do_sample=True, temperature=0.7, top_p=0.9 ) thread = threading.Thread(target=math_model.generate, kwargs=generation_kwargs) thread.start() # yield chunks from streamer buffer = "" for new_text in streamer: buffer += new_text yield buffer # --------------------------- # Simple in-process queue for heavy tasks (visual + OCR) # --------------------------- work_q = queue.Queue(maxsize=256) results_cache = {} # job_id -> result def worker_loop(): while True: job = work_q.get() if job is None: break job_id, image_paths, question = job try: ocr_texts = [run_ocr(p) for p in image_paths] visual_texts = [] if visual_processor and visual_model: for p in image_paths: v = run_visual_reasoning(p, question) visual_texts.append(v) # combine combined = { "ocr": ocr_texts, "visual": visual_texts } results_cache[job_id] = combined except Exception as e: results_cache[job_id] = {"error": str(e)} finally: work_q.task_done() # start a few worker threads NUM_WORKERS = max(1, min(4, (os.cpu_count() or 2)//2)) for _ in range(NUM_WORKERS): t = threading.Thread(target=worker_loop, daemon=True) t.start() # --------------------------- # Main chat pipeline: orchestrates OCR/visual + math llm + chat memory # --------------------------- def build_prompt(system_prompt: str, chat_history: List[Tuple[str,str]], extracted_texts: List[str], user_question: str) -> str: # Keep a compact, relevant prompt history_text = "" for role, text in chat_history[-8:]: # keep last N turns history_text += f"{role}: {text}\n" img_ctx = "" if extracted_texts: img_ctx = "\n\nEXTRACTED_FROM_IMAGES:\n" + "\n---\n".join(extracted_texts) prompt = f"""{system_prompt} Conversation: {history_text} User question: {user_question} {img_ctx} Assistant (explain step-by-step, show calculations if any):""" return prompt SYSTEM_PROMPT = "You are a helpful assistant that solves aptitude, math, and image-based questions. Be precise, show steps, and if images contain diagrams refer to them." # simple memory per-session (in-memory). For production, persist in DB. SESSION_MEMORY = defaultdict(lambda: {"history": [], "embeddings": []}) def process_request(client_id: str, uploaded_files, user_question: str, stream=True): # Rate limiting if not rate_ok(client_id): return ["Rate limit exceeded. Try again later."] # Save uploaded files image_paths = [] for f in (uploaded_files or []): p = save_uploaded_image(f) image_paths.append(p) if len(image_paths) > MAX_IMAGES_PER_REQUEST: return [f"Too many images - max {MAX_IMAGES_PER_REQUEST}"] # Create job to process OCR+visual job_id = uuid.uuid4().hex work_q.put((job_id, image_paths, user_question)) # Wait for job to complete (small timeout) — for more scalable UI this should be async and notify user later. wait_seconds = 0 while job_id not in results_cache and wait_seconds < 12: time.sleep(0.25) wait_seconds += 0.25 if job_id not in results_cache: # fallback: run basic OCR inline (slower but reliable) ocr_texts = [run_ocr(p) for p in image_paths] visual_texts = [] if visual_processor and visual_model: for p in image_paths: visual_texts.append(run_visual_reasoning(p, user_question)) results = {"ocr": ocr_texts, "visual": visual_texts} else: results = results_cache.pop(job_id, {"ocr": [], "visual": []}) # Build final extracted_texts list combining OCR + visual captions intelligently extracted_texts = [] for o, v in zip(results.get("ocr", []), results.get("visual", [])): parts = [] if o: parts.append("OCR: " + o) if v: parts.append("Visual: " + v) combined = "\n".join(parts).strip() if combined: extracted_texts.append(combined) # add to session memory sess = SESSION_MEMORY[client_id] sess["history"].append(("User", user_question)) # Build LLM prompt prompt = build_prompt(SYSTEM_PROMPT, sess["history"], extracted_texts, user_question) # stream or non-stream generation if stream: # streaming generator using ask_math_llm(stream=True) yield from _stream_llm_response_generator(prompt, client_id) else: answer = ask_math_llm(prompt, stream=False) sess["history"].append(("Assistant", answer)) return [answer] def _stream_llm_response_generator(prompt: str, client_id: str): # yield progressive updates to Gradio UI (the generator returns strings) # Gradio chat with streaming expects generator that yields partial strings session = SESSION_MEMORY[client_id] # Start streaming gen = ask_math_llm(prompt, stream=True) partial = "" for chunk in gen: # chunk is the current buffer; yield once per small delay partial = chunk # also update session memory at end (approximate) yield partial # final append session["history"].append(("Assistant", partial)) # --------------------------- # GRADIO UI # --------------------------- # --------------------------- # GRADIO UI # --------------------------- with gr.Blocks(css=""" /* small CSS to make chat look nicer */ .chat-column { max-width: 900px; margin-left: auto; margin-right: auto; } """) as demo: gr.Markdown("# 🚀 Open-Source ChatGPT-like (Multimodal)") with gr.Row(): with gr.Column(scale=8, elem_classes="chat-column"): chatbot = gr.Chatbot( label="Assistant", elem_id="chatbot", show_label=False, type="messages", height=600 ) with gr.Row(): txt = gr.Textbox( label="Type a message...", placeholder="Ask a question or upload images", show_label=False ) submit = gr.Button("Send") with gr.Row(): img_in = gr.File( label="Upload images (multiple)", file_count="multiple", file_types=["image"] ) clear_btn = gr.Button("New Chat") client_id_state = gr.State(str(uuid.uuid4())) # simple per-window client id # --------------------------- # handle send # --------------------------- def handle_send(message, client_state, files): client_id = client_state or str(uuid.uuid4()) gen = process_request(client_id, files, message, stream=True) collected = "" for part in gen: collected = part # Return in new type="messages" format yield "", [ {"role": "user", "content": message}, {"role": "assistant", "content": collected} ] # Final guarantee yield "", [ {"role": "user", "content": message}, {"role": "assistant", "content": collected} ] # Connect send button and textbox submit.click(handle_send, inputs=[txt, client_id_state, img_in], outputs=[txt, chatbot]) txt.submit(handle_send, inputs=[txt, client_id_state, img_in], outputs=[txt, chatbot]) # Clear chat button def clear_chat(): client_id_state.value = str(uuid.uuid4()) return [], "" clear_btn.click(clear_chat, None, [chatbot, txt])