Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os | |
| import json | |
| import requests | |
| from io import BytesIO | |
| import fitz # PyMuPDF | |
| from urllib.parse import urlparse, unquote | |
| import os | |
| from io import BytesIO | |
| import re | |
| import requests | |
| import pandas as pd | |
| import fitz # PyMuPDF | |
| import re | |
| import urllib.parse | |
| import difflib | |
| from fuzzywuzzy import fuzz | |
| import copy | |
| # import tsadropboxretrieval | |
| import urllib.parse | |
| def get_toc_page_numbers(doc, max_pages_to_check=15): | |
| toc_pages = [] | |
| # 1. Existing Dot Pattern (looking for ".....") | |
| dot_pattern = re.compile(r"\.{2,}") | |
| # 2. NEW: Title Pattern (looking for specific headers) | |
| # ^ and $ ensure the line is JUST that word (ignoring "The contents of the bag...") | |
| # re.IGNORECASE makes it match "CONTENTS", "Contents", "Index", etc. | |
| title_pattern = re.compile(r"^\s*(table of contents|contents|index)\s*$", re.IGNORECASE) | |
| for page_num in range(min(len(doc), max_pages_to_check)): | |
| page = doc.load_page(page_num) | |
| blocks = page.get_text("dict")["blocks"] | |
| dot_line_count = 0 | |
| has_toc_title = False | |
| for block in blocks: | |
| for line in block.get("lines", []): | |
| # Extract text from spans (mimicking get_spaced_text_from_spans) | |
| line_text = " ".join([span["text"] for span in line["spans"]]).strip() | |
| # CHECK A: Does the line have dots? | |
| if dot_pattern.search(line_text): | |
| dot_line_count += 1 | |
| # CHECK B: Is this line a Title? | |
| # We check this early in the loop. If a page has a title "Contents", | |
| # we mark it immediately. | |
| if title_pattern.match(line_text): | |
| has_toc_title = True | |
| # CONDITION: | |
| # It is a TOC page if it has a Title OR if it has dot leaders. | |
| # We use 'dot_line_count >= 1' to be sensitive to single-item lists. | |
| if has_toc_title or dot_line_count >= 1: | |
| toc_pages.append(page_num) | |
| # RETURN: | |
| # If we found TOC pages (e.g., [2, 3]), we return [0, 1, 2, 3] | |
| # This covers the cover page, inside cover, and the TOC itself. | |
| if toc_pages: | |
| last_toc_page = toc_pages[0] | |
| return list(range(0, last_toc_page + 1)) | |
| return [] # Return empty list if nothing found | |
| def openPDF(pdf_path): | |
| pdf_path = pdf_path.replace('dl=0', 'dl=1') | |
| response = requests.get(pdf_path) | |
| pdf_content = BytesIO(response.content) | |
| if not pdf_content: | |
| raise ValueError("No valid PDF content found.") | |
| doc = fitz.open(stream=pdf_content, filetype="pdf") | |
| return doc | |
| def identify_headers_with_openrouter(pdf_path, model,LLM_prompt, pages_to_check=None, top_margin=70, bottom_margin=85): | |
| """Ask an LLM (OpenRouter) to identify headers in the document. | |
| Returns a list of dicts: {text, page, suggested_level, confidence}. | |
| The function sends plain page-line strings to the LLM (including page numbers) | |
| and asks for a JSON array containing only header lines with suggested levels. | |
| """ | |
| doc=openPDF(pdf_path) | |
| api_key='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' | |
| if api_key is None: | |
| api_key = os.getenv("OPENROUTER_API_KEY") or None | |
| model=str(model) | |
| toc_pages = get_toc_page_numbers(doc) | |
| lines_for_prompt = [] | |
| # Collect text lines from pages (skip TOC pages) | |
| for pno in range(len(doc)): | |
| if pages_to_check and pno not in pages_to_check: | |
| continue | |
| if pno in toc_pages: | |
| continue | |
| page = doc.load_page(pno) | |
| page_height = page.rect.height | |
| for block in page.get_text("dict").get('blocks', []): | |
| if block.get('type') != 0: | |
| continue | |
| for line in block.get('lines', []): | |
| spans = line.get('spans', []) | |
| if not spans: | |
| continue | |
| y0 = spans[0]['bbox'][1] | |
| y1 = spans[0]['bbox'][3] | |
| if y0 < top_margin or y1 > (page_height - bottom_margin): | |
| continue | |
| text = " ".join(s.get('text','') for s in spans).strip() | |
| if text: | |
| # prefix with page for easier mapping back | |
| lines_for_prompt.append(f"PAGE {pno+1}: {text}") | |
| if not lines_for_prompt: | |
| return [] | |
| prompt = ( | |
| LLM_prompt.join(lines_for_prompt) | |
| ) | |
| if not api_key: | |
| # No API key: return empty so caller can fallback to heuristics | |
| return [] | |
| url = "https://openrouter.ai/api/v1/chat/completions" | |
| # Build headers following the OpenRouter example | |
| headers = { | |
| "Authorization": f"Bearer {api_key}", | |
| "Content-Type": "application/json", | |
| "HTTP-Referer": os.getenv("OPENROUTER_REFERER", ""), | |
| "X-Title": os.getenv("OPENROUTER_X_TITLE", "") | |
| } | |
| # Wrap the prompt as the example 'content' array expected by OpenRouter | |
| body = { | |
| "model": model, | |
| "messages": [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": prompt} | |
| ] | |
| } | |
| ] | |
| } | |
| # Debug: log request body (truncated) and write raw response for inspection | |
| try: | |
| print("LLM request (truncated):", prompt[:1000]) | |
| resp = requests.post( | |
| url=url, | |
| headers=headers, | |
| data=json.dumps(body), | |
| ) | |
| resp.raise_for_status() | |
| resp_text = resp.text | |
| print("LLM raw response length:", len(resp_text)) | |
| # Save raw response for offline inspection | |
| try: | |
| with open("llm_debug.json", "w", encoding="utf-8") as fh: | |
| fh.write(resp_text) | |
| except Exception as e: | |
| print("Warning: could not write llm_debug.json:", e) | |
| rj = resp.json() | |
| print("LLM parsed response keys:", list(rj.keys()) if isinstance(rj, dict) else type(rj)) | |
| except Exception as e: | |
| print("LLM call failed:", repr(e)) | |
| return [] | |
| # Extract textual reply robustly | |
| text_reply = None | |
| if isinstance(rj, dict): | |
| choices = rj.get('choices') or [] | |
| if choices: | |
| c0 = choices[0] | |
| msg = c0.get('message') or c0.get('delta') or {} | |
| content = msg.get('content') | |
| if isinstance(content, list): | |
| for c in content: | |
| if c.get('type') == 'text' and c.get('text'): | |
| text_reply = c.get('text') | |
| break | |
| elif isinstance(content, str): | |
| text_reply = content | |
| elif isinstance(msg, dict) and msg.get('content') and isinstance(msg.get('content'), dict): | |
| text_reply = msg.get('content').get('text') | |
| if not text_reply: | |
| for c in rj.get('choices', []): | |
| if isinstance(c.get('text'), str): | |
| text_reply = c.get('text') | |
| break | |
| if not text_reply: | |
| return [] | |
| s = text_reply.strip() | |
| start = s.find('[') | |
| end = s.rfind(']') | |
| js = s[start:end+1] if start != -1 and end != -1 else s | |
| try: | |
| parsed = json.loads(js) | |
| except Exception: | |
| return [] | |
| # Normalize parsed entries and return | |
| out = [] | |
| for obj in parsed: | |
| t = obj.get('text') | |
| page = int(obj.get('page')) if obj.get('page') else None | |
| level = obj.get('suggested_level') | |
| conf = float(obj.get('confidence') or 0) | |
| if t and page is not None: | |
| out.append({'text': t, 'page': page-1, 'suggested_level': level, 'confidence': conf}) | |
| return out | |
| # Wrapper function to convert JSON to a dataframe-friendly format | |
| def identify_headers_with_table(pdf_path, model, LLM_prompt): | |
| # Call your existing function | |
| result = identify_headers_with_openrouter(pdf_path, model, LLM_prompt) | |
| # Convert list of dicts to list of lists for Gradio Dataframe | |
| if not result: | |
| return [] # empty table if no results | |
| table_data = [[item['text'], item['page']+1, item['suggested_level'], item['confidence']] for item in result] | |
| return table_data | |
| # Column names for the table | |
| columns = ["Text", "Page", "Suggested Level", "Confidence"] | |
| # Gradio Interface | |
| iface = gr.Interface( | |
| fn=identify_headers_with_table, | |
| inputs=[ | |
| gr.Textbox(label="Document Link"), | |
| gr.Textbox(label="Model Type"), | |
| gr.Textbox(label="LLM Prompt") | |
| ], | |
| outputs=gr.Dataframe(headers=columns) | |
| ) | |
| iface.launch() | |