PromptTesting / app.py
Marthee's picture
Update app.py
11ce43f verified
raw
history blame
8.58 kB
import gradio as gr
import os
import json
import requests
from io import BytesIO
import fitz # PyMuPDF
from urllib.parse import urlparse, unquote
import os
from io import BytesIO
import re
import requests
import pandas as pd
import fitz # PyMuPDF
import re
import urllib.parse
import difflib
from fuzzywuzzy import fuzz
import copy
# import tsadropboxretrieval
import urllib.parse
def get_toc_page_numbers(doc, max_pages_to_check=15):
toc_pages = []
# 1. Existing Dot Pattern (looking for ".....")
dot_pattern = re.compile(r"\.{2,}")
# 2. NEW: Title Pattern (looking for specific headers)
# ^ and $ ensure the line is JUST that word (ignoring "The contents of the bag...")
# re.IGNORECASE makes it match "CONTENTS", "Contents", "Index", etc.
title_pattern = re.compile(r"^\s*(table of contents|contents|index)\s*$", re.IGNORECASE)
for page_num in range(min(len(doc), max_pages_to_check)):
page = doc.load_page(page_num)
blocks = page.get_text("dict")["blocks"]
dot_line_count = 0
has_toc_title = False
for block in blocks:
for line in block.get("lines", []):
# Extract text from spans (mimicking get_spaced_text_from_spans)
line_text = " ".join([span["text"] for span in line["spans"]]).strip()
# CHECK A: Does the line have dots?
if dot_pattern.search(line_text):
dot_line_count += 1
# CHECK B: Is this line a Title?
# We check this early in the loop. If a page has a title "Contents",
# we mark it immediately.
if title_pattern.match(line_text):
has_toc_title = True
# CONDITION:
# It is a TOC page if it has a Title OR if it has dot leaders.
# We use 'dot_line_count >= 1' to be sensitive to single-item lists.
if has_toc_title or dot_line_count >= 1:
toc_pages.append(page_num)
# RETURN:
# If we found TOC pages (e.g., [2, 3]), we return [0, 1, 2, 3]
# This covers the cover page, inside cover, and the TOC itself.
if toc_pages:
last_toc_page = toc_pages[0]
return list(range(0, last_toc_page + 1))
return [] # Return empty list if nothing found
def openPDF(pdf_path):
pdf_path = pdf_path.replace('dl=0', 'dl=1')
response = requests.get(pdf_path)
pdf_content = BytesIO(response.content)
if not pdf_content:
raise ValueError("No valid PDF content found.")
doc = fitz.open(stream=pdf_content, filetype="pdf")
return doc
def identify_headers_with_openrouter(pdf_path, model,LLM_prompt, pages_to_check=None, top_margin=70, bottom_margin=85):
"""Ask an LLM (OpenRouter) to identify headers in the document.
Returns a list of dicts: {text, page, suggested_level, confidence}.
The function sends plain page-line strings to the LLM (including page numbers)
and asks for a JSON array containing only header lines with suggested levels.
"""
doc=openPDF(pdf_path)
api_key='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'
if api_key is None:
api_key = os.getenv("OPENROUTER_API_KEY") or None
model=str(model)
toc_pages = get_toc_page_numbers(doc)
lines_for_prompt = []
# Collect text lines from pages (skip TOC pages)
for pno in range(len(doc)):
if pages_to_check and pno not in pages_to_check:
continue
if pno in toc_pages:
continue
page = doc.load_page(pno)
page_height = page.rect.height
for block in page.get_text("dict").get('blocks', []):
if block.get('type') != 0:
continue
for line in block.get('lines', []):
spans = line.get('spans', [])
if not spans:
continue
y0 = spans[0]['bbox'][1]
y1 = spans[0]['bbox'][3]
if y0 < top_margin or y1 > (page_height - bottom_margin):
continue
text = " ".join(s.get('text','') for s in spans).strip()
if text:
# prefix with page for easier mapping back
lines_for_prompt.append(f"PAGE {pno+1}: {text}")
if not lines_for_prompt:
return []
prompt = (
LLM_prompt + "\n\nLines:\n" + "\n".join(lines_for_prompt)
)
if not api_key:
# No API key: return empty so caller can fallback to heuristics
return []
url = "https://openrouter.ai/api/v1/chat/completions"
# Build headers following the OpenRouter example
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"HTTP-Referer": os.getenv("OPENROUTER_REFERER", ""),
"X-Title": os.getenv("OPENROUTER_X_TITLE", "")
}
# Wrap the prompt as the example 'content' array expected by OpenRouter
body = {
"model": model,
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": prompt}
]
}
]
}
# Debug: log request body (truncated) and write raw response for inspection
try:
print("LLM request (truncated):", prompt[:1000])
resp = requests.post(
url=url,
headers=headers,
data=json.dumps(body),
)
resp.raise_for_status()
resp_text = resp.text
print("LLM raw response length:", len(resp_text))
# Save raw response for offline inspection
try:
with open("llm_debug.json", "w", encoding="utf-8") as fh:
fh.write(resp_text)
except Exception as e:
print("Warning: could not write llm_debug.json:", e)
rj = resp.json()
print("LLM parsed response keys:", list(rj.keys()) if isinstance(rj, dict) else type(rj))
except Exception as e:
print("LLM call failed:", repr(e))
return []
# Extract textual reply robustly
text_reply = None
if isinstance(rj, dict):
choices = rj.get('choices') or []
if choices:
c0 = choices[0]
msg = c0.get('message') or c0.get('delta') or {}
content = msg.get('content')
if isinstance(content, list):
for c in content:
if c.get('type') == 'text' and c.get('text'):
text_reply = c.get('text')
break
elif isinstance(content, str):
text_reply = content
elif isinstance(msg, dict) and msg.get('content') and isinstance(msg.get('content'), dict):
text_reply = msg.get('content').get('text')
if not text_reply:
for c in rj.get('choices', []):
if isinstance(c.get('text'), str):
text_reply = c.get('text')
break
if not text_reply:
return []
s = text_reply.strip()
start = s.find('[')
end = s.rfind(']')
js = s[start:end+1] if start != -1 and end != -1 else s
try:
parsed = json.loads(js)
except Exception:
return []
# Normalize parsed entries and return
out = []
for obj in parsed:
t = obj.get('text')
page = int(obj.get('page')) if obj.get('page') else None
level = obj.get('suggested_level')
conf = float(obj.get('confidence') or 0)
if t and page is not None:
out.append({'text': t, 'page': page-1, 'suggested_level': level, 'confidence': conf})
return out
# Wrapper function to convert JSON to a dataframe-friendly format
def identify_headers_with_table(pdf_path, model, LLM_prompt):
# Call your existing function
result = identify_headers_with_openrouter(pdf_path, model, LLM_prompt)
# Convert list of dicts to list of lists for Gradio Dataframe
if not result:
return [] # empty table if no results
table_data = [[item['text'], item['page']+1, item['suggested_level'], item['confidence']] for item in result]
return table_data
# Column names for the table
columns = ["Text", "Page", "Suggested Level", "Confidence"]
# Gradio Interface
iface = gr.Interface(
fn=identify_headers_with_table,
inputs=[
gr.Textbox(label="Document Link"),
gr.Textbox(label="Model Type"),
gr.Textbox(label="LLM Prompt")
],
outputs=gr.Dataframe(headers=columns)
)
iface.launch()