diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -5,55 +5,36 @@ import requests from io import BytesIO from datetime import datetime import pandas as pd -from io import BytesIO import fitz # PyMuPDF from collections import defaultdict, Counter from urllib.parse import urlparse, unquote -import os -from io import BytesIO -import re -import requests -import pandas as pd -import fitz # PyMuPDF import re -import urllib.parse import difflib - import copy -# import tsadropboxretrieval - import urllib.parse import logging +from difflib import SequenceMatcher - -# Set up logging to see everything +# Set up logging logging.basicConfig( - level=logging.DEBUG, + level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ - logging.StreamHandler(), # Print to console - logging.FileHandler('debug.log', mode='w') # Save to file + logging.StreamHandler(), ] ) logger = logging.getLogger(__name__) - +# Constants top_margin = 70 bottom_margin = 85 def getLocation_of_header(doc, headerText, expected_page=None): locations = [] - - # pages = ( - # [(expected_page, doc.load_page(expected_page))] - # if expected_page is not None - # else enumerate(doc) - # ) - expectedpageNorm=expected_page - page=doc[expectedpageNorm] - # for page_number, page in pages: + expectedpageNorm = expected_page + page = doc[expectedpageNorm] page_height = page.rect.height rects = page.search_for(headerText) @@ -67,7 +48,7 @@ def getLocation_of_header(doc, headerText, expected_page=None): continue locations.append({ - "headerText":headerText, + "headerText": headerText, "page": expectedpageNorm, "x": r.x0, "y": y @@ -76,39 +57,26 @@ def getLocation_of_header(doc, headerText, expected_page=None): def filter_headers_outside_toc(headers, toc_pages): toc_pages_set = set(toc_pages) - filtered = [] + for h in headers: page = h[2] - y = h[3] - - # Skip invalid / fallback headers - if page is None or y is None: + if page is None: continue - - # Skip headers inside TOC pages if page in toc_pages_set: continue - filtered.append(h) - + return filtered - def headers_with_location(doc, llm_headers): - """ - Converts LLM headers into: - [text, font_size, page, y, suggested_level, x] - Always include all headers, even if location not found. - """ headersJson = [] for h in llm_headers: text = h["text"] llm_page = h["page"] - - # Attempt to locate the header on the page - locations = getLocation_of_header(doc, text,llm_page) + + locations = getLocation_of_header(doc, text, llm_page) if locations: for loc in locations: @@ -121,41 +89,38 @@ def headers_with_location(doc, llm_headers): for line in block.get("lines", []): line_text = "".join(span["text"] for span in line["spans"]).strip() if normalize(line_text) == normalize(text): - fontsize = line["spans"][0]["size"] - break + if line["spans"]: + fontsize = line["spans"][0]["size"] + break if fontsize: break + entry = [ text, fontsize, loc["page"], loc["y"], h["suggested_level"], - loc.get("x", 0), # Add x coordinate + loc.get("x", 0), ] if entry not in headersJson: headersJson.append(entry) + return headersJson - - def build_hierarchy_from_llm(headers): nodes = [] - - # ------------------------- - # 1. Build nodes safely - # ------------------------- + + # Build nodes for h in headers: - # print("headerrrrrrrrrrrrrrr", h) - - if len(h) < 6: # Changed from 5 to 6 since we added 'x' + if len(h) < 6: continue - text, size, page, y, level, x = h # Unpack 'x' here - + text, size, page, y, level, x = h + if level is None: continue - + try: level = int(level) except Exception: @@ -164,7 +129,7 @@ def build_hierarchy_from_llm(headers): node = { "text": text, "page": page if page is not None else -1, - "x": x if x is not None else -1, # Add this + "x": x if x is not None else -1, "y": y if y is not None else -1, "size": size, "bold": False, @@ -176,29 +141,20 @@ def build_hierarchy_from_llm(headers): "norm_text": normalize(text), "level": level, } - nodes.append(node) if not nodes: return [] - # ------------------------- - # 2. Sort top-to-bottom - # ------------------------- + # Sort top-to-bottom nodes.sort(key=lambda x: (x["page"], x["y"])) - # ------------------------- - # 3. NORMALIZE LEVELS - # (smallest level → 0) - # ------------------------- + # Normalize levels min_level = min(n["level"] for n in nodes) - for n in nodes: n["level"] -= min_level - # ------------------------- - # 4. Build hierarchy - # ------------------------- + # Build hierarchy root = [] stack = [] added_level0 = set() @@ -209,7 +165,6 @@ def build_hierarchy_from_llm(headers): if lvl < 0: continue - # De-duplicate true top-level headers if lvl == 0: key = (header["norm_text"], header["page"]) if key in added_level0: @@ -230,9 +185,7 @@ def build_hierarchy_from_llm(headers): stack.append(header) - # ------------------------- - # 5. Enforce nesting sanity - # ------------------------- + # Enforce nesting def enforce_nesting(node_list, parent_level=-1): for node in node_list: if node["level"] <= parent_level: @@ -241,32 +194,22 @@ def build_hierarchy_from_llm(headers): enforce_nesting(root) - # ------------------------- - # 6. OPTIONAL cleanup - # (only if real level-0s exist) - # ------------------------- + # Cleanup if any(h["level"] == 0 for h in root): root = [ h for h in root if not (h["level"] == 0 and not h["children"]) ] - # ------------------------- - # 7. Final pass - # ------------------------- - header_tree = enforce_level_hierarchy(root) - - return header_tree - - + return enforce_level_hierarchy(root) def get_regular_font_size_and_color(doc): font_sizes = [] colors = [] fonts = [] - # Loop through all pages - for page_num in range(len(doc)): + # Check only first few pages for efficiency + for page_num in range(min(len(doc), 10)): page = doc.load_page(page_num) for span in page.get_text("dict")["blocks"]: if "lines" in span: @@ -276,10 +219,9 @@ def get_regular_font_size_and_color(doc): colors.append(span['color']) fonts.append(span['font']) - # Get the most common font size, color, and font - most_common_font_size = Counter(font_sizes).most_common(1)[0][0] if font_sizes else None - most_common_color = Counter(colors).most_common(1)[0][0] if colors else None - most_common_font = Counter(fonts).most_common(1)[0][0] if fonts else None + most_common_font_size = Counter(font_sizes).most_common(1)[0][0] if font_sizes else 12 + most_common_color = Counter(colors).most_common(1)[0][0] if colors else 0 + most_common_font = Counter(fonts).most_common(1)[0][0] if fonts else "Helvetica" return most_common_font_size, most_common_color, most_common_font @@ -291,99 +233,75 @@ def normalize_text(text): def get_spaced_text_from_spans(spans): return normalize_text(" ".join(span["text"].strip() for span in spans)) - - - def is_numbered(text): return bool(re.match(r'^\d', text.strip())) def is_similar(a, b, threshold=0.85): - return difflib.SequenceMatcher(None, a, b).ratio() > threshold + return SequenceMatcher(None, a, b).ratio() > threshold def normalize(text): text = text.lower() - text = re.sub(r'\.{2,}', '', text) # remove long dots - text = re.sub(r'\s+', ' ', text) # replace multiple spaces with one + text = re.sub(r'\.{2,}', '', text) + text = re.sub(r'\s+', ' ', text) return text.strip() def clean_toc_entry(toc_text): - """Remove page numbers and formatting from TOC entries""" - # Remove everything after last sequence of dots/whitespace followed by digits return re.sub(r'[\.\s]+\d+.*$', '', toc_text).strip('. ') - - - - def enforce_level_hierarchy(headers): - """ - Ensure level 2 headers only exist under level 1 headers - and clean up any orphaned headers - """ def process_node_list(node_list, parent_level=-1): i = 0 while i < len(node_list): node = node_list[i] - - # Remove level 2 headers that don't have a level 1 parent if node['level'] == 2 and parent_level != 1: node_list.pop(i) continue - - # Recursively process children process_node_list(node['children'], node['level']) i += 1 process_node_list(headers) return headers - - - -def highlight_boxes(doc, highlights, stringtowrite, fixed_width=500): # Set your desired width here +def highlight_boxes(doc, highlights, stringtowrite, fixed_width=500): for page_num, bbox in highlights.items(): page = doc.load_page(page_num) page_width = page.rect.width - # Get original rect for vertical coordinates orig_rect = fitz.Rect(bbox) rect_height = orig_rect.height + if rect_height > 30: - if orig_rect.width > 10: - # Center horizontally using fixed width - center_x = page_width / 2 - new_x0 = center_x - fixed_width / 2 - new_x1 = center_x + fixed_width / 2 - new_rect = fitz.Rect(new_x0, orig_rect.y0, new_x1, orig_rect.y1) - - # Add highlight rectangle - annot = page.add_rect_annot(new_rect) - if stringtowrite.startswith('Not'): - annot.set_colors(stroke=(0.5, 0.5, 0.5), fill=(0.5, 0.5, 0.5)) - else: - annot.set_colors(stroke=(1, 1, 0), fill=(1, 1, 0)) - - annot.set_opacity(0.3) - annot.update() - - # Add right-aligned freetext annotation inside the fixed-width box - text = '['+stringtowrite +']' - annot1 = page.add_freetext_annot( - new_rect, - text, - fontsize=15, - fontname='helv', - text_color=(1, 0, 0), - rotate=page.rotation, - align=2 # right alignment - ) - annot1.update() + center_x = page_width / 2 + new_x0 = center_x - fixed_width / 2 + new_x1 = center_x + fixed_width / 2 + new_rect = fitz.Rect(new_x0, orig_rect.y0, new_x1, orig_rect.y1) + + annot = page.add_rect_annot(new_rect) + if stringtowrite.startswith('Not'): + annot.set_colors(stroke=(0.5, 0.5, 0.5), fill=(0.5, 0.5, 0.5)) + else: + annot.set_colors(stroke=(1, 1, 0), fill=(1, 1, 0)) + annot.set_opacity(0.3) + annot.update() + + text = '[' + stringtowrite + ']' + annot1 = page.add_freetext_annot( + new_rect, + text, + fontsize=15, + fontname='helv', + text_color=(1, 0, 0), + rotate=page.rotation, + align=2 + ) + annot1.update() def get_leaf_headers_with_paths(listtoloop, path=None, output=None): if path is None: path = [] if output is None: output = [] + for header in listtoloop: current_path = path + [header['text']] if not header['children']: @@ -391,8 +309,9 @@ def get_leaf_headers_with_paths(listtoloop, path=None, output=None): output.append((header, current_path)) else: get_leaf_headers_with_paths(header['children'], current_path, output) + return output -# Add this helper function at the top of your code + def words_match_ratio(text1, text2): words1 = set(text1.split()) words2 = set(text2.split()) @@ -402,26 +321,17 @@ def words_match_ratio(text1, text2): return len(common_words) / len(words1) def same_start_word(s1, s2): - # Split both strings into words words1 = s1.strip().split() words2 = s2.strip().split() - - # Check if both have at least one word and compare the first ones if words1 and words2: return words1[0].lower() == words2[0].lower() return False - def get_toc_page_numbers(doc, max_pages_to_check=15): toc_pages = [] - logger.debug(f"Starting TOC detection, checking first {max_pages_to_check} pages") - # 1. Existing Dot Pattern (looking for ".....") - dot_pattern = re.compile(r"\.{2,}") - # 2. NEW: Title Pattern (looking for specific headers) - # ^ and $ ensure the line is JUST that word (ignoring "The contents of the bag...") - # re.IGNORECASE makes it match "CONTENTS", "Contents", "Index", etc. + dot_pattern = re.compile(r"\.{2,}") title_pattern = re.compile(r"^\s*(table of contents|contents|index)\s*$", re.IGNORECASE) for page_num in range(min(len(doc), max_pages_to_check)): @@ -431,35 +341,19 @@ def get_toc_page_numbers(doc, max_pages_to_check=15): dot_line_count = 0 has_toc_title = False - logger.debug(f"Checking page {page_num} for TOC") - for block in blocks: for line in block.get("lines", []): - # Extract text from spans (mimicking get_spaced_text_from_spans) line_text = " ".join([span["text"] for span in line["spans"]]).strip() - # CHECK A: Does the line have dots? if dot_pattern.search(line_text): dot_line_count += 1 - logger.debug(f" Found dot pattern on page {page_num}: '{line_text[:50]}...'") - # CHECK B: Is this line a Title? - # We check this early in the loop. If a page has a title "Contents", - # we mark it immediately. if title_pattern.match(line_text): has_toc_title = True - logger.debug(f" Found TOC title on page {page_num}: '{line_text}'") - # CONDITION: - # It is a TOC page if it has a Title OR if it has dot leaders. - # We use 'dot_line_count >= 1' to be sensitive to single-item lists. if has_toc_title or dot_line_count >= 1: toc_pages.append(page_num) - logger.info(f"Page {page_num} identified as TOC page") - # RETURN: - # If we found TOC pages (e.g., [2, 3]), we return [0, 1, 2, 3] - # This covers the cover page, inside cover, and the TOC itself. if toc_pages: last_toc_page = toc_pages[0] result = list(range(0, last_toc_page + 1)) @@ -467,1506 +361,328 @@ def get_toc_page_numbers(doc, max_pages_to_check=15): return result logger.info("No TOC pages found") - return [] # Return empty list if nothing found - + return [] -def openPDF(pdf_path): +def openPDF(pdf_path): logger.info(f"Opening PDF from URL: {pdf_path}") pdf_path = pdf_path.replace('dl=0', 'dl=1') response = requests.get(pdf_path) - logger.debug(f"PDF download response status: {response.status_code}") - pdf_content = BytesIO(response.content) - if not pdf_content: - logger.error("No valid PDF content found.") - raise ValueError("No valid PDF content found.") + if response.status_code != 200: + logger.error(f"Failed to download PDF. Status code: {response.status_code}") + return None + + pdf_content = BytesIO(response.content) doc = fitz.open(stream=pdf_content, filetype="pdf") logger.info(f"PDF opened successfully, {len(doc)} pages") return doc - -def process_document_in_chunks( - lengthofDoc, - pdf_path, - LLM_prompt, - model, - chunk_size=15, +def is_header(span, regular_font_size, regular_color, regular_font, allheaders_LLM=None): + """ + Determine if a text span is a header based on font characteristics. + """ + # Check font size (headers are typically larger than regular text) + size_ok = span.get('size', 0) > regular_font_size * 1.1 -): - total_pages = lengthofDoc - all_results = [] - - for start in range(0, total_pages, chunk_size): - end = start + chunk_size - - logger.info(f"Processing pages {start + 1} → {min(end, total_pages)}") - - result = identify_headers_with_openrouterNEWW( - pdf_path=pdf_path, - model=model, - LLM_prompt=LLM_prompt, - pages_to_check=(start, end) - ) - - if result: - all_results.extend(result) - - return all_results - - -def identify_headers_with_openrouterNEWW(pdf_path, model,LLM_prompt, pages_to_check=None, top_margin=0, bottom_margin=0): + # Check if it's bold (common for headers) + flags = span.get('flags', 0) + is_bold = bool(flags & 2) - """Ask an LLM (OpenRouter) to identify headers in the document. - Returns a list of dicts: {text, page, suggested_level, confidence}. - The function sends plain page-line strings to the LLM (including page numbers) - and asks for a JSON array containing only header lines with suggested levels. - """ - logger.info("=" * 80) - logger.info("STARTING IDENTIFY_HEADERS_WITH_OPENROUTER") - # logger.info(f"PDF Path: {pdf_path}") - logger.info(f"Model: {model}") - # logger.info(f"LLM Prompt: {LLM_prompt[:200]}..." if len(LLM_prompt) > 200 else f"LLM Prompt: {LLM_prompt}") + # Check font family + font_ok = span.get('font') != regular_font - doc = openPDF(pdf_path) - api_key = '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' - if api_key is None: - api_key = os.getenv("OPENROUTER_API_KEY") or None + # Check color + color_ok = span.get('color') != regular_color - model = str(model) - # toc_pages = get_toc_page_numbers(doc) - lines_for_prompt = [] - # pgestoRun=20 - # logger.info(f"TOC pages to skip: {toc_pages}") - # logger.info(f"Total pages in document: {len(doc)}") - logger.info(f"Total pages in document: {len(doc)}") + # Check if text matches LLM-identified headers + text_match = False + if allheaders_LLM and 'text' in span: + span_text = span['text'].strip() + if span_text: + norm_text = normalize_text(span_text) + text_match = any( + normalize_text(header) == norm_text + for header in allheaders_LLM + ) - # Collect text lines from pages (skip TOC pages) - total_lines = 0 - - ArrayofTextWithFormat = [] - total_pages = len(doc) - - if pages_to_check is None: - start_page = 0 - end_page = min(15, total_pages) - else: - start_page, end_page = pages_to_check - end_page = min(end_page, total_pages) # 🔑 CRITICAL LINE - - for pno in range(start_page, end_page): - page = doc.load_page(pno) - page_height = page.rect.height - lines_on_page = 0 - text_dict = page.get_text("dict") - lines = [] - y_tolerance = 0.5 # tweak if needed (1–3 usually works) - - for block in text_dict["blocks"]: - if block["type"] != 0: - continue - for line in block["lines"]: - for span in line["spans"]: - text = span["text"].strip() - if not text: # Skip empty text - continue - - # Extract all formatting attributes - font = span.get('font') - size = span.get('size') - color = span.get('color') - flags = span.get('flags', 0) - bbox = span.get("bbox", (0, 0, 0, 0)) - x0, y0, x1, y1 = bbox - - # Create text format dictionary - text_format = { - 'Font': font, - 'Size': size, - 'Flags': flags, - 'Color': color, - 'Text': text, - 'BBox': bbox, - 'Page': pno + 1 - } - - # Add to ArrayofTextWithFormat - ArrayofTextWithFormat.append(text_format) - - # For line grouping (keeping your existing logic) - matched = False - for l in lines: - if abs(l["y"] - y0) <= y_tolerance: - l["spans"].append((x0, text, font, size, color, flags)) - matched = True - break - if not matched: - lines.append({ - "y": y0, - "spans": [(x0, text, font, size, color, flags)] - }) - - lines.sort(key=lambda l: l["y"]) - - # Join text inside each line with formatting info - final_lines = [] - for l in lines: - l["spans"].sort(key=lambda s: s[0]) # left → right - - # Collect all text and formatting for this line - line_text = " ".join(text for _, text, _, _, _, _ in l["spans"]) - - # Get dominant formatting for the line (based on first span) - if l["spans"]: - _, _, font, size, color, flags = l["spans"][0] - - # Store line with its formatting - line_with_format = { - 'text': line_text, - 'font': font, - 'size': size, - 'color': color, - 'flags': flags, - 'page': pno + 1, - 'y_position': l["y"] - } - final_lines.append(line_with_format) - - # Result - for line_data in final_lines: - line_text = line_data['text'] - print(line_text) - - if line_text: - # Create a formatted string with text properties - format_info = f"Font: {line_data['font']}, Size: {line_data['size']}, Color: {line_data['color']}" - lines_for_prompt.append(f"PAGE {pno+1}: {line_text} [{format_info}]") - lines_on_page += 1 - - if lines_on_page > 0: - logger.debug(f"Page {pno}: collected {lines_on_page} lines") - total_lines += lines_on_page - - logger.info(f"Total lines collected for LLM: {total_lines}") + # A span is considered a header if it meets multiple criteria + return (size_ok and (is_bold or font_ok or color_ok)) or text_match +def identify_headers_with_openrouter(pdf_path, model, LLM_prompt, pages_to_check=None): + """Simplified version for HuggingFace Spaces""" + logger.info("Starting header identification") - if not lines_for_prompt: - logger.warning("No lines collected for prompt") + doc = openPDF(pdf_path) + if doc is None: return [] - # Log sample of lines - logger.info("Sample lines (first 10):") - for i, line in enumerate(lines_for_prompt[:10]): - logger.info(f" {i}: {line}") + # Use environment variable for API key + api_key = os.getenv("OPENROUTER_API_KEY") + if not api_key: + logger.warning("No OpenRouter API key found. Using fallback heuristics.") + return fallback_header_detection(doc) - prompt =LLM_prompt + "\n\nLines:\n" + "\n".join(lines_for_prompt) - - logger.debug(f"Full prompt length: {len(prompt)} characters") - # Changed: Print entire prompt, not truncated - print("=" * 80) - print("FULL LLM PROMPT:") - print(prompt) - print("=" * 80) + # Simplified prompt for faster processing + simplified_prompt = """ + Analyze the following text lines from a PDF document. + Identify which lines are headers/titles and suggest a hierarchy level (1 for main headers, 2 for subheaders, etc.). + Return only a JSON array of objects with keys: text, page, suggested_level. - # Also log to file - try: - with open("full_prompt.txt", "w", encoding="utf-8") as f: - f.write(prompt) - logger.info("Full prompt saved to full_prompt.txt") - except Exception as e: - logger.error(f"Could not save prompt to file: {e}") + Example: [{"text": "Introduction", "page": 3, "suggested_level": 1}, ...] + """ - if not api_key: - # No API key: return empty so caller can fallback to heuristics - logger.error("No API key provided") - return [] + # Collect text from first 20 pages max for HuggingFace + total_pages = len(doc) + start_page = 0 + end_page = min(20, total_pages) # Limit pages for HuggingFace + + lines_for_prompt = [] + for pno in range(start_page, end_page): + page = doc.load_page(pno) + text = page.get_text() + if text.strip(): + lines = text.split('\n') + for line in lines: + if line.strip(): + lines_for_prompt.append(f"PAGE {pno+1}: {line.strip()}") + if not lines_for_prompt: + return fallback_header_detection(doc) + + prompt = simplified_prompt + "\n\nLines:\n" + "\n".join(lines_for_prompt[:100]) # Limit lines + + # Make API call url = "https://openrouter.ai/api/v1/chat/completions" - # Build headers following the OpenRouter example headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", - "HTTP-Referer": os.getenv("OPENROUTER_REFERER", ""), - "X-Title": os.getenv("OPENROUTER_X_TITLE", ""), - # "X-Request-Timestamp": str(unix_timestamp), - # "X-Request-Datetime": current_time, } - - # Log request details (without exposing full API key) - logger.info(f"Making request to OpenRouter with model: {model}") - logger.debug(f"Headers (API key masked): { {k: '***' if k == 'Authorization' else v for k, v in headers.items()} }") - - # Wrap the prompt as the example 'content' array expected by OpenRouter body = { "model": model, "messages": [ { "role": "user", - "content": [ - {"type": "text", "text": prompt} - ] + "content": prompt } - ] + ], + "max_tokens": 2000 } - # print(f"Request sent at: {current_time}") - - # print(f"Unix timestamp: {unix_timestamp}") - # Debug: log request body (truncated) and write raw response for inspection + try: - # Changed: Log full body (excluding prompt text which is already logged) - logger.debug(f"Request body (without prompt text): { {k: v if k != 'messages' else '[...prompt...]' for k, v in body.items()} }") - - # Removed timeout parameter - resp = requests.post( - url=url, - headers=headers, - data=json.dumps(body) - ) - - logger.info(f"HTTP Response Status: {resp.status_code}") + resp = requests.post(url, headers=headers, json=body, timeout=30) resp.raise_for_status() + rj = resp.json() - resp_text = resp.text - # Changed: Print entire response - print("=" * 80) - print("FULL LLM RESPONSE:") - print(resp_text) - print("=" * 80) - - logger.info(f"LLM raw response length: {len(resp_text)}") + # Extract response + text_reply = rj.get('choices', [{}])[0].get('message', {}).get('content', '') - # Save raw response for offline inspection + # Parse JSON from response + import json as json_module try: - with open("llm_debug.json", "w", encoding="utf-8") as fh: - fh.write(resp_text) - logger.info("Raw response saved to llm_debug.json") - except Exception as e: - logger.error(f"Warning: could not write llm_debug.json: {e}") + # Find JSON array in response + start = text_reply.find('[') + end = text_reply.rfind(']') + 1 + if start != -1 and end != -1: + json_str = text_reply[start:end] + parsed = json_module.loads(json_str) + else: + parsed = [] + except: + parsed = [] - rj = resp.json() - logger.info(f"LLM parsed response type: {type(rj)}") - if isinstance(rj, dict): - logger.debug(f"Response keys: {list(rj.keys())}") + # Format output + out = [] + for obj in parsed: + if isinstance(obj, dict): + t = obj.get('text') + page = obj.get('page') + level = obj.get('suggested_level') + if t and page: + out.append({ + 'text': t, + 'page': page - 1, # Convert to 0-indexed + 'suggested_level': level, + 'confidence': 1.0 + }) + + logger.info(f"Identified {len(out)} headers") + return out - except requests.exceptions.RequestException as e: - logger.error(f"HTTP request failed: {repr(e)}") - return [] except Exception as e: - logger.error(f"LLM call failed: {repr(e)}") - return [] + logger.error(f"OpenRouter API error: {e}") + return fallback_header_detection(doc) + +def fallback_header_detection(doc): + """Fallback header detection using font heuristics""" + headers = [] - # Extract textual reply robustly - text_reply = None - if isinstance(rj, dict): - choices = rj.get('choices') or [] - logger.debug(f"Number of choices in response: {len(choices)}") + # Check only first 30 pages for efficiency + for page_num in range(min(len(doc), 30)): + page = doc.load_page(page_num) + blocks = page.get_text("dict")["blocks"] - if choices: - for i, c in enumerate(choices): - logger.debug(f"Choice {i}: {c}") - - c0 = choices[0] - msg = c0.get('message') or c0.get('delta') or {} - content = msg.get('content') - - if isinstance(content, list): - logger.debug(f"Content is a list with {len(content)} items") - for idx, c in enumerate(content): - if c.get('type') == 'text' and c.get('text'): - text_reply = c.get('text') - logger.debug(f"Found text reply in content[{idx}], length: {len(text_reply)}") - break - elif isinstance(content, str): - text_reply = content - logger.debug(f"Content is string, length: {len(text_reply)}") - elif isinstance(msg, dict) and msg.get('content') and isinstance(msg.get('content'), dict): - text_reply = msg.get('content').get('text') - logger.debug(f"Found text in nested content dict") - - # Fallback extraction - if not text_reply: - logger.debug("Trying fallback extraction from choices") - for c in rj.get('choices', []): - if isinstance(c.get('text'), str): - text_reply = c.get('text') - logger.debug(f"Found text reply in choice.text, length: {len(text_reply)}") - break - - if not text_reply: - logger.error("Could not extract text reply from response") - # Changed: Print the entire response structure for debugging - print("=" * 80) - print("FAILED TO EXTRACT TEXT REPLY. FULL RESPONSE STRUCTURE:") - print(json.dumps(rj, indent=2)) - print("=" * 80) - return [] - - # Changed: Print the extracted text reply - print("=" * 80) - print("EXTRACTED TEXT REPLY:") - print(text_reply) - print("=" * 80) - - logger.info(f"Extracted text reply length: {len(text_reply)}") - logger.debug(f"First 500 chars of reply: {text_reply[:500]}...") - - s = text_reply.strip() - start = s.find('[') - end = s.rfind(']') - js = s[start:end+1] if start != -1 and end != -1 else s - - logger.debug(f"Looking for JSON array: start={start}, end={end}") - logger.debug(f"Extracted JSON string (first 500 chars): {js[:500]}...") - - try: - parsed = json.loads(js) - logger.info(f"Successfully parsed JSON, got {len(parsed)} items") - except json.JSONDecodeError as e: - logger.error(f"Failed to parse JSON: {e}") - logger.error(f"JSON string that failed to parse: {js[:1000]}") - # Try to find any JSON-like structure - try: - # Try to extract any JSON array - import re - json_pattern = r'\[\s*\{.*?\}\s*\]' - matches = re.findall(json_pattern, text_reply, re.DOTALL) - if matches: - logger.info(f"Found {len(matches)} potential JSON arrays via regex") - for i, match in enumerate(matches): - try: - parsed = json.loads(match) - logger.info(f"Successfully parsed regex match {i} with {len(parsed)} items") - break - except json.JSONDecodeError as e2: - logger.debug(f"Regex match {i} also failed: {e2}") - continue - else: - logger.error("All regex matches failed to parse") - return [] - else: - logger.error("No JSON-like pattern found via regex") - return [] - except Exception as e2: - logger.error(f"Regex extraction also failed: {e2}") - return [] - - # Log parsed results - logger.info(f"Parsed {len(parsed)} header items:") - for i, obj in enumerate(parsed[:10]): # Log first 10 items - logger.info(f" Item {i}: {obj}") - - # Normalize parsed entries and return - out = [] - for obj in parsed: - t = obj.get('text') - page = int(obj.get('page')) if obj.get('page') else None - level = obj.get('suggested_level') - conf = float(obj.get('confidence') or 0) - if t and page is not None: - out.append({'text': t, 'page': page-1, 'suggested_level': level, 'confidence': conf}) - - logger.info(f"Returning {len(out)} valid header entries") - return out + for block in blocks: + if block.get("type") == 0: # Text block + for line in block.get("lines", []): + if line.get("spans"): + span = line["spans"][0] + text = span.get("text", "").strip() + + # Simple heuristics for headers + if (text and + len(text) < 100 and # Headers are usually short + not text.endswith('.') and # Not regular sentences + text[0].isupper() and # Starts with capital + any(c.isalpha() for c in text)): # Contains letters + + headers.append({ + 'text': text, + 'page': page_num, + 'suggested_level': 2 if len(text.split()) < 5 else 3, + 'confidence': 0.7 + }) + + # Deduplicate + unique_headers = [] + seen = set() + for h in headers: + key = (h['text'].lower(), h['page']) + if key not in seen: + seen.add(key) + unique_headers.append(h) + return unique_headers -def extract_section_under_header_tobebilledMultiplePDFS(multiplePDF_Paths,model,identified_headers): - logger.debug(f"Starting function") - # keywordstoSkip=["installation", "execution", "miscellaneous items", "workmanship", "testing", "labeling"] - filenames=[] - keywords = {'installation', 'execution', 'miscellaneous items', 'workmanship', 'testing', 'labeling'} - - arrayofPDFS=multiplePDF_Paths.split(',') - print(multiplePDF_Paths) - print(arrayofPDFS) - docarray=[] - jsons=[] - df = pd.DataFrame(columns=["PDF Name","NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2","BodyText"]) - for pdf_path in arrayofPDFS: - headertoContinue1 = False - headertoContinue2=False - Alltexttobebilled='' - parsed_url = urlparse(pdf_path) - filename = os.path.basename(parsed_url.path) - filename = unquote(filename) # decode URL-encoded characters - filenames.append(filename) - logger.debug(f"Starting with pdf: {filename}") - # Optimized URL handling - if pdf_path and ('http' in pdf_path or 'dropbox' in pdf_path): - pdf_path = pdf_path.replace('dl=0', 'dl=1') - - # Cache frequently used values - response = requests.get(pdf_path) - pdf_content = BytesIO(response.content) - if not pdf_content: - raise ValueError("No valid PDF content found.") - - doc = fitz.open(stream=pdf_content, filetype="pdf") - logger.info(f"Total pages in document: {len(doc)}") - docHighlights = fitz.open(stream=pdf_content, filetype="pdf") - most_common_font_size, most_common_color, most_common_font = get_regular_font_size_and_color(doc) +def process_single_pdf(pdf_path, model="openai/gpt-3.5-turbo", LLM_prompt=None): + """Process a single PDF for HuggingFace Spaces""" + logger.info(f"Processing PDF: {pdf_path}") - # Precompute regex patterns - dot_pattern = re.compile(r'\.{3,}') - url_pattern = re.compile(r'https?://\S+|www\.\S+') - - + try: + # Open PDF + doc = openPDF(pdf_path) + if doc is None: + return None, None + + # Get basic document info toc_pages = get_toc_page_numbers(doc) - logger.info(f"Skipping TOC pages: Range {toc_pages}") - # headers, top_3_font_sizes, smallest_font_size, headersSpans = extract_headers( - # doc, toc_pages, most_common_font_size, most_common_color, most_common_font, top_margin, bottom_margin - # ) - logger.info(f"Starting model run.") - # identified_headers = identify_headers_with_openrouterNEWW(doc, model) - allheaders_LLM=[] - for h in identified_headers: - if int(h["page"]) in toc_pages: - continue - if h['text']: - allheaders_LLM.append(h['text']) - - logger.info(f"Done with model.") - print('identified_headers',identified_headers) - headers_json=headers_with_location(doc,identified_headers) - headers=filter_headers_outside_toc(headers_json,toc_pages) - - hierarchy=build_hierarchy_from_llm(headers) - listofHeaderstoMarkup = get_leaf_headers_with_paths(hierarchy) - logger.info(f"Hierarchy built as {hierarchy}") - - # Precompute all children headers once - allchildrenheaders = [normalize_text(item['text']) for item, p in listofHeaderstoMarkup] - allchildrenheaders_set = set(allchildrenheaders) # For faster lookups - - # df = pd.DataFrame(columns=["NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2","BodyText"]) - dictionaryNBS={} - data_list_JSON = [] - json_output=[] - currentgroupname='' - # if len(top_3_font_sizes)==3: - # mainHeaderFontSize, subHeaderFontSize, subsubheaderFontSize = top_3_font_sizes - # elif len(top_3_font_sizes)==2: - # mainHeaderFontSize= top_3_font_sizes[0] - # subHeaderFontSize= top_3_font_sizes[1] - # subsubheaderFontSize= top_3_font_sizes[1] - - - # Preload all pages to avoid repeated loading - # pages = [doc.load_page(page_num) for page_num in range(len(doc)) if page_num not in toc_pages] - - for heading_to_searchDict,pathss in listofHeaderstoMarkup: - - heading_to_search = heading_to_searchDict['text'] - heading_to_searchPageNum = heading_to_searchDict['page'] - paths=heading_to_searchDict['path'] - xloc=heading_to_searchDict.get('x', 0) # Use get() with default - yloc=heading_to_searchDict.get('y', 0) # Use get() with default + # Identify headers (with fallback) + if LLM_prompt and os.getenv("OPENROUTER_API_KEY"): + identified_headers = identify_headers_with_openrouter(pdf_path, model, LLM_prompt) + else: + identified_headers = fallback_header_detection(doc) - # Initialize variables - headertoContinue1 = False - headertoContinue2 = False - matched_header_line = None - done = False - collecting = False - collected_lines = [] - page_highlights = {} - current_bbox = {} - last_y1s = {} - mainHeader = '' - subHeader = '' - matched_header_line_norm = heading_to_search - break_collecting = False - heading_norm = normalize_text(heading_to_search) - paths_norm = [normalize_text(p) for p in paths[0]] if paths and paths[0] else [] - for page_num in range(heading_to_searchPageNum,len(doc)): - # print(heading_to_search) - if paths[0].strip().lower() != currentgroupname.strip().lower(): - Alltexttobebilled+= paths[0] +'\n' - currentgroupname=paths[0] - # print(paths[0]) - - - if page_num in toc_pages: - continue - if break_collecting: - break - page=doc[page_num] - page_height = page.rect.height - blocks = page.get_text("dict")["blocks"] - - for block in blocks: - if break_collecting: - break - - lines = block.get("lines", []) - i = 0 - while i < len(lines): - if break_collecting: - break - - spans = lines[i].get("spans", []) - if not spans: - i += 1 - continue - - y0 = spans[0]["bbox"][1] - y1 = spans[0]["bbox"][3] - if y0 < top_margin or y1 > (page_height - bottom_margin): - i += 1 - continue - - line_text = get_spaced_text_from_spans(spans).lower() - line_text_norm = normalize_text(line_text) - - # Combine with next line if available - if i + 1 < len(lines): - next_spans = lines[i + 1].get("spans", []) - next_line_text = get_spaced_text_from_spans(next_spans).lower() - combined_line_norm = normalize_text(line_text + " " + next_line_text) - else: - combined_line_norm = line_text_norm - - # Check if we should continue processing - if combined_line_norm and combined_line_norm in paths[0]: - - headertoContinue1 = combined_line_norm - if combined_line_norm and combined_line_norm in paths[-2]: - - headertoContinue2 = combined_line_norm - # if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : - last_path = paths[-2].lower() - # if any(word in paths[-2].lower() for word in keywordstoSkip): - # if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() or 'workmanship' in paths[-2].lower() or 'testing' in paths[-2].lower() or 'labeling' in paths[-2].lower(): - if any(keyword in last_path for keyword in keywords): - stringtowrite='Not to be billed' - logger.info(f"Keyword found. Not to be billed activated. keywords: {keywords}") - else: - stringtowrite='To be billed' - if stringtowrite=='To be billed': - # Alltexttobebilled+= combined_line_norm ################################################# - if matched_header_line_norm in combined_line_norm: - Alltexttobebilled+='\n' - Alltexttobebilled+= ' '+combined_line_norm - # Optimized header matching - existsfull = ( - ( combined_line_norm in allchildrenheaders_set or - combined_line_norm in allchildrenheaders ) and heading_to_search in combined_line_norm - ) - - # New word-based matching - current_line_words = set(combined_line_norm.split()) - heading_words = set(heading_norm.split()) - all_words_match = current_line_words.issubset(heading_words) and len(current_line_words) > 0 - - substring_match = ( - heading_norm in combined_line_norm or - combined_line_norm in heading_norm or - all_words_match # Include the new word-based matching - ) - # substring_match = ( - # heading_norm in combined_line_norm or - # combined_line_norm in heading_norm - # ) - - if (substring_match and existsfull and not collecting and - len(combined_line_norm) > 0 ):#and (headertoContinue1 or headertoContinue2) ): - - # Check header conditions more efficiently - # header_spans = [ - # span for span in spans - # if (is_header(span, most_common_font_size, most_common_color, most_common_font) - # # and span['size'] >= subsubheaderFontSize - # and span['size'] < mainHeaderFontSize) - # ] - if stringtowrite.startswith('To') : - collecting = True - # if stringtowrite=='To be billed': - # Alltexttobebilled+='\n' - # matched_header_font_size = max(span["size"] for span in header_spans) - - # collected_lines.append(line_text) - valid_spans = [span for span in spans if span.get("bbox")] - - if valid_spans: - x0s = [span["bbox"][0] for span in valid_spans] - x1s = [span["bbox"][2] for span in valid_spans] - y0s = [span["bbox"][1] for span in valid_spans] - y1s = [span["bbox"][3] for span in valid_spans] - - header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] - - if page_num in current_bbox: - cb = current_bbox[page_num] - current_bbox[page_num] = [ - min(cb[0], header_bbox[0]), - min(cb[1], header_bbox[1]), - max(cb[2], header_bbox[2]), - max(cb[3], header_bbox[3]) - ] - else: - current_bbox[page_num] = header_bbox - last_y1s[page_num] = header_bbox[3] - x0, y0, x1, y1 = header_bbox - - zoom = 200 - left = int(x0) - top = int(y0) - zoom_str = f"{zoom},{left},{top}" - pageNumberFound = page_num + 1 - - # Build the query parameters - params = { - 'pdfLink': pdf_path, # Your PDF link - 'keyword': heading_to_search, # Your keyword (could be a string or list) - } - - # URL encode each parameter - encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} - - # Construct the final encoded link - encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) - - # Get current date and time - now = datetime.now() - - # Format the output - formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") - # Optionally, add the URL to a DataFrame - - - data_entry = { - "PDF Name":filename, - "NBSLink": zoom_str, - "Subject": heading_to_search, - "Page": str(pageNumberFound), - "Author": "ADR", - "Creation Date": formatted_time, - "Layer": "Initial", - "Code": stringtowrite, - # "head above 1": paths[-2], - # "head above 2": paths[0], - "BodyText":collected_lines, - "MC Connnection": 'Go to ' + paths[0].strip().split()[0] +'/'+ heading_to_search.strip().split()[0] + ' in '+ filename - } - # Dynamically add "head above 1", "head above 2", ... depending on the number of levels - for i, path_text in enumerate(paths[:-1]): # skip the last one because that's the current heading - data_entry[f"head above {i+1}"] = path_text - data_list_JSON.append(data_entry) - - # Convert list to JSON - # json_output = [data_list_JSON] - # json_output = json.dumps(data_list_JSON, indent=4) - - i += 2 - continue - else: - if (substring_match and not collecting and - len(combined_line_norm) > 0): # and (headertoContinue1 or headertoContinue2) ): - - # Calculate word match percentage - word_match_percent = words_match_ratio(heading_norm, combined_line_norm) * 100 - - # Check if at least 70% of header words exist in this line - meets_word_threshold = word_match_percent >= 100 - - # Check header conditions (including word threshold) - # header_spans = [ - # span for span in spans - # if (is_header(span, most_common_font_size, most_common_color, most_common_font) - # # and span['size'] >= subsubheaderFontSize - # and span['size'] < mainHeaderFontSize) - # ] - - if (meets_word_threshold or same_start_word(heading_to_search, combined_line_norm) ) and stringtowrite.startswith('To'): - collecting = True - if stringtowrite=='To be billed': - Alltexttobebilled+='\n' - # if stringtowrite=='To be billed': - # Alltexttobebilled+= ' '+ combined_line_norm - # matched_header_font_size = max(span["size"] for span in header_spans) - - collected_lines.append(line_text) - valid_spans = [span for span in spans if span.get("bbox")] - - if valid_spans: - x0s = [span["bbox"][0] for span in valid_spans] - x1s = [span["bbox"][2] for span in valid_spans] - y0s = [span["bbox"][1] for span in valid_spans] - y1s = [span["bbox"][3] for span in valid_spans] - - header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] - - if page_num in current_bbox: - cb = current_bbox[page_num] - current_bbox[page_num] = [ - min(cb[0], header_bbox[0]), - min(cb[1], header_bbox[1]), - max(cb[2], header_bbox[2]), - max(cb[3], header_bbox[3]) - ] - else: - current_bbox[page_num] = header_bbox - - last_y1s[page_num] = header_bbox[3] - x0, y0, x1, y1 = header_bbox - zoom = 200 - left = int(x0) - top = int(y0) - zoom_str = f"{zoom},{left},{top}" - pageNumberFound = page_num + 1 - - # Build the query parameters - params = { - 'pdfLink': pdf_path, # Your PDF link - 'keyword': heading_to_search, # Your keyword (could be a string or list) - } - - # URL encode each parameter - encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} - - # Construct the final encoded link - encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) - - # Get current date and time - now = datetime.now() - - # Format the output - formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") - # Optionally, add the URL to a DataFrame - - logger.info(f"Logging into table") - data_entry = { - "PDF Name":filename, - "NBSLink": zoom_str, - "Subject": heading_to_search, - "Page": str(pageNumberFound), - "Author": "ADR", - "Creation Date": formatted_time, - "Layer": "Initial", - "Code": stringtowrite, - # "head above 1": paths[-2], - # "head above 2": paths[0], - "BodyText":collected_lines, - "MC Connnection": 'Go to ' + paths[0].strip().split()[0] +'/'+ heading_to_search.strip().split()[0] + ' in '+ filename - } - # Dynamically add "head above 1", "head above 2", ... depending on the number of levels - for i, path_text in enumerate(paths[:-1]): # skip the last one because that's the current heading - data_entry[f"head above {i+1}"] = path_text - data_list_JSON.append(data_entry) - - # Convert list to JSON - # json_output = [data_list_JSON] - # json_output = json.dumps(data_list_JSON, indent=4) - - - i += 2 - continue - if collecting: - norm_line = normalize_text(line_text) - - # Optimized URL check - if url_pattern.match(norm_line): - line_is_header = False - else: - # line_is_header = any(is_header(span, most_common_font_size, most_common_color, most_common_font) for span in spans) - def normalize(text): - return " ".join(text.lower().split()) - - line_text = " ".join(span["text"] for span in spans).strip() - - line_is_header = any( - normalize(line_text) == normalize(header) - for header in allheaders_LLM - ) - if line_is_header: - header_font_size = max(span["size"] for span in spans) - is_probably_real_header = ( - # header_font_size >= matched_header_font_size and - # is_header(spans[0], most_common_font_size, most_common_color, most_common_font) and - len(line_text.strip()) > 2 - ) - - if (norm_line != matched_header_line_norm and - norm_line != heading_norm and - is_probably_real_header): - if line_text not in heading_norm: - collecting = False - done = True - headertoContinue1 = False - headertoContinue2=False - for page_num, bbox in current_bbox.items(): - bbox[3] = last_y1s.get(page_num, bbox[3]) - page_highlights[page_num] = bbox - highlight_boxes(docHighlights, page_highlights,stringtowrite) - - break_collecting = True - break - - if break_collecting: - break - - collected_lines.append(line_text) - valid_spans = [span for span in spans if span.get("bbox")] - if valid_spans: - x0s = [span["bbox"][0] for span in valid_spans] - x1s = [span["bbox"][2] for span in valid_spans] - y0s = [span["bbox"][1] for span in valid_spans] - y1s = [span["bbox"][3] for span in valid_spans] - - line_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] - - if page_num in current_bbox: - cb = current_bbox[page_num] - current_bbox[page_num] = [ - min(cb[0], line_bbox[0]), - min(cb[1], line_bbox[1]), - max(cb[2], line_bbox[2]), - max(cb[3], line_bbox[3]) - ] - else: - current_bbox[page_num] = line_bbox - - last_y1s[page_num] = line_bbox[3] - i += 1 - - if not done: - for page_num, bbox in current_bbox.items(): - bbox[3] = last_y1s.get(page_num, bbox[3]) - page_highlights[page_num] = bbox - if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : - stringtowrite='Not to be billed' - else: - stringtowrite='To be billed' - highlight_boxes(docHighlights, page_highlights,stringtowrite) - docarray.append(docHighlights) - if data_list_JSON and not data_list_JSON[-1]["BodyText"] and collected_lines: - data_list_JSON[-1]["BodyText"] = collected_lines[1:] if len(collected_lines) > 0 else [] - # Final cleanup of the JSON data before returning - for entry in data_list_JSON: - # Check if BodyText exists and has content - if isinstance(entry.get("BodyText"), list) and len(entry["BodyText"]) > 0: - # Check if the first line of the body is essentially the same as the Subject - first_line = normalize_text(entry["BodyText"][0]) - subject = normalize_text(entry["Subject"]) - - # If they match or the subject is inside the first line, remove it - if subject in first_line or first_line in subject: - entry["BodyText"] = entry["BodyText"][1:] - jsons.append(data_list_JSON) - logger.info(f"Markups done! Uploading to dropbox") - logger.info(f"Uploaded and Readyy!") - - - return jsons,identified_headers - - - + # Process headers + headers_json = headers_with_location(doc, identified_headers) + headers = filter_headers_outside_toc(headers_json, toc_pages) + hierarchy = build_hierarchy_from_llm(headers) + + # Create simple output + results = [] + for header in hierarchy: + results.append({ + "text": header.get("text", ""), + "page": header.get("page", 0) + 1, + "level": header.get("level", 0), + "font_size": header.get("size", 0) + }) + + # Create DataFrame + df = pd.DataFrame(results) + + # Save to Excel + output_path = "header_analysis.xlsx" + df.to_excel(output_path, index=False) + + logger.info(f"Processed {len(results)} headers") + return output_path, df.head(10).to_dict('records') + + except Exception as e: + logger.error(f"Error processing PDF: {e}") + return None, None -def testFunction(pdf_path, model,LLM_prompt): - Alltexttobebilled='' - alltextWithoutNotbilled='' - # keywordstoSkip=["installation", "execution", "miscellaneous items", "workmanship", "testing", "labeling"] - - headertoContinue1 = False - headertoContinue2=False +def simple_interface(pdf_path, use_llm=True, model="openai/gpt-3.5-turbo"): + """ + Simplified interface for HuggingFace Spaces + """ + logger.info("Starting PDF header extraction") - parsed_url = urlparse(pdf_path) - filename = os.path.basename(parsed_url.path) - filename = unquote(filename) # decode URL-encoded characters - - # Optimized URL handling - if pdf_path and ('http' in pdf_path or 'dropbox' in pdf_path): - pdf_path = pdf_path.replace('dl=0', 'dl=1') - - # Cache frequently used values - response = requests.get(pdf_path) - pdf_content = BytesIO(response.content) - if not pdf_content: - raise ValueError("No valid PDF content found.") - - doc = fitz.open(stream=pdf_content, filetype="pdf") - docHighlights = fitz.open(stream=pdf_content, filetype="pdf") - parsed_url = urlparse(pdf_path) - filename = os.path.basename(parsed_url.path) - filename = unquote(filename) # decode URL-encoded characters - -#### Get regular tex font size, style , color - most_common_font_size, most_common_color, most_common_font = get_regular_font_size_and_color(doc) - - # Precompute regex patterns - dot_pattern = re.compile(r'\.{3,}') - url_pattern = re.compile(r'https?://\S+|www\.\S+') - highlighted=[] - processed_subjects = set() # Initialize at the top of testFunction - toc_pages = get_toc_page_numbers(doc) - identified_headers=process_document_in_chunks(len(doc), pdf_path, LLM_prompt, model) - # identified_headers = identify_headers_with_openrouterNEWW(doc, api_key='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')# ['text', fontsize, page number,y] - - # with open("identified_headers.txt", "w", encoding="utf-8") as f: - # json.dump(identified_headers, f, indent=4) - # with open("identified_headers.txt", "r", encoding="utf-8") as f: - # identified_headers = json.load(f) - print(identified_headers) - allheaders_LLM=[] - for h in identified_headers: - if int(h["page"]) in toc_pages: - continue - if h['text']: - allheaders_LLM.append(h['text']) + if not pdf_path: + return "Please provide a PDF URL", None, None - headers_json=headers_with_location(doc,identified_headers) - headers=filter_headers_outside_toc(headers_json,toc_pages) - hierarchy=build_hierarchy_from_llm(headers) - # identify_headers_and_save_excel(hierarchy) - listofHeaderstoMarkup = get_leaf_headers_with_paths(hierarchy) - allchildrenheaders = [normalize_text(item['text']) for item, p in listofHeaderstoMarkup] - allchildrenheaders_set = set(allchildrenheaders) # For faster lookups - # print('allchildrenheaders_set',allchildrenheaders_set) - df = pd.DataFrame(columns=["NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2",'BodyText']) - dictionaryNBS={} - data_list_JSON = [] - for heading_to_searchDict,pathss in listofHeaderstoMarkup: - heading_to_search = heading_to_searchDict['text'] - heading_to_searchPageNum = heading_to_searchDict['page'] - paths=heading_to_searchDict['path'] - xloc=heading_to_searchDict.get('x', 0) # Use get() with default - yloc=heading_to_searchDict.get('y', 0) # Use get() with default + try: + # Default prompt + LLM_prompt = """Analyze the text lines and identify headers with hierarchy levels.""" - # Initialize variables - headertoContinue1 = False - headertoContinue2 = False - matched_header_line = None - done = False - collecting = False - collected_lines = [] - page_highlights = {} - current_bbox = {} - last_y1s = {} - mainHeader = '' - subHeader = '' - matched_header_line_norm = heading_to_search - break_collecting = False - heading_norm = normalize_text(heading_to_search) - paths_norm = [normalize_text(p) for p in paths[0]] if paths and paths[0] else [] + # Process the PDF + excel_path, sample_data = process_single_pdf(pdf_path, model, LLM_prompt if use_llm else None) - for page_num in range(heading_to_searchPageNum,len(doc)): - if page_num in toc_pages: - continue - if break_collecting: - break - page=doc[page_num] - page_height = page.rect.height - blocks = page.get_text("dict")["blocks"] - - for block in blocks: - if break_collecting: - break - - lines = block.get("lines", []) - i = 0 - while i < len(lines): - if break_collecting: - break - - spans = lines[i].get("spans", []) - if not spans: - i += 1 - continue - - # y0 = spans[0]["bbox"][1] - # y1 = spans[0]["bbox"][3] - x0 = spans[0]["bbox"][0] # left - x1 = spans[0]["bbox"][2] # right - y0 = spans[0]["bbox"][1] # top - y1 = spans[0]["bbox"][3] # bottom - - if y0 < top_margin or y1 > (page_height - bottom_margin): - i += 1 - continue - - line_text = get_spaced_text_from_spans(spans).lower() - line_text_norm = normalize_text(line_text) - - # Combine with next line if available - if i + 1 < len(lines): - next_spans = lines[i + 1].get("spans", []) - next_line_text = get_spaced_text_from_spans(next_spans).lower() - combined_line_norm = normalize_text(line_text + " " + next_line_text) - else: - combined_line_norm = line_text_norm - - # Check if we should continue processing - if combined_line_norm and combined_line_norm in paths[0]: - - headertoContinue1 = combined_line_norm - if combined_line_norm and combined_line_norm in paths[-2]: - - headertoContinue2 = combined_line_norm - # print('paths',paths) - - # if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : - # if any(word in paths[-2].lower() for word in keywordstoSkip): - # stringtowrite='Not to be billed' - # else: - stringtowrite='To be billed' - if stringtowrite!='To be billed': - alltextWithoutNotbilled+= combined_line_norm ################################################# - # Optimized header matching - existsfull = ( - ( combined_line_norm in allchildrenheaders_set or - combined_line_norm in allchildrenheaders ) and heading_to_search in combined_line_norm - ) - # existsfull=False - # if xloc==x0 and yloc ==y0: - # existsfull=True - # New word-based matching - current_line_words = set(combined_line_norm.split()) - heading_words = set(heading_norm.split()) - all_words_match = current_line_words.issubset(heading_words) and len(current_line_words) > 0 - - substring_match = ( - heading_norm in combined_line_norm or - combined_line_norm in heading_norm or - all_words_match # Include the new word-based matching - ) - # substring_match = ( - # heading_norm in combined_line_norm or - # combined_line_norm in heading_norm - # ) - - if ( substring_match and existsfull and not collecting and - len(combined_line_norm) > 0 ):#and (headertoContinue1 or headertoContinue2) ): - - # Check header conditions more efficiently - # header_spans = [ - # span for span in spans - # if (is_header(span, most_common_font_size, most_common_color, most_common_font) ) - # # and span['size'] >= subsubheaderFontSize - # # and span['size'] < mainHeaderFontSize) - # ] - if stringtowrite.startswith('To'): - collecting = True - # matched_header_font_size = max(span["size"] for span in header_spans) - Alltexttobebilled+= ' '+ combined_line_norm - - # collected_lines.append(line_text) - valid_spans = [span for span in spans if span.get("bbox")] - - if valid_spans: - x0s = [span["bbox"][0] for span in valid_spans] - x1s = [span["bbox"][2] for span in valid_spans] - y0s = [span["bbox"][1] for span in valid_spans] - y1s = [span["bbox"][3] for span in valid_spans] - - header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] - - if page_num in current_bbox: - cb = current_bbox[page_num] - current_bbox[page_num] = [ - min(cb[0], header_bbox[0]), - min(cb[1], header_bbox[1]), - max(cb[2], header_bbox[2]), - max(cb[3], header_bbox[3]) - ] - else: - current_bbox[page_num] = header_bbox - last_y1s[page_num] = header_bbox[3] - x0, y0, x1, y1 = header_bbox - - zoom = 200 - left = int(x0) - top = int(y0) - zoom_str = f"{zoom},{left},{top}" - pageNumberFound = page_num + 1 - - # Build the query parameters - params = { - 'pdfLink': pdf_path, # Your PDF link - 'keyword': heading_to_search, # Your keyword (could be a string or list) - } - - # URL encode each parameter - encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} - - # Construct the final encoded link - encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) - - # Get current date and time - now = datetime.now() - - # Format the output - formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") - # Optionally, add the URL to a DataFrame - - - # Create the data entry only if the subject is unique - if heading_to_search not in processed_subjects: - data_entry = { - "NBSLink": zoom_str, - "Subject": heading_to_search, - "Page": str(pageNumberFound), - "Author": "ADR", - "Creation Date": formatted_time, - "Layer": "Initial", - "Code": stringtowrite, - "BodyText": collected_lines, - "MC Connnection": 'Go to ' + paths[0].strip().split()[0] + '/' + heading_to_search.strip().split()[0] + ' in ' + filename - } - - # Dynamically add hierarchy paths - for i, path_text in enumerate(paths[:-1]): - data_entry[f"head above {i+1}"] = path_text - - # Append to the list and mark this subject as processed - data_list_JSON.append(data_entry) - processed_subjects.add(heading_to_search) - else: - print(f"Skipping duplicate data entry for Subject: {heading_to_search}") - - # Convert list to JSON - json_output = json.dumps(data_list_JSON, indent=4) - - i += 1 - continue - else: - if (substring_match and not collecting and - len(combined_line_norm) > 0): # and (headertoContinue1 or headertoContinue2) ): - - # Calculate word match percentage - word_match_percent = words_match_ratio(heading_norm, combined_line_norm) * 100 - - # Check if at least 70% of header words exist in this line - meets_word_threshold = word_match_percent >= 100 - - # Check header conditions (including word threshold) - # header_spans = [ - # span for span in spans - # if (is_header(span, most_common_font_size, most_common_color, most_common_font)) - # # and span['size'] >= subsubheaderFontSize - # # and span['size'] < mainHeaderFontSize) - # ] - - if (meets_word_threshold or same_start_word(heading_to_search, combined_line_norm) ) and stringtowrite.startswith('To'): - collecting = True - # matched_header_font_size = max(span["size"] for span in header_spans) - Alltexttobebilled+= ' '+ combined_line_norm - - collected_lines.append(line_text) - valid_spans = [span for span in spans if span.get("bbox")] - - if valid_spans: - x0s = [span["bbox"][0] for span in valid_spans] - x1s = [span["bbox"][2] for span in valid_spans] - y0s = [span["bbox"][1] for span in valid_spans] - y1s = [span["bbox"][3] for span in valid_spans] - - header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] - - if page_num in current_bbox: - cb = current_bbox[page_num] - current_bbox[page_num] = [ - min(cb[0], header_bbox[0]), - min(cb[1], header_bbox[1]), - max(cb[2], header_bbox[2]), - max(cb[3], header_bbox[3]) - ] - else: - current_bbox[page_num] = header_bbox - - last_y1s[page_num] = header_bbox[3] - x0, y0, x1, y1 = header_bbox - zoom = 200 - left = int(x0) - top = int(y0) - zoom_str = f"{zoom},{left},{top}" - pageNumberFound = page_num + 1 - - # Build the query parameters - params = { - 'pdfLink': pdf_path, # Your PDF link - 'keyword': heading_to_search, # Your keyword (could be a string or list) - } - - # URL encode each parameter - encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} - - # Construct the final encoded link - encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) - - # Get current date and time - now = datetime.now() - - # Format the output - formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") - # Optionally, add the URL to a DataFrame - - - # Create the data entry only if the subject is unique - if heading_to_search not in processed_subjects: - data_entry = { - "NBSLink": zoom_str, - "Subject": heading_to_search, - "Page": str(pageNumberFound), - "Author": "ADR", - "Creation Date": formatted_time, - "Layer": "Initial", - "Code": stringtowrite, - "BodyText": collected_lines, - "MC Connnection": 'Go to ' + paths[0].strip().split()[0] + '/' + heading_to_search.strip().split()[0] + ' in ' + filename - } - - # Dynamically add hierarchy paths - for i, path_text in enumerate(paths[:-1]): - data_entry[f"head above {i+1}"] = path_text - - # Append to the list and mark this subject as processed - data_list_JSON.append(data_entry) - processed_subjects.add(heading_to_search) - else: - print(f"Skipping duplicate data entry for Subject: {heading_to_search}") - # Convert list to JSON - json_output = json.dumps(data_list_JSON, indent=4) - - - i += 2 - continue - if collecting: - norm_line = normalize_text(line_text) - def normalize(text): - if isinstance(text, list): - text = " ".join(text) - return " ".join(text.lower().split()) - - def is_similar(a, b, threshold=0.75): - return SequenceMatcher(None, a, b).ratio() >= threshold - # Optimized URL check - if url_pattern.match(norm_line): - line_is_header = False - else: - line_is_header = any(is_header(span, most_common_font_size, most_common_color, most_common_font,allheaders_LLM) for span in spans) - # def normalize(text): - # return " ".join(text.lower().split()) - # line_text = " ".join(span["text"] for span in spans).strip() - # line_is_header = any( normalize(line_text) == normalize(header) for header in allheaders_LLM ) - - - # for line_text in lines: - # if collecting: - # # Join all spans into one line - # line_text = " ".join(span["text"] for span in spans).strip() - # norm_line = normalize(line_text) - - # # Get max font size in this line - # max_font_size = max(span.get("size", 0) for span in spans) - - # # Skip URLs - # if url_pattern.match(norm_line): - # line_is_header = False - # else: - # text_matches_header = any( - # is_similar(norm_line, normalize(header)) - # if not isinstance(header, list) - # else is_similar(norm_line, normalize(" ".join(header))) - # for header in allheaders_LLM - # ) - - # # ✅ FINAL header - if line_is_header: - header_font_size = max(span["size"] for span in spans) - is_probably_real_header = ( - # header_font_size >= matched_header_font_size and - # is_header(spans[0], most_common_font_size, most_common_color, most_common_font) and - len(line_text.strip()) > 2 - ) - - if (norm_line != matched_header_line_norm and - norm_line != heading_norm and - is_probably_real_header): - if line_text not in heading_norm: - collecting = False - done = True - headertoContinue1 = False - headertoContinue2=False - for page_num, bbox in current_bbox.items(): - bbox[3] = last_y1s.get(page_num, bbox[3]) - page_highlights[page_num] = bbox - can_highlight=False - if [page_num,bbox] not in highlighted: - highlighted.append([page_num,bbox]) - can_highlight=True - if can_highlight: - highlight_boxes(docHighlights, page_highlights,stringtowrite) - - break_collecting = True - - break - - if break_collecting: - break - - - collected_lines.append(line_text) - - valid_spans = [span for span in spans if span.get("bbox")] - if valid_spans: - x0s = [span["bbox"][0] for span in valid_spans] - x1s = [span["bbox"][2] for span in valid_spans] - y0s = [span["bbox"][1] for span in valid_spans] - y1s = [span["bbox"][3] for span in valid_spans] - - line_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] - - if page_num in current_bbox: - cb = current_bbox[page_num] - current_bbox[page_num] = [ - min(cb[0], line_bbox[0]), - min(cb[1], line_bbox[1]), - max(cb[2], line_bbox[2]), - max(cb[3], line_bbox[3]) - ] - else: - current_bbox[page_num] = line_bbox - - last_y1s[page_num] = line_bbox[3] - i += 1 - - if not done: - for page_num, bbox in current_bbox.items(): - bbox[3] = last_y1s.get(page_num, bbox[3]) - page_highlights[page_num] = bbox - # if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : - # stringtowrite='Not to be billed' - # else: - stringtowrite='To be billed' + if excel_path and os.path.exists(excel_path): + # Read the file content for download + with open(excel_path, 'rb') as f: + file_content = f.read() - highlight_boxes(docHighlights, page_highlights,stringtowrite) - - print("Current working directory:", os.getcwd()) - - docHighlights.save("highlighted_output.pdf") - - # dbxTeam = tsadropboxretrieval.ADR_Access_DropboxTeam('user') - # metadata = dbxTeam.sharing_get_shared_link_metadata(pdf_path) - # dbPath = '/TSA JOBS/ADR Test/FIND/' - # pdf_bytes = BytesIO() - # docHighlights.save(pdf_bytes) - # pdflink = tsadropboxretrieval.uploadanyFile(doc=docHighlights, path=dbPath, pdfname=filename) - # json_output=changepdflinks(json_output,pdflink) - # return pdf_bytes.getvalue(), docHighlights , json_output , Alltexttobebilled , alltextWithoutNotbilled , filename - # Final safety check: if the very last entry in our list has an empty BodyText, - # but we have collected_lines, sync them. - if data_list_JSON and not data_list_JSON[-1]["BodyText"] and collected_lines: - data_list_JSON[-1]["BodyText"] = collected_lines[1:] if len(collected_lines) > 0 else [] -# Final cleanup of the JSON data before returning - for entry in data_list_JSON: - # Check if BodyText exists and has content - if isinstance(entry.get("BodyText"), list) and len(entry["BodyText"]) > 0: - # Check if the first line of the body is essentially the same as the Subject - first_line = normalize_text(entry["BodyText"][0]) - subject = normalize_text(entry["Subject"]) + # Create sample preview + if sample_data: + preview_html = "
| Text | Page | Level |
|---|---|---|
| {item['text'][:50]}... | {item['page']} | {item['level']} |
No headers found or could not process.
" - # If they match or the subject is inside the first line, remove it - if subject in first_line or first_line in subject: - entry["BodyText"] = entry["BodyText"][1:] - - # jsons.append(data_list_JSON) - json_output = json.dumps(data_list_JSON, indent=4) - logger.info(f"Markups done!") - logger.info(f"Uploaded and Readyy!") - - - return json_output,identified_headers - - - -def build_subject_body_map(jsons): - subject_body = {} - - for obj in jsons: - subject = obj.get("Subject") - body = obj.get("BodyText", []) - - if subject: - # join body text into a readable paragraph - subject_body[subject.strip()] = " ".join(body) - - return subject_body - -def identify_headers_and_save_excel(pdf_path, model,LLM_prompt): - try: - # result = identify_headers_with_openrouterNEWW(pdf_path, model,LLM_prompt) - print('beginnging identify') - jsons,result = testFunction(pdf_path, model,LLM_prompt) - print('done , will start dataframe',jsons,result) - if not result: - df = pd.DataFrame([{ - "text": None, - "page": None, - "suggested_level": None, - "confidence": None, - "body": None, - "System Message": "No headers were identified by the LLM." - }]) + return preview_html, (excel_path, file_content), "Processing completed successfully!" else: - df = pd.DataFrame(result) - - subject_body_map = {} - - # Safely navigate the nested structure: [ [ [ {dict}, {dict} ] ] ] - for pdf_level in jsons: - if not isinstance(pdf_level, list): - continue - - for section_level in pdf_level: - # If the LLM returns a list of dictionaries here - if isinstance(section_level, list): - for obj in section_level: - if isinstance(obj, dict): - subject = obj.get("Subject") - body = obj.get("BodyText", []) - if subject: - # Ensure body is a list before joining - body_str = " ".join(body) if isinstance(body, list) else str(body) - subject_body_map[subject.strip()] = body_str - - # If the LLM returns a single dictionary here - elif isinstance(section_level, dict): - subject = section_level.get("Subject") - body = section_level.get("BodyText", []) - if subject: - body_str = " ".join(body) if isinstance(body, list) else str(body) - subject_body_map[subject.strip()] = body_str - - # Map the extracted body text to the "text" column in your main DataFrame - if "text" in df.columns: - df["body"] = df["text"].map(lambda x: subject_body_map.get(str(x).strip()) if x else None) - else: - df["body"] = None - - # Save to Excel - output_path = os.path.abspath("header_analysis_output.xlsx") - df.to_excel(output_path, index=False, engine="openpyxl") - - print("--- Processed DataFrame ---") - print(df) - - return output_path - + return "Failed to process the PDF. Please check the URL and try again.
", None, "Processing failed." + except Exception as e: - print(f"ERROR - Critical error in processing: {e}") - # Re-raise or handle as needed - return None -# Improved launch with debug mode enabled + logger.error(f"Error in interface: {e}") + return f"Error: {str(e)}
", None, "Error occurred during processing." + +# Create Gradio interface for HuggingFace iface = gr.Interface( - fn=identify_headers_and_save_excel, + fn=simple_interface, inputs=[ - gr.Textbox(label="PDF URL"), - gr.Textbox(label="Model Type"), # Default example - gr.Textbox(label="LLM Prompt") + gr.Textbox( + label="PDF URL", + placeholder="Enter the URL of a PDF file...", + info="Make sure the PDF is publicly accessible" + ), + gr.Checkbox( + label="Use AI Analysis (OpenRouter)", + value=False, + info="Requires OPENROUTER_API_KEY environment variable" + ), + gr.Dropdown( + label="AI Model", + choices=["openai/gpt-3.5-turbo", "anthropic/claude-3-haiku", "google/gemini-pro"], + value="openai/gpt-3.5-turbo", + visible=False # Hidden for simplicity + ) + ], + outputs=[ + gr.HTML(label="Results Preview"), + gr.File(label="Download Excel Results"), + gr.Textbox(label="Status") + ], + title="PDF Header Extractor", + description="Extract headers from PDF documents and analyze their hierarchy. Upload a publicly accessible PDF URL to begin.", + examples=[ + ["https://arxiv.org/pdf/2305.15334.pdf", False], + ["https://www.w3.org/WAI/ER/tests/xhtml/testfiles/resources/pdf/dummy.pdf", False] ], - outputs=gr.File(label="Download Excel Results"), - title="PDF Header Extractor" + cache_examples=False, + allow_flagging="never" ) -# Launch with debug=True to see errors in the console -iface.launch(debug=True) \ No newline at end of file +# Launch with HuggingFace-friendly settings +if __name__ == "__main__": + # For HuggingFace Spaces, use launch with specific settings + iface.launch( + debug=False, # Disable debug for production + show_api=False, + server_name="0.0.0.0", + server_port=7860 + ) \ No newline at end of file