PromptTesting / appOLD.py
Marthee's picture
Rename app.py to appOLD.py
8592290 verified
raw
history blame
106 kB
import gradio as gr
import os
import json
import requests
from io import BytesIO
from datetime import datetime
from difflib import SequenceMatcher
import pandas as pd
from io import BytesIO
import fitz # PyMuPDF
from collections import defaultdict, Counter
from urllib.parse import urlparse, unquote
import os
from io import BytesIO
import re
import requests
import pandas as pd
import fitz # PyMuPDF
import re
import urllib.parse
import difflib
import copy
# import tsadropboxretrieval
import urllib.parse
import logging
# Set up logging to see everything
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(), # Print to console
logging.FileHandler('debug.log', mode='w') # Save to file
]
)
logger = logging.getLogger(__name__)
top_margin = 70
bottom_margin = 85
def getLocation_of_header(doc, headerText, expected_page=None):
locations = []
# pages = (
# [(expected_page, doc.load_page(expected_page))]
# if expected_page is not None
# else enumerate(doc)
# )
expectedpageNorm=expected_page
page=doc[expectedpageNorm]
# for page_number, page in pages:
page_height = page.rect.height
rects = page.search_for(headerText)
for r in rects:
y = r.y0
# Skip headers in top or bottom margin
if y <= top_margin:
continue
if y >= page_height - bottom_margin:
continue
locations.append({
"headerText":headerText,
"page": expectedpageNorm,
"x": r.x0,
"y": y
})
return locations
def filter_headers_outside_toc(headers, toc_pages):
toc_pages_set = set(toc_pages)
filtered = []
for h in headers:
page = h[2]
y = h[3]
# Skip invalid / fallback headers
if page is None or y is None:
continue
# Skip headers inside TOC pages
if page in toc_pages_set:
continue
filtered.append(h)
return filtered
def headers_with_location(doc, llm_headers):
"""
Converts LLM headers into:
[text, font_size, page, y, suggested_level, confidence]
Always include all headers, even if location not found.
"""
headersJson = []
for h in llm_headers:
text = h["text"]
llm_page = h["page"]
# Attempt to locate the header on the page
locations = getLocation_of_header(doc, text,llm_page)
if locations:
for loc in locations:
page = doc.load_page(loc["page"])
fontsize = None
for block in page.get_text("dict")["blocks"]:
if block.get("type") != 0:
continue
for line in block.get("lines", []):
line_text = "".join(span["text"] for span in line["spans"]).strip()
if normalize(line_text) == normalize(text):
fontsize = line["spans"][0]["size"]
break
if fontsize:
break
entry = [
text,
fontsize,
loc["page"],
loc["y"],
h["suggested_level"],
]
if entry not in headersJson:
headersJson.append(entry)
return headersJson
def build_hierarchy_from_llm(headers):
nodes = []
# -------------------------
# 1. Build nodes safely
# -------------------------
for h in headers:
# print("headerrrrrrrrrrrrrrr", h)
if len(h) < 5:
continue
text, size, page, y, level = h
if level is None:
continue
try:
level = int(level)
except Exception:
continue
node = {
"text": text,
"page": page if page is not None else -1,
"y": y if y is not None else -1,
"size": size,
"bold": False,
"color": None,
"font": None,
"children": [],
"is_numbered": is_numbered(text),
"original_size": size,
"norm_text": normalize(text),
"level": level,
}
nodes.append(node)
if not nodes:
return []
# -------------------------
# 2. Sort top-to-bottom
# -------------------------
nodes.sort(key=lambda x: (x["page"], x["y"]))
# -------------------------
# 3. NORMALIZE LEVELS
# (smallest level → 0)
# -------------------------
min_level = min(n["level"] for n in nodes)
for n in nodes:
n["level"] -= min_level
# -------------------------
# 4. Build hierarchy
# -------------------------
root = []
stack = []
added_level0 = set()
for header in nodes:
lvl = header["level"]
if lvl < 0:
continue
# De-duplicate true top-level headers
if lvl == 0:
key = (header["norm_text"], header["page"])
if key in added_level0:
continue
added_level0.add(key)
while stack and stack[-1]["level"] >= lvl:
stack.pop()
parent = stack[-1] if stack else None
if parent:
header["path"] = parent["path"] + [header["norm_text"]]
parent["children"].append(header)
else:
header["path"] = [header["norm_text"]]
root.append(header)
stack.append(header)
# -------------------------
# 5. Enforce nesting sanity
# -------------------------
def enforce_nesting(node_list, parent_level=-1):
for node in node_list:
if node["level"] <= parent_level:
node["level"] = parent_level + 1
enforce_nesting(node["children"], node["level"])
enforce_nesting(root)
# -------------------------
# 6. OPTIONAL cleanup
# (only if real level-0s exist)
# -------------------------
if any(h["level"] == 0 for h in root):
root = [
h for h in root
if not (h["level"] == 0 and not h["children"])
]
# -------------------------
# 7. Final pass
# -------------------------
header_tree = enforce_level_hierarchy(root)
return header_tree
def get_regular_font_size_and_color(doc):
font_sizes = []
colors = []
fonts = []
# Loop through all pages
for page_num in range(len(doc)):
page = doc.load_page(page_num)
for span in page.get_text("dict")["blocks"]:
if "lines" in span:
for line in span["lines"]:
for span in line["spans"]:
font_sizes.append(span['size'])
colors.append(span['color'])
fonts.append(span['font'])
# Get the most common font size, color, and font
most_common_font_size = Counter(font_sizes).most_common(1)[0][0] if font_sizes else None
most_common_color = Counter(colors).most_common(1)[0][0] if colors else None
most_common_font = Counter(fonts).most_common(1)[0][0] if fonts else None
return most_common_font_size, most_common_color, most_common_font
def normalize_text(text):
if text is None:
return ""
return re.sub(r'\s+', ' ', text.strip().lower())
def get_spaced_text_from_spans(spans):
return normalize_text(" ".join(span["text"].strip() for span in spans))
def is_numbered(text):
return bool(re.match(r'^\d', text.strip()))
def is_similar(a, b, threshold=0.85):
return difflib.SequenceMatcher(None, a, b).ratio() > threshold
def normalize(text):
text = text.lower()
text = re.sub(r'\.{2,}', '', text) # remove long dots
text = re.sub(r'\s+', ' ', text) # replace multiple spaces with one
return text.strip()
def clean_toc_entry(toc_text):
"""Remove page numbers and formatting from TOC entries"""
# Remove everything after last sequence of dots/whitespace followed by digits
return re.sub(r'[\.\s]+\d+.*$', '', toc_text).strip('. ')
def enforce_level_hierarchy(headers):
"""
Ensure level 2 headers only exist under level 1 headers
and clean up any orphaned headers
"""
def process_node_list(node_list, parent_level=-1):
i = 0
while i < len(node_list):
node = node_list[i]
# Remove level 2 headers that don't have a level 1 parent
if node['level'] == 2 and parent_level != 1:
node_list.pop(i)
continue
# Recursively process children
process_node_list(node['children'], node['level'])
i += 1
process_node_list(headers)
return headers
def highlight_boxes(doc, highlights, stringtowrite, fixed_width=500): # Set your desired width here
for page_num, bbox in highlights.items():
page = doc.load_page(page_num)
page_width = page.rect.width
# Get original rect for vertical coordinates
orig_rect = fitz.Rect(bbox)
rect_height = orig_rect.height
if rect_height > 30:
if orig_rect.width > 10:
# Center horizontally using fixed width
center_x = page_width / 2
new_x0 = center_x - fixed_width / 2
new_x1 = center_x + fixed_width / 2
new_rect = fitz.Rect(new_x0, orig_rect.y0, new_x1, orig_rect.y1)
# Add highlight rectangle
annot = page.add_rect_annot(new_rect)
if stringtowrite.startswith('Not'):
annot.set_colors(stroke=(0.5, 0.5, 0.5), fill=(0.5, 0.5, 0.5))
else:
annot.set_colors(stroke=(1, 1, 0), fill=(1, 1, 0))
annot.set_opacity(0.3)
annot.update()
# Add right-aligned freetext annotation inside the fixed-width box
text = '['+stringtowrite +']'
annot1 = page.add_freetext_annot(
new_rect,
text,
fontsize=15,
fontname='helv',
text_color=(1, 0, 0),
rotate=page.rotation,
align=2 # right alignment
)
annot1.update()
def get_leaf_headers_with_paths(listtoloop, path=None, output=None):
if path is None:
path = []
if output is None:
output = []
for header in listtoloop:
current_path = path + [header['text']]
if not header['children']:
if header['level'] != 0 and header['level'] != 1:
output.append((header, current_path))
else:
get_leaf_headers_with_paths(header['children'], current_path, output)
return output
# Add this helper function at the top of your code
def words_match_ratio(text1, text2):
words1 = set(text1.split())
words2 = set(text2.split())
if not words1 or not words2:
return 0.0
common_words = words1 & words2
return len(common_words) / len(words1)
def same_start_word(s1, s2):
# Split both strings into words
words1 = s1.strip().split()
words2 = s2.strip().split()
# Check if both have at least one word and compare the first ones
if words1 and words2:
return words1[0].lower() == words2[0].lower()
return False
def get_toc_page_numbers(doc, max_pages_to_check=15):
toc_pages = []
logger.debug(f"Starting TOC detection, checking first {max_pages_to_check} pages")
# 1. Existing Dot Pattern (looking for ".....")
dot_pattern = re.compile(r"\.{2,}")
# 2. NEW: Title Pattern (looking for specific headers)
# ^ and $ ensure the line is JUST that word (ignoring "The contents of the bag...")
# re.IGNORECASE makes it match "CONTENTS", "Contents", "Index", etc.
title_pattern = re.compile(r"^\s*(table of contents|contents|index)\s*$", re.IGNORECASE)
for page_num in range(min(len(doc), max_pages_to_check)):
page = doc.load_page(page_num)
blocks = page.get_text("dict")["blocks"]
dot_line_count = 0
has_toc_title = False
logger.debug(f"Checking page {page_num} for TOC")
for block in blocks:
for line in block.get("lines", []):
# Extract text from spans (mimicking get_spaced_text_from_spans)
line_text = " ".join([span["text"] for span in line["spans"]]).strip()
# CHECK A: Does the line have dots?
if dot_pattern.search(line_text):
dot_line_count += 1
logger.debug(f" Found dot pattern on page {page_num}: '{line_text[:50]}...'")
# CHECK B: Is this line a Title?
# We check this early in the loop. If a page has a title "Contents",
# we mark it immediately.
if title_pattern.match(line_text):
has_toc_title = True
logger.debug(f" Found TOC title on page {page_num}: '{line_text}'")
# CONDITION:
# It is a TOC page if it has a Title OR if it has dot leaders.
# We use 'dot_line_count >= 1' to be sensitive to single-item lists.
if has_toc_title or dot_line_count >= 1:
toc_pages.append(page_num)
logger.info(f"Page {page_num} identified as TOC page")
# RETURN:
# If we found TOC pages (e.g., [2, 3]), we return [0, 1, 2, 3]
# This covers the cover page, inside cover, and the TOC itself.
if toc_pages:
last_toc_page = toc_pages[0]
result = list(range(0, last_toc_page + 1))
logger.info(f"TOC pages found: {result}")
return result
logger.info("No TOC pages found")
return [] # Return empty list if nothing found
def is_header(span, most_common_font_size, most_common_color, most_common_font,allheadersLLM):
fontname = span.get("font", "").lower()
# is_italic = "italic" in fontname or "oblique" in fontname
isheader=False
is_bold = "bold" in fontname or span.get("bold", False)
if span['text'] in allheadersLLM:
isheader=True
return (
(
span["size"] > most_common_font_size or
span["font"].lower() != most_common_font.lower() or
(isheader and span["size"] > most_common_font_size )
)
)
def openPDF(pdf_path):
logger.info(f"Opening PDF from URL: {pdf_path}")
pdf_path = pdf_path.replace('dl=0', 'dl=1')
response = requests.get(pdf_path)
logger.debug(f"PDF download response status: {response.status_code}")
pdf_content = BytesIO(response.content)
if not pdf_content:
logger.error("No valid PDF content found.")
raise ValueError("No valid PDF content found.")
doc = fitz.open(stream=pdf_content, filetype="pdf")
logger.info(f"PDF opened successfully, {len(doc)} pages")
return doc
# def identify_headers_with_openrouter(pdf_path, model, LLM_prompt, pages_to_check=None, top_margin=0, bottom_margin=0):
# """Ask an LLM (OpenRouter) to identify headers in the document.
# Returns a list of dicts: {text, page, suggested_level, confidence}.
# The function sends plain page-line strings to the LLM (including page numbers)
# and asks for a JSON array containing only header lines with suggested levels.
# """
# logger.info("=" * 80)
# logger.info("STARTING IDENTIFY_HEADERS_WITH_OPENROUTER")
# logger.info(f"PDF Path: {pdf_path}")
# logger.info(f"Model: {model}")
# logger.info(f"LLM Prompt: {LLM_prompt[:200]}..." if len(LLM_prompt) > 200 else f"LLM Prompt: {LLM_prompt}")
# doc = openPDF(pdf_path)
# api_key = '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'
# if api_key is None:
# api_key = os.getenv("OPENROUTER_API_KEY") or None
# model = str(model)
# # toc_pages = get_toc_page_numbers(doc)
# lines_for_prompt = []
# pgestoRun=20
# # logger.info(f"TOC pages to skip: {toc_pages}")
# logger.info(f"Total pages in document: {pgestoRun}")
# # Collect text lines from pages (skip TOC pages)
# total_lines = 0
# for pno in range(len(doc)):
# # if pages_to_check and pno not in pages_to_check:
# # continue
# # if pno in toc_pages:
# # logger.debug(f"Skipping TOC page {pno}")
# # continue
# page = doc.load_page(pno)
# page_height = page.rect.height
# text_dict = page.get_text("dict")
# lines_for_prompt = []
# lines_on_page = 0
# for block in text_dict.get("blocks", []):
# if block.get("type") != 0: # text blocks only
# continue
# for line in block.get("lines", []):
# spans = line.get("spans", [])
# if not spans:
# continue
# # Use first span to check vertical position
# y0 = spans[0]["bbox"][1]
# y1 = spans[0]['bbox'][3]
# # if y0 < top_margin or y1 > (page_height - bottom_margin):
# # continue
# text = " ".join(s.get('text','') for s in spans).strip()
# if text:
# # prefix with page for easier mapping back
# lines_for_prompt.append(f"PAGE {pno+1}: {text}")
# lines_on_page += 1
# # if lines_on_page > 0:
# # page = doc.load_page(pno)
# # page_height = page.rect.height
# # lines_on_page = 0
# # text_dict = page.get_text("dict")
# # lines = []
# # y_tolerance = 0.2 # tweak if needed (1–3 usually works)
# # for block in page.get_text("dict").get('blocks', []):
# # if block.get('type') != 0:
# # continue
# # for line in block.get('lines', []):
# # spans = line.get('spans', [])
# # if not spans:
# # continue
# # y0 = spans[0]['bbox'][1]
# # y1 = spans[0]['bbox'][3]
# # if y0 < top_margin or y1 > (page_height - bottom_margin):
# # continue
# # for s in spans:
# # # text,font,size,flags,color
# # # ArrayofTextWithFormat={'Font':s.get('font')},{'Size':s.get('size')},{'Flags':s.get('flags')},{'Color':s.get('color')},{'Text':s.get('text')}
# # # prefix with page for easier mapping back
# # text = s["text"].strip()
# # lines_for_prompt.append(f"PAGE {pno+1}: {text}")
# # # if not lines_for_prompt:
# # # return []
# # if text:
# # # prefix with page for easier mapping back
# # # lines_for_prompt.append(f"PAGE {pno+1}: {line}")
# # lines_on_page += 1
# if lines_on_page > 0:
# logger.debug(f"Page {pno}: collected {lines_on_page} lines")
# total_lines += lines_on_page
# logger.info(f"Total lines collected for LLM: {total_lines}")
# if not lines_for_prompt:
# logger.warning("No lines collected for prompt")
# return []
# # Log sample of lines
# logger.info("Sample lines (first 10):")
# for i, line in enumerate(lines_for_prompt[:10]):
# logger.info(f" {i}: {line}")
# prompt = LLM_prompt+"\n\nLines:\n" + "\n".join(lines_for_prompt)
# logger.debug(f"Full prompt length: {len(prompt)} characters")
# # Changed: Print entire prompt, not truncated
# print("=" * 80)
# print("FULL LLM PROMPT:")
# print(prompt)
# print("=" * 80)
# # Also log to file
# # try:
# # with open("full_prompt.txt", "w", encoding="utf-8") as f:
# # f.write(prompt)
# # logger.info("Full prompt saved to full_prompt.txt")
# # except Exception as e:
# # logger.error(f"Could not save prompt to file: {e}")
# if not api_key:
# # No API key: return empty so caller can fallback to heuristics
# logger.error("No API key provided")
# return []
# url = "https://openrouter.ai/api/v1/chat/completions"
# # Build headers following the OpenRouter example
# headers = {
# "Authorization": f"Bearer {api_key}",
# "Content-Type": "application/json",
# "HTTP-Referer": os.getenv("OPENROUTER_REFERER", ""),
# "X-Title": os.getenv("OPENROUTER_X_TITLE", "")
# }
# # Log request details (without exposing full API key)
# logger.info(f"Making request to OpenRouter with model: {model}")
# logger.debug(f"Headers (API key masked): { {k: '***' if k == 'Authorization' else v for k, v in headers.items()} }")
# # Wrap the prompt as the example 'content' array expected by OpenRouter
# body = {
# "model": model,
# "messages": [
# {
# "role": "user",
# "content": [
# {"type": "text", "text": prompt}
# ]
# }
# ]
# }
# # Debug: log request body (truncated) and write raw response for inspection
# try:
# # Changed: Log full body (excluding prompt text which is already logged)
# logger.debug(f"Request body (without prompt text): { {k: v if k != 'messages' else '[...prompt...]' for k, v in body.items()} }")
# # Removed timeout parameter
# resp = requests.post(
# url=url,
# headers=headers,
# data=json.dumps(body)
# )
# logger.info(f"HTTP Response Status: {resp.status_code}")
# resp.raise_for_status()
# resp_text = resp.text
# # Changed: Print entire response
# print("=" * 80)
# print("FULL LLM RESPONSE:")
# print(resp_text)
# print("=" * 80)
# logger.info(f"LLM raw response length: {len(resp_text)}")
# # Save raw response for offline inspection
# try:
# with open("llm_debug.json", "w", encoding="utf-8") as fh:
# fh.write(resp_text)
# logger.info("Raw response saved to llm_debug.json")
# except Exception as e:
# logger.error(f"Warning: could not write llm_debug.json: {e}")
# rj = resp.json()
# logger.info(f"LLM parsed response type: {type(rj)}")
# if isinstance(rj, dict):
# logger.debug(f"Response keys: {list(rj.keys())}")
# except requests.exceptions.RequestException as e:
# logger.error(f"HTTP request failed: {repr(e)}")
# return []
# except Exception as e:
# logger.error(f"LLM call failed: {repr(e)}")
# return []
# # Extract textual reply robustly
# text_reply = None
# if isinstance(rj, dict):
# choices = rj.get('choices') or []
# logger.debug(f"Number of choices in response: {len(choices)}")
# if choices:
# for i, c in enumerate(choices):
# logger.debug(f"Choice {i}: {c}")
# c0 = choices[0]
# msg = c0.get('message') or c0.get('delta') or {}
# content = msg.get('content')
# if isinstance(content, list):
# logger.debug(f"Content is a list with {len(content)} items")
# for idx, c in enumerate(content):
# if c.get('type') == 'text' and c.get('text'):
# text_reply = c.get('text')
# logger.debug(f"Found text reply in content[{idx}], length: {len(text_reply)}")
# break
# elif isinstance(content, str):
# text_reply = content
# logger.debug(f"Content is string, length: {len(text_reply)}")
# elif isinstance(msg, dict) and msg.get('content') and isinstance(msg.get('content'), dict):
# text_reply = msg.get('content').get('text')
# logger.debug(f"Found text in nested content dict")
# # Fallback extraction
# if not text_reply:
# logger.debug("Trying fallback extraction from choices")
# for c in rj.get('choices', []):
# if isinstance(c.get('text'), str):
# text_reply = c.get('text')
# logger.debug(f"Found text reply in choice.text, length: {len(text_reply)}")
# break
# if not text_reply:
# logger.error("Could not extract text reply from response")
# # Changed: Print the entire response structure for debugging
# print("=" * 80)
# print("FAILED TO EXTRACT TEXT REPLY. FULL RESPONSE STRUCTURE:")
# print(json.dumps(rj, indent=2))
# print("=" * 80)
# return []
# # Changed: Print the extracted text reply
# print("=" * 80)
# print("EXTRACTED TEXT REPLY:")
# print(text_reply)
# print("=" * 80)
# logger.info(f"Extracted text reply length: {len(text_reply)}")
# logger.debug(f"First 500 chars of reply: {text_reply[:500]}...")
# s = text_reply.strip()
# start = s.find('[')
# end = s.rfind(']')
# js = s[start:end+1] if start != -1 and end != -1 else s
# logger.debug(f"Looking for JSON array: start={start}, end={end}")
# logger.debug(f"Extracted JSON string (first 500 chars): {js[:500]}...")
# try:
# parsed = json.loads(js)
# logger.info(f"Successfully parsed JSON, got {len(parsed)} items")
# except json.JSONDecodeError as e:
# logger.error(f"Failed to parse JSON: {e}")
# logger.error(f"JSON string that failed to parse: {js[:1000]}")
# # Try to find any JSON-like structure
# try:
# # Try to extract any JSON array
# import re
# json_pattern = r'\[\s*\{.*?\}\s*\]'
# matches = re.findall(json_pattern, text_reply, re.DOTALL)
# if matches:
# logger.info(f"Found {len(matches)} potential JSON arrays via regex")
# for i, match in enumerate(matches):
# try:
# parsed = json.loads(match)
# logger.info(f"Successfully parsed regex match {i} with {len(parsed)} items")
# break
# except json.JSONDecodeError as e2:
# logger.debug(f"Regex match {i} also failed: {e2}")
# continue
# else:
# logger.error("All regex matches failed to parse")
# return []
# else:
# logger.error("No JSON-like pattern found via regex")
# return []
# except Exception as e2:
# logger.error(f"Regex extraction also failed: {e2}")
# return []
# # Log parsed results
# logger.info(f"Parsed {len(parsed)} header items:")
# for i, obj in enumerate(parsed[:10]): # Log first 10 items
# logger.info(f" Item {i}: {obj}")
# # Normalize parsed entries and return
# out = []
# for obj in parsed:
# t = obj.get('text')
# page = int(obj.get('page')) if obj.get('page') else None
# level = obj.get('suggested_level')
# conf = float(obj.get('confidence') or 0)
# if t and page is not None:
# out.append({'text': t, 'page': page-1, 'suggested_level': level, 'confidence': conf})
# logger.info(f"Returning {len(out)} valid header entries")
# return out
def process_document_in_chunks(
lengthofDoc,
pdf_path,
LLM_prompt,
model,
chunk_size=15,
):
total_pages = lengthofDoc
all_results = []
print(f"DEBUG: process_document_in_chunks - Total pages: {total_pages}")
for start in range(0, total_pages, chunk_size):
end = start + chunk_size
print(f"DEBUG: Processing pages {start + 1}{min(end, total_pages)}")
result = identify_headers_with_openrouterNEWW(
pdf_path=pdf_path,
model=model,
LLM_prompt=LLM_prompt,
pages_to_check=(start, end)
)
print(f"DEBUG: Chunk returned {len(result) if result else 0} headers")
if result:
print(f"DEBUG: Sample header from chunk: {result[0]}")
all_results.extend(result)
print(f"DEBUG: Total headers collected: {len(all_results)}")
return all_results
def identify_headers_with_openrouterNEWW(pdf_path, model,LLM_prompt, pages_to_check=None, top_margin=0, bottom_margin=0):
"""Ask an LLM (OpenRouter) to identify headers in the document.
Returns a list of dicts: {text, page, suggested_level, confidence}.
The function sends plain page-line strings to the LLM (including page numbers)
and asks for a JSON array containing only header lines with suggested levels.
"""
logger.info("=" * 80)
logger.info("STARTING IDENTIFY_HEADERS_WITH_OPENROUTER")
# logger.info(f"PDF Path: {pdf_path}")
logger.info(f"Model: {model}")
# logger.info(f"LLM Prompt: {LLM_prompt[:200]}..." if len(LLM_prompt) > 200 else f"LLM Prompt: {LLM_prompt}")
doc = openPDF(pdf_path)
api_key = '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'
if api_key is None:
api_key = os.getenv("OPENROUTER_API_KEY") or None
model = str(model)
# toc_pages = get_toc_page_numbers(doc)
lines_for_prompt = []
# pgestoRun=20
# logger.info(f"TOC pages to skip: {toc_pages}")
# logger.info(f"Total pages in document: {len(doc)}")
logger.info(f"Total pages in document: {len(doc)}")
# Collect text lines from pages (skip TOC pages)
total_lines = 0
ArrayofTextWithFormat = []
total_pages = len(doc)
if pages_to_check is None:
start_page = 0
end_page = min(15, total_pages)
else:
start_page, end_page = pages_to_check
end_page = min(end_page, total_pages) # 🔑 CRITICAL LINE
for pno in range(start_page, end_page):
page = doc.load_page(pno)
# # Collect text lines from pages (skip TOC pages)
# total_lines = 0
# for pno in range(len(doc)):
# if pages_to_check and pno not in pages_to_check:
# continue
# if pno in toc_pages:
# logger.debug(f"Skipping TOC page {pno}")
# continue
# page = doc.load_page(pno)
# page_height = page.rect.height
# lines_on_page = 0
# text_dict = page.get_text("dict")
# lines = []
# # y_tolerance = 0.2 # tweak if needed (1–3 usually works)
# for block in text_dict["blocks"]:
# if block["type"] != 0:
# continue
# for line in block["lines"]:
# for span in line["spans"]:
# text = span["text"].strip()
# if not text:
# continue
# if text:
# # prefix with page for easier mapping back
# lines_for_prompt.append(f"PAGE {pno+1}: {text}")
# lines_on_page += 1
# if lines_on_page > 0:
# logger.debug(f"Page {pno}: collected {lines_on_page} lines")
# total_lines += lines_on_page
# logger.info(f"Total lines collected for LLM: {total_lines}")
page_height = page.rect.height
lines_on_page = 0
text_dict = page.get_text("dict")
lines = []
y_tolerance = 0.5 # tweak if needed (1–3 usually works)
for block in text_dict["blocks"]:
if block["type"] != 0:
continue
for line in block["lines"]:
for span in line["spans"]:
text = span["text"].strip()
if not text: # Skip empty text
continue
# Extract all formatting attributes
font = span.get('font')
size = span.get('size')
color = span.get('color')
flags = span.get('flags', 0)
bbox = span.get("bbox", (0, 0, 0, 0))
x0, y0, x1, y1 = bbox
# Create text format dictionary
text_format = {
'Font': font,
'Size': size,
'Flags': flags,
'Color': color,
'Text': text,
'BBox': bbox,
'Page': pno + 1
}
# Add to ArrayofTextWithFormat
ArrayofTextWithFormat.append(text_format)
# For line grouping (keeping your existing logic)
matched = False
for l in lines:
if abs(l["y"] - y0) <= y_tolerance:
l["spans"].append((x0, text, font, size, color, flags))
matched = True
break
if not matched:
lines.append({
"y": y0,
"spans": [(x0, text, font, size, color, flags)]
})
lines.sort(key=lambda l: l["y"])
# Join text inside each line with formatting info
final_lines = []
for l in lines:
l["spans"].sort(key=lambda s: s[0]) # left → right
# Collect all text and formatting for this line
line_text = " ".join(text for _, text, _, _, _, _ in l["spans"])
# Get dominant formatting for the line (based on first span)
if l["spans"]:
_, _, font, size, color, flags = l["spans"][0]
# Store line with its formatting
line_with_format = {
'text': line_text,
'font': font,
'size': size,
'color': color,
'flags': flags,
'page': pno + 1,
'y_position': l["y"]
}
final_lines.append(line_with_format)
# Result
for line_data in final_lines:
line_text = line_data['text']
print(line_text)
if line_text:
# Create a formatted string with text properties
format_info = f"Font: {line_data['font']}, Size: {line_data['size']}, Color: {line_data['color']}"
lines_for_prompt.append(f"PAGE {pno+1}: {line_text} [{format_info}]")
lines_on_page += 1
if lines_on_page > 0:
logger.debug(f"Page {pno}: collected {lines_on_page} lines")
total_lines += lines_on_page
logger.info(f"Total lines collected for LLM: {total_lines}")
if not lines_for_prompt:
logger.warning("No lines collected for prompt")
return []
# Log sample of lines
logger.info("Sample lines (first 10):")
for i, line in enumerate(lines_for_prompt[:10]):
logger.info(f" {i}: {line}")
prompt =LLM_prompt + "\n\nLines:\n" + "\n".join(lines_for_prompt)
logger.debug(f"Full prompt length: {len(prompt)} characters")
# Changed: Print entire prompt, not truncated
print("=" * 80)
print("FULL LLM PROMPT:")
print(prompt)
print("=" * 80)
# Also log to file
try:
with open("full_prompt.txt", "w", encoding="utf-8") as f:
f.write(prompt)
logger.info("Full prompt saved to full_prompt.txt")
except Exception as e:
logger.error(f"Could not save prompt to file: {e}")
if not api_key:
# No API key: return empty so caller can fallback to heuristics
logger.error("No API key provided")
return []
url = "https://openrouter.ai/api/v1/chat/completions"
# Build headers following the OpenRouter example
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"HTTP-Referer": os.getenv("OPENROUTER_REFERER", ""),
"X-Title": os.getenv("OPENROUTER_X_TITLE", ""),
# "X-Request-Timestamp": str(unix_timestamp),
# "X-Request-Datetime": current_time,
}
# Log request details (without exposing full API key)
logger.info(f"Making request to OpenRouter with model: {model}")
logger.debug(f"Headers (API key masked): { {k: '***' if k == 'Authorization' else v for k, v in headers.items()} }")
# Wrap the prompt as the example 'content' array expected by OpenRouter
body = {
"model": model,
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": prompt}
]
}
]
}
# print(f"Request sent at: {current_time}")
# print(f"Unix timestamp: {unix_timestamp}")
# Debug: log request body (truncated) and write raw response for inspection
try:
# Changed: Log full body (excluding prompt text which is already logged)
logger.debug(f"Request body (without prompt text): { {k: v if k != 'messages' else '[...prompt...]' for k, v in body.items()} }")
# Removed timeout parameter
resp = requests.post(
url=url,
headers=headers,
data=json.dumps(body)
)
logger.info(f"HTTP Response Status: {resp.status_code}")
resp.raise_for_status()
resp_text = resp.text
# Changed: Print entire response
print("=" * 80)
print("FULL LLM RESPONSE:")
print(resp_text)
print("=" * 80)
logger.info(f"LLM raw response length: {len(resp_text)}")
# Save raw response for offline inspection
try:
with open("llm_debug.json", "w", encoding="utf-8") as fh:
fh.write(resp_text)
logger.info("Raw response saved to llm_debug.json")
except Exception as e:
logger.error(f"Warning: could not write llm_debug.json: {e}")
rj = resp.json()
logger.info(f"LLM parsed response type: {type(rj)}")
if isinstance(rj, dict):
logger.debug(f"Response keys: {list(rj.keys())}")
except requests.exceptions.RequestException as e:
logger.error(f"HTTP request failed: {repr(e)}")
return []
except Exception as e:
logger.error(f"LLM call failed: {repr(e)}")
return []
# Extract textual reply robustly
text_reply = None
if isinstance(rj, dict):
choices = rj.get('choices') or []
logger.debug(f"Number of choices in response: {len(choices)}")
if choices:
for i, c in enumerate(choices):
logger.debug(f"Choice {i}: {c}")
c0 = choices[0]
msg = c0.get('message') or c0.get('delta') or {}
content = msg.get('content')
if isinstance(content, list):
logger.debug(f"Content is a list with {len(content)} items")
for idx, c in enumerate(content):
if c.get('type') == 'text' and c.get('text'):
text_reply = c.get('text')
logger.debug(f"Found text reply in content[{idx}], length: {len(text_reply)}")
break
elif isinstance(content, str):
text_reply = content
logger.debug(f"Content is string, length: {len(text_reply)}")
elif isinstance(msg, dict) and msg.get('content') and isinstance(msg.get('content'), dict):
text_reply = msg.get('content').get('text')
logger.debug(f"Found text in nested content dict")
# Fallback extraction
if not text_reply:
logger.debug("Trying fallback extraction from choices")
for c in rj.get('choices', []):
if isinstance(c.get('text'), str):
text_reply = c.get('text')
logger.debug(f"Found text reply in choice.text, length: {len(text_reply)}")
break
if not text_reply:
logger.error("Could not extract text reply from response")
# Changed: Print the entire response structure for debugging
print("=" * 80)
print("FAILED TO EXTRACT TEXT REPLY. FULL RESPONSE STRUCTURE:")
print(json.dumps(rj, indent=2))
print("=" * 80)
return []
# Changed: Print the extracted text reply
print("=" * 80)
print("EXTRACTED TEXT REPLY:")
print(text_reply)
print("=" * 80)
logger.info(f"Extracted text reply length: {len(text_reply)}")
logger.debug(f"First 500 chars of reply: {text_reply[:500]}...")
s = text_reply.strip()
start = s.find('[')
end = s.rfind(']')
js = s[start:end+1] if start != -1 and end != -1 else s
logger.debug(f"Looking for JSON array: start={start}, end={end}")
logger.debug(f"Extracted JSON string (first 500 chars): {js[:500]}...")
try:
parsed = json.loads(js)
logger.info(f"Successfully parsed JSON, got {len(parsed)} items")
except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON: {e}")
logger.error(f"JSON string that failed to parse: {js[:1000]}")
# Try to find any JSON-like structure
try:
# Try to extract any JSON array
import re
json_pattern = r'\[\s*\{.*?\}\s*\]'
matches = re.findall(json_pattern, text_reply, re.DOTALL)
if matches:
logger.info(f"Found {len(matches)} potential JSON arrays via regex")
for i, match in enumerate(matches):
try:
parsed = json.loads(match)
logger.info(f"Successfully parsed regex match {i} with {len(parsed)} items")
break
except json.JSONDecodeError as e2:
logger.debug(f"Regex match {i} also failed: {e2}")
continue
else:
logger.error("All regex matches failed to parse")
return []
else:
logger.error("No JSON-like pattern found via regex")
return []
except Exception as e2:
logger.error(f"Regex extraction also failed: {e2}")
return []
# Log parsed results
logger.info(f"Parsed {len(parsed)} header items:")
for i, obj in enumerate(parsed[:10]): # Log first 10 items
logger.info(f" Item {i}: {obj}")
# Normalize parsed entries and return
out = []
for obj in parsed:
t = obj.get('text')
page = int(obj.get('page')) if obj.get('page') else None
level = obj.get('suggested_level')
conf = float(obj.get('confidence') or 0)
if t and page is not None:
out.append({'text': t, 'page': page-1, 'suggested_level': level, 'confidence': conf})
logger.info(f"Returning {len(out)} valid header entries")
return out
# def identify_headers_and_save_excel(pdf_path, model, llm_prompt):
# try:
# # 1. Get the result from your LLM function
# result = identify_headers_with_openrouter(pdf_path, model, llm_prompt)
# # 2. Safety Check: If LLM failed or returned nothing
# if not result:
# logger.warning("No headers found or LLM failed. Creating an empty report.")
# df = pd.DataFrame([{"System Message": "No headers were identified by the LLM."}])
# else:
# df = pd.DataFrame(result)
# # 3. Use an Absolute Path for the output
# # This ensures Gradio knows exactly where the file is
# output_path = os.path.abspath("header_analysis_output.xlsx")
# # 4. Save using the engine explicitly
# df.to_excel(output_path, index=False, engine='openpyxl')
# logger.info(f"File successfully saved to {output_path}")
# return output_path
# except Exception as e:
# logger.error(f"Critical error in processing: {str(e)}")
# # Return None or a custom error message to Gradio
# return None
def extract_section_under_header_tobebilledMultiplePDFS(multiplePDF_Paths,model,identified_headers):
logger.debug(f"Starting function")
# keywordstoSkip=["installation", "execution", "miscellaneous items", "workmanship", "testing", "labeling"]
filenames=[]
keywords = {'installation', 'execution', 'miscellaneous items', 'workmanship', 'testing', 'labeling'}
arrayofPDFS=multiplePDF_Paths.split(',')
print(multiplePDF_Paths)
print(arrayofPDFS)
docarray=[]
jsons=[]
df = pd.DataFrame(columns=["PDF Name","NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2","BodyText"])
for pdf_path in arrayofPDFS:
headertoContinue1 = False
headertoContinue2=False
Alltexttobebilled=''
parsed_url = urlparse(pdf_path)
filename = os.path.basename(parsed_url.path)
filename = unquote(filename) # decode URL-encoded characters
filenames.append(filename)
logger.debug(f"Starting with pdf: {filename}")
# Optimized URL handling
if pdf_path and ('http' in pdf_path or 'dropbox' in pdf_path):
pdf_path = pdf_path.replace('dl=0', 'dl=1')
# Cache frequently used values
response = requests.get(pdf_path)
pdf_content = BytesIO(response.content)
if not pdf_content:
raise ValueError("No valid PDF content found.")
doc = fitz.open(stream=pdf_content, filetype="pdf")
logger.info(f"Total pages in document: {len(doc)}")
docHighlights = fitz.open(stream=pdf_content, filetype="pdf")
most_common_font_size, most_common_color, most_common_font = get_regular_font_size_and_color(doc)
# Precompute regex patterns
dot_pattern = re.compile(r'\.{3,}')
url_pattern = re.compile(r'https?://\S+|www\.\S+')
toc_pages = get_toc_page_numbers(doc)
logger.info(f"Skipping TOC pages: Range {toc_pages}")
# headers, top_3_font_sizes, smallest_font_size, headersSpans = extract_headers(
# doc, toc_pages, most_common_font_size, most_common_color, most_common_font, top_margin, bottom_margin
# )
logger.info(f"Starting model run.")
# identified_headers = identify_headers_with_openrouterNEWW(doc, model)
allheaders_LLM=[]
for h in identified_headers:
if int(h["page"]) in toc_pages:
continue
if h['text']:
allheaders_LLM.append(h['text'])
logger.info(f"Done with model.")
print('identified_headers',identified_headers)
headers_json=headers_with_location(doc,identified_headers)
headers=filter_headers_outside_toc(headers_json,toc_pages)
hierarchy=build_hierarchy_from_llm(headers)
listofHeaderstoMarkup = get_leaf_headers_with_paths(hierarchy)
logger.info(f"Hierarchy built as {hierarchy}")
# Precompute all children headers once
allchildrenheaders = [normalize_text(item['text']) for item, p in listofHeaderstoMarkup]
allchildrenheaders_set = set(allchildrenheaders) # For faster lookups
# df = pd.DataFrame(columns=["NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2","BodyText"])
dictionaryNBS={}
data_list_JSON = []
json_output=[]
currentgroupname=''
# if len(top_3_font_sizes)==3:
# mainHeaderFontSize, subHeaderFontSize, subsubheaderFontSize = top_3_font_sizes
# elif len(top_3_font_sizes)==2:
# mainHeaderFontSize= top_3_font_sizes[0]
# subHeaderFontSize= top_3_font_sizes[1]
# subsubheaderFontSize= top_3_font_sizes[1]
# Preload all pages to avoid repeated loading
# pages = [doc.load_page(page_num) for page_num in range(len(doc)) if page_num not in toc_pages]
for heading_to_searchDict,pathss in listofHeaderstoMarkup:
heading_to_search = heading_to_searchDict['text']
heading_to_searchPageNum = heading_to_searchDict['page']
paths=heading_to_searchDict['path']
# Initialize variables
headertoContinue1 = False
headertoContinue2 = False
matched_header_line = None
done = False
collecting = False
collected_lines = []
page_highlights = {}
current_bbox = {}
last_y1s = {}
mainHeader = ''
subHeader = ''
matched_header_line_norm = heading_to_search
break_collecting = False
heading_norm = normalize_text(heading_to_search)
paths_norm = [normalize_text(p) for p in paths[0]] if paths and paths[0] else []
for page_num in range(heading_to_searchPageNum,len(doc)):
# print(heading_to_search)
if paths[0].strip().lower() != currentgroupname.strip().lower():
Alltexttobebilled+= paths[0] +'\n'
currentgroupname=paths[0]
# print(paths[0])
if page_num in toc_pages:
continue
if break_collecting:
break
page=doc[page_num]
page_height = page.rect.height
blocks = page.get_text("dict")["blocks"]
for block in blocks:
if break_collecting:
break
lines = block.get("lines", [])
i = 0
while i < len(lines):
if break_collecting:
break
spans = lines[i].get("spans", [])
if not spans:
i += 1
continue
y0 = spans[0]["bbox"][1]
y1 = spans[0]["bbox"][3]
if y0 < top_margin or y1 > (page_height - bottom_margin):
i += 1
continue
line_text = get_spaced_text_from_spans(spans).lower()
line_text_norm = normalize_text(line_text)
# Combine with next line if available
if i + 1 < len(lines):
next_spans = lines[i + 1].get("spans", [])
next_line_text = get_spaced_text_from_spans(next_spans).lower()
combined_line_norm = normalize_text(line_text + " " + next_line_text)
else:
combined_line_norm = line_text_norm
# Check if we should continue processing
if combined_line_norm and combined_line_norm in paths[0]:
headertoContinue1 = combined_line_norm
if combined_line_norm and combined_line_norm in paths[-2]:
headertoContinue2 = combined_line_norm
# if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() :
last_path = paths[-2].lower()
# if any(word in paths[-2].lower() for word in keywordstoSkip):
# if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() or 'workmanship' in paths[-2].lower() or 'testing' in paths[-2].lower() or 'labeling' in paths[-2].lower():
if any(keyword in last_path for keyword in keywords):
stringtowrite='Not to be billed'
logger.info(f"Keyword found. Not to be billed activated. keywords: {keywords}")
else:
stringtowrite='To be billed'
if stringtowrite=='To be billed':
# Alltexttobebilled+= combined_line_norm #################################################
if matched_header_line_norm in combined_line_norm:
Alltexttobebilled+='\n'
Alltexttobebilled+= ' '+combined_line_norm
# Optimized header matching
existsfull = (
( combined_line_norm in allchildrenheaders_set or
combined_line_norm in allchildrenheaders ) and heading_to_search in combined_line_norm
)
# New word-based matching
current_line_words = set(combined_line_norm.split())
heading_words = set(heading_norm.split())
all_words_match = current_line_words.issubset(heading_words) and len(current_line_words) > 0
substring_match = (
heading_norm in combined_line_norm or
combined_line_norm in heading_norm or
all_words_match # Include the new word-based matching
)
# substring_match = (
# heading_norm in combined_line_norm or
# combined_line_norm in heading_norm
# )
if (substring_match and existsfull and not collecting and
len(combined_line_norm) > 0 ):#and (headertoContinue1 or headertoContinue2) ):
# Check header conditions more efficiently
# header_spans = [
# span for span in spans
# if (is_header(span, most_common_font_size, most_common_color, most_common_font)
# # and span['size'] >= subsubheaderFontSize
# and span['size'] < mainHeaderFontSize)
# ]
if stringtowrite.startswith('To') :
collecting = True
# if stringtowrite=='To be billed':
# Alltexttobebilled+='\n'
# matched_header_font_size = max(span["size"] for span in header_spans)
# collected_lines.append(line_text)
valid_spans = [span for span in spans if span.get("bbox")]
if valid_spans:
x0s = [span["bbox"][0] for span in valid_spans]
x1s = [span["bbox"][2] for span in valid_spans]
y0s = [span["bbox"][1] for span in valid_spans]
y1s = [span["bbox"][3] for span in valid_spans]
header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)]
if page_num in current_bbox:
cb = current_bbox[page_num]
current_bbox[page_num] = [
min(cb[0], header_bbox[0]),
min(cb[1], header_bbox[1]),
max(cb[2], header_bbox[2]),
max(cb[3], header_bbox[3])
]
else:
current_bbox[page_num] = header_bbox
last_y1s[page_num] = header_bbox[3]
x0, y0, x1, y1 = header_bbox
zoom = 200
left = int(x0)
top = int(y0)
zoom_str = f"{zoom},{left},{top}"
pageNumberFound = page_num + 1
# Build the query parameters
params = {
'pdfLink': pdf_path, # Your PDF link
'keyword': heading_to_search, # Your keyword (could be a string or list)
}
# URL encode each parameter
encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()}
# Construct the final encoded link
encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()])
# Correctly construct the final URL with page and zoom
# final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}"
# Get current date and time
now = datetime.now()
# Format the output
formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p")
# Optionally, add the URL to a DataFrame
data_entry = {
"PDF Name":filename,
"NBSLink": zoom_str,
"Subject": heading_to_search,
"Page": str(pageNumberFound),
"Author": "ADR",
"Creation Date": formatted_time,
"Layer": "Initial",
"Code": stringtowrite,
# "head above 1": paths[-2],
# "head above 2": paths[0],
"BodyText":collected_lines,
"MC Connnection": 'Go to ' + paths[0].strip().split()[0] +'/'+ heading_to_search.strip().split()[0] + ' in '+ filename
}
# Dynamically add "head above 1", "head above 2", ... depending on the number of levels
for i, path_text in enumerate(paths[:-1]): # skip the last one because that's the current heading
data_entry[f"head above {i+1}"] = path_text
data_list_JSON.append(data_entry)
# Convert list to JSON
# json_output = [data_list_JSON]
# json_output = json.dumps(data_list_JSON, indent=4)
i += 2
continue
else:
if (substring_match and not collecting and
len(combined_line_norm) > 0): # and (headertoContinue1 or headertoContinue2) ):
# Calculate word match percentage
word_match_percent = words_match_ratio(heading_norm, combined_line_norm) * 100
# Check if at least 70% of header words exist in this line
meets_word_threshold = word_match_percent >= 100
# Check header conditions (including word threshold)
# header_spans = [
# span for span in spans
# if (is_header(span, most_common_font_size, most_common_color, most_common_font)
# # and span['size'] >= subsubheaderFontSize
# and span['size'] < mainHeaderFontSize)
# ]
if (meets_word_threshold or same_start_word(heading_to_search, combined_line_norm) ) and stringtowrite.startswith('To'):
collecting = True
if stringtowrite=='To be billed':
Alltexttobebilled+='\n'
# if stringtowrite=='To be billed':
# Alltexttobebilled+= ' '+ combined_line_norm
# matched_header_font_size = max(span["size"] for span in header_spans)
collected_lines.append(line_text)
valid_spans = [span for span in spans if span.get("bbox")]
if valid_spans:
x0s = [span["bbox"][0] for span in valid_spans]
x1s = [span["bbox"][2] for span in valid_spans]
y0s = [span["bbox"][1] for span in valid_spans]
y1s = [span["bbox"][3] for span in valid_spans]
header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)]
if page_num in current_bbox:
cb = current_bbox[page_num]
current_bbox[page_num] = [
min(cb[0], header_bbox[0]),
min(cb[1], header_bbox[1]),
max(cb[2], header_bbox[2]),
max(cb[3], header_bbox[3])
]
else:
current_bbox[page_num] = header_bbox
last_y1s[page_num] = header_bbox[3]
x0, y0, x1, y1 = header_bbox
zoom = 200
left = int(x0)
top = int(y0)
zoom_str = f"{zoom},{left},{top}"
pageNumberFound = page_num + 1
# Build the query parameters
params = {
'pdfLink': pdf_path, # Your PDF link
'keyword': heading_to_search, # Your keyword (could be a string or list)
}
# URL encode each parameter
encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()}
# Construct the final encoded link
encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()])
# Correctly construct the final URL with page and zoom
# final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}"
# Get current date and time
now = datetime.now()
# Format the output
formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p")
# Optionally, add the URL to a DataFrame
logger.info(f"Logging into table")
data_entry = {
"PDF Name":filename,
"NBSLink": zoom_str,
"Subject": heading_to_search,
"Page": str(pageNumberFound),
"Author": "ADR",
"Creation Date": formatted_time,
"Layer": "Initial",
"Code": stringtowrite,
# "head above 1": paths[-2],
# "head above 2": paths[0],
"BodyText":collected_lines,
"MC Connnection": 'Go to ' + paths[0].strip().split()[0] +'/'+ heading_to_search.strip().split()[0] + ' in '+ filename
}
# Dynamically add "head above 1", "head above 2", ... depending on the number of levels
for i, path_text in enumerate(paths[:-1]): # skip the last one because that's the current heading
data_entry[f"head above {i+1}"] = path_text
data_list_JSON.append(data_entry)
# Convert list to JSON
# json_output = [data_list_JSON]
# json_output = json.dumps(data_list_JSON, indent=4)
i += 2
continue
if collecting:
norm_line = normalize_text(line_text)
# Optimized URL check
if url_pattern.match(norm_line):
line_is_header = False
else:
# line_is_header = any(is_header(span, most_common_font_size, most_common_color, most_common_font) for span in spans)
def normalize(text):
return " ".join(text.lower().split())
line_text = " ".join(span["text"] for span in spans).strip()
line_is_header = any(
normalize(line_text) == normalize(header)
for header in allheaders_LLM
)
if line_is_header:
header_font_size = max(span["size"] for span in spans)
is_probably_real_header = (
# header_font_size >= matched_header_font_size and
# is_header(spans[0], most_common_font_size, most_common_color, most_common_font) and
len(line_text.strip()) > 2
)
if (norm_line != matched_header_line_norm and
norm_line != heading_norm and
is_probably_real_header):
if line_text not in heading_norm:
collecting = False
done = True
headertoContinue1 = False
headertoContinue2=False
for page_num, bbox in current_bbox.items():
bbox[3] = last_y1s.get(page_num, bbox[3])
page_highlights[page_num] = bbox
highlight_boxes(docHighlights, page_highlights,stringtowrite)
break_collecting = True
break
if break_collecting:
break
collected_lines.append(line_text)
valid_spans = [span for span in spans if span.get("bbox")]
if valid_spans:
x0s = [span["bbox"][0] for span in valid_spans]
x1s = [span["bbox"][2] for span in valid_spans]
y0s = [span["bbox"][1] for span in valid_spans]
y1s = [span["bbox"][3] for span in valid_spans]
line_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)]
if page_num in current_bbox:
cb = current_bbox[page_num]
current_bbox[page_num] = [
min(cb[0], line_bbox[0]),
min(cb[1], line_bbox[1]),
max(cb[2], line_bbox[2]),
max(cb[3], line_bbox[3])
]
else:
current_bbox[page_num] = line_bbox
last_y1s[page_num] = line_bbox[3]
i += 1
if not done:
for page_num, bbox in current_bbox.items():
bbox[3] = last_y1s.get(page_num, bbox[3])
page_highlights[page_num] = bbox
if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() :
stringtowrite='Not to be billed'
else:
stringtowrite='To be billed'
highlight_boxes(docHighlights, page_highlights,stringtowrite)
docarray.append(docHighlights)
if data_list_JSON and not data_list_JSON[-1]["BodyText"] and collected_lines:
data_list_JSON[-1]["BodyText"] = collected_lines[1:] if len(collected_lines) > 0 else []
# Final cleanup of the JSON data before returning
for entry in data_list_JSON:
# Check if BodyText exists and has content
if isinstance(entry.get("BodyText"), list) and len(entry["BodyText"]) > 0:
# Check if the first line of the body is essentially the same as the Subject
first_line = normalize_text(entry["BodyText"][0])
subject = normalize_text(entry["Subject"])
# If they match or the subject is inside the first line, remove it
if subject in first_line or first_line in subject:
entry["BodyText"] = entry["BodyText"][1:]
jsons.append(data_list_JSON)
logger.info(f"Markups done! Uploading to dropbox")
logger.info(f"Uploaded and Readyy!")
return jsons,identified_headers
def testFunction(pdf_path, model,LLM_prompt):
Alltexttobebilled=''
alltextWithoutNotbilled=''
# keywordstoSkip=["installation", "execution", "miscellaneous items", "workmanship", "testing", "labeling"]
headertoContinue1 = False
headertoContinue2=False
parsed_url = urlparse(pdf_path)
filename = os.path.basename(parsed_url.path)
filename = unquote(filename) # decode URL-encoded characters
# Optimized URL handling
if pdf_path and ('http' in pdf_path or 'dropbox' in pdf_path):
pdf_path = pdf_path.replace('dl=0', 'dl=1')
# Cache frequently used values
response = requests.get(pdf_path)
pdf_content = BytesIO(response.content)
if not pdf_content:
raise ValueError("No valid PDF content found.")
doc = fitz.open(stream=pdf_content, filetype="pdf")
docHighlights = fitz.open(stream=pdf_content, filetype="pdf")
parsed_url = urlparse(pdf_path)
filename = os.path.basename(parsed_url.path)
filename = unquote(filename) # decode URL-encoded characters
#### Get regular tex font size, style , color
most_common_font_size, most_common_color, most_common_font = get_regular_font_size_and_color(doc)
# Precompute regex patterns
dot_pattern = re.compile(r'\.{3,}')
url_pattern = re.compile(r'https?://\S+|www\.\S+')
highlighted=[]
processed_subjects = set() # Initialize at the top of testFunction
toc_pages = get_toc_page_numbers(doc)
identified_headers=process_document_in_chunks(len(doc), pdf_path, LLM_prompt, model)
# identified_headers = identify_headers_with_openrouterNEWW(doc, api_key='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')# ['text', fontsize, page number,y]
# with open("identified_headers.txt", "w", encoding="utf-8") as f:
# json.dump(identified_headers, f, indent=4)
# with open("identified_headers.txt", "r", encoding="utf-8") as f:
# identified_headers = json.load(f)
print(identified_headers)
allheaders_LLM=[]
for h in identified_headers:
if int(h["page"]) in toc_pages:
continue
if h['text']:
allheaders_LLM.append(h['text'])
headers_json=headers_with_location(doc,identified_headers)
headers=filter_headers_outside_toc(headers_json,toc_pages)
hierarchy=build_hierarchy_from_llm(headers)
# identify_headers_and_save_excel(hierarchy)
listofHeaderstoMarkup = get_leaf_headers_with_paths(hierarchy)
allchildrenheaders = [normalize_text(item['text']) for item, p in listofHeaderstoMarkup]
allchildrenheaders_set = set(allchildrenheaders) # For faster lookups
# print('allchildrenheaders_set',allchildrenheaders_set)
df = pd.DataFrame(columns=["NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2",'BodyText'])
dictionaryNBS={}
data_list_JSON = []
for heading_to_searchDict,pathss in listofHeaderstoMarkup:
heading_to_search = heading_to_searchDict['text']
heading_to_searchPageNum = heading_to_searchDict['page']
paths=heading_to_searchDict['path']
# xloc=heading_to_searchDict['x']
yloc=heading_to_searchDict['y']
# Initialize variables
headertoContinue1 = False
headertoContinue2 = False
matched_header_line = None
done = False
collecting = False
collected_lines = []
page_highlights = {}
current_bbox = {}
last_y1s = {}
mainHeader = ''
subHeader = ''
matched_header_line_norm = heading_to_search
break_collecting = False
heading_norm = normalize_text(heading_to_search)
paths_norm = [normalize_text(p) for p in paths[0]] if paths and paths[0] else []
for page_num in range(heading_to_searchPageNum,len(doc)):
if page_num in toc_pages:
continue
if break_collecting:
break
page=doc[page_num]
page_height = page.rect.height
blocks = page.get_text("dict")["blocks"]
for block in blocks:
if break_collecting:
break
lines = block.get("lines", [])
i = 0
while i < len(lines):
if break_collecting:
break
spans = lines[i].get("spans", [])
if not spans:
i += 1
continue
# y0 = spans[0]["bbox"][1]
# y1 = spans[0]["bbox"][3]
x0 = spans[0]["bbox"][0] # left
x1 = spans[0]["bbox"][2] # right
y0 = spans[0]["bbox"][1] # top
y1 = spans[0]["bbox"][3] # bottom
if y0 < top_margin or y1 > (page_height - bottom_margin):
i += 1
continue
line_text = get_spaced_text_from_spans(spans).lower()
line_text_norm = normalize_text(line_text)
# Combine with next line if available
if i + 1 < len(lines):
next_spans = lines[i + 1].get("spans", [])
next_line_text = get_spaced_text_from_spans(next_spans).lower()
combined_line_norm = normalize_text(line_text + " " + next_line_text)
else:
combined_line_norm = line_text_norm
# Check if we should continue processing
if combined_line_norm and combined_line_norm in paths[0]:
headertoContinue1 = combined_line_norm
if combined_line_norm and combined_line_norm in paths[-2]:
headertoContinue2 = combined_line_norm
# print('paths',paths)
# if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() :
# if any(word in paths[-2].lower() for word in keywordstoSkip):
# stringtowrite='Not to be billed'
# else:
stringtowrite='To be billed'
if stringtowrite!='To be billed':
alltextWithoutNotbilled+= combined_line_norm #################################################
# Optimized header matching
existsfull = (
( combined_line_norm in allchildrenheaders_set or
combined_line_norm in allchildrenheaders ) and heading_to_search in combined_line_norm
)
# existsfull=False
# if xloc==x0 and yloc ==y0:
# existsfull=True
# New word-based matching
current_line_words = set(combined_line_norm.split())
heading_words = set(heading_norm.split())
all_words_match = current_line_words.issubset(heading_words) and len(current_line_words) > 0
substring_match = (
heading_norm in combined_line_norm or
combined_line_norm in heading_norm or
all_words_match # Include the new word-based matching
)
# substring_match = (
# heading_norm in combined_line_norm or
# combined_line_norm in heading_norm
# )
if ( substring_match and existsfull and not collecting and
len(combined_line_norm) > 0 ):#and (headertoContinue1 or headertoContinue2) ):
# Check header conditions more efficiently
# header_spans = [
# span for span in spans
# if (is_header(span, most_common_font_size, most_common_color, most_common_font) )
# # and span['size'] >= subsubheaderFontSize
# # and span['size'] < mainHeaderFontSize)
# ]
if stringtowrite.startswith('To'):
collecting = True
# matched_header_font_size = max(span["size"] for span in header_spans)
Alltexttobebilled+= ' '+ combined_line_norm
# collected_lines.append(line_text)
valid_spans = [span for span in spans if span.get("bbox")]
if valid_spans:
x0s = [span["bbox"][0] for span in valid_spans]
x1s = [span["bbox"][2] for span in valid_spans]
y0s = [span["bbox"][1] for span in valid_spans]
y1s = [span["bbox"][3] for span in valid_spans]
header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)]
if page_num in current_bbox:
cb = current_bbox[page_num]
current_bbox[page_num] = [
min(cb[0], header_bbox[0]),
min(cb[1], header_bbox[1]),
max(cb[2], header_bbox[2]),
max(cb[3], header_bbox[3])
]
else:
current_bbox[page_num] = header_bbox
last_y1s[page_num] = header_bbox[3]
x0, y0, x1, y1 = header_bbox
zoom = 200
left = int(x0)
top = int(y0)
zoom_str = f"{zoom},{left},{top}"
pageNumberFound = page_num + 1
# Build the query parameters
params = {
'pdfLink': pdf_path, # Your PDF link
'keyword': heading_to_search, # Your keyword (could be a string or list)
}
# URL encode each parameter
encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()}
# Construct the final encoded link
encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()])
# Correctly construct the final URL with page and zoom
# final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}"
# Get current date and time
now = datetime.now()
# Format the output
formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p")
# Optionally, add the URL to a DataFrame
# Create the data entry only if the subject is unique
if heading_to_search not in processed_subjects:
data_entry = {
"NBSLink": zoom_str,
"Subject": heading_to_search,
"Page": str(pageNumberFound),
"Author": "ADR",
"Creation Date": formatted_time,
"Layer": "Initial",
"Code": stringtowrite,
"BodyText": collected_lines,
"MC Connnection": 'Go to ' + paths[0].strip().split()[0] + '/' + heading_to_search.strip().split()[0] + ' in ' + filename
}
# Dynamically add hierarchy paths
for i, path_text in enumerate(paths[:-1]):
data_entry[f"head above {i+1}"] = path_text
# Append to the list and mark this subject as processed
data_list_JSON.append(data_entry)
processed_subjects.add(heading_to_search)
else:
print(f"Skipping duplicate data entry for Subject: {heading_to_search}")
# Convert list to JSON
json_output = json.dumps(data_list_JSON, indent=4)
i += 1
continue
else:
if (substring_match and not collecting and
len(combined_line_norm) > 0): # and (headertoContinue1 or headertoContinue2) ):
# Calculate word match percentage
word_match_percent = words_match_ratio(heading_norm, combined_line_norm) * 100
# Check if at least 70% of header words exist in this line
meets_word_threshold = word_match_percent >= 100
# Check header conditions (including word threshold)
# header_spans = [
# span for span in spans
# if (is_header(span, most_common_font_size, most_common_color, most_common_font))
# # and span['size'] >= subsubheaderFontSize
# # and span['size'] < mainHeaderFontSize)
# ]
if (meets_word_threshold or same_start_word(heading_to_search, combined_line_norm) ) and stringtowrite.startswith('To'):
collecting = True
# matched_header_font_size = max(span["size"] for span in header_spans)
Alltexttobebilled+= ' '+ combined_line_norm
collected_lines.append(line_text)
valid_spans = [span for span in spans if span.get("bbox")]
if valid_spans:
x0s = [span["bbox"][0] for span in valid_spans]
x1s = [span["bbox"][2] for span in valid_spans]
y0s = [span["bbox"][1] for span in valid_spans]
y1s = [span["bbox"][3] for span in valid_spans]
header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)]
if page_num in current_bbox:
cb = current_bbox[page_num]
current_bbox[page_num] = [
min(cb[0], header_bbox[0]),
min(cb[1], header_bbox[1]),
max(cb[2], header_bbox[2]),
max(cb[3], header_bbox[3])
]
else:
current_bbox[page_num] = header_bbox
last_y1s[page_num] = header_bbox[3]
x0, y0, x1, y1 = header_bbox
zoom = 200
left = int(x0)
top = int(y0)
zoom_str = f"{zoom},{left},{top}"
pageNumberFound = page_num + 1
# Build the query parameters
params = {
'pdfLink': pdf_path, # Your PDF link
'keyword': heading_to_search, # Your keyword (could be a string or list)
}
# URL encode each parameter
encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()}
# Construct the final encoded link
encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()])
# Correctly construct the final URL with page and zoom
# final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}"
# Get current date and time
now = datetime.now()
# Format the output
formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p")
# Optionally, add the URL to a DataFrame
# Create the data entry only if the subject is unique
if heading_to_search not in processed_subjects:
data_entry = {
"NBSLink": zoom_str,
"Subject": heading_to_search,
"Page": str(pageNumberFound),
"Author": "ADR",
"Creation Date": formatted_time,
"Layer": "Initial",
"Code": stringtowrite,
"BodyText": collected_lines,
"MC Connnection": 'Go to ' + paths[0].strip().split()[0] + '/' + heading_to_search.strip().split()[0] + ' in ' + filename
}
# Dynamically add hierarchy paths
for i, path_text in enumerate(paths[:-1]):
data_entry[f"head above {i+1}"] = path_text
# Append to the list and mark this subject as processed
data_list_JSON.append(data_entry)
processed_subjects.add(heading_to_search)
else:
print(f"Skipping duplicate data entry for Subject: {heading_to_search}")
# Convert list to JSON
json_output = json.dumps(data_list_JSON, indent=4)
i += 2
continue
if collecting:
norm_line = normalize_text(line_text)
def normalize(text):
if isinstance(text, list):
text = " ".join(text)
return " ".join(text.lower().split())
def is_similar(a, b, threshold=0.75):
return SequenceMatcher(None, a, b).ratio() >= threshold
# Optimized URL check
if url_pattern.match(norm_line):
line_is_header = False
else:
line_is_header = any(is_header(span, most_common_font_size, most_common_color, most_common_font,allheaders_LLM) for span in spans)
# def normalize(text):
# return " ".join(text.lower().split())
# line_text = " ".join(span["text"] for span in spans).strip()
# line_is_header = any( normalize(line_text) == normalize(header) for header in allheaders_LLM )
# for line_text in lines:
# if collecting:
# # Join all spans into one line
# line_text = " ".join(span["text"] for span in spans).strip()
# norm_line = normalize(line_text)
# # Get max font size in this line
# max_font_size = max(span.get("size", 0) for span in spans)
# # Skip URLs
# if url_pattern.match(norm_line):
# line_is_header = False
# else:
# text_matches_header = any(
# is_similar(norm_line, normalize(header))
# if not isinstance(header, list)
# else is_similar(norm_line, normalize(" ".join(header)))
# for header in allheaders_LLM
# )
# # ✅ FINAL header condition
# line_is_header = text_matches_header and max_font_size > 11
if line_is_header:
header_font_size = max(span["size"] for span in spans)
is_probably_real_header = (
# header_font_size >= matched_header_font_size and
# is_header(spans[0], most_common_font_size, most_common_color, most_common_font) and
len(line_text.strip()) > 2
)
if (norm_line != matched_header_line_norm and
norm_line != heading_norm and
is_probably_real_header):
if line_text not in heading_norm:
collecting = False
done = True
headertoContinue1 = False
headertoContinue2=False
for page_num, bbox in current_bbox.items():
bbox[3] = last_y1s.get(page_num, bbox[3])
page_highlights[page_num] = bbox
can_highlight=False
if [page_num,bbox] not in highlighted:
highlighted.append([page_num,bbox])
can_highlight=True
if can_highlight:
highlight_boxes(docHighlights, page_highlights,stringtowrite)
break_collecting = True
break
if break_collecting:
break
collected_lines.append(line_text)
valid_spans = [span for span in spans if span.get("bbox")]
if valid_spans:
x0s = [span["bbox"][0] for span in valid_spans]
x1s = [span["bbox"][2] for span in valid_spans]
y0s = [span["bbox"][1] for span in valid_spans]
y1s = [span["bbox"][3] for span in valid_spans]
line_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)]
if page_num in current_bbox:
cb = current_bbox[page_num]
current_bbox[page_num] = [
min(cb[0], line_bbox[0]),
min(cb[1], line_bbox[1]),
max(cb[2], line_bbox[2]),
max(cb[3], line_bbox[3])
]
else:
current_bbox[page_num] = line_bbox
last_y1s[page_num] = line_bbox[3]
i += 1
if not done:
for page_num, bbox in current_bbox.items():
bbox[3] = last_y1s.get(page_num, bbox[3])
page_highlights[page_num] = bbox
# if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() :
# stringtowrite='Not to be billed'
# else:
stringtowrite='To be billed'
highlight_boxes(docHighlights, page_highlights,stringtowrite)
print("Current working directory:", os.getcwd())
docHighlights.save("highlighted_output.pdf")
# dbxTeam = tsadropboxretrieval.ADR_Access_DropboxTeam('user')
# metadata = dbxTeam.sharing_get_shared_link_metadata(pdf_path)
# dbPath = '/TSA JOBS/ADR Test/FIND/'
# pdf_bytes = BytesIO()
# docHighlights.save(pdf_bytes)
# pdflink = tsadropboxretrieval.uploadanyFile(doc=docHighlights, path=dbPath, pdfname=filename)
# json_output=changepdflinks(json_output,pdflink)
# return pdf_bytes.getvalue(), docHighlights , json_output , Alltexttobebilled , alltextWithoutNotbilled , filename
# Final safety check: if the very last entry in our list has an empty BodyText,
# but we have collected_lines, sync them.
if data_list_JSON and not data_list_JSON[-1]["BodyText"] and collected_lines:
data_list_JSON[-1]["BodyText"] = collected_lines[1:] if len(collected_lines) > 0 else []
# Final cleanup of the JSON data before returning
for entry in data_list_JSON:
# Check if BodyText exists and has content
if isinstance(entry.get("BodyText"), list) and len(entry["BodyText"]) > 0:
# Check if the first line of the body is essentially the same as the Subject
first_line = normalize_text(entry["BodyText"][0])
subject = normalize_text(entry["Subject"])
# If they match or the subject is inside the first line, remove it
if subject in first_line or first_line in subject:
entry["BodyText"] = entry["BodyText"][1:]
print('data_list_JSON',data_list_JSON)
# json_output.append(data_list_JSON)
json_output = json.dumps(data_list_JSON, indent=4)
logger.info(f"Markups done! Uploading to dropbox")
logger.info(f"Uploaded and Readyy!")
return json_output,identified_headers
def build_subject_body_map(jsons):
subject_body = {}
for obj in jsons:
subject = obj.get("Subject")
body = obj.get("BodyText", [])
if subject:
# join body text into a readable paragraph
subject_body[subject.strip()] = " ".join(body)
return subject_body
# def identify_headers_and_save_excel(pdf_path, model,LLM_prompt):
# try:
# # result = identify_headers_with_openrouterNEWW(pdf_path, model,LLM_prompt)
# print('beginnging identify')
# jsons,result = testFunction(pdf_path, model,LLM_prompt)
# print('done , will start dataframe',jsons,result)
# if not result:
# df = pd.DataFrame([{
# "text": None,
# "page": None,
# "suggested_level": None,
# "confidence": None,
# "body": None,
# "System Message": "No headers were identified by the LLM."
# }])
# else:
# df = pd.DataFrame(result)
# subject_body_map = {}
# # Safely navigate the nested structure: [ [ [ {dict}, {dict} ] ] ]
# for pdf_level in jsons:
# if not isinstance(pdf_level, list):
# continue
# for section_level in pdf_level:
# # If the LLM returns a list of dictionaries here
# if isinstance(section_level, list):
# for obj in section_level:
# if isinstance(obj, dict):
# subject = obj.get("Subject")
# body = obj.get("BodyText", [])
# if subject:
# # Ensure body is a list before joining
# body_str = " ".join(body) if isinstance(body, list) else str(body)
# subject_body_map[subject.strip()] = body_str
# # If the LLM returns a single dictionary here
# elif isinstance(section_level, dict):
# subject = section_level.get("Subject")
# body = section_level.get("BodyText", [])
# if subject:
# body_str = " ".join(body) if isinstance(body, list) else str(body)
# subject_body_map[subject.strip()] = body_str
# # Map the extracted body text to the "text" column in your main DataFrame
# if "text" in df.columns:
# df["body"] = df["text"].map(lambda x: subject_body_map.get(str(x).strip()) if x else None)
# else:
# df["body"] = None
# # Save to Excel
# output_path = os.path.abspath("header_analysis_output.xlsx")
# df.to_excel(output_path, index=False, engine="openpyxl")
# print("--- Processed DataFrame ---")
# print(df)
# return output_path
# except Exception as e:
# print(f"ERROR - Critical error in processing: {e}")
# # Re-raise or handle as needed
# return None
def identify_headers_and_save_excel(pdf_path, model,LLM_prompt):
try:
jsons, result = testFunction(pdf_path, model,LLM_prompt)
if not result:
df = pd.DataFrame([{
"text": None,
"page": None,
"suggested_level": None,
"confidence": None,
"body": None,
"System Message": "No headers were identified by the LLM."
}])
else:
print('here')
df = pd.DataFrame(result)
# Convert JSON string to list if needed
if isinstance(jsons, str):
jsons = json.loads(jsons)
subject_body_map = {}
# ✅ jsons is a flat list of dicts
for obj in jsons:
if not isinstance(obj, dict):
continue
subject = obj.get("Subject")
body = obj.get("BodyText", [])
if subject:
subject_body_map[subject.strip()] = " ".join(body)
# ✅ Map body to dataframe
df["body"] = df["text"].map(subject_body_map).fillna("")
# ✅ Save once at end
output_path = os.path.abspath("header_analysis_output.xlsx")
df.to_excel(output_path, index=False, engine="openpyxl")
print("--- Processed DataFrame ---")
print(df)
return output_path
except Exception as e:
logger.error(f"Critical error in processing: {str(e)}")
return None
# Improved launch with debug mode enabled
iface = gr.Interface(
fn=identify_headers_and_save_excel,
inputs=[
gr.Textbox(label="PDF URL"),
gr.Textbox(label="Model Type"), # Default example
gr.Textbox(label="LLM Prompt")
],
outputs=gr.File(label="Download Excel Results"),
title="PDF Header Extractor"
)
# Launch with debug=True to see errors in the console
iface.launch(debug=True)