| |
|
| | """
|
| | Created on Mon Nov 24 14:58:03 2025
|
| |
|
| | @author: rmd2219
|
| | """
|
| |
|
| | from PIL import Image
|
| | import numpy as np
|
| | import gradio as gr
|
| | import fitz
|
| | import tempfile
|
| | import os, json
|
| | import base64
|
| | from io import BytesIO
|
| | from openai import OpenAI
|
| | from paddleocr import PaddleOCR
|
| |
|
| |
|
| | ocr_engine = PaddleOCR(
|
| | lang="en",
|
| | use_angle_cls=False,
|
| | det_db_thresh=0.3,
|
| | det_db_box_thresh=0.5,
|
| | rec_batch_num=6,
|
| | det_limit_side_len=4096,
|
| | use_gpu=False,
|
| | enable_mkldnn=True,
|
| | cpu_threads=8,
|
| | )
|
| |
|
| | os.environ["OPENAI_API_KEY"] = os.environ.get("OPEN_AI_API_KEY")
|
| |
|
| | client = OpenAI()
|
| |
|
| |
|
| | def ocr_with_confidence_power(pil_img):
|
| | """
|
| | Universal PaddleOCR wrapper (works for all versions)
|
| | Extracts text + average confidence.
|
| | No preprocessing required.
|
| | """
|
| |
|
| |
|
| | img_np = np.array(pil_img)
|
| |
|
| |
|
| | result = ocr_engine.ocr(img_np)
|
| |
|
| | if result is None or not result:
|
| | print("β οΈ OCR returned None or empty")
|
| | return "", 0
|
| |
|
| |
|
| |
|
| | if isinstance(result[0], dict):
|
| |
|
| |
|
| |
|
| | texts = result[0].get('rec_texts', [])
|
| |
|
| |
|
| | scores = result[0].get('rec_scores', [])
|
| | if not scores:
|
| | scores = result[0].get('scores', [])
|
| | if not scores:
|
| |
|
| | scores = [0.95] * len(texts)
|
| |
|
| |
|
| |
|
| |
|
| | full_text = "\n".join(texts)
|
| |
|
| |
|
| | if scores and len(scores) > 0:
|
| | avg_conf = sum(float(s) for s in scores) / len(scores) * 100
|
| | else:
|
| | avg_conf = 95.0
|
| |
|
| | print(f"=== OCR COMPLETE ===")
|
| | print(f"Total lines: {len(texts)}")
|
| | print(f"Avg confidence: {avg_conf:.2f}%")
|
| | print(f"Full text preview: {full_text[:200]}...")
|
| |
|
| | return full_text, avg_conf
|
| |
|
| |
|
| | print("Processing list format")
|
| | lines = []
|
| | confs = []
|
| |
|
| | for item in result[0]:
|
| |
|
| | if isinstance(item, (list, tuple)) and len(item) >= 2:
|
| | try:
|
| | box, text_conf = item[0], item[1]
|
| | if isinstance(text_conf, (list, tuple)) and len(text_conf) >= 2:
|
| | text, conf = text_conf[0], text_conf[1]
|
| | conf = float(conf) * 100
|
| | lines.append(text)
|
| | confs.append(conf)
|
| | except Exception as e:
|
| | print(f"Error parsing item: {e}")
|
| | continue
|
| |
|
| | full_text = "\n".join(lines)
|
| | avg_conf = sum(confs) / len(confs) if confs else 0
|
| |
|
| | print(f"=== OCR COMPLETE ===")
|
| | print(f"Total lines: {len(lines)}")
|
| | print(f"Avg confidence: {avg_conf:.2f}%")
|
| |
|
| | return full_text, avg_conf
|
| |
|
| | def pil_to_base64(pil_img):
|
| | buffered = BytesIO()
|
| | pil_img.save(buffered, format="PNG")
|
| | encoded = base64.b64encode(buffered.getvalue()).decode("utf-8")
|
| | return f"data:image/png;base64,{encoded}"
|
| |
|
| | def flatten_ocr_result(ocr_pages):
|
| | if isinstance(ocr_pages, list):
|
| | return "\n\n".join([p["text"] for p in ocr_pages])
|
| | return str(ocr_pages)
|
| |
|
| |
|
| |
|
| | SYSTEM_PROMPT = """
|
| | You extract structured information from building related documents.
|
| |
|
| | Your task:
|
| | 1. Identify the information requested by the user in the PDF.
|
| | 2. Identify how you should organize that information to clearly return it to the user for viewing in table format.
|
| | 3. Only return fields strictly requested by the user.
|
| | 4. Output your response in as structured a manner as possible. Do not use paragraphs but use bullets organized by category.
|
| | 5. Make your output very concise and precise. Try to summarize in a way that would be easily input ot a table if the user wanted to.
|
| |
|
| | Rules:
|
| | - Only include data actually present in the OCR text, however if you can reasonably infer a use value include it. And if it appears a single use is describing multiple floors, extrapolate but note it.
|
| | - Do NOT invent additional data.
|
| | - Provide a "notes" section to contain model concerns such as: inconsistent numbers, ambiguous use text, missing columns, or anything suspicious in the OCR. make notes feild concise. Include whether or not an image was needed for assistance.
|
| | - If the OCR text passed to you is unclear you have access to the image directly through your tool "get_pdf_page_image"
|
| | If you decide that looking at the image would improve accuracy,
|
| | you MUST call the function `get_pdf_page_image`.
|
| |
|
| | NEVER describe fetching or retrieving the image in plain text.
|
| | NEVER state that you will call the tool. ONLY call the tool directly.
|
| |
|
| | Respond in **Markdown**
|
| | Avoid using characters that trigger markdown formatting in responses.
|
| | Specifically:
|
| | - Do NOT use underscores (_)
|
| | - Do NOT use asterisks (*)
|
| | - Do NOT use tildes (~)
|
| | - Do NOT use backticks (`)
|
| | - Do NOT use double characters like ** __ ~~ **
|
| | - Do NOT attempt bold, italics, strikethrough, or inline code formatting
|
| | - Respond in plain text only, with no markdown formatting
|
| | """
|
| | TOOLS = [
|
| | {
|
| | "type": "function",
|
| | "function": {
|
| | "name": "get_pdf_page_image",
|
| | "description": '''This function MUST be called whenever visual inspection of the page is needed,
|
| | even slightly. You MUST NOT describe the image in words unless you have
|
| | called this tool and received the images from the user.''',
|
| | "parameters": {
|
| | "type": "object",
|
| | "properties": {},
|
| | "required": []
|
| | }
|
| | }
|
| | }
|
| | ]
|
| | def get_pdf_page_image(images):
|
| | print("Encoding images as base64 image_url blocks...")
|
| |
|
| | blocks = []
|
| | for img in images:
|
| |
|
| | buf = BytesIO()
|
| | img.save(buf, format="PNG")
|
| | b64 = base64.b64encode(buf.getvalue()).decode("utf-8")
|
| |
|
| |
|
| | blocks.append({
|
| | "type": "image_url",
|
| | "image_url": {
|
| | "url": f"data:image/png;base64,{b64}",
|
| | "detail": "high"
|
| | }
|
| | })
|
| |
|
| | return {"images": blocks}
|
| |
|
| | def llm_extract_stream(raw_text: str, images, user_input):
|
| |
|
| |
|
| | messages = [
|
| | {"role": "system", "content": SYSTEM_PROMPT},
|
| | {"role": "user", "content": f'''
|
| | The user has requested the following information from this document: {user_input}
|
| |
|
| | If any part of the OCR appears unreliable, noisy, or uncertain,
|
| | you MUST call the image tool instead of guessing.
|
| |
|
| | The OCR for this document is {raw_text}
|
| | '''},
|
| | ]
|
| |
|
| |
|
| | response = client.chat.completions.create(
|
| | model="gpt-5",
|
| | messages=messages,
|
| | tools=TOOLS,
|
| | tool_choice="auto"
|
| | )
|
| |
|
| | msg = response.choices[0].message
|
| |
|
| |
|
| | if msg.tool_calls:
|
| | tool_call = msg.tool_calls[0]
|
| |
|
| | if tool_call.function.name == "get_pdf_page_image":
|
| | yield "πΈ Model requested image help."
|
| |
|
| |
|
| | tool_result = get_pdf_page_image(images)
|
| |
|
| | messages.append(msg)
|
| |
|
| | messages.append({
|
| | "role": "tool",
|
| | "tool_call_id": tool_call.id,
|
| | "content": json.dumps({"status": "images will follow"})
|
| | })
|
| |
|
| |
|
| |
|
| | messages.append({
|
| | "role": "user",
|
| | "content": [
|
| | {"type": "text", "text": "Here are the images you requested."}
|
| | ] + tool_result["images"]
|
| | })
|
| |
|
| |
|
| | stream_text = ""
|
| | for chunk in client.chat.completions.create(
|
| | model="gpt-5",
|
| | messages=messages,
|
| | tools=TOOLS,
|
| | tool_choice="none",
|
| | stream=True
|
| | ):
|
| | delta = chunk.choices[0].delta
|
| | token = getattr(delta, "content", "") or ""
|
| | stream_text += token
|
| | yield stream_text
|
| |
|
| | return
|
| |
|
| | stream_text = ""
|
| | for chunk in client.chat.completions.create(
|
| | model="gpt-5",
|
| | messages=messages,
|
| | stream=True
|
| | ):
|
| | delta = chunk.choices[0].delta
|
| | token = getattr(delta, "content", "") or ""
|
| | stream_text += token
|
| | yield stream_text
|
| |
|
| | def ensure_max_resolution(img, max_dim=2000):
|
| | w, h = img.size
|
| | if max(w, h) > max_dim:
|
| | scale = max_dim / max(w, h)
|
| | return img.resize((int(w * scale), int(h * scale)), Image.LANCZOS)
|
| | return img
|
| |
|
| |
|
| |
|
| | def file_to_images(file_path, dpi=300):
|
| | """
|
| | Accepts either a PDF or an image file.
|
| | Returns a list of PIL images.
|
| | """
|
| |
|
| |
|
| | if file_path.lower().endswith(".pdf"):
|
| | images = []
|
| | doc = fitz.open(file_path)
|
| |
|
| | for page in doc:
|
| | pix = page.get_pixmap(dpi=dpi)
|
| | img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
|
| | img = ensure_max_resolution(img)
|
| | images.append(img)
|
| |
|
| | return images
|
| |
|
| |
|
| | img = Image.open(file_path).convert("RGB")
|
| | return [img]
|
| |
|
| | def extract_pages_from_file(file_path, dpi=300):
|
| | pages = []
|
| | pil_images = file_to_images(file_path, dpi=dpi)
|
| |
|
| | for idx, pil_img in enumerate(pil_images):
|
| | text, conf = ocr_with_confidence_power(pil_img)
|
| | pages.append({
|
| | "page": idx,
|
| | "text": text,
|
| | "image": pil_img,
|
| | "confidence": conf
|
| | })
|
| |
|
| | return pages
|
| |
|
| | def process_single_pdf_stream(pdf_path, user_input):
|
| | yield "β³ Running OCR..."
|
| |
|
| | table_pages = extract_pages_from_file(pdf_path)
|
| | raw_text = flatten_ocr_result(table_pages)
|
| | print(raw_text)
|
| | images = [p["image"] for p in table_pages]
|
| |
|
| | yield "β³ Running LLM with tool-calling..."
|
| |
|
| | final_text = None
|
| |
|
| | for update in llm_extract_stream(raw_text, images, user_input):
|
| | final_text = update
|
| | yield update
|
| |
|
| |
|
| | return
|
| |
|
| |
|
| |
|
| | PASSWORD = os.environ.get("PASSWORD")
|
| |
|
| | def gradio_process(pdf_file, user_input):
|
| |
|
| | if pdf_file:
|
| | pdf_path = pdf_file.name
|
| | baseline_preview = None
|
| | yield "β³ Running OCR...", None , gr.update()
|
| | else:
|
| | pdf_path = 'example_image.jpg'
|
| |
|
| |
|
| | baseline_preview = preview_pdf(pdf_path)
|
| |
|
| |
|
| | yield "β³ Running OCR...", baseline_preview
|
| |
|
| | for output in process_single_pdf_stream(pdf_path, user_input):
|
| |
|
| | if baseline_preview:
|
| |
|
| | yield output, baseline_preview
|
| | else:
|
| |
|
| | yield output, gr.update()
|
| |
|
| |
|
| |
|
| | def preview_pdf(pdf_file):
|
| | if hasattr(pdf_file, "name"):
|
| | pdf_path = pdf_file.name
|
| | else:
|
| | pdf_path = pdf_file
|
| |
|
| | doc = fitz.open(pdf_path)
|
| | temp_dir = tempfile.mkdtemp()
|
| |
|
| | image_paths = []
|
| | for i, page in enumerate(doc):
|
| | pix = page.get_pixmap(dpi=150)
|
| | out_path = os.path.join(temp_dir, f"page_{i+1}.png")
|
| | pix.save(out_path)
|
| | image_paths.append(out_path)
|
| |
|
| | return image_paths
|
| |
|
| | def check_password(pw):
|
| | if pw == PASSWORD:
|
| | return (
|
| | gr.update(visible=False),
|
| | gr.update(visible=True),
|
| | ""
|
| | )
|
| | else:
|
| | return (
|
| | gr.update(visible=True),
|
| | gr.update(visible=False),
|
| | "β Incorrect password, try again."
|
| | )
|
| |
|
| | with gr.Blocks() as demo:
|
| | with gr.Group(visible=True) as password_block:
|
| | gr.Markdown("### π Enter password to access the chatbot")
|
| | pw_box = gr.Textbox(type="password", placeholder="Enter password...", show_label=False)
|
| | pw_btn = gr.Button("Unlock")
|
| | pw_msg = gr.Markdown("")
|
| | with gr.Group(visible=False) as main_app:
|
| | gr.Markdown("""
|
| | ## π Universal Document Interpreter
|
| | """)
|
| |
|
| | pdf_input = gr.File(
|
| | label="Upload PDF or Image",
|
| | file_types=[".pdf", ".png", ".jpg", ".jpeg"]
|
| | )
|
| |
|
| |
|
| | user_input = gr.Textbox(
|
| | label="Instructions",
|
| | placeholder="Type desired characteristics to extract..."
|
| | )
|
| |
|
| | run_btn_slow = gr.Button("Run Extraction with Powerful OCR - Will be Slow")
|
| |
|
| |
|
| | gallery = gr.Gallery(
|
| | label="Preview",
|
| | columns=1,
|
| | height="auto",
|
| | object_fit="contain"
|
| | )
|
| |
|
| | status_box = gr.Markdown()
|
| |
|
| |
|
| | pdf_input.upload(preview_pdf, pdf_input, gallery)
|
| |
|
| |
|
| |
|
| | run_btn_slow.click(
|
| | fn=gradio_process,
|
| | inputs=[pdf_input, user_input],
|
| | outputs=[status_box, gallery]
|
| | )
|
| |
|
| | pw_box.submit(
|
| | fn=check_password,
|
| | inputs=[pw_box],
|
| | outputs=[password_block, main_app, pw_msg],
|
| | )
|
| | pw_btn.click(
|
| | fn=check_password,
|
| | inputs=[pw_box],
|
| | outputs=[password_block, main_app, pw_msg]
|
| | )
|
| |
|
| | demo.launch(inbrowser=True) |