Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os | |
| import json | |
| import requests | |
| from io import BytesIO | |
| import gradio as gr | |
| import pandas as pd | |
| from io import BytesIO | |
| import fitz # PyMuPDF | |
| from urllib.parse import urlparse, unquote | |
| import os | |
| from io import BytesIO | |
| import re | |
| import requests | |
| import pandas as pd | |
| import fitz # PyMuPDF | |
| import re | |
| import urllib.parse | |
| import difflib | |
| from fuzzywuzzy import fuzz | |
| import copy | |
| # import tsadropboxretrieval | |
| import urllib.parse | |
| import logging | |
| # Set up logging to see everything | |
| logging.basicConfig( | |
| level=logging.DEBUG, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler(), # Print to console | |
| logging.FileHandler('debug.log', mode='w') # Save to file | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| def get_toc_page_numbers(doc, max_pages_to_check=15): | |
| toc_pages = [] | |
| logger.debug(f"Starting TOC detection, checking first {max_pages_to_check} pages") | |
| # 1. Existing Dot Pattern (looking for ".....") | |
| dot_pattern = re.compile(r"\.{2,}") | |
| # 2. NEW: Title Pattern (looking for specific headers) | |
| # ^ and $ ensure the line is JUST that word (ignoring "The contents of the bag...") | |
| # re.IGNORECASE makes it match "CONTENTS", "Contents", "Index", etc. | |
| title_pattern = re.compile(r"^\s*(table of contents|contents|index)\s*$", re.IGNORECASE) | |
| for page_num in range(min(len(doc), max_pages_to_check)): | |
| page = doc.load_page(page_num) | |
| blocks = page.get_text("dict")["blocks"] | |
| dot_line_count = 0 | |
| has_toc_title = False | |
| logger.debug(f"Checking page {page_num} for TOC") | |
| for block in blocks: | |
| for line in block.get("lines", []): | |
| # Extract text from spans (mimicking get_spaced_text_from_spans) | |
| line_text = " ".join([span["text"] for span in line["spans"]]).strip() | |
| # CHECK A: Does the line have dots? | |
| if dot_pattern.search(line_text): | |
| dot_line_count += 1 | |
| logger.debug(f" Found dot pattern on page {page_num}: '{line_text[:50]}...'") | |
| # CHECK B: Is this line a Title? | |
| # We check this early in the loop. If a page has a title "Contents", | |
| # we mark it immediately. | |
| if title_pattern.match(line_text): | |
| has_toc_title = True | |
| logger.debug(f" Found TOC title on page {page_num}: '{line_text}'") | |
| # CONDITION: | |
| # It is a TOC page if it has a Title OR if it has dot leaders. | |
| # We use 'dot_line_count >= 1' to be sensitive to single-item lists. | |
| if has_toc_title or dot_line_count >= 1: | |
| toc_pages.append(page_num) | |
| logger.info(f"Page {page_num} identified as TOC page") | |
| # RETURN: | |
| # If we found TOC pages (e.g., [2, 3]), we return [0, 1, 2, 3] | |
| # This covers the cover page, inside cover, and the TOC itself. | |
| if toc_pages: | |
| last_toc_page = toc_pages[0] | |
| result = list(range(0, last_toc_page + 1)) | |
| logger.info(f"TOC pages found: {result}") | |
| return result | |
| logger.info("No TOC pages found") | |
| return [] # Return empty list if nothing found | |
| def openPDF(pdf_path): | |
| logger.info(f"Opening PDF from URL: {pdf_path}") | |
| pdf_path = pdf_path.replace('dl=0', 'dl=1') | |
| response = requests.get(pdf_path) | |
| logger.debug(f"PDF download response status: {response.status_code}") | |
| pdf_content = BytesIO(response.content) | |
| if not pdf_content: | |
| logger.error("No valid PDF content found.") | |
| raise ValueError("No valid PDF content found.") | |
| doc = fitz.open(stream=pdf_content, filetype="pdf") | |
| logger.info(f"PDF opened successfully, {len(doc)} pages") | |
| return doc | |
| def identify_headers_with_openrouter(pdf_path, model, LLM_prompt, pages_to_check=None, top_margin=0, bottom_margin=0): | |
| """Ask an LLM (OpenRouter) to identify headers in the document. | |
| Returns a list of dicts: {text, page, suggested_level, confidence}. | |
| The function sends plain page-line strings to the LLM (including page numbers) | |
| and asks for a JSON array containing only header lines with suggested levels. | |
| """ | |
| logger.info("=" * 80) | |
| logger.info("STARTING IDENTIFY_HEADERS_WITH_OPENROUTER") | |
| logger.info(f"PDF Path: {pdf_path}") | |
| logger.info(f"Model: {model}") | |
| logger.info(f"LLM Prompt: {LLM_prompt[:200]}..." if len(LLM_prompt) > 200 else f"LLM Prompt: {LLM_prompt}") | |
| doc = openPDF(pdf_path) | |
| api_key = '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' | |
| if api_key is None: | |
| api_key = os.getenv("OPENROUTER_API_KEY") or None | |
| model = str(model) | |
| toc_pages = get_toc_page_numbers(doc) | |
| lines_for_prompt = [] | |
| logger.info(f"TOC pages to skip: {toc_pages}") | |
| logger.info(f"Total pages in document: {len(doc)}") | |
| # Collect text lines from pages (skip TOC pages) | |
| total_lines = 0 | |
| for pno in range(len(doc)): | |
| if pages_to_check and pno not in pages_to_check: | |
| continue | |
| if pno in toc_pages: | |
| logger.debug(f"Skipping TOC page {pno}") | |
| continue | |
| page = doc.load_page(pno) | |
| page_height = page.rect.height | |
| lines_on_page = 0 | |
| for block in page.get_text("dict").get('blocks', []): | |
| if block.get('type') != 0: | |
| continue | |
| for line in block.get('lines', []): | |
| spans = line.get('spans', []) | |
| if not spans: | |
| continue | |
| y0 = spans[0]['bbox'][1] | |
| y1 = spans[0]['bbox'][3] | |
| # if y0 < top_margin or y1 > (page_height - bottom_margin): | |
| # continue | |
| for s in spans: | |
| # text,font,size,flags,color | |
| ArrayofTextWithFormat={'Font':s.get('font')},{'Size':s.get('size')},{'Flags':s.get('flags')},{'Color':s.get('color')},{'Text':s.get('text')} | |
| # prefix with page for easier mapping back | |
| lines_for_prompt.append(f"PAGE {pno+1}: {ArrayofTextWithFormat}") | |
| # text = " ".join(s.get('text','') for s in spans).strip() | |
| # if text: | |
| # # prefix with page for easier mapping back | |
| # lines_for_prompt.append(f"PAGE {pno+1}: {text}") | |
| lines_on_page += 1 | |
| if lines_on_page > 0: | |
| logger.debug(f"Page {pno}: collected {lines_on_page} lines") | |
| total_lines += lines_on_page | |
| logger.info(f"Total lines collected for LLM: {total_lines}") | |
| if not lines_for_prompt: | |
| logger.warning("No lines collected for prompt") | |
| return [] | |
| # Log sample of lines | |
| logger.info("Sample lines (first 10):") | |
| for i, line in enumerate(lines_for_prompt[:10]): | |
| logger.info(f" {i}: {line}") | |
| prompt = LLM_prompt + "\n\nLines:\n" + "\n".join(lines_for_prompt) | |
| logger.debug(f"Full prompt length: {len(prompt)} characters") | |
| # Changed: Print entire prompt, not truncated | |
| print("=" * 80) | |
| print("FULL LLM PROMPT:") | |
| print(prompt) | |
| print("=" * 80) | |
| # Also log to file | |
| try: | |
| with open("full_prompt.txt", "w", encoding="utf-8") as f: | |
| f.write(prompt) | |
| logger.info("Full prompt saved to full_prompt.txt") | |
| except Exception as e: | |
| logger.error(f"Could not save prompt to file: {e}") | |
| if not api_key: | |
| # No API key: return empty so caller can fallback to heuristics | |
| logger.error("No API key provided") | |
| return [] | |
| url = "https://openrouter.ai/api/v1/chat/completions" | |
| # Build headers following the OpenRouter example | |
| headers = { | |
| "Authorization": f"Bearer {api_key}", | |
| "Content-Type": "application/json", | |
| "HTTP-Referer": os.getenv("OPENROUTER_REFERER", ""), | |
| "X-Title": os.getenv("OPENROUTER_X_TITLE", "") | |
| } | |
| # Log request details (without exposing full API key) | |
| logger.info(f"Making request to OpenRouter with model: {model}") | |
| logger.debug(f"Headers (API key masked): { {k: '***' if k == 'Authorization' else v for k, v in headers.items()} }") | |
| # Wrap the prompt as the example 'content' array expected by OpenRouter | |
| body = { | |
| "model": model, | |
| "messages": [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": prompt} | |
| ] | |
| } | |
| ] | |
| } | |
| # Debug: log request body (truncated) and write raw response for inspection | |
| try: | |
| # Changed: Log full body (excluding prompt text which is already logged) | |
| logger.debug(f"Request body (without prompt text): { {k: v if k != 'messages' else '[...prompt...]' for k, v in body.items()} }") | |
| # Removed timeout parameter | |
| resp = requests.post( | |
| url=url, | |
| headers=headers, | |
| data=json.dumps(body) | |
| ) | |
| logger.info(f"HTTP Response Status: {resp.status_code}") | |
| resp.raise_for_status() | |
| resp_text = resp.text | |
| # Changed: Print entire response | |
| print("=" * 80) | |
| print("FULL LLM RESPONSE:") | |
| print(resp_text) | |
| print("=" * 80) | |
| logger.info(f"LLM raw response length: {len(resp_text)}") | |
| # Save raw response for offline inspection | |
| try: | |
| with open("llm_debug.json", "w", encoding="utf-8") as fh: | |
| fh.write(resp_text) | |
| logger.info("Raw response saved to llm_debug.json") | |
| except Exception as e: | |
| logger.error(f"Warning: could not write llm_debug.json: {e}") | |
| rj = resp.json() | |
| logger.info(f"LLM parsed response type: {type(rj)}") | |
| if isinstance(rj, dict): | |
| logger.debug(f"Response keys: {list(rj.keys())}") | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"HTTP request failed: {repr(e)}") | |
| return [] | |
| except Exception as e: | |
| logger.error(f"LLM call failed: {repr(e)}") | |
| return [] | |
| # Extract textual reply robustly | |
| text_reply = None | |
| if isinstance(rj, dict): | |
| choices = rj.get('choices') or [] | |
| logger.debug(f"Number of choices in response: {len(choices)}") | |
| if choices: | |
| for i, c in enumerate(choices): | |
| logger.debug(f"Choice {i}: {c}") | |
| c0 = choices[0] | |
| msg = c0.get('message') or c0.get('delta') or {} | |
| content = msg.get('content') | |
| if isinstance(content, list): | |
| logger.debug(f"Content is a list with {len(content)} items") | |
| for idx, c in enumerate(content): | |
| if c.get('type') == 'text' and c.get('text'): | |
| text_reply = c.get('text') | |
| logger.debug(f"Found text reply in content[{idx}], length: {len(text_reply)}") | |
| break | |
| elif isinstance(content, str): | |
| text_reply = content | |
| logger.debug(f"Content is string, length: {len(text_reply)}") | |
| elif isinstance(msg, dict) and msg.get('content') and isinstance(msg.get('content'), dict): | |
| text_reply = msg.get('content').get('text') | |
| logger.debug(f"Found text in nested content dict") | |
| # Fallback extraction | |
| if not text_reply: | |
| logger.debug("Trying fallback extraction from choices") | |
| for c in rj.get('choices', []): | |
| if isinstance(c.get('text'), str): | |
| text_reply = c.get('text') | |
| logger.debug(f"Found text reply in choice.text, length: {len(text_reply)}") | |
| break | |
| if not text_reply: | |
| logger.error("Could not extract text reply from response") | |
| # Changed: Print the entire response structure for debugging | |
| print("=" * 80) | |
| print("FAILED TO EXTRACT TEXT REPLY. FULL RESPONSE STRUCTURE:") | |
| print(json.dumps(rj, indent=2)) | |
| print("=" * 80) | |
| return [] | |
| # Changed: Print the extracted text reply | |
| print("=" * 80) | |
| print("EXTRACTED TEXT REPLY:") | |
| print(text_reply) | |
| print("=" * 80) | |
| logger.info(f"Extracted text reply length: {len(text_reply)}") | |
| logger.debug(f"First 500 chars of reply: {text_reply[:500]}...") | |
| s = text_reply.strip() | |
| start = s.find('[') | |
| end = s.rfind(']') | |
| js = s[start:end+1] if start != -1 and end != -1 else s | |
| logger.debug(f"Looking for JSON array: start={start}, end={end}") | |
| logger.debug(f"Extracted JSON string (first 500 chars): {js[:500]}...") | |
| try: | |
| parsed = json.loads(js) | |
| logger.info(f"Successfully parsed JSON, got {len(parsed)} items") | |
| except json.JSONDecodeError as e: | |
| logger.error(f"Failed to parse JSON: {e}") | |
| logger.error(f"JSON string that failed to parse: {js[:1000]}") | |
| # Try to find any JSON-like structure | |
| try: | |
| # Try to extract any JSON array | |
| import re | |
| json_pattern = r'\[\s*\{.*?\}\s*\]' | |
| matches = re.findall(json_pattern, text_reply, re.DOTALL) | |
| if matches: | |
| logger.info(f"Found {len(matches)} potential JSON arrays via regex") | |
| for i, match in enumerate(matches): | |
| try: | |
| parsed = json.loads(match) | |
| logger.info(f"Successfully parsed regex match {i} with {len(parsed)} items") | |
| break | |
| except json.JSONDecodeError as e2: | |
| logger.debug(f"Regex match {i} also failed: {e2}") | |
| continue | |
| else: | |
| logger.error("All regex matches failed to parse") | |
| return [] | |
| else: | |
| logger.error("No JSON-like pattern found via regex") | |
| return [] | |
| except Exception as e2: | |
| logger.error(f"Regex extraction also failed: {e2}") | |
| return [] | |
| # Log parsed results | |
| logger.info(f"Parsed {len(parsed)} header items:") | |
| for i, obj in enumerate(parsed[:10]): # Log first 10 items | |
| logger.info(f" Item {i}: {obj}") | |
| # Normalize parsed entries and return | |
| out = [] | |
| for obj in parsed: | |
| t = obj.get('text') | |
| page = int(obj.get('page')) if obj.get('page') else None | |
| level = obj.get('suggested_level') | |
| conf = float(obj.get('confidence') or 0) | |
| if t and page is not None: | |
| out.append({'text': t, 'page': page-1, 'suggested_level': level, 'confidence': conf}) | |
| logger.info(f"Returning {len(out)} valid header entries") | |
| return out | |
| def identify_headers_and_save_excel(pdf_path, model, llm_prompt): | |
| logger.info("=" * 80) | |
| logger.info("STARTING IDENTIFY_HEADERS_AND_SAVE_EXCEL") | |
| logger.info(f"Inputs - PDF: {pdf_path}, Model: {model}") | |
| # Call your existing function | |
| result = identify_headers_with_openrouter(pdf_path, model, llm_prompt) | |
| if not result: | |
| logger.warning("No results returned from identify_headers_with_openrouter") | |
| return None | |
| logger.info(f"Got {len(result)} results, creating DataFrame") | |
| df = pd.DataFrame(result) | |
| # Log DataFrame info | |
| logger.info(f"DataFrame shape: {df.shape}") | |
| logger.info(f"DataFrame columns: {df.columns.tolist()}") | |
| logger.info("DataFrame head:") | |
| logger.info(df.head().to_string()) | |
| # Save Excel to a file on disk | |
| output_path = "output.xlsx" | |
| try: | |
| df.to_excel(output_path, index=False, engine='openpyxl') | |
| logger.info(f"Excel file saved successfully to: {output_path}") | |
| # Verify file was created | |
| if os.path.exists(output_path): | |
| file_size = os.path.getsize(output_path) | |
| logger.info(f"Output file exists, size: {file_size} bytes") | |
| else: | |
| logger.error(f"Output file was not created at: {output_path}") | |
| except Exception as e: | |
| logger.error(f"Failed to save Excel file: {e}") | |
| return None | |
| return output_path # return file path, not BytesIO | |
| iface = gr.Interface( | |
| fn=identify_headers_and_save_excel, | |
| inputs=[ | |
| gr.Textbox(label="Document Link"), | |
| gr.Textbox(label="Model Type"), | |
| gr.Textbox(label="LLM Prompt") | |
| ], | |
| outputs = gr.File(file_count="single", label="Download Excel") | |
| ) | |
| if __name__ == "__main__": | |
| print("Starting Gradio interface...") | |
| logger.info("Launching Gradio interface") | |
| iface.launch() |