import gradio as gr import os import json import requests from io import BytesIO from datetime import datetime import pandas as pd import fitz # PyMuPDF from collections import defaultdict, Counter from urllib.parse import urlparse, unquote import re import difflib import copy import urllib.parse import logging from difflib import SequenceMatcher # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), ] ) logger = logging.getLogger(__name__) # Constants top_margin = 70 bottom_margin = 85 def getLocation_of_header(doc, headerText, expected_page=None): locations = [] expectedpageNorm = expected_page page = doc[expectedpageNorm] page_height = page.rect.height rects = page.search_for(headerText) for r in rects: y = r.y0 # Skip headers in top or bottom margin if y <= top_margin: continue if y >= page_height - bottom_margin: continue locations.append({ "headerText": headerText, "page": expectedpageNorm, "x": r.x0, "y": y }) return locations def filter_headers_outside_toc(headers, toc_pages): toc_pages_set = set(toc_pages) filtered = [] for h in headers: page = h[2] if page is None: continue if page in toc_pages_set: continue filtered.append(h) return filtered def headers_with_location(doc, llm_headers): headersJson = [] for h in llm_headers: text = h["text"] llm_page = h["page"] locations = getLocation_of_header(doc, text, llm_page) if locations: for loc in locations: page = doc.load_page(loc["page"]) fontsize = None for block in page.get_text("dict")["blocks"]: if block.get("type") != 0: continue for line in block.get("lines", []): line_text = "".join(span["text"] for span in line["spans"]).strip() if normalize(line_text) == normalize(text): if line["spans"]: fontsize = line["spans"][0]["size"] break if fontsize: break entry = [ text, fontsize, loc["page"], loc["y"], h["suggested_level"], loc.get("x", 0), ] if entry not in headersJson: headersJson.append(entry) return headersJson def build_hierarchy_from_llm(headers): nodes = [] # Build nodes for h in headers: if len(h) < 6: continue text, size, page, y, level, x = h if level is None: continue try: level = int(level) except Exception: continue node = { "text": text, "page": page if page is not None else -1, "x": x if x is not None else -1, "y": y if y is not None else -1, "size": size, "bold": False, "color": None, "font": None, "children": [], "is_numbered": is_numbered(text), "original_size": size, "norm_text": normalize(text), "level": level, } nodes.append(node) if not nodes: return [] # Sort top-to-bottom nodes.sort(key=lambda x: (x["page"], x["y"])) # Normalize levels min_level = min(n["level"] for n in nodes) for n in nodes: n["level"] -= min_level # Build hierarchy root = [] stack = [] added_level0 = set() for header in nodes: lvl = header["level"] if lvl < 0: continue if lvl == 0: key = (header["norm_text"], header["page"]) if key in added_level0: continue added_level0.add(key) while stack and stack[-1]["level"] >= lvl: stack.pop() parent = stack[-1] if stack else None if parent: header["path"] = parent["path"] + [header["norm_text"]] parent["children"].append(header) else: header["path"] = [header["norm_text"]] root.append(header) stack.append(header) # Enforce nesting def enforce_nesting(node_list, parent_level=-1): for node in node_list: if node["level"] <= parent_level: node["level"] = parent_level + 1 enforce_nesting(node["children"], node["level"]) enforce_nesting(root) # Cleanup if any(h["level"] == 0 for h in root): root = [ h for h in root if not (h["level"] == 0 and not h["children"]) ] return enforce_level_hierarchy(root) def get_regular_font_size_and_color(doc): font_sizes = [] colors = [] fonts = [] # Check only first few pages for efficiency for page_num in range(min(len(doc), 10)): page = doc.load_page(page_num) for span in page.get_text("dict")["blocks"]: if "lines" in span: for line in span["lines"]: for span in line["spans"]: font_sizes.append(span['size']) colors.append(span['color']) fonts.append(span['font']) most_common_font_size = Counter(font_sizes).most_common(1)[0][0] if font_sizes else 12 most_common_color = Counter(colors).most_common(1)[0][0] if colors else 0 most_common_font = Counter(fonts).most_common(1)[0][0] if fonts else "Helvetica" return most_common_font_size, most_common_color, most_common_font def normalize_text(text): if text is None: return "" return re.sub(r'\s+', ' ', text.strip().lower()) def get_spaced_text_from_spans(spans): return normalize_text(" ".join(span["text"].strip() for span in spans)) def is_numbered(text): return bool(re.match(r'^\d', text.strip())) def is_similar(a, b, threshold=0.85): return SequenceMatcher(None, a, b).ratio() > threshold def normalize(text): text = text.lower() text = re.sub(r'\.{2,}', '', text) text = re.sub(r'\s+', ' ', text) return text.strip() def clean_toc_entry(toc_text): return re.sub(r'[\.\s]+\d+.*$', '', toc_text).strip('. ') def enforce_level_hierarchy(headers): def process_node_list(node_list, parent_level=-1): i = 0 while i < len(node_list): node = node_list[i] if node['level'] == 2 and parent_level != 1: node_list.pop(i) continue process_node_list(node['children'], node['level']) i += 1 process_node_list(headers) return headers def highlight_boxes(doc, highlights, stringtowrite, fixed_width=500): for page_num, bbox in highlights.items(): page = doc.load_page(page_num) page_width = page.rect.width orig_rect = fitz.Rect(bbox) rect_height = orig_rect.height if rect_height > 30: center_x = page_width / 2 new_x0 = center_x - fixed_width / 2 new_x1 = center_x + fixed_width / 2 new_rect = fitz.Rect(new_x0, orig_rect.y0, new_x1, orig_rect.y1) annot = page.add_rect_annot(new_rect) if stringtowrite.startswith('Not'): annot.set_colors(stroke=(0.5, 0.5, 0.5), fill=(0.5, 0.5, 0.5)) else: annot.set_colors(stroke=(1, 1, 0), fill=(1, 1, 0)) annot.set_opacity(0.3) annot.update() text = '[' + stringtowrite + ']' annot1 = page.add_freetext_annot( new_rect, text, fontsize=15, fontname='helv', text_color=(1, 0, 0), rotate=page.rotation, align=2 ) annot1.update() def get_leaf_headers_with_paths(listtoloop, path=None, output=None): if path is None: path = [] if output is None: output = [] for header in listtoloop: current_path = path + [header['text']] if not header['children']: if header['level'] != 0 and header['level'] != 1: output.append((header, current_path)) else: get_leaf_headers_with_paths(header['children'], current_path, output) return output def words_match_ratio(text1, text2): words1 = set(text1.split()) words2 = set(text2.split()) if not words1 or not words2: return 0.0 common_words = words1 & words2 return len(common_words) / len(words1) def same_start_word(s1, s2): words1 = s1.strip().split() words2 = s2.strip().split() if words1 and words2: return words1[0].lower() == words2[0].lower() return False def get_toc_page_numbers(doc, max_pages_to_check=15): toc_pages = [] logger.debug(f"Starting TOC detection, checking first {max_pages_to_check} pages") dot_pattern = re.compile(r"\.{2,}") title_pattern = re.compile(r"^\s*(table of contents|contents|index)\s*$", re.IGNORECASE) for page_num in range(min(len(doc), max_pages_to_check)): page = doc.load_page(page_num) blocks = page.get_text("dict")["blocks"] dot_line_count = 0 has_toc_title = False for block in blocks: for line in block.get("lines", []): line_text = " ".join([span["text"] for span in line["spans"]]).strip() if dot_pattern.search(line_text): dot_line_count += 1 if title_pattern.match(line_text): has_toc_title = True if has_toc_title or dot_line_count >= 1: toc_pages.append(page_num) if toc_pages: last_toc_page = toc_pages[0] result = list(range(0, last_toc_page + 1)) logger.info(f"TOC pages found: {result}") return result logger.info("No TOC pages found") return [] def openPDF(pdf_path): logger.info(f"Opening PDF from URL: {pdf_path}") pdf_path = pdf_path.replace('dl=0', 'dl=1') response = requests.get(pdf_path) if response.status_code != 200: logger.error(f"Failed to download PDF. Status code: {response.status_code}") return None pdf_content = BytesIO(response.content) doc = fitz.open(stream=pdf_content, filetype="pdf") logger.info(f"PDF opened successfully, {len(doc)} pages") return doc def is_header(span, regular_font_size, regular_color, regular_font, allheaders_LLM=None): """ Determine if a text span is a header based on font characteristics. """ # Check font size (headers are typically larger than regular text) size_ok = span.get('size', 0) > regular_font_size * 1.1 # Check if it's bold (common for headers) flags = span.get('flags', 0) is_bold = bool(flags & 2) # Check font family font_ok = span.get('font') != regular_font # Check color color_ok = span.get('color') != regular_color # Check if text matches LLM-identified headers text_match = False if allheaders_LLM and 'text' in span: span_text = span['text'].strip() if span_text: norm_text = normalize_text(span_text) text_match = any( normalize_text(header) == norm_text for header in allheaders_LLM ) # A span is considered a header if it meets multiple criteria return (size_ok and (is_bold or font_ok or color_ok)) or text_match def identify_headers_with_openrouter(pdf_path, model, LLM_prompt, pages_to_check=None): """Simplified version for HuggingFace Spaces""" logger.info("Starting header identification") doc = openPDF(pdf_path) if doc is None: return [] # Use environment variable for API key api_key = os.getenv("OPENROUTER_API_KEY") if not api_key: logger.warning("No OpenRouter API key found. Using fallback heuristics.") return fallback_header_detection(doc) # Simplified prompt for faster processing simplified_prompt = """ Analyze the following text lines from a PDF document. Identify which lines are headers/titles and suggest a hierarchy level (1 for main headers, 2 for subheaders, etc.). Return only a JSON array of objects with keys: text, page, suggested_level. Example: [{"text": "Introduction", "page": 3, "suggested_level": 1}, ...] """ # Collect text from first 20 pages max for HuggingFace total_pages = len(doc) start_page = 0 end_page = min(20, total_pages) # Limit pages for HuggingFace lines_for_prompt = [] for pno in range(start_page, end_page): page = doc.load_page(pno) text = page.get_text() if text.strip(): lines = text.split('\n') for line in lines: if line.strip(): lines_for_prompt.append(f"PAGE {pno+1}: {line.strip()}") if not lines_for_prompt: return fallback_header_detection(doc) prompt = simplified_prompt + "\n\nLines:\n" + "\n".join(lines_for_prompt[:100]) # Limit lines # Make API call url = "https://openrouter.ai/api/v1/chat/completions" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", } body = { "model": model, "messages": [ { "role": "user", "content": prompt } ], "max_tokens": 2000 } try: resp = requests.post(url, headers=headers, json=body, timeout=30) resp.raise_for_status() rj = resp.json() # Extract response text_reply = rj.get('choices', [{}])[0].get('message', {}).get('content', '') # Parse JSON from response import json as json_module try: # Find JSON array in response start = text_reply.find('[') end = text_reply.rfind(']') + 1 if start != -1 and end != -1: json_str = text_reply[start:end] parsed = json_module.loads(json_str) else: parsed = [] except: parsed = [] # Format output out = [] for obj in parsed: if isinstance(obj, dict): t = obj.get('text') page = obj.get('page') level = obj.get('suggested_level') if t and page: out.append({ 'text': t, 'page': page - 1, # Convert to 0-indexed 'suggested_level': level, 'confidence': 1.0 }) logger.info(f"Identified {len(out)} headers") return out except Exception as e: logger.error(f"OpenRouter API error: {e}") return fallback_header_detection(doc) def fallback_header_detection(doc): """Fallback header detection using font heuristics""" headers = [] # Check only first 30 pages for efficiency for page_num in range(min(len(doc), 30)): page = doc.load_page(page_num) blocks = page.get_text("dict")["blocks"] for block in blocks: if block.get("type") == 0: # Text block for line in block.get("lines", []): if line.get("spans"): span = line["spans"][0] text = span.get("text", "").strip() # Simple heuristics for headers if (text and len(text) < 100 and # Headers are usually short not text.endswith('.') and # Not regular sentences text[0].isupper() and # Starts with capital any(c.isalpha() for c in text)): # Contains letters headers.append({ 'text': text, 'page': page_num, 'suggested_level': 2 if len(text.split()) < 5 else 3, 'confidence': 0.7 }) # Deduplicate unique_headers = [] seen = set() for h in headers: key = (h['text'].lower(), h['page']) if key not in seen: seen.add(key) unique_headers.append(h) return unique_headers def process_single_pdf(pdf_path, model="openai/gpt-3.5-turbo", LLM_prompt=None): """Process a single PDF for HuggingFace Spaces""" logger.info(f"Processing PDF: {pdf_path}") try: # Open PDF doc = openPDF(pdf_path) if doc is None: return None, None # Get basic document info toc_pages = get_toc_page_numbers(doc) # Identify headers (with fallback) if LLM_prompt and os.getenv("OPENROUTER_API_KEY"): identified_headers = identify_headers_with_openrouter(pdf_path, model, LLM_prompt) else: identified_headers = fallback_header_detection(doc) # Process headers headers_json = headers_with_location(doc, identified_headers) headers = filter_headers_outside_toc(headers_json, toc_pages) hierarchy = build_hierarchy_from_llm(headers) # Create simple output results = [] for header in hierarchy: results.append({ "text": header.get("text", ""), "page": header.get("page", 0) + 1, "level": header.get("level", 0), "font_size": header.get("size", 0) }) # Create DataFrame df = pd.DataFrame(results) # Save to Excel output_path = "header_analysis.xlsx" df.to_excel(output_path, index=False) logger.info(f"Processed {len(results)} headers") return output_path, df.head(10).to_dict('records') except Exception as e: logger.error(f"Error processing PDF: {e}") return None, None def simple_interface(pdf_path, use_llm=True, model="openai/gpt-3.5-turbo"): """ Simplified interface for HuggingFace Spaces """ logger.info("Starting PDF header extraction") if not pdf_path: return "Please provide a PDF URL", None, None try: # Default prompt LLM_prompt = """Analyze the text lines and identify headers with hierarchy levels.""" # Process the PDF excel_path, sample_data = process_single_pdf(pdf_path, model, LLM_prompt if use_llm else None) if excel_path and os.path.exists(excel_path): # Read the file content for download with open(excel_path, 'rb') as f: file_content = f.read() # Create sample preview if sample_data: preview_html = "

Sample Headers Found:

" preview_html += "" for item in sample_data: preview_html += f"" preview_html += "
TextPageLevel
{item['text'][:50]}...{item['page']}{item['level']}
" else: preview_html = "

No headers found or could not process.

" return preview_html, (excel_path, file_content), "Processing completed successfully!" else: return "

Failed to process the PDF. Please check the URL and try again.

", None, "Processing failed." except Exception as e: logger.error(f"Error in interface: {e}") return f"

Error: {str(e)}

", None, "Error occurred during processing." # Create Gradio interface for HuggingFace iface = gr.Interface( fn=simple_interface, inputs=[ gr.Textbox( label="PDF URL", placeholder="Enter the URL of a PDF file...", info="Make sure the PDF is publicly accessible" ), gr.Checkbox( label="Use AI Analysis (OpenRouter)", value=False, info="Requires OPENROUTER_API_KEY environment variable" ), gr.Dropdown( label="AI Model", choices=["openai/gpt-3.5-turbo", "anthropic/claude-3-haiku", "google/gemini-pro"], value="openai/gpt-3.5-turbo", visible=False # Hidden for simplicity ) ], outputs=[ gr.HTML(label="Results Preview"), gr.File(label="Download Excel Results"), gr.Textbox(label="Status") ], title="PDF Header Extractor", description="Extract headers from PDF documents and analyze their hierarchy. Upload a publicly accessible PDF URL to begin.", examples=[ ["https://arxiv.org/pdf/2305.15334.pdf", False], ["https://www.w3.org/WAI/ER/tests/xhtml/testfiles/resources/pdf/dummy.pdf", False] ], cache_examples=False, allow_flagging="never" ) # Launch with HuggingFace-friendly settings if __name__ == "__main__": # For HuggingFace Spaces, use launch with specific settings iface.launch( debug=False, # Disable debug for production show_api=False, server_name="0.0.0.0", server_port=7860 )