Ryan2219's picture
Upload app.py
f551837 verified
# -*- coding: utf-8 -*-
"""
Created on Mon Nov 24 14:58:03 2025
@author: rmd2219
"""
from PIL import Image
import numpy as np
import gradio as gr
import fitz # PyMuPDF
import tempfile
import os, json
import base64
from io import BytesIO
from openai import OpenAI
from paddleocr import PaddleOCR
# Initialize once (slow!)
ocr_engine = PaddleOCR(
lang="en",
use_angle_cls=False, # Disable angle classification if docs are straight
det_db_thresh=0.3,
det_db_box_thresh=0.5,
rec_batch_num=6, # Process multiple text regions at once
det_limit_side_len=4096, # Limit detection image size (default 960)
use_gpu=False,
enable_mkldnn=True,
cpu_threads=8, # Use more CPU threads
)
os.environ["OPENAI_API_KEY"] = os.environ.get("OPEN_AI_API_KEY")
client = OpenAI()
#%%
def ocr_with_confidence_power(pil_img):
"""
Universal PaddleOCR wrapper (works for all versions)
Extracts text + average confidence.
No preprocessing required.
"""
# Ensure numpy array
img_np = np.array(pil_img)
# Use `ocr`, which internally runs detection+recog
result = ocr_engine.ocr(img_np)
if result is None or not result:
print("⚠️ OCR returned None or empty")
return "", 0
# Handle dictionary format (newer PaddleOCR)
if isinstance(result[0], dict):
# Get the recognized texts
texts = result[0].get('rec_texts', [])
# Try to find confidence scores - they might be under different keys
scores = result[0].get('rec_scores', [])
if not scores:
scores = result[0].get('scores', [])
if not scores:
# Some versions might have it nested differently
scores = [0.95] * len(texts) # Default confidence if not found
# Combine texts
full_text = "\n".join(texts)
# Calculate average confidence
if scores and len(scores) > 0:
avg_conf = sum(float(s) for s in scores) / len(scores) * 100
else:
avg_conf = 95.0 # Default
print(f"=== OCR COMPLETE ===")
print(f"Total lines: {len(texts)}")
print(f"Avg confidence: {avg_conf:.2f}%")
print(f"Full text preview: {full_text[:200]}...")
return full_text, avg_conf
# Handle list format (older PaddleOCR)
print("Processing list format")
lines = []
confs = []
for item in result[0]:
# Check if it's a list [box, (text, conf)]
if isinstance(item, (list, tuple)) and len(item) >= 2:
try:
box, text_conf = item[0], item[1]
if isinstance(text_conf, (list, tuple)) and len(text_conf) >= 2:
text, conf = text_conf[0], text_conf[1]
conf = float(conf) * 100
lines.append(text)
confs.append(conf)
except Exception as e:
print(f"Error parsing item: {e}")
continue
full_text = "\n".join(lines)
avg_conf = sum(confs) / len(confs) if confs else 0
print(f"=== OCR COMPLETE ===")
print(f"Total lines: {len(lines)}")
print(f"Avg confidence: {avg_conf:.2f}%")
return full_text, avg_conf
def pil_to_base64(pil_img):
buffered = BytesIO()
pil_img.save(buffered, format="PNG")
encoded = base64.b64encode(buffered.getvalue()).decode("utf-8")
return f"data:image/png;base64,{encoded}"
def flatten_ocr_result(ocr_pages):
if isinstance(ocr_pages, list):
return "\n\n".join([p["text"] for p in ocr_pages])
return str(ocr_pages)
#%%
SYSTEM_PROMPT = """
You extract structured information from building related documents.
Your task:
1. Identify the information requested by the user in the PDF.
2. Identify how you should organize that information to clearly return it to the user for viewing in table format.
3. Only return fields strictly requested by the user.
4. Output your response in as structured a manner as possible. Do not use paragraphs but use bullets organized by category.
5. Make your output very concise and precise. Try to summarize in a way that would be easily input ot a table if the user wanted to.
Rules:
- Only include data actually present in the OCR text, however if you can reasonably infer a use value include it. And if it appears a single use is describing multiple floors, extrapolate but note it.
- Do NOT invent additional data.
- Provide a "notes" section to contain model concerns such as: inconsistent numbers, ambiguous use text, missing columns, or anything suspicious in the OCR. make notes feild concise. Include whether or not an image was needed for assistance.
- If the OCR text passed to you is unclear you have access to the image directly through your tool "get_pdf_page_image"
If you decide that looking at the image would improve accuracy,
you MUST call the function `get_pdf_page_image`.
NEVER describe fetching or retrieving the image in plain text.
NEVER state that you will call the tool. ONLY call the tool directly.
Respond in **Markdown**
Avoid using characters that trigger markdown formatting in responses.
Specifically:
- Do NOT use underscores (_)
- Do NOT use asterisks (*)
- Do NOT use tildes (~)
- Do NOT use backticks (`)
- Do NOT use double characters like ** __ ~~ **
- Do NOT attempt bold, italics, strikethrough, or inline code formatting
- Respond in plain text only, with no markdown formatting
"""
TOOLS = [
{
"type": "function",
"function": {
"name": "get_pdf_page_image",
"description": '''This function MUST be called whenever visual inspection of the page is needed,
even slightly. You MUST NOT describe the image in words unless you have
called this tool and received the images from the user.''',
"parameters": {
"type": "object",
"properties": {},
"required": []
}
}
}
]
def get_pdf_page_image(images):
print("Encoding images as base64 image_url blocks...")
blocks = []
for img in images:
# Encode PNG
buf = BytesIO()
img.save(buf, format="PNG")
b64 = base64.b64encode(buf.getvalue()).decode("utf-8")
# FULLY VALID message block
blocks.append({
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{b64}",
"detail": "high"
}
})
return {"images": blocks}
def llm_extract_stream(raw_text: str, images, user_input):
# Initial messages
messages = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": f'''
The user has requested the following information from this document: {user_input}
If any part of the OCR appears unreliable, noisy, or uncertain,
you MUST call the image tool instead of guessing.
The OCR for this document is {raw_text}
'''},
]
# FIRST CALL β€” model decides if it needs images
response = client.chat.completions.create(
model="gpt-5",
messages=messages,
tools=TOOLS,
tool_choice="auto"
)
msg = response.choices[0].message
# If the model CALLS the tool
if msg.tool_calls:
tool_call = msg.tool_calls[0]
if tool_call.function.name == "get_pdf_page_image":
yield "πŸ“Έ Model requested image help."
# TOOL EXECUTION β†’ returns file_ids
tool_result = get_pdf_page_image(images)
messages.append(msg)
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": json.dumps({"status": "images will follow"})
})
# *** CRITICAL ***
# Now ADD A USER MESSAGE with the ACTUAL image blocks
messages.append({
"role": "user",
"content": [
{"type": "text", "text": "Here are the images you requested."}
] + tool_result["images"]
})
# SECOND CALL β€” final answer, NO further tool calls allowed
stream_text = ""
for chunk in client.chat.completions.create(
model="gpt-5",
messages=messages,
tools=TOOLS,
tool_choice="none",
stream=True
):
delta = chunk.choices[0].delta
token = getattr(delta, "content", "") or ""
stream_text += token
yield stream_text # REAL streaming
return
stream_text = ""
for chunk in client.chat.completions.create(
model="gpt-5",
messages=messages,
stream=True
):
delta = chunk.choices[0].delta
token = getattr(delta, "content", "") or ""
stream_text += token
yield stream_text
def ensure_max_resolution(img, max_dim=2000):
w, h = img.size
if max(w, h) > max_dim:
scale = max_dim / max(w, h)
return img.resize((int(w * scale), int(h * scale)), Image.LANCZOS)
return img
def file_to_images(file_path, dpi=300):
"""
Accepts either a PDF or an image file.
Returns a list of PIL images.
"""
# Case 1 β€” PDF
if file_path.lower().endswith(".pdf"):
images = []
doc = fitz.open(file_path)
for page in doc:
pix = page.get_pixmap(dpi=dpi)
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
img = ensure_max_resolution(img)
images.append(img)
return images
# Case 2 β€” Image
img = Image.open(file_path).convert("RGB")
return [img]
def extract_pages_from_file(file_path, dpi=300):
pages = []
pil_images = file_to_images(file_path, dpi=dpi)
for idx, pil_img in enumerate(pil_images):
text, conf = ocr_with_confidence_power(pil_img)
pages.append({
"page": idx,
"text": text,
"image": pil_img,
"confidence": conf
})
return pages
def process_single_pdf_stream(pdf_path, user_input):
yield "⏳ Running OCR..."
# OCR + image extraction
table_pages = extract_pages_from_file(pdf_path)
raw_text = flatten_ocr_result(table_pages)
print(raw_text)
images = [p["image"] for p in table_pages]
yield "⏳ Running LLM with tool-calling..."
final_text = None
for update in llm_extract_stream(raw_text, images, user_input):
final_text = update
yield update # <-- streamed text
# Done, nothing more to return
return
#%%
PASSWORD = os.environ.get("PASSWORD")
def gradio_process(pdf_file, user_input):
if pdf_file:
pdf_path = pdf_file.name
baseline_preview = None # no need for persistent preview
yield "⏳ Running OCR...", None , gr.update()
else:
pdf_path = 'example_image.jpg'
# Generate preview manually
baseline_preview = preview_pdf(pdf_path)
# Update gallery BEFORE streaming OCR
yield "⏳ Running OCR...", baseline_preview
# STREAM THE STEPS
for output in process_single_pdf_stream(pdf_path, user_input):
if baseline_preview:
# baseline mode β†’ preserve preview
yield output, baseline_preview
else:
# upload mode β†’ preserve user preview
yield output, gr.update()
def preview_pdf(pdf_file):
if hasattr(pdf_file, "name"):
pdf_path = pdf_file.name
else:
pdf_path = pdf_file
doc = fitz.open(pdf_path)
temp_dir = tempfile.mkdtemp()
image_paths = []
for i, page in enumerate(doc):
pix = page.get_pixmap(dpi=150)
out_path = os.path.join(temp_dir, f"page_{i+1}.png")
pix.save(out_path)
image_paths.append(out_path)
return image_paths
def check_password(pw):
if pw == PASSWORD:
return (
gr.update(visible=False), # hide password section
gr.update(visible=True), # show main app
""
)
else:
return (
gr.update(visible=True),
gr.update(visible=False),
"❌ Incorrect password, try again."
)
with gr.Blocks() as demo:
with gr.Group(visible=True) as password_block:
gr.Markdown("### πŸ”’ Enter password to access the chatbot")
pw_box = gr.Textbox(type="password", placeholder="Enter password...", show_label=False)
pw_btn = gr.Button("Unlock")
pw_msg = gr.Markdown("")
with gr.Group(visible=False) as main_app:
gr.Markdown("""
## πŸ“„ Universal Document Interpreter
""")
pdf_input = gr.File(
label="Upload PDF or Image",
file_types=[".pdf", ".png", ".jpg", ".jpeg"]
)
# User input triggers LLM extraction
user_input = gr.Textbox(
label="Instructions",
placeholder="Type desired characteristics to extract..."
)
run_btn_slow = gr.Button("Run Extraction with Powerful OCR - Will be Slow")
# PDF Preview
gallery = gr.Gallery(
label="Preview",
columns=1,
height="auto",
object_fit="contain"
)
status_box = gr.Markdown()
# PREVIEW ONLY on PDF upload
pdf_input.upload(preview_pdf, pdf_input, gallery)
# EXTRACTION: runs when user hits ENTER after typing in the box
run_btn_slow.click(
fn=gradio_process,
inputs=[pdf_input, user_input],
outputs=[status_box, gallery]
)
pw_box.submit(
fn=check_password,
inputs=[pw_box],
outputs=[password_block, main_app, pw_msg],
)
pw_btn.click(
fn=check_password,
inputs=[pw_box],
outputs=[password_block, main_app, pw_msg]
)
demo.launch(inbrowser=True)