import gradio as gr import os import json import requests from io import BytesIO import gradio as gr import pandas as pd from io import BytesIO import fitz # PyMuPDF from urllib.parse import urlparse, unquote import os from io import BytesIO import re import requests import pandas as pd import fitz # PyMuPDF import re import urllib.parse import difflib from fuzzywuzzy import fuzz import copy # import tsadropboxretrieval import urllib.parse import logging # Set up logging to see everything logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), # Print to console logging.FileHandler('debug.log', mode='w') # Save to file ] ) logger = logging.getLogger(__name__) def get_toc_page_numbers(doc, max_pages_to_check=15): toc_pages = [] logger.debug(f"Starting TOC detection, checking first {max_pages_to_check} pages") # 1. Existing Dot Pattern (looking for ".....") dot_pattern = re.compile(r"\.{2,}") # 2. NEW: Title Pattern (looking for specific headers) # ^ and $ ensure the line is JUST that word (ignoring "The contents of the bag...") # re.IGNORECASE makes it match "CONTENTS", "Contents", "Index", etc. title_pattern = re.compile(r"^\s*(table of contents|contents|index)\s*$", re.IGNORECASE) for page_num in range(min(len(doc), max_pages_to_check)): page = doc.load_page(page_num) blocks = page.get_text("dict")["blocks"] dot_line_count = 0 has_toc_title = False logger.debug(f"Checking page {page_num} for TOC") for block in blocks: for line in block.get("lines", []): # Extract text from spans (mimicking get_spaced_text_from_spans) line_text = " ".join([span["text"] for span in line["spans"]]).strip() # CHECK A: Does the line have dots? if dot_pattern.search(line_text): dot_line_count += 1 logger.debug(f" Found dot pattern on page {page_num}: '{line_text[:50]}...'") # CHECK B: Is this line a Title? # We check this early in the loop. If a page has a title "Contents", # we mark it immediately. if title_pattern.match(line_text): has_toc_title = True logger.debug(f" Found TOC title on page {page_num}: '{line_text}'") # CONDITION: # It is a TOC page if it has a Title OR if it has dot leaders. # We use 'dot_line_count >= 1' to be sensitive to single-item lists. if has_toc_title or dot_line_count >= 1: toc_pages.append(page_num) logger.info(f"Page {page_num} identified as TOC page") # RETURN: # If we found TOC pages (e.g., [2, 3]), we return [0, 1, 2, 3] # This covers the cover page, inside cover, and the TOC itself. if toc_pages: last_toc_page = toc_pages[0] result = list(range(0, last_toc_page + 1)) logger.info(f"TOC pages found: {result}") return result logger.info("No TOC pages found") return [] # Return empty list if nothing found def openPDF(pdf_path): logger.info(f"Opening PDF from URL: {pdf_path}") pdf_path = pdf_path.replace('dl=0', 'dl=1') response = requests.get(pdf_path) logger.debug(f"PDF download response status: {response.status_code}") pdf_content = BytesIO(response.content) if not pdf_content: logger.error("No valid PDF content found.") raise ValueError("No valid PDF content found.") doc = fitz.open(stream=pdf_content, filetype="pdf") logger.info(f"PDF opened successfully, {len(doc)} pages") return doc def identify_headers_with_openrouter(pdf_path, model, LLM_prompt, pages_to_check=None, top_margin=0, bottom_margin=0): """Ask an LLM (OpenRouter) to identify headers in the document. Returns a list of dicts: {text, page, suggested_level, confidence}. The function sends plain page-line strings to the LLM (including page numbers) and asks for a JSON array containing only header lines with suggested levels. """ logger.info("=" * 80) logger.info("STARTING IDENTIFY_HEADERS_WITH_OPENROUTER") logger.info(f"PDF Path: {pdf_path}") logger.info(f"Model: {model}") logger.info(f"LLM Prompt: {LLM_prompt[:200]}..." if len(LLM_prompt) > 200 else f"LLM Prompt: {LLM_prompt}") doc = openPDF(pdf_path) api_key = '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' if api_key is None: api_key = os.getenv("OPENROUTER_API_KEY") or None model = str(model) toc_pages = get_toc_page_numbers(doc) lines_for_prompt = [] logger.info(f"TOC pages to skip: {toc_pages}") logger.info(f"Total pages in document: {len(doc)}") # Collect text lines from pages (skip TOC pages) total_lines = 0 for pno in range(len(doc)): if pages_to_check and pno not in pages_to_check: continue if pno in toc_pages: logger.debug(f"Skipping TOC page {pno}") continue page = doc.load_page(pno) page_height = page.rect.height lines_on_page = 0 for block in page.get_text("dict").get('blocks', []): if block.get('type') != 0: continue for line in block.get('lines', []): spans = line.get('spans', []) if not spans: continue y0 = spans[0]['bbox'][1] y1 = spans[0]['bbox'][3] # if y0 < top_margin or y1 > (page_height - bottom_margin): # continue for s in spans: # text,font,size,flags,color ArrayofTextWithFormat={'Font':s.get('font')},{'Size':s.get('size')},{'Flags':s.get('flags')},{'Color':s.get('color')},{'Text':s.get('text')} # prefix with page for easier mapping back lines_for_prompt.append(f"PAGE {pno+1}: {ArrayofTextWithFormat}") # text = " ".join(s.get('text','') for s in spans).strip() # if text: # # prefix with page for easier mapping back # lines_for_prompt.append(f"PAGE {pno+1}: {text}") lines_on_page += 1 if lines_on_page > 0: logger.debug(f"Page {pno}: collected {lines_on_page} lines") total_lines += lines_on_page logger.info(f"Total lines collected for LLM: {total_lines}") if not lines_for_prompt: logger.warning("No lines collected for prompt") return [] # Log sample of lines logger.info("Sample lines (first 10):") for i, line in enumerate(lines_for_prompt[:10]): logger.info(f" {i}: {line}") prompt = LLM_prompt + "\n\nLines:\n" + "\n".join(lines_for_prompt) logger.debug(f"Full prompt length: {len(prompt)} characters") # Changed: Print entire prompt, not truncated print("=" * 80) print("FULL LLM PROMPT:") print(prompt) print("=" * 80) # Also log to file try: with open("full_prompt.txt", "w", encoding="utf-8") as f: f.write(prompt) logger.info("Full prompt saved to full_prompt.txt") except Exception as e: logger.error(f"Could not save prompt to file: {e}") if not api_key: # No API key: return empty so caller can fallback to heuristics logger.error("No API key provided") return [] url = "https://openrouter.ai/api/v1/chat/completions" # Build headers following the OpenRouter example headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", "HTTP-Referer": os.getenv("OPENROUTER_REFERER", ""), "X-Title": os.getenv("OPENROUTER_X_TITLE", "") } # Log request details (without exposing full API key) logger.info(f"Making request to OpenRouter with model: {model}") logger.debug(f"Headers (API key masked): { {k: '***' if k == 'Authorization' else v for k, v in headers.items()} }") # Wrap the prompt as the example 'content' array expected by OpenRouter body = { "model": model, "messages": [ { "role": "user", "content": [ {"type": "text", "text": prompt} ] } ] } # Debug: log request body (truncated) and write raw response for inspection try: # Changed: Log full body (excluding prompt text which is already logged) logger.debug(f"Request body (without prompt text): { {k: v if k != 'messages' else '[...prompt...]' for k, v in body.items()} }") # Removed timeout parameter resp = requests.post( url=url, headers=headers, data=json.dumps(body) ) logger.info(f"HTTP Response Status: {resp.status_code}") resp.raise_for_status() resp_text = resp.text # Changed: Print entire response print("=" * 80) print("FULL LLM RESPONSE:") print(resp_text) print("=" * 80) logger.info(f"LLM raw response length: {len(resp_text)}") # Save raw response for offline inspection try: with open("llm_debug.json", "w", encoding="utf-8") as fh: fh.write(resp_text) logger.info("Raw response saved to llm_debug.json") except Exception as e: logger.error(f"Warning: could not write llm_debug.json: {e}") rj = resp.json() logger.info(f"LLM parsed response type: {type(rj)}") if isinstance(rj, dict): logger.debug(f"Response keys: {list(rj.keys())}") except requests.exceptions.RequestException as e: logger.error(f"HTTP request failed: {repr(e)}") return [] except Exception as e: logger.error(f"LLM call failed: {repr(e)}") return [] # Extract textual reply robustly text_reply = None if isinstance(rj, dict): choices = rj.get('choices') or [] logger.debug(f"Number of choices in response: {len(choices)}") if choices: for i, c in enumerate(choices): logger.debug(f"Choice {i}: {c}") c0 = choices[0] msg = c0.get('message') or c0.get('delta') or {} content = msg.get('content') if isinstance(content, list): logger.debug(f"Content is a list with {len(content)} items") for idx, c in enumerate(content): if c.get('type') == 'text' and c.get('text'): text_reply = c.get('text') logger.debug(f"Found text reply in content[{idx}], length: {len(text_reply)}") break elif isinstance(content, str): text_reply = content logger.debug(f"Content is string, length: {len(text_reply)}") elif isinstance(msg, dict) and msg.get('content') and isinstance(msg.get('content'), dict): text_reply = msg.get('content').get('text') logger.debug(f"Found text in nested content dict") # Fallback extraction if not text_reply: logger.debug("Trying fallback extraction from choices") for c in rj.get('choices', []): if isinstance(c.get('text'), str): text_reply = c.get('text') logger.debug(f"Found text reply in choice.text, length: {len(text_reply)}") break if not text_reply: logger.error("Could not extract text reply from response") # Changed: Print the entire response structure for debugging print("=" * 80) print("FAILED TO EXTRACT TEXT REPLY. FULL RESPONSE STRUCTURE:") print(json.dumps(rj, indent=2)) print("=" * 80) return [] # Changed: Print the extracted text reply print("=" * 80) print("EXTRACTED TEXT REPLY:") print(text_reply) print("=" * 80) logger.info(f"Extracted text reply length: {len(text_reply)}") logger.debug(f"First 500 chars of reply: {text_reply[:500]}...") s = text_reply.strip() start = s.find('[') end = s.rfind(']') js = s[start:end+1] if start != -1 and end != -1 else s logger.debug(f"Looking for JSON array: start={start}, end={end}") logger.debug(f"Extracted JSON string (first 500 chars): {js[:500]}...") try: parsed = json.loads(js) logger.info(f"Successfully parsed JSON, got {len(parsed)} items") except json.JSONDecodeError as e: logger.error(f"Failed to parse JSON: {e}") logger.error(f"JSON string that failed to parse: {js[:1000]}") # Try to find any JSON-like structure try: # Try to extract any JSON array import re json_pattern = r'\[\s*\{.*?\}\s*\]' matches = re.findall(json_pattern, text_reply, re.DOTALL) if matches: logger.info(f"Found {len(matches)} potential JSON arrays via regex") for i, match in enumerate(matches): try: parsed = json.loads(match) logger.info(f"Successfully parsed regex match {i} with {len(parsed)} items") break except json.JSONDecodeError as e2: logger.debug(f"Regex match {i} also failed: {e2}") continue else: logger.error("All regex matches failed to parse") return [] else: logger.error("No JSON-like pattern found via regex") return [] except Exception as e2: logger.error(f"Regex extraction also failed: {e2}") return [] # Log parsed results logger.info(f"Parsed {len(parsed)} header items:") for i, obj in enumerate(parsed[:10]): # Log first 10 items logger.info(f" Item {i}: {obj}") # Normalize parsed entries and return out = [] for obj in parsed: t = obj.get('text') page = int(obj.get('page')) if obj.get('page') else None level = obj.get('suggested_level') conf = float(obj.get('confidence') or 0) if t and page is not None: out.append({'text': t, 'page': page-1, 'suggested_level': level, 'confidence': conf}) logger.info(f"Returning {len(out)} valid header entries") return out def identify_headers_and_save_excel(pdf_path, model, llm_prompt): logger.info("=" * 80) logger.info("STARTING IDENTIFY_HEADERS_AND_SAVE_EXCEL") logger.info(f"Inputs - PDF: {pdf_path}, Model: {model}") # Call your existing function result = identify_headers_with_openrouter(pdf_path, model, llm_prompt) if not result: logger.warning("No results returned from identify_headers_with_openrouter") return None logger.info(f"Got {len(result)} results, creating DataFrame") df = pd.DataFrame(result) # Log DataFrame info logger.info(f"DataFrame shape: {df.shape}") logger.info(f"DataFrame columns: {df.columns.tolist()}") logger.info("DataFrame head:") logger.info(df.head().to_string()) # Save Excel to a file on disk output_path = "output.xlsx" try: df.to_excel(output_path, index=False, engine='openpyxl') logger.info(f"Excel file saved successfully to: {output_path}") # Verify file was created if os.path.exists(output_path): file_size = os.path.getsize(output_path) logger.info(f"Output file exists, size: {file_size} bytes") else: logger.error(f"Output file was not created at: {output_path}") except Exception as e: logger.error(f"Failed to save Excel file: {e}") return None return output_path # return file path, not BytesIO iface = gr.Interface( fn=identify_headers_and_save_excel, inputs=[ gr.Textbox(label="Document Link"), gr.Textbox(label="Model Type"), gr.Textbox(label="LLM Prompt") ], outputs = gr.File(file_count="single", label="Download Excel") ) if __name__ == "__main__": print("Starting Gradio interface...") logger.info("Launching Gradio interface") iface.launch()