Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os | |
| import json | |
| import requests | |
| from io import BytesIO | |
| from datetime import datetime | |
| import pandas as pd | |
| import fitz # PyMuPDF | |
| from collections import defaultdict, Counter | |
| from urllib.parse import urlparse, unquote | |
| import re | |
| import difflib | |
| import copy | |
| import urllib.parse | |
| import logging | |
| from difflib import SequenceMatcher | |
| # Set up logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler(), | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Constants | |
| top_margin = 70 | |
| bottom_margin = 85 | |
| def getLocation_of_header(doc, headerText, expected_page=None): | |
| locations = [] | |
| expectedpageNorm = expected_page | |
| page = doc[expectedpageNorm] | |
| page_height = page.rect.height | |
| rects = page.search_for(headerText) | |
| for r in rects: | |
| y = r.y0 | |
| # Skip headers in top or bottom margin | |
| if y <= top_margin: | |
| continue | |
| if y >= page_height - bottom_margin: | |
| continue | |
| locations.append({ | |
| "headerText": headerText, | |
| "page": expectedpageNorm, | |
| "x": r.x0, | |
| "y": y | |
| }) | |
| return locations | |
| def filter_headers_outside_toc(headers, toc_pages): | |
| toc_pages_set = set(toc_pages) | |
| filtered = [] | |
| for h in headers: | |
| page = h[2] | |
| if page is None: | |
| continue | |
| if page in toc_pages_set: | |
| continue | |
| filtered.append(h) | |
| return filtered | |
| def headers_with_location(doc, llm_headers): | |
| headersJson = [] | |
| for h in llm_headers: | |
| text = h["text"] | |
| llm_page = h["page"] | |
| locations = getLocation_of_header(doc, text, llm_page) | |
| if locations: | |
| for loc in locations: | |
| page = doc.load_page(loc["page"]) | |
| fontsize = None | |
| for block in page.get_text("dict")["blocks"]: | |
| if block.get("type") != 0: | |
| continue | |
| for line in block.get("lines", []): | |
| line_text = "".join(span["text"] for span in line["spans"]).strip() | |
| if normalize(line_text) == normalize(text): | |
| if line["spans"]: | |
| fontsize = line["spans"][0]["size"] | |
| break | |
| if fontsize: | |
| break | |
| entry = [ | |
| text, | |
| fontsize, | |
| loc["page"], | |
| loc["y"], | |
| h["suggested_level"], | |
| loc.get("x", 0), | |
| ] | |
| if entry not in headersJson: | |
| headersJson.append(entry) | |
| return headersJson | |
| def build_hierarchy_from_llm(headers): | |
| nodes = [] | |
| # Build nodes | |
| for h in headers: | |
| if len(h) < 6: | |
| continue | |
| text, size, page, y, level, x = h | |
| if level is None: | |
| continue | |
| try: | |
| level = int(level) | |
| except Exception: | |
| continue | |
| node = { | |
| "text": text, | |
| "page": page if page is not None else -1, | |
| "x": x if x is not None else -1, | |
| "y": y if y is not None else -1, | |
| "size": size, | |
| "bold": False, | |
| "color": None, | |
| "font": None, | |
| "children": [], | |
| "is_numbered": is_numbered(text), | |
| "original_size": size, | |
| "norm_text": normalize(text), | |
| "level": level, | |
| } | |
| nodes.append(node) | |
| if not nodes: | |
| return [] | |
| # Sort top-to-bottom | |
| nodes.sort(key=lambda x: (x["page"], x["y"])) | |
| # Normalize levels | |
| min_level = min(n["level"] for n in nodes) | |
| for n in nodes: | |
| n["level"] -= min_level | |
| # Build hierarchy | |
| root = [] | |
| stack = [] | |
| added_level0 = set() | |
| for header in nodes: | |
| lvl = header["level"] | |
| if lvl < 0: | |
| continue | |
| if lvl == 0: | |
| key = (header["norm_text"], header["page"]) | |
| if key in added_level0: | |
| continue | |
| added_level0.add(key) | |
| while stack and stack[-1]["level"] >= lvl: | |
| stack.pop() | |
| parent = stack[-1] if stack else None | |
| if parent: | |
| header["path"] = parent["path"] + [header["norm_text"]] | |
| parent["children"].append(header) | |
| else: | |
| header["path"] = [header["norm_text"]] | |
| root.append(header) | |
| stack.append(header) | |
| # Enforce nesting | |
| def enforce_nesting(node_list, parent_level=-1): | |
| for node in node_list: | |
| if node["level"] <= parent_level: | |
| node["level"] = parent_level + 1 | |
| enforce_nesting(node["children"], node["level"]) | |
| enforce_nesting(root) | |
| # Cleanup | |
| if any(h["level"] == 0 for h in root): | |
| root = [ | |
| h for h in root | |
| if not (h["level"] == 0 and not h["children"]) | |
| ] | |
| return enforce_level_hierarchy(root) | |
| def get_regular_font_size_and_color(doc): | |
| font_sizes = [] | |
| colors = [] | |
| fonts = [] | |
| # Check only first few pages for efficiency | |
| for page_num in range(min(len(doc), 10)): | |
| page = doc.load_page(page_num) | |
| for span in page.get_text("dict")["blocks"]: | |
| if "lines" in span: | |
| for line in span["lines"]: | |
| for span in line["spans"]: | |
| font_sizes.append(span['size']) | |
| colors.append(span['color']) | |
| fonts.append(span['font']) | |
| most_common_font_size = Counter(font_sizes).most_common(1)[0][0] if font_sizes else 12 | |
| most_common_color = Counter(colors).most_common(1)[0][0] if colors else 0 | |
| most_common_font = Counter(fonts).most_common(1)[0][0] if fonts else "Helvetica" | |
| return most_common_font_size, most_common_color, most_common_font | |
| def normalize_text(text): | |
| if text is None: | |
| return "" | |
| return re.sub(r'\s+', ' ', text.strip().lower()) | |
| def get_spaced_text_from_spans(spans): | |
| return normalize_text(" ".join(span["text"].strip() for span in spans)) | |
| def is_numbered(text): | |
| return bool(re.match(r'^\d', text.strip())) | |
| def is_similar(a, b, threshold=0.85): | |
| return SequenceMatcher(None, a, b).ratio() > threshold | |
| def normalize(text): | |
| text = text.lower() | |
| text = re.sub(r'\.{2,}', '', text) | |
| text = re.sub(r'\s+', ' ', text) | |
| return text.strip() | |
| def clean_toc_entry(toc_text): | |
| return re.sub(r'[\.\s]+\d+.*$', '', toc_text).strip('. ') | |
| def enforce_level_hierarchy(headers): | |
| def process_node_list(node_list, parent_level=-1): | |
| i = 0 | |
| while i < len(node_list): | |
| node = node_list[i] | |
| if node['level'] == 2 and parent_level != 1: | |
| node_list.pop(i) | |
| continue | |
| process_node_list(node['children'], node['level']) | |
| i += 1 | |
| process_node_list(headers) | |
| return headers | |
| def highlight_boxes(doc, highlights, stringtowrite, fixed_width=500): | |
| for page_num, bbox in highlights.items(): | |
| page = doc.load_page(page_num) | |
| page_width = page.rect.width | |
| orig_rect = fitz.Rect(bbox) | |
| rect_height = orig_rect.height | |
| if rect_height > 30: | |
| center_x = page_width / 2 | |
| new_x0 = center_x - fixed_width / 2 | |
| new_x1 = center_x + fixed_width / 2 | |
| new_rect = fitz.Rect(new_x0, orig_rect.y0, new_x1, orig_rect.y1) | |
| annot = page.add_rect_annot(new_rect) | |
| if stringtowrite.startswith('Not'): | |
| annot.set_colors(stroke=(0.5, 0.5, 0.5), fill=(0.5, 0.5, 0.5)) | |
| else: | |
| annot.set_colors(stroke=(1, 1, 0), fill=(1, 1, 0)) | |
| annot.set_opacity(0.3) | |
| annot.update() | |
| text = '[' + stringtowrite + ']' | |
| annot1 = page.add_freetext_annot( | |
| new_rect, | |
| text, | |
| fontsize=15, | |
| fontname='helv', | |
| text_color=(1, 0, 0), | |
| rotate=page.rotation, | |
| align=2 | |
| ) | |
| annot1.update() | |
| def get_leaf_headers_with_paths(listtoloop, path=None, output=None): | |
| if path is None: | |
| path = [] | |
| if output is None: | |
| output = [] | |
| for header in listtoloop: | |
| current_path = path + [header['text']] | |
| if not header['children']: | |
| if header['level'] != 0 and header['level'] != 1: | |
| output.append((header, current_path)) | |
| else: | |
| get_leaf_headers_with_paths(header['children'], current_path, output) | |
| return output | |
| def words_match_ratio(text1, text2): | |
| words1 = set(text1.split()) | |
| words2 = set(text2.split()) | |
| if not words1 or not words2: | |
| return 0.0 | |
| common_words = words1 & words2 | |
| return len(common_words) / len(words1) | |
| def same_start_word(s1, s2): | |
| words1 = s1.strip().split() | |
| words2 = s2.strip().split() | |
| if words1 and words2: | |
| return words1[0].lower() == words2[0].lower() | |
| return False | |
| def get_toc_page_numbers(doc, max_pages_to_check=15): | |
| toc_pages = [] | |
| logger.debug(f"Starting TOC detection, checking first {max_pages_to_check} pages") | |
| dot_pattern = re.compile(r"\.{2,}") | |
| title_pattern = re.compile(r"^\s*(table of contents|contents|index)\s*$", re.IGNORECASE) | |
| for page_num in range(min(len(doc), max_pages_to_check)): | |
| page = doc.load_page(page_num) | |
| blocks = page.get_text("dict")["blocks"] | |
| dot_line_count = 0 | |
| has_toc_title = False | |
| for block in blocks: | |
| for line in block.get("lines", []): | |
| line_text = " ".join([span["text"] for span in line["spans"]]).strip() | |
| if dot_pattern.search(line_text): | |
| dot_line_count += 1 | |
| if title_pattern.match(line_text): | |
| has_toc_title = True | |
| if has_toc_title or dot_line_count >= 1: | |
| toc_pages.append(page_num) | |
| if toc_pages: | |
| last_toc_page = toc_pages[0] | |
| result = list(range(0, last_toc_page + 1)) | |
| logger.info(f"TOC pages found: {result}") | |
| return result | |
| logger.info("No TOC pages found") | |
| return [] | |
| def openPDF(pdf_path): | |
| logger.info(f"Opening PDF from URL: {pdf_path}") | |
| pdf_path = pdf_path.replace('dl=0', 'dl=1') | |
| response = requests.get(pdf_path) | |
| if response.status_code != 200: | |
| logger.error(f"Failed to download PDF. Status code: {response.status_code}") | |
| return None | |
| pdf_content = BytesIO(response.content) | |
| doc = fitz.open(stream=pdf_content, filetype="pdf") | |
| logger.info(f"PDF opened successfully, {len(doc)} pages") | |
| return doc | |
| def is_header(span, regular_font_size, regular_color, regular_font, allheaders_LLM=None): | |
| """ | |
| Determine if a text span is a header based on font characteristics. | |
| """ | |
| # Check font size (headers are typically larger than regular text) | |
| size_ok = span.get('size', 0) > regular_font_size * 1.1 | |
| # Check if it's bold (common for headers) | |
| flags = span.get('flags', 0) | |
| is_bold = bool(flags & 2) | |
| # Check font family | |
| font_ok = span.get('font') != regular_font | |
| # Check color | |
| color_ok = span.get('color') != regular_color | |
| # Check if text matches LLM-identified headers | |
| text_match = False | |
| if allheaders_LLM and 'text' in span: | |
| span_text = span['text'].strip() | |
| if span_text: | |
| norm_text = normalize_text(span_text) | |
| text_match = any( | |
| normalize_text(header) == norm_text | |
| for header in allheaders_LLM | |
| ) | |
| # A span is considered a header if it meets multiple criteria | |
| return (size_ok and (is_bold or font_ok or color_ok)) or text_match | |
| def identify_headers_with_openrouter(pdf_path, model, LLM_prompt, pages_to_check=None): | |
| """Simplified version for HuggingFace Spaces""" | |
| logger.info("Starting header identification") | |
| doc = openPDF(pdf_path) | |
| if doc is None: | |
| return [] | |
| # Use environment variable for API key | |
| api_key = os.getenv("OPENROUTER_API_KEY") | |
| if not api_key: | |
| logger.warning("No OpenRouter API key found. Using fallback heuristics.") | |
| return fallback_header_detection(doc) | |
| # Simplified prompt for faster processing | |
| simplified_prompt = """ | |
| Analyze the following text lines from a PDF document. | |
| Identify which lines are headers/titles and suggest a hierarchy level (1 for main headers, 2 for subheaders, etc.). | |
| Return only a JSON array of objects with keys: text, page, suggested_level. | |
| Example: [{"text": "Introduction", "page": 3, "suggested_level": 1}, ...] | |
| """ | |
| # Collect text from first 20 pages max for HuggingFace | |
| total_pages = len(doc) | |
| start_page = 0 | |
| end_page = min(20, total_pages) # Limit pages for HuggingFace | |
| lines_for_prompt = [] | |
| for pno in range(start_page, end_page): | |
| page = doc.load_page(pno) | |
| text = page.get_text() | |
| if text.strip(): | |
| lines = text.split('\n') | |
| for line in lines: | |
| if line.strip(): | |
| lines_for_prompt.append(f"PAGE {pno+1}: {line.strip()}") | |
| if not lines_for_prompt: | |
| return fallback_header_detection(doc) | |
| prompt = simplified_prompt + "\n\nLines:\n" + "\n".join(lines_for_prompt[:100]) # Limit lines | |
| # Make API call | |
| url = "https://openrouter.ai/api/v1/chat/completions" | |
| headers = { | |
| "Authorization": f"Bearer {api_key}", | |
| "Content-Type": "application/json", | |
| } | |
| body = { | |
| "model": model, | |
| "messages": [ | |
| { | |
| "role": "user", | |
| "content": prompt | |
| } | |
| ], | |
| "max_tokens": 2000 | |
| } | |
| try: | |
| resp = requests.post(url, headers=headers, json=body, timeout=30) | |
| resp.raise_for_status() | |
| rj = resp.json() | |
| # Extract response | |
| text_reply = rj.get('choices', [{}])[0].get('message', {}).get('content', '') | |
| # Parse JSON from response | |
| import json as json_module | |
| try: | |
| # Find JSON array in response | |
| start = text_reply.find('[') | |
| end = text_reply.rfind(']') + 1 | |
| if start != -1 and end != -1: | |
| json_str = text_reply[start:end] | |
| parsed = json_module.loads(json_str) | |
| else: | |
| parsed = [] | |
| except: | |
| parsed = [] | |
| # Format output | |
| out = [] | |
| for obj in parsed: | |
| if isinstance(obj, dict): | |
| t = obj.get('text') | |
| page = obj.get('page') | |
| level = obj.get('suggested_level') | |
| if t and page: | |
| out.append({ | |
| 'text': t, | |
| 'page': page - 1, # Convert to 0-indexed | |
| 'suggested_level': level, | |
| 'confidence': 1.0 | |
| }) | |
| logger.info(f"Identified {len(out)} headers") | |
| return out | |
| except Exception as e: | |
| logger.error(f"OpenRouter API error: {e}") | |
| return fallback_header_detection(doc) | |
| def fallback_header_detection(doc): | |
| """Fallback header detection using font heuristics""" | |
| headers = [] | |
| # Check only first 30 pages for efficiency | |
| for page_num in range(min(len(doc), 30)): | |
| page = doc.load_page(page_num) | |
| blocks = page.get_text("dict")["blocks"] | |
| for block in blocks: | |
| if block.get("type") == 0: # Text block | |
| for line in block.get("lines", []): | |
| if line.get("spans"): | |
| span = line["spans"][0] | |
| text = span.get("text", "").strip() | |
| # Simple heuristics for headers | |
| if (text and | |
| len(text) < 100 and # Headers are usually short | |
| not text.endswith('.') and # Not regular sentences | |
| text[0].isupper() and # Starts with capital | |
| any(c.isalpha() for c in text)): # Contains letters | |
| headers.append({ | |
| 'text': text, | |
| 'page': page_num, | |
| 'suggested_level': 2 if len(text.split()) < 5 else 3, | |
| 'confidence': 0.7 | |
| }) | |
| # Deduplicate | |
| unique_headers = [] | |
| seen = set() | |
| for h in headers: | |
| key = (h['text'].lower(), h['page']) | |
| if key not in seen: | |
| seen.add(key) | |
| unique_headers.append(h) | |
| return unique_headers | |
| def process_single_pdf(pdf_path, model="openai/gpt-3.5-turbo", LLM_prompt=None): | |
| """Process a single PDF for HuggingFace Spaces""" | |
| logger.info(f"Processing PDF: {pdf_path}") | |
| try: | |
| # Open PDF | |
| doc = openPDF(pdf_path) | |
| if doc is None: | |
| return None, None | |
| # Get basic document info | |
| toc_pages = get_toc_page_numbers(doc) | |
| # Identify headers (with fallback) | |
| if LLM_prompt and os.getenv("OPENROUTER_API_KEY"): | |
| identified_headers = identify_headers_with_openrouter(pdf_path, model, LLM_prompt) | |
| else: | |
| identified_headers = fallback_header_detection(doc) | |
| # Process headers | |
| headers_json = headers_with_location(doc, identified_headers) | |
| headers = filter_headers_outside_toc(headers_json, toc_pages) | |
| hierarchy = build_hierarchy_from_llm(headers) | |
| # Create simple output | |
| results = [] | |
| for header in hierarchy: | |
| results.append({ | |
| "text": header.get("text", ""), | |
| "page": header.get("page", 0) + 1, | |
| "level": header.get("level", 0), | |
| "font_size": header.get("size", 0) | |
| }) | |
| # Create DataFrame | |
| df = pd.DataFrame(results) | |
| # Save to Excel | |
| output_path = "header_analysis.xlsx" | |
| df.to_excel(output_path, index=False) | |
| logger.info(f"Processed {len(results)} headers") | |
| return output_path, df.head(10).to_dict('records') | |
| except Exception as e: | |
| logger.error(f"Error processing PDF: {e}") | |
| return None, None | |
| def simple_interface(pdf_path, use_llm=True, model="openai/gpt-3.5-turbo"): | |
| """ | |
| Simplified interface for HuggingFace Spaces | |
| """ | |
| logger.info("Starting PDF header extraction") | |
| if not pdf_path: | |
| return "Please provide a PDF URL", None, None | |
| try: | |
| # Default prompt | |
| LLM_prompt = """Analyze the text lines and identify headers with hierarchy levels.""" | |
| # Process the PDF | |
| excel_path, sample_data = process_single_pdf(pdf_path, model, LLM_prompt if use_llm else None) | |
| if excel_path and os.path.exists(excel_path): | |
| # Read the file content for download | |
| with open(excel_path, 'rb') as f: | |
| file_content = f.read() | |
| # Create sample preview | |
| if sample_data: | |
| preview_html = "<h3>Sample Headers Found:</h3><table border='1' style='width:100%'>" | |
| preview_html += "<tr><th>Text</th><th>Page</th><th>Level</th></tr>" | |
| for item in sample_data: | |
| preview_html += f"<tr><td>{item['text'][:50]}...</td><td>{item['page']}</td><td>{item['level']}</td></tr>" | |
| preview_html += "</table>" | |
| else: | |
| preview_html = "<p>No headers found or could not process.</p>" | |
| return preview_html, (excel_path, file_content), "Processing completed successfully!" | |
| else: | |
| return "<p>Failed to process the PDF. Please check the URL and try again.</p>", None, "Processing failed." | |
| except Exception as e: | |
| logger.error(f"Error in interface: {e}") | |
| return f"<p>Error: {str(e)}</p>", None, "Error occurred during processing." | |
| # Create Gradio interface for HuggingFace | |
| iface = gr.Interface( | |
| fn=simple_interface, | |
| inputs=[ | |
| gr.Textbox( | |
| label="PDF URL", | |
| placeholder="Enter the URL of a PDF file...", | |
| info="Make sure the PDF is publicly accessible" | |
| ), | |
| gr.Checkbox( | |
| label="Use AI Analysis (OpenRouter)", | |
| value=False, | |
| info="Requires OPENROUTER_API_KEY environment variable" | |
| ), | |
| gr.Dropdown( | |
| label="AI Model", | |
| choices=["openai/gpt-3.5-turbo", "anthropic/claude-3-haiku", "google/gemini-pro"], | |
| value="openai/gpt-3.5-turbo", | |
| visible=False # Hidden for simplicity | |
| ) | |
| ], | |
| outputs=[ | |
| gr.HTML(label="Results Preview"), | |
| gr.File(label="Download Excel Results"), | |
| gr.Textbox(label="Status") | |
| ], | |
| title="PDF Header Extractor", | |
| description="Extract headers from PDF documents and analyze their hierarchy. Upload a publicly accessible PDF URL to begin.", | |
| examples=[ | |
| ["https://arxiv.org/pdf/2305.15334.pdf", False], | |
| ["https://www.w3.org/WAI/ER/tests/xhtml/testfiles/resources/pdf/dummy.pdf", False] | |
| ], | |
| cache_examples=False, | |
| allow_flagging="never" | |
| ) | |
| # Launch with HuggingFace-friendly settings | |
| if __name__ == "__main__": | |
| # For HuggingFace Spaces, use launch with specific settings | |
| iface.launch( | |
| debug=False, # Disable debug for production | |
| show_api=False, | |
| server_name="0.0.0.0", | |
| server_port=7860 | |
| ) |