import gradio as gr import os import json import requests from io import BytesIO import gradio as gr import pandas as pd from io import BytesIO import fitz # PyMuPDF from urllib.parse import urlparse, unquote import os from io import BytesIO import re import requests import pandas as pd import fitz # PyMuPDF import re import urllib.parse import difflib import copy # import tsadropboxretrieval import urllib.parse import logging # Set up logging to see everything logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), # Print to console logging.FileHandler('debug.log', mode='w') # Save to file ] ) logger = logging.getLogger(__name__) top_margin = 70 bottom_margin = 85 def getLocation_of_header(doc, headerText, expected_page=None): locations = [] # pages = ( # [(expected_page, doc.load_page(expected_page))] # if expected_page is not None # else enumerate(doc) # ) expectedpageNorm=expected_page page=doc[expectedpageNorm] # for page_number, page in pages: page_height = page.rect.height rects = page.search_for(headerText) for r in rects: y = r.y0 # Skip headers in top or bottom margin if y <= top_margin: continue if y >= page_height - bottom_margin: continue locations.append({ "headerText":headerText, "page": expectedpageNorm, "x": r.x0, "y": y }) return locations def filter_headers_outside_toc(headers, toc_pages): toc_pages_set = set(toc_pages) filtered = [] for h in headers: page = h[2] y = h[3] # Skip invalid / fallback headers if page is None or y is None: continue # Skip headers inside TOC pages if page in toc_pages_set: continue filtered.append(h) return filtered def headers_with_location(doc, llm_headers): """ Converts LLM headers into: [text, font_size, page, y, suggested_level, confidence] Always include all headers, even if location not found. """ headersJson = [] for h in llm_headers: text = h["text"] llm_page = h["page"] # Attempt to locate the header on the page locations = getLocation_of_header(doc, text,llm_page) if locations: for loc in locations: page = doc.load_page(loc["page"]) fontsize = None for block in page.get_text("dict")["blocks"]: if block.get("type") != 0: continue for line in block.get("lines", []): line_text = "".join(span["text"] for span in line["spans"]).strip() if normalize(line_text) == normalize(text): fontsize = line["spans"][0]["size"] break if fontsize: break entry = [ text, fontsize, loc["page"], loc["y"], h["suggested_level"], ] if entry not in headersJson: headersJson.append(entry) return headersJson def build_hierarchy_from_llm(headers): nodes = [] # ------------------------- # 1. Build nodes safely # ------------------------- for h in headers: # print("headerrrrrrrrrrrrrrr", h) if len(h) < 5: continue text, size, page, y, level = h if level is None: continue try: level = int(level) except Exception: continue node = { "text": text, "page": page if page is not None else -1, "y": y if y is not None else -1, "size": size, "bold": False, "color": None, "font": None, "children": [], "is_numbered": is_numbered(text), "original_size": size, "norm_text": normalize(text), "level": level, } nodes.append(node) if not nodes: return [] # ------------------------- # 2. Sort top-to-bottom # ------------------------- nodes.sort(key=lambda x: (x["page"], x["y"])) # ------------------------- # 3. NORMALIZE LEVELS # (smallest level → 0) # ------------------------- min_level = min(n["level"] for n in nodes) for n in nodes: n["level"] -= min_level # ------------------------- # 4. Build hierarchy # ------------------------- root = [] stack = [] added_level0 = set() for header in nodes: lvl = header["level"] if lvl < 0: continue # De-duplicate true top-level headers if lvl == 0: key = (header["norm_text"], header["page"]) if key in added_level0: continue added_level0.add(key) while stack and stack[-1]["level"] >= lvl: stack.pop() parent = stack[-1] if stack else None if parent: header["path"] = parent["path"] + [header["norm_text"]] parent["children"].append(header) else: header["path"] = [header["norm_text"]] root.append(header) stack.append(header) # ------------------------- # 5. Enforce nesting sanity # ------------------------- def enforce_nesting(node_list, parent_level=-1): for node in node_list: if node["level"] <= parent_level: node["level"] = parent_level + 1 enforce_nesting(node["children"], node["level"]) enforce_nesting(root) # ------------------------- # 6. OPTIONAL cleanup # (only if real level-0s exist) # ------------------------- if any(h["level"] == 0 for h in root): root = [ h for h in root if not (h["level"] == 0 and not h["children"]) ] # ------------------------- # 7. Final pass # ------------------------- header_tree = enforce_level_hierarchy(root) return header_tree def get_regular_font_size_and_color(doc): font_sizes = [] colors = [] fonts = [] # Loop through all pages for page_num in range(len(doc)): page = doc.load_page(page_num) for span in page.get_text("dict")["blocks"]: if "lines" in span: for line in span["lines"]: for span in line["spans"]: font_sizes.append(span['size']) colors.append(span['color']) fonts.append(span['font']) # Get the most common font size, color, and font most_common_font_size = Counter(font_sizes).most_common(1)[0][0] if font_sizes else None most_common_color = Counter(colors).most_common(1)[0][0] if colors else None most_common_font = Counter(fonts).most_common(1)[0][0] if fonts else None return most_common_font_size, most_common_color, most_common_font def normalize_text(text): if text is None: return "" return re.sub(r'\s+', ' ', text.strip().lower()) def get_spaced_text_from_spans(spans): return normalize_text(" ".join(span["text"].strip() for span in spans)) def is_numbered(text): return bool(re.match(r'^\d', text.strip())) def is_similar(a, b, threshold=0.85): return difflib.SequenceMatcher(None, a, b).ratio() > threshold def normalize(text): text = text.lower() text = re.sub(r'\.{2,}', '', text) # remove long dots text = re.sub(r'\s+', ' ', text) # replace multiple spaces with one return text.strip() def clean_toc_entry(toc_text): """Remove page numbers and formatting from TOC entries""" # Remove everything after last sequence of dots/whitespace followed by digits return re.sub(r'[\.\s]+\d+.*$', '', toc_text).strip('. ') def enforce_level_hierarchy(headers): """ Ensure level 2 headers only exist under level 1 headers and clean up any orphaned headers """ def process_node_list(node_list, parent_level=-1): i = 0 while i < len(node_list): node = node_list[i] # Remove level 2 headers that don't have a level 1 parent if node['level'] == 2 and parent_level != 1: node_list.pop(i) continue # Recursively process children process_node_list(node['children'], node['level']) i += 1 process_node_list(headers) return headers def highlight_boxes(doc, highlights, stringtowrite, fixed_width=500): # Set your desired width here for page_num, bbox in highlights.items(): page = doc.load_page(page_num) page_width = page.rect.width # Get original rect for vertical coordinates orig_rect = fitz.Rect(bbox) rect_height = orig_rect.height if rect_height > 30: if orig_rect.width > 10: # Center horizontally using fixed width center_x = page_width / 2 new_x0 = center_x - fixed_width / 2 new_x1 = center_x + fixed_width / 2 new_rect = fitz.Rect(new_x0, orig_rect.y0, new_x1, orig_rect.y1) # Add highlight rectangle annot = page.add_rect_annot(new_rect) if stringtowrite.startswith('Not'): annot.set_colors(stroke=(0.5, 0.5, 0.5), fill=(0.5, 0.5, 0.5)) else: annot.set_colors(stroke=(1, 1, 0), fill=(1, 1, 0)) annot.set_opacity(0.3) annot.update() # Add right-aligned freetext annotation inside the fixed-width box text = '['+stringtowrite +']' annot1 = page.add_freetext_annot( new_rect, text, fontsize=15, fontname='helv', text_color=(1, 0, 0), rotate=page.rotation, align=2 # right alignment ) annot1.update() def get_leaf_headers_with_paths(listtoloop, path=None, output=None): if path is None: path = [] if output is None: output = [] for header in listtoloop: current_path = path + [header['text']] if not header['children']: if header['level'] != 0 and header['level'] != 1: output.append((header, current_path)) else: get_leaf_headers_with_paths(header['children'], current_path, output) return output # Add this helper function at the top of your code def words_match_ratio(text1, text2): words1 = set(text1.split()) words2 = set(text2.split()) if not words1 or not words2: return 0.0 common_words = words1 & words2 return len(common_words) / len(words1) def same_start_word(s1, s2): # Split both strings into words words1 = s1.strip().split() words2 = s2.strip().split() # Check if both have at least one word and compare the first ones if words1 and words2: return words1[0].lower() == words2[0].lower() return False def get_toc_page_numbers(doc, max_pages_to_check=15): toc_pages = [] logger.debug(f"Starting TOC detection, checking first {max_pages_to_check} pages") # 1. Existing Dot Pattern (looking for ".....") dot_pattern = re.compile(r"\.{2,}") # 2. NEW: Title Pattern (looking for specific headers) # ^ and $ ensure the line is JUST that word (ignoring "The contents of the bag...") # re.IGNORECASE makes it match "CONTENTS", "Contents", "Index", etc. title_pattern = re.compile(r"^\s*(table of contents|contents|index)\s*$", re.IGNORECASE) for page_num in range(min(len(doc), max_pages_to_check)): page = doc.load_page(page_num) blocks = page.get_text("dict")["blocks"] dot_line_count = 0 has_toc_title = False logger.debug(f"Checking page {page_num} for TOC") for block in blocks: for line in block.get("lines", []): # Extract text from spans (mimicking get_spaced_text_from_spans) line_text = " ".join([span["text"] for span in line["spans"]]).strip() # CHECK A: Does the line have dots? if dot_pattern.search(line_text): dot_line_count += 1 logger.debug(f" Found dot pattern on page {page_num}: '{line_text[:50]}...'") # CHECK B: Is this line a Title? # We check this early in the loop. If a page has a title "Contents", # we mark it immediately. if title_pattern.match(line_text): has_toc_title = True logger.debug(f" Found TOC title on page {page_num}: '{line_text}'") # CONDITION: # It is a TOC page if it has a Title OR if it has dot leaders. # We use 'dot_line_count >= 1' to be sensitive to single-item lists. if has_toc_title or dot_line_count >= 1: toc_pages.append(page_num) logger.info(f"Page {page_num} identified as TOC page") # RETURN: # If we found TOC pages (e.g., [2, 3]), we return [0, 1, 2, 3] # This covers the cover page, inside cover, and the TOC itself. if toc_pages: last_toc_page = toc_pages[0] result = list(range(0, last_toc_page + 1)) logger.info(f"TOC pages found: {result}") return result logger.info("No TOC pages found") return [] # Return empty list if nothing found def openPDF(pdf_path): logger.info(f"Opening PDF from URL: {pdf_path}") pdf_path = pdf_path.replace('dl=0', 'dl=1') response = requests.get(pdf_path) logger.debug(f"PDF download response status: {response.status_code}") pdf_content = BytesIO(response.content) if not pdf_content: logger.error("No valid PDF content found.") raise ValueError("No valid PDF content found.") doc = fitz.open(stream=pdf_content, filetype="pdf") logger.info(f"PDF opened successfully, {len(doc)} pages") return doc # def identify_headers_with_openrouter(pdf_path, model, LLM_prompt, pages_to_check=None, top_margin=0, bottom_margin=0): # """Ask an LLM (OpenRouter) to identify headers in the document. # Returns a list of dicts: {text, page, suggested_level, confidence}. # The function sends plain page-line strings to the LLM (including page numbers) # and asks for a JSON array containing only header lines with suggested levels. # """ # logger.info("=" * 80) # logger.info("STARTING IDENTIFY_HEADERS_WITH_OPENROUTER") # logger.info(f"PDF Path: {pdf_path}") # logger.info(f"Model: {model}") # logger.info(f"LLM Prompt: {LLM_prompt[:200]}..." if len(LLM_prompt) > 200 else f"LLM Prompt: {LLM_prompt}") # doc = openPDF(pdf_path) # api_key = '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' # if api_key is None: # api_key = os.getenv("OPENROUTER_API_KEY") or None # model = str(model) # # toc_pages = get_toc_page_numbers(doc) # lines_for_prompt = [] # pgestoRun=20 # # logger.info(f"TOC pages to skip: {toc_pages}") # logger.info(f"Total pages in document: {pgestoRun}") # # Collect text lines from pages (skip TOC pages) # total_lines = 0 # for pno in range(len(doc)): # # if pages_to_check and pno not in pages_to_check: # # continue # # if pno in toc_pages: # # logger.debug(f"Skipping TOC page {pno}") # # continue # page = doc.load_page(pno) # page_height = page.rect.height # text_dict = page.get_text("dict") # lines_for_prompt = [] # lines_on_page = 0 # for block in text_dict.get("blocks", []): # if block.get("type") != 0: # text blocks only # continue # for line in block.get("lines", []): # spans = line.get("spans", []) # if not spans: # continue # # Use first span to check vertical position # y0 = spans[0]["bbox"][1] # y1 = spans[0]['bbox'][3] # # if y0 < top_margin or y1 > (page_height - bottom_margin): # # continue # text = " ".join(s.get('text','') for s in spans).strip() # if text: # # prefix with page for easier mapping back # lines_for_prompt.append(f"PAGE {pno+1}: {text}") # lines_on_page += 1 # # if lines_on_page > 0: # # page = doc.load_page(pno) # # page_height = page.rect.height # # lines_on_page = 0 # # text_dict = page.get_text("dict") # # lines = [] # # y_tolerance = 0.2 # tweak if needed (1–3 usually works) # # for block in page.get_text("dict").get('blocks', []): # # if block.get('type') != 0: # # continue # # for line in block.get('lines', []): # # spans = line.get('spans', []) # # if not spans: # # continue # # y0 = spans[0]['bbox'][1] # # y1 = spans[0]['bbox'][3] # # if y0 < top_margin or y1 > (page_height - bottom_margin): # # continue # # for s in spans: # # # text,font,size,flags,color # # # ArrayofTextWithFormat={'Font':s.get('font')},{'Size':s.get('size')},{'Flags':s.get('flags')},{'Color':s.get('color')},{'Text':s.get('text')} # # # prefix with page for easier mapping back # # text = s["text"].strip() # # lines_for_prompt.append(f"PAGE {pno+1}: {text}") # # # if not lines_for_prompt: # # # return [] # # if text: # # # prefix with page for easier mapping back # # # lines_for_prompt.append(f"PAGE {pno+1}: {line}") # # lines_on_page += 1 # if lines_on_page > 0: # logger.debug(f"Page {pno}: collected {lines_on_page} lines") # total_lines += lines_on_page # logger.info(f"Total lines collected for LLM: {total_lines}") # if not lines_for_prompt: # logger.warning("No lines collected for prompt") # return [] # # Log sample of lines # logger.info("Sample lines (first 10):") # for i, line in enumerate(lines_for_prompt[:10]): # logger.info(f" {i}: {line}") # prompt = LLM_prompt+"\n\nLines:\n" + "\n".join(lines_for_prompt) # logger.debug(f"Full prompt length: {len(prompt)} characters") # # Changed: Print entire prompt, not truncated # print("=" * 80) # print("FULL LLM PROMPT:") # print(prompt) # print("=" * 80) # # Also log to file # # try: # # with open("full_prompt.txt", "w", encoding="utf-8") as f: # # f.write(prompt) # # logger.info("Full prompt saved to full_prompt.txt") # # except Exception as e: # # logger.error(f"Could not save prompt to file: {e}") # if not api_key: # # No API key: return empty so caller can fallback to heuristics # logger.error("No API key provided") # return [] # url = "https://openrouter.ai/api/v1/chat/completions" # # Build headers following the OpenRouter example # headers = { # "Authorization": f"Bearer {api_key}", # "Content-Type": "application/json", # "HTTP-Referer": os.getenv("OPENROUTER_REFERER", ""), # "X-Title": os.getenv("OPENROUTER_X_TITLE", "") # } # # Log request details (without exposing full API key) # logger.info(f"Making request to OpenRouter with model: {model}") # logger.debug(f"Headers (API key masked): { {k: '***' if k == 'Authorization' else v for k, v in headers.items()} }") # # Wrap the prompt as the example 'content' array expected by OpenRouter # body = { # "model": model, # "messages": [ # { # "role": "user", # "content": [ # {"type": "text", "text": prompt} # ] # } # ] # } # # Debug: log request body (truncated) and write raw response for inspection # try: # # Changed: Log full body (excluding prompt text which is already logged) # logger.debug(f"Request body (without prompt text): { {k: v if k != 'messages' else '[...prompt...]' for k, v in body.items()} }") # # Removed timeout parameter # resp = requests.post( # url=url, # headers=headers, # data=json.dumps(body) # ) # logger.info(f"HTTP Response Status: {resp.status_code}") # resp.raise_for_status() # resp_text = resp.text # # Changed: Print entire response # print("=" * 80) # print("FULL LLM RESPONSE:") # print(resp_text) # print("=" * 80) # logger.info(f"LLM raw response length: {len(resp_text)}") # # Save raw response for offline inspection # try: # with open("llm_debug.json", "w", encoding="utf-8") as fh: # fh.write(resp_text) # logger.info("Raw response saved to llm_debug.json") # except Exception as e: # logger.error(f"Warning: could not write llm_debug.json: {e}") # rj = resp.json() # logger.info(f"LLM parsed response type: {type(rj)}") # if isinstance(rj, dict): # logger.debug(f"Response keys: {list(rj.keys())}") # except requests.exceptions.RequestException as e: # logger.error(f"HTTP request failed: {repr(e)}") # return [] # except Exception as e: # logger.error(f"LLM call failed: {repr(e)}") # return [] # # Extract textual reply robustly # text_reply = None # if isinstance(rj, dict): # choices = rj.get('choices') or [] # logger.debug(f"Number of choices in response: {len(choices)}") # if choices: # for i, c in enumerate(choices): # logger.debug(f"Choice {i}: {c}") # c0 = choices[0] # msg = c0.get('message') or c0.get('delta') or {} # content = msg.get('content') # if isinstance(content, list): # logger.debug(f"Content is a list with {len(content)} items") # for idx, c in enumerate(content): # if c.get('type') == 'text' and c.get('text'): # text_reply = c.get('text') # logger.debug(f"Found text reply in content[{idx}], length: {len(text_reply)}") # break # elif isinstance(content, str): # text_reply = content # logger.debug(f"Content is string, length: {len(text_reply)}") # elif isinstance(msg, dict) and msg.get('content') and isinstance(msg.get('content'), dict): # text_reply = msg.get('content').get('text') # logger.debug(f"Found text in nested content dict") # # Fallback extraction # if not text_reply: # logger.debug("Trying fallback extraction from choices") # for c in rj.get('choices', []): # if isinstance(c.get('text'), str): # text_reply = c.get('text') # logger.debug(f"Found text reply in choice.text, length: {len(text_reply)}") # break # if not text_reply: # logger.error("Could not extract text reply from response") # # Changed: Print the entire response structure for debugging # print("=" * 80) # print("FAILED TO EXTRACT TEXT REPLY. FULL RESPONSE STRUCTURE:") # print(json.dumps(rj, indent=2)) # print("=" * 80) # return [] # # Changed: Print the extracted text reply # print("=" * 80) # print("EXTRACTED TEXT REPLY:") # print(text_reply) # print("=" * 80) # logger.info(f"Extracted text reply length: {len(text_reply)}") # logger.debug(f"First 500 chars of reply: {text_reply[:500]}...") # s = text_reply.strip() # start = s.find('[') # end = s.rfind(']') # js = s[start:end+1] if start != -1 and end != -1 else s # logger.debug(f"Looking for JSON array: start={start}, end={end}") # logger.debug(f"Extracted JSON string (first 500 chars): {js[:500]}...") # try: # parsed = json.loads(js) # logger.info(f"Successfully parsed JSON, got {len(parsed)} items") # except json.JSONDecodeError as e: # logger.error(f"Failed to parse JSON: {e}") # logger.error(f"JSON string that failed to parse: {js[:1000]}") # # Try to find any JSON-like structure # try: # # Try to extract any JSON array # import re # json_pattern = r'\[\s*\{.*?\}\s*\]' # matches = re.findall(json_pattern, text_reply, re.DOTALL) # if matches: # logger.info(f"Found {len(matches)} potential JSON arrays via regex") # for i, match in enumerate(matches): # try: # parsed = json.loads(match) # logger.info(f"Successfully parsed regex match {i} with {len(parsed)} items") # break # except json.JSONDecodeError as e2: # logger.debug(f"Regex match {i} also failed: {e2}") # continue # else: # logger.error("All regex matches failed to parse") # return [] # else: # logger.error("No JSON-like pattern found via regex") # return [] # except Exception as e2: # logger.error(f"Regex extraction also failed: {e2}") # return [] # # Log parsed results # logger.info(f"Parsed {len(parsed)} header items:") # for i, obj in enumerate(parsed[:10]): # Log first 10 items # logger.info(f" Item {i}: {obj}") # # Normalize parsed entries and return # out = [] # for obj in parsed: # t = obj.get('text') # page = int(obj.get('page')) if obj.get('page') else None # level = obj.get('suggested_level') # conf = float(obj.get('confidence') or 0) # if t and page is not None: # out.append({'text': t, 'page': page-1, 'suggested_level': level, 'confidence': conf}) # logger.info(f"Returning {len(out)} valid header entries") # return out def identify_headers_with_openrouterNEWW(pdf_path, model,LLM_prompt, pages_to_check=None, top_margin=0, bottom_margin=0): """Ask an LLM (OpenRouter) to identify headers in the document. Returns a list of dicts: {text, page, suggested_level, confidence}. The function sends plain page-line strings to the LLM (including page numbers) and asks for a JSON array containing only header lines with suggested levels. """ logger.info("=" * 80) logger.info("STARTING IDENTIFY_HEADERS_WITH_OPENROUTER") # logger.info(f"PDF Path: {pdf_path}") logger.info(f"Model: {model}") # logger.info(f"LLM Prompt: {LLM_prompt[:200]}..." if len(LLM_prompt) > 200 else f"LLM Prompt: {LLM_prompt}") doc = openPDF(pdf_path) api_key = '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' if api_key is None: api_key = os.getenv("OPENROUTER_API_KEY") or None model = str(model) # toc_pages = get_toc_page_numbers(doc) lines_for_prompt = [] # pgestoRun=20 # logger.info(f"TOC pages to skip: {toc_pages}") logger.info(f"Total pages in document: {len(doc)}") # Collect text lines from pages (skip TOC pages) total_lines = 0 for pno in range(len(doc)): # if pages_to_check and pno not in pages_to_check: # continue # if pno in toc_pages: # logger.debug(f"Skipping TOC page {pno}") # continue page = doc.load_page(pno) page_height = page.rect.height lines_on_page = 0 text_dict = page.get_text("dict") lines = [] # y_tolerance = 0.2 # tweak if needed (1–3 usually works) for block in text_dict["blocks"]: if block["type"] != 0: continue for line in block["lines"]: for span in line["spans"]: text = span["text"].strip() if not text: continue if text: # prefix with page for easier mapping back lines_for_prompt.append(f"PAGE {pno+1}: {text}") lines_on_page += 1 if lines_on_page > 0: logger.debug(f"Page {pno}: collected {lines_on_page} lines") total_lines += lines_on_page logger.info(f"Total lines collected for LLM: {total_lines}") if not lines_for_prompt: logger.warning("No lines collected for prompt") return [] # Log sample of lines logger.info("Sample lines (first 10):") for i, line in enumerate(lines_for_prompt[:10]): logger.info(f" {i}: {line}") prompt =LLM_prompt + "\n\nLines:\n" + "\n".join(lines_for_prompt) logger.debug(f"Full prompt length: {len(prompt)} characters") # Changed: Print entire prompt, not truncated print("=" * 80) print("FULL LLM PROMPT:") print(prompt) print("=" * 80) # Also log to file try: with open("full_prompt.txt", "w", encoding="utf-8") as f: f.write(prompt) logger.info("Full prompt saved to full_prompt.txt") except Exception as e: logger.error(f"Could not save prompt to file: {e}") if not api_key: # No API key: return empty so caller can fallback to heuristics logger.error("No API key provided") return [] url = "https://openrouter.ai/api/v1/chat/completions" # Build headers following the OpenRouter example headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", "HTTP-Referer": os.getenv("OPENROUTER_REFERER", ""), "X-Title": os.getenv("OPENROUTER_X_TITLE", ""), # "X-Request-Timestamp": str(unix_timestamp), # "X-Request-Datetime": current_time, } # Log request details (without exposing full API key) logger.info(f"Making request to OpenRouter with model: {model}") logger.debug(f"Headers (API key masked): { {k: '***' if k == 'Authorization' else v for k, v in headers.items()} }") # Wrap the prompt as the example 'content' array expected by OpenRouter body = { "model": model, "messages": [ { "role": "user", "content": [ {"type": "text", "text": prompt} ] } ] } # print(f"Request sent at: {current_time}") # print(f"Unix timestamp: {unix_timestamp}") # Debug: log request body (truncated) and write raw response for inspection try: # Changed: Log full body (excluding prompt text which is already logged) logger.debug(f"Request body (without prompt text): { {k: v if k != 'messages' else '[...prompt...]' for k, v in body.items()} }") # Removed timeout parameter resp = requests.post( url=url, headers=headers, data=json.dumps(body) ) logger.info(f"HTTP Response Status: {resp.status_code}") resp.raise_for_status() resp_text = resp.text # Changed: Print entire response print("=" * 80) print("FULL LLM RESPONSE:") print(resp_text) print("=" * 80) logger.info(f"LLM raw response length: {len(resp_text)}") # Save raw response for offline inspection try: with open("llm_debug.json", "w", encoding="utf-8") as fh: fh.write(resp_text) logger.info("Raw response saved to llm_debug.json") except Exception as e: logger.error(f"Warning: could not write llm_debug.json: {e}") rj = resp.json() logger.info(f"LLM parsed response type: {type(rj)}") if isinstance(rj, dict): logger.debug(f"Response keys: {list(rj.keys())}") except requests.exceptions.RequestException as e: logger.error(f"HTTP request failed: {repr(e)}") return [] except Exception as e: logger.error(f"LLM call failed: {repr(e)}") return [] # Extract textual reply robustly text_reply = None if isinstance(rj, dict): choices = rj.get('choices') or [] logger.debug(f"Number of choices in response: {len(choices)}") if choices: for i, c in enumerate(choices): logger.debug(f"Choice {i}: {c}") c0 = choices[0] msg = c0.get('message') or c0.get('delta') or {} content = msg.get('content') if isinstance(content, list): logger.debug(f"Content is a list with {len(content)} items") for idx, c in enumerate(content): if c.get('type') == 'text' and c.get('text'): text_reply = c.get('text') logger.debug(f"Found text reply in content[{idx}], length: {len(text_reply)}") break elif isinstance(content, str): text_reply = content logger.debug(f"Content is string, length: {len(text_reply)}") elif isinstance(msg, dict) and msg.get('content') and isinstance(msg.get('content'), dict): text_reply = msg.get('content').get('text') logger.debug(f"Found text in nested content dict") # Fallback extraction if not text_reply: logger.debug("Trying fallback extraction from choices") for c in rj.get('choices', []): if isinstance(c.get('text'), str): text_reply = c.get('text') logger.debug(f"Found text reply in choice.text, length: {len(text_reply)}") break if not text_reply: logger.error("Could not extract text reply from response") # Changed: Print the entire response structure for debugging print("=" * 80) print("FAILED TO EXTRACT TEXT REPLY. FULL RESPONSE STRUCTURE:") print(json.dumps(rj, indent=2)) print("=" * 80) return [] # Changed: Print the extracted text reply print("=" * 80) print("EXTRACTED TEXT REPLY:") print(text_reply) print("=" * 80) logger.info(f"Extracted text reply length: {len(text_reply)}") logger.debug(f"First 500 chars of reply: {text_reply[:500]}...") s = text_reply.strip() start = s.find('[') end = s.rfind(']') js = s[start:end+1] if start != -1 and end != -1 else s logger.debug(f"Looking for JSON array: start={start}, end={end}") logger.debug(f"Extracted JSON string (first 500 chars): {js[:500]}...") try: parsed = json.loads(js) logger.info(f"Successfully parsed JSON, got {len(parsed)} items") except json.JSONDecodeError as e: logger.error(f"Failed to parse JSON: {e}") logger.error(f"JSON string that failed to parse: {js[:1000]}") # Try to find any JSON-like structure try: # Try to extract any JSON array import re json_pattern = r'\[\s*\{.*?\}\s*\]' matches = re.findall(json_pattern, text_reply, re.DOTALL) if matches: logger.info(f"Found {len(matches)} potential JSON arrays via regex") for i, match in enumerate(matches): try: parsed = json.loads(match) logger.info(f"Successfully parsed regex match {i} with {len(parsed)} items") break except json.JSONDecodeError as e2: logger.debug(f"Regex match {i} also failed: {e2}") continue else: logger.error("All regex matches failed to parse") return [] else: logger.error("No JSON-like pattern found via regex") return [] except Exception as e2: logger.error(f"Regex extraction also failed: {e2}") return [] # Log parsed results logger.info(f"Parsed {len(parsed)} header items:") for i, obj in enumerate(parsed[:10]): # Log first 10 items logger.info(f" Item {i}: {obj}") # Normalize parsed entries and return out = [] for obj in parsed: t = obj.get('text') page = int(obj.get('page')) if obj.get('page') else None level = obj.get('suggested_level') conf = float(obj.get('confidence') or 0) if t and page is not None: out.append({'text': t, 'page': page-1, 'suggested_level': level, 'confidence': conf}) logger.info(f"Returning {len(out)} valid header entries") return out # def identify_headers_and_save_excel(pdf_path, model, llm_prompt): # try: # # 1. Get the result from your LLM function # result = identify_headers_with_openrouter(pdf_path, model, llm_prompt) # # 2. Safety Check: If LLM failed or returned nothing # if not result: # logger.warning("No headers found or LLM failed. Creating an empty report.") # df = pd.DataFrame([{"System Message": "No headers were identified by the LLM."}]) # else: # df = pd.DataFrame(result) # # 3. Use an Absolute Path for the output # # This ensures Gradio knows exactly where the file is # output_path = os.path.abspath("header_analysis_output.xlsx") # # 4. Save using the engine explicitly # df.to_excel(output_path, index=False, engine='openpyxl') # logger.info(f"File successfully saved to {output_path}") # return output_path # except Exception as e: # logger.error(f"Critical error in processing: {str(e)}") # # Return None or a custom error message to Gradio # return None def extract_section_under_header_tobebilledMultiplePDFS(multiplePDF_Paths,model,identified_headers): logger.debug(f"Starting function") # keywordstoSkip=["installation", "execution", "miscellaneous items", "workmanship", "testing", "labeling"] filenames=[] keywords = {'installation', 'execution', 'miscellaneous items', 'workmanship', 'testing', 'labeling'} arrayofPDFS=multiplePDF_Paths.split(',') print(multiplePDF_Paths) print(arrayofPDFS) docarray=[] jsons=[] df = pd.DataFrame(columns=["PDF Name","NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2","BodyText"]) for pdf_path in arrayofPDFS: headertoContinue1 = False headertoContinue2=False Alltexttobebilled='' parsed_url = urlparse(pdf_path) filename = os.path.basename(parsed_url.path) filename = unquote(filename) # decode URL-encoded characters filenames.append(filename) logger.debug(f"Starting with pdf: {filename}") # Optimized URL handling if pdf_path and ('http' in pdf_path or 'dropbox' in pdf_path): pdf_path = pdf_path.replace('dl=0', 'dl=1') # Cache frequently used values response = requests.get(pdf_path) pdf_content = BytesIO(response.content) if not pdf_content: raise ValueError("No valid PDF content found.") doc = fitz.open(stream=pdf_content, filetype="pdf") logger.info(f"Total pages in document: {len(doc)}") docHighlights = fitz.open(stream=pdf_content, filetype="pdf") most_common_font_size, most_common_color, most_common_font = get_regular_font_size_and_color(doc) # Precompute regex patterns dot_pattern = re.compile(r'\.{3,}') url_pattern = re.compile(r'https?://\S+|www\.\S+') toc_pages = get_toc_page_numbers(doc) logger.info(f"Skipping TOC pages: Range {toc_pages}") # headers, top_3_font_sizes, smallest_font_size, headersSpans = extract_headers( # doc, toc_pages, most_common_font_size, most_common_color, most_common_font, top_margin, bottom_margin # ) logger.info(f"Starting model run.") # identified_headers = identify_headers_with_openrouterNEWW(doc, model) allheaders_LLM=[] for h in identified_headers: if int(h["page"]) in toc_pages: continue if h['text']: allheaders_LLM.append(h['text']) logger.info(f"Done with model.") print('identified_headers',identified_headers) headers_json=headers_with_location(doc,identified_headers) headers=filter_headers_outside_toc(headers_json,toc_pages) hierarchy=build_hierarchy_from_llm(headers) listofHeaderstoMarkup = get_leaf_headers_with_paths(hierarchy) logger.info(f"Hierarchy built as {hierarchy}") # Precompute all children headers once allchildrenheaders = [normalize_text(item['text']) for item, p in listofHeaderstoMarkup] allchildrenheaders_set = set(allchildrenheaders) # For faster lookups # df = pd.DataFrame(columns=["NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2","BodyText"]) dictionaryNBS={} data_list_JSON = [] json_output=[] currentgroupname='' # if len(top_3_font_sizes)==3: # mainHeaderFontSize, subHeaderFontSize, subsubheaderFontSize = top_3_font_sizes # elif len(top_3_font_sizes)==2: # mainHeaderFontSize= top_3_font_sizes[0] # subHeaderFontSize= top_3_font_sizes[1] # subsubheaderFontSize= top_3_font_sizes[1] # Preload all pages to avoid repeated loading # pages = [doc.load_page(page_num) for page_num in range(len(doc)) if page_num not in toc_pages] for heading_to_searchDict,pathss in listofHeaderstoMarkup: heading_to_search = heading_to_searchDict['text'] heading_to_searchPageNum = heading_to_searchDict['page'] paths=heading_to_searchDict['path'] # Initialize variables headertoContinue1 = False headertoContinue2 = False matched_header_line = None done = False collecting = False collected_lines = [] page_highlights = {} current_bbox = {} last_y1s = {} mainHeader = '' subHeader = '' matched_header_line_norm = heading_to_search break_collecting = False heading_norm = normalize_text(heading_to_search) paths_norm = [normalize_text(p) for p in paths[0]] if paths and paths[0] else [] for page_num in range(heading_to_searchPageNum,len(doc)): # print(heading_to_search) if paths[0].strip().lower() != currentgroupname.strip().lower(): Alltexttobebilled+= paths[0] +'\n' currentgroupname=paths[0] # print(paths[0]) if page_num in toc_pages: continue if break_collecting: break page=doc[page_num] page_height = page.rect.height blocks = page.get_text("dict")["blocks"] for block in blocks: if break_collecting: break lines = block.get("lines", []) i = 0 while i < len(lines): if break_collecting: break spans = lines[i].get("spans", []) if not spans: i += 1 continue y0 = spans[0]["bbox"][1] y1 = spans[0]["bbox"][3] if y0 < top_margin or y1 > (page_height - bottom_margin): i += 1 continue line_text = get_spaced_text_from_spans(spans).lower() line_text_norm = normalize_text(line_text) # Combine with next line if available if i + 1 < len(lines): next_spans = lines[i + 1].get("spans", []) next_line_text = get_spaced_text_from_spans(next_spans).lower() combined_line_norm = normalize_text(line_text + " " + next_line_text) else: combined_line_norm = line_text_norm # Check if we should continue processing if combined_line_norm and combined_line_norm in paths[0]: headertoContinue1 = combined_line_norm if combined_line_norm and combined_line_norm in paths[-2]: headertoContinue2 = combined_line_norm # if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : last_path = paths[-2].lower() # if any(word in paths[-2].lower() for word in keywordstoSkip): # if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() or 'workmanship' in paths[-2].lower() or 'testing' in paths[-2].lower() or 'labeling' in paths[-2].lower(): if any(keyword in last_path for keyword in keywords): stringtowrite='Not to be billed' logger.info(f"Keyword found. Not to be billed activated. keywords: {keywords}") else: stringtowrite='To be billed' if stringtowrite=='To be billed': # Alltexttobebilled+= combined_line_norm ################################################# if matched_header_line_norm in combined_line_norm: Alltexttobebilled+='\n' Alltexttobebilled+= ' '+combined_line_norm # Optimized header matching existsfull = ( ( combined_line_norm in allchildrenheaders_set or combined_line_norm in allchildrenheaders ) and heading_to_search in combined_line_norm ) # New word-based matching current_line_words = set(combined_line_norm.split()) heading_words = set(heading_norm.split()) all_words_match = current_line_words.issubset(heading_words) and len(current_line_words) > 0 substring_match = ( heading_norm in combined_line_norm or combined_line_norm in heading_norm or all_words_match # Include the new word-based matching ) # substring_match = ( # heading_norm in combined_line_norm or # combined_line_norm in heading_norm # ) if (substring_match and existsfull and not collecting and len(combined_line_norm) > 0 ):#and (headertoContinue1 or headertoContinue2) ): # Check header conditions more efficiently # header_spans = [ # span for span in spans # if (is_header(span, most_common_font_size, most_common_color, most_common_font) # # and span['size'] >= subsubheaderFontSize # and span['size'] < mainHeaderFontSize) # ] if stringtowrite.startswith('To') : collecting = True # if stringtowrite=='To be billed': # Alltexttobebilled+='\n' # matched_header_font_size = max(span["size"] for span in header_spans) # collected_lines.append(line_text) valid_spans = [span for span in spans if span.get("bbox")] if valid_spans: x0s = [span["bbox"][0] for span in valid_spans] x1s = [span["bbox"][2] for span in valid_spans] y0s = [span["bbox"][1] for span in valid_spans] y1s = [span["bbox"][3] for span in valid_spans] header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] if page_num in current_bbox: cb = current_bbox[page_num] current_bbox[page_num] = [ min(cb[0], header_bbox[0]), min(cb[1], header_bbox[1]), max(cb[2], header_bbox[2]), max(cb[3], header_bbox[3]) ] else: current_bbox[page_num] = header_bbox last_y1s[page_num] = header_bbox[3] x0, y0, x1, y1 = header_bbox zoom = 200 left = int(x0) top = int(y0) zoom_str = f"{zoom},{left},{top}" pageNumberFound = page_num + 1 # Build the query parameters params = { 'pdfLink': pdf_path, # Your PDF link 'keyword': heading_to_search, # Your keyword (could be a string or list) } # URL encode each parameter encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} # Construct the final encoded link encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) # Correctly construct the final URL with page and zoom # final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" # Get current date and time now = datetime.now() # Format the output formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") # Optionally, add the URL to a DataFrame data_entry = { "PDF Name":filename, "NBSLink": zoom_str, "Subject": heading_to_search, "Page": str(pageNumberFound), "Author": "ADR", "Creation Date": formatted_time, "Layer": "Initial", "Code": stringtowrite, # "head above 1": paths[-2], # "head above 2": paths[0], "BodyText":collected_lines, "MC Connnection": 'Go to ' + paths[0].strip().split()[0] +'/'+ heading_to_search.strip().split()[0] + ' in '+ filename } # Dynamically add "head above 1", "head above 2", ... depending on the number of levels for i, path_text in enumerate(paths[:-1]): # skip the last one because that's the current heading data_entry[f"head above {i+1}"] = path_text data_list_JSON.append(data_entry) # Convert list to JSON # json_output = [data_list_JSON] # json_output = json.dumps(data_list_JSON, indent=4) i += 2 continue else: if (substring_match and not collecting and len(combined_line_norm) > 0): # and (headertoContinue1 or headertoContinue2) ): # Calculate word match percentage word_match_percent = words_match_ratio(heading_norm, combined_line_norm) * 100 # Check if at least 70% of header words exist in this line meets_word_threshold = word_match_percent >= 100 # Check header conditions (including word threshold) # header_spans = [ # span for span in spans # if (is_header(span, most_common_font_size, most_common_color, most_common_font) # # and span['size'] >= subsubheaderFontSize # and span['size'] < mainHeaderFontSize) # ] if (meets_word_threshold or same_start_word(heading_to_search, combined_line_norm) ) and stringtowrite.startswith('To'): collecting = True if stringtowrite=='To be billed': Alltexttobebilled+='\n' # if stringtowrite=='To be billed': # Alltexttobebilled+= ' '+ combined_line_norm # matched_header_font_size = max(span["size"] for span in header_spans) collected_lines.append(line_text) valid_spans = [span for span in spans if span.get("bbox")] if valid_spans: x0s = [span["bbox"][0] for span in valid_spans] x1s = [span["bbox"][2] for span in valid_spans] y0s = [span["bbox"][1] for span in valid_spans] y1s = [span["bbox"][3] for span in valid_spans] header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] if page_num in current_bbox: cb = current_bbox[page_num] current_bbox[page_num] = [ min(cb[0], header_bbox[0]), min(cb[1], header_bbox[1]), max(cb[2], header_bbox[2]), max(cb[3], header_bbox[3]) ] else: current_bbox[page_num] = header_bbox last_y1s[page_num] = header_bbox[3] x0, y0, x1, y1 = header_bbox zoom = 200 left = int(x0) top = int(y0) zoom_str = f"{zoom},{left},{top}" pageNumberFound = page_num + 1 # Build the query parameters params = { 'pdfLink': pdf_path, # Your PDF link 'keyword': heading_to_search, # Your keyword (could be a string or list) } # URL encode each parameter encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} # Construct the final encoded link encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) # Correctly construct the final URL with page and zoom # final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" # Get current date and time now = datetime.now() # Format the output formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") # Optionally, add the URL to a DataFrame logger.info(f"Logging into table") data_entry = { "PDF Name":filename, "NBSLink": zoom_str, "Subject": heading_to_search, "Page": str(pageNumberFound), "Author": "ADR", "Creation Date": formatted_time, "Layer": "Initial", "Code": stringtowrite, # "head above 1": paths[-2], # "head above 2": paths[0], "BodyText":collected_lines, "MC Connnection": 'Go to ' + paths[0].strip().split()[0] +'/'+ heading_to_search.strip().split()[0] + ' in '+ filename } # Dynamically add "head above 1", "head above 2", ... depending on the number of levels for i, path_text in enumerate(paths[:-1]): # skip the last one because that's the current heading data_entry[f"head above {i+1}"] = path_text data_list_JSON.append(data_entry) # Convert list to JSON # json_output = [data_list_JSON] # json_output = json.dumps(data_list_JSON, indent=4) i += 2 continue if collecting: norm_line = normalize_text(line_text) # Optimized URL check if url_pattern.match(norm_line): line_is_header = False else: # line_is_header = any(is_header(span, most_common_font_size, most_common_color, most_common_font) for span in spans) def normalize(text): return " ".join(text.lower().split()) line_text = " ".join(span["text"] for span in spans).strip() line_is_header = any( normalize(line_text) == normalize(header) for header in allheaders_LLM ) if line_is_header: header_font_size = max(span["size"] for span in spans) is_probably_real_header = ( # header_font_size >= matched_header_font_size and # is_header(spans[0], most_common_font_size, most_common_color, most_common_font) and len(line_text.strip()) > 2 ) if (norm_line != matched_header_line_norm and norm_line != heading_norm and is_probably_real_header): if line_text not in heading_norm: collecting = False done = True headertoContinue1 = False headertoContinue2=False for page_num, bbox in current_bbox.items(): bbox[3] = last_y1s.get(page_num, bbox[3]) page_highlights[page_num] = bbox highlight_boxes(docHighlights, page_highlights,stringtowrite) break_collecting = True break if break_collecting: break collected_lines.append(line_text) valid_spans = [span for span in spans if span.get("bbox")] if valid_spans: x0s = [span["bbox"][0] for span in valid_spans] x1s = [span["bbox"][2] for span in valid_spans] y0s = [span["bbox"][1] for span in valid_spans] y1s = [span["bbox"][3] for span in valid_spans] line_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] if page_num in current_bbox: cb = current_bbox[page_num] current_bbox[page_num] = [ min(cb[0], line_bbox[0]), min(cb[1], line_bbox[1]), max(cb[2], line_bbox[2]), max(cb[3], line_bbox[3]) ] else: current_bbox[page_num] = line_bbox last_y1s[page_num] = line_bbox[3] i += 1 if not done: for page_num, bbox in current_bbox.items(): bbox[3] = last_y1s.get(page_num, bbox[3]) page_highlights[page_num] = bbox if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : stringtowrite='Not to be billed' else: stringtowrite='To be billed' highlight_boxes(docHighlights, page_highlights,stringtowrite) docarray.append(docHighlights) if data_list_JSON and not data_list_JSON[-1]["BodyText"] and collected_lines: data_list_JSON[-1]["BodyText"] = collected_lines[1:] if len(collected_lines) > 0 else [] # Final cleanup of the JSON data before returning for entry in data_list_JSON: # Check if BodyText exists and has content if isinstance(entry.get("BodyText"), list) and len(entry["BodyText"]) > 0: # Check if the first line of the body is essentially the same as the Subject first_line = normalize_text(entry["BodyText"][0]) subject = normalize_text(entry["Subject"]) # If they match or the subject is inside the first line, remove it if subject in first_line or first_line in subject: entry["BodyText"] = entry["BodyText"][1:] jsons.append(data_list_JSON) logger.info(f"Markups done! Uploading to dropbox") logger.info(f"Uploaded and Readyy!") return jsons,identified_headers def build_subject_body_map(jsons): subject_body = {} for obj in jsons: subject = obj.get("Subject") body = obj.get("BodyText", []) if subject: # join body text into a readable paragraph subject_body[subject.strip()] = " ".join(body) return subject_body def identify_headers_and_save_excel(pdf_path, model,LLM_prompt): try: result = identify_headers_with_openrouterNEWW(pdf_path, model,LLM_prompt) jsons = extract_section_under_header_tobebilledMultiplePDFS(pdf_path, model,result) print(jsons) if not result: df = pd.DataFrame([{ "text": None, "page": None, "suggested_level": None, "confidence": None, "body": None, "System Message": "No headers were identified by the LLM." }]) else: print('here') df = pd.DataFrame(result) subject_body_map = {} for pdf_sections in jsons: for obj in pdf_sections: subject = obj.get("Subject") body = obj.get("BodyText", []) if subject: subject_body_map[subject.strip()] = " ".join(body) df["body"] = df["text"].map(subject_body_map) output_path = os.path.abspath("header_analysis_output.xlsx") df.to_excel(output_path, index=False, engine="openpyxl") print(df) return output_path except Exception as e: logger.error(f"Critical error in processing: {str(e)}") return None # Improved launch with debug mode enabled iface = gr.Interface( fn=identify_headers_and_save_excel, inputs=[ gr.Textbox(label="PDF URL"), gr.Textbox(label="Model Type"), # Default example gr.Textbox(label="LLM Prompt") ], outputs=gr.File(label="Download Excel Results"), title="PDF Header Extractor" ) # Launch with debug=True to see errors in the console iface.launch(debug=True)