import gradio as gr import os import json import requests from io import BytesIO from datetime import datetime from difflib import SequenceMatcher import pandas as pd from io import BytesIO import fitz # PyMuPDF from collections import defaultdict, Counter from urllib.parse import urlparse, unquote import os from io import BytesIO import re import requests import pandas as pd import fitz # PyMuPDF import re import urllib.parse import difflib import copy # import tsadropboxretrieval import urllib.parse import logging # Set up logging to see everything logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), # Print to console logging.FileHandler('debug.log', mode='w') # Save to file ] ) logger = logging.getLogger(__name__) top_margin = 70 bottom_margin = 85 def getLocation_of_header(doc, headerText, expected_page=None): locations = [] # pages = ( # [(expected_page, doc.load_page(expected_page))] # if expected_page is not None # else enumerate(doc) # ) expectedpageNorm=expected_page page=doc[expectedpageNorm] # for page_number, page in pages: page_height = page.rect.height rects = page.search_for(headerText) for r in rects: y = r.y0 # Skip headers in top or bottom margin if y <= top_margin: continue if y >= page_height - bottom_margin: continue locations.append({ "headerText":headerText, "page": expectedpageNorm, "x": r.x0, "y": y }) return locations def filter_headers_outside_toc(headers, toc_pages): toc_pages_set = set(toc_pages) filtered = [] for h in headers: page = h[2] y = h[3] # Skip invalid / fallback headers if page is None or y is None: continue # Skip headers inside TOC pages if page in toc_pages_set: continue filtered.append(h) return filtered def headers_with_location(doc, llm_headers): """ Converts LLM headers into: [text, font_size, page, y, suggested_level, confidence] Always include all headers, even if location not found. """ headersJson = [] for h in llm_headers: text = h["text"] llm_page = h["page"] # Attempt to locate the header on the page locations = getLocation_of_header(doc, text,llm_page) if locations: for loc in locations: page = doc.load_page(loc["page"]) fontsize = None for block in page.get_text("dict")["blocks"]: if block.get("type") != 0: continue for line in block.get("lines", []): line_text = "".join(span["text"] for span in line["spans"]).strip() if normalize(line_text) == normalize(text): fontsize = line["spans"][0]["size"] break if fontsize: break entry = [ text, fontsize, loc["page"], loc["y"], h["suggested_level"], ] if entry not in headersJson: headersJson.append(entry) return headersJson def build_hierarchy_from_llm(headers): nodes = [] # ------------------------- # 1. Build nodes safely # ------------------------- for h in headers: # print("headerrrrrrrrrrrrrrr", h) if len(h) < 5: continue text, size, page, y, level = h if level is None: continue try: level = int(level) except Exception: continue node = { "text": text, "page": page if page is not None else -1, "y": y if y is not None else -1, "size": size, "bold": False, "color": None, "font": None, "children": [], "is_numbered": is_numbered(text), "original_size": size, "norm_text": normalize(text), "level": level, } nodes.append(node) if not nodes: return [] # ------------------------- # 2. Sort top-to-bottom # ------------------------- nodes.sort(key=lambda x: (x["page"], x["y"])) # ------------------------- # 3. NORMALIZE LEVELS # (smallest level → 0) # ------------------------- min_level = min(n["level"] for n in nodes) for n in nodes: n["level"] -= min_level # ------------------------- # 4. Build hierarchy # ------------------------- root = [] stack = [] added_level0 = set() for header in nodes: lvl = header["level"] if lvl < 0: continue # De-duplicate true top-level headers if lvl == 0: key = (header["norm_text"], header["page"]) if key in added_level0: continue added_level0.add(key) while stack and stack[-1]["level"] >= lvl: stack.pop() parent = stack[-1] if stack else None if parent: header["path"] = parent["path"] + [header["norm_text"]] parent["children"].append(header) else: header["path"] = [header["norm_text"]] root.append(header) stack.append(header) # ------------------------- # 5. Enforce nesting sanity # ------------------------- def enforce_nesting(node_list, parent_level=-1): for node in node_list: if node["level"] <= parent_level: node["level"] = parent_level + 1 enforce_nesting(node["children"], node["level"]) enforce_nesting(root) # ------------------------- # 6. OPTIONAL cleanup # (only if real level-0s exist) # ------------------------- if any(h["level"] == 0 for h in root): root = [ h for h in root if not (h["level"] == 0 and not h["children"]) ] # ------------------------- # 7. Final pass # ------------------------- header_tree = enforce_level_hierarchy(root) return header_tree def get_regular_font_size_and_color(doc): font_sizes = [] colors = [] fonts = [] # Loop through all pages for page_num in range(len(doc)): page = doc.load_page(page_num) for span in page.get_text("dict")["blocks"]: if "lines" in span: for line in span["lines"]: for span in line["spans"]: font_sizes.append(span['size']) colors.append(span['color']) fonts.append(span['font']) # Get the most common font size, color, and font most_common_font_size = Counter(font_sizes).most_common(1)[0][0] if font_sizes else None most_common_color = Counter(colors).most_common(1)[0][0] if colors else None most_common_font = Counter(fonts).most_common(1)[0][0] if fonts else None return most_common_font_size, most_common_color, most_common_font def normalize_text(text): if text is None: return "" return re.sub(r'\s+', ' ', text.strip().lower()) def get_spaced_text_from_spans(spans): return normalize_text(" ".join(span["text"].strip() for span in spans)) def is_numbered(text): return bool(re.match(r'^\d', text.strip())) def is_similar(a, b, threshold=0.85): return difflib.SequenceMatcher(None, a, b).ratio() > threshold def normalize(text): text = text.lower() text = re.sub(r'\.{2,}', '', text) # remove long dots text = re.sub(r'\s+', ' ', text) # replace multiple spaces with one return text.strip() def clean_toc_entry(toc_text): """Remove page numbers and formatting from TOC entries""" # Remove everything after last sequence of dots/whitespace followed by digits return re.sub(r'[\.\s]+\d+.*$', '', toc_text).strip('. ') def enforce_level_hierarchy(headers): """ Ensure level 2 headers only exist under level 1 headers and clean up any orphaned headers """ def process_node_list(node_list, parent_level=-1): i = 0 while i < len(node_list): node = node_list[i] # Remove level 2 headers that don't have a level 1 parent if node['level'] == 2 and parent_level != 1: node_list.pop(i) continue # Recursively process children process_node_list(node['children'], node['level']) i += 1 process_node_list(headers) return headers def highlight_boxes(doc, highlights, stringtowrite, fixed_width=500): # Set your desired width here for page_num, bbox in highlights.items(): page = doc.load_page(page_num) page_width = page.rect.width # Get original rect for vertical coordinates orig_rect = fitz.Rect(bbox) rect_height = orig_rect.height if rect_height > 30: if orig_rect.width > 10: # Center horizontally using fixed width center_x = page_width / 2 new_x0 = center_x - fixed_width / 2 new_x1 = center_x + fixed_width / 2 new_rect = fitz.Rect(new_x0, orig_rect.y0, new_x1, orig_rect.y1) # Add highlight rectangle annot = page.add_rect_annot(new_rect) if stringtowrite.startswith('Not'): annot.set_colors(stroke=(0.5, 0.5, 0.5), fill=(0.5, 0.5, 0.5)) else: annot.set_colors(stroke=(1, 1, 0), fill=(1, 1, 0)) annot.set_opacity(0.3) annot.update() # Add right-aligned freetext annotation inside the fixed-width box text = '['+stringtowrite +']' annot1 = page.add_freetext_annot( new_rect, text, fontsize=15, fontname='helv', text_color=(1, 0, 0), rotate=page.rotation, align=2 # right alignment ) annot1.update() def get_leaf_headers_with_paths(listtoloop, path=None, output=None): if path is None: path = [] if output is None: output = [] for header in listtoloop: current_path = path + [header['text']] if not header['children']: if header['level'] != 0 and header['level'] != 1: output.append((header, current_path)) else: get_leaf_headers_with_paths(header['children'], current_path, output) return output # Add this helper function at the top of your code def words_match_ratio(text1, text2): words1 = set(text1.split()) words2 = set(text2.split()) if not words1 or not words2: return 0.0 common_words = words1 & words2 return len(common_words) / len(words1) def same_start_word(s1, s2): # Split both strings into words words1 = s1.strip().split() words2 = s2.strip().split() # Check if both have at least one word and compare the first ones if words1 and words2: return words1[0].lower() == words2[0].lower() return False def get_toc_page_numbers(doc, max_pages_to_check=15): toc_pages = [] logger.debug(f"Starting TOC detection, checking first {max_pages_to_check} pages") # 1. Existing Dot Pattern (looking for ".....") dot_pattern = re.compile(r"\.{2,}") # 2. NEW: Title Pattern (looking for specific headers) # ^ and $ ensure the line is JUST that word (ignoring "The contents of the bag...") # re.IGNORECASE makes it match "CONTENTS", "Contents", "Index", etc. title_pattern = re.compile(r"^\s*(table of contents|contents|index)\s*$", re.IGNORECASE) for page_num in range(min(len(doc), max_pages_to_check)): page = doc.load_page(page_num) blocks = page.get_text("dict")["blocks"] dot_line_count = 0 has_toc_title = False logger.debug(f"Checking page {page_num} for TOC") for block in blocks: for line in block.get("lines", []): # Extract text from spans (mimicking get_spaced_text_from_spans) line_text = " ".join([span["text"] for span in line["spans"]]).strip() # CHECK A: Does the line have dots? if dot_pattern.search(line_text): dot_line_count += 1 logger.debug(f" Found dot pattern on page {page_num}: '{line_text[:50]}...'") # CHECK B: Is this line a Title? # We check this early in the loop. If a page has a title "Contents", # we mark it immediately. if title_pattern.match(line_text): has_toc_title = True logger.debug(f" Found TOC title on page {page_num}: '{line_text}'") # CONDITION: # It is a TOC page if it has a Title OR if it has dot leaders. # We use 'dot_line_count >= 1' to be sensitive to single-item lists. if has_toc_title or dot_line_count >= 1: toc_pages.append(page_num) logger.info(f"Page {page_num} identified as TOC page") # RETURN: # If we found TOC pages (e.g., [2, 3]), we return [0, 1, 2, 3] # This covers the cover page, inside cover, and the TOC itself. if toc_pages: last_toc_page = toc_pages[0] result = list(range(0, last_toc_page + 1)) logger.info(f"TOC pages found: {result}") return result logger.info("No TOC pages found") return [] # Return empty list if nothing found def is_header(span, most_common_font_size, most_common_color, most_common_font,allheadersLLM): fontname = span.get("font", "").lower() # is_italic = "italic" in fontname or "oblique" in fontname isheader=False is_bold = "bold" in fontname or span.get("bold", False) if span['text'] in allheadersLLM: isheader=True return ( ( span["size"] > most_common_font_size or span["font"].lower() != most_common_font.lower() or (isheader and span["size"] > most_common_font_size ) ) ) def openPDF(pdf_path): logger.info(f"Opening PDF from URL: {pdf_path}") pdf_path = pdf_path.replace('dl=0', 'dl=1') response = requests.get(pdf_path) logger.debug(f"PDF download response status: {response.status_code}") pdf_content = BytesIO(response.content) if not pdf_content: logger.error("No valid PDF content found.") raise ValueError("No valid PDF content found.") doc = fitz.open(stream=pdf_content, filetype="pdf") logger.info(f"PDF opened successfully, {len(doc)} pages") return doc # def identify_headers_with_openrouter(pdf_path, model, LLM_prompt, pages_to_check=None, top_margin=0, bottom_margin=0): # """Ask an LLM (OpenRouter) to identify headers in the document. # Returns a list of dicts: {text, page, suggested_level, confidence}. # The function sends plain page-line strings to the LLM (including page numbers) # and asks for a JSON array containing only header lines with suggested levels. # """ # logger.info("=" * 80) # logger.info("STARTING IDENTIFY_HEADERS_WITH_OPENROUTER") # logger.info(f"PDF Path: {pdf_path}") # logger.info(f"Model: {model}") # logger.info(f"LLM Prompt: {LLM_prompt[:200]}..." if len(LLM_prompt) > 200 else f"LLM Prompt: {LLM_prompt}") # doc = openPDF(pdf_path) # api_key = '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' # if api_key is None: # api_key = os.getenv("OPENROUTER_API_KEY") or None # model = str(model) # # toc_pages = get_toc_page_numbers(doc) # lines_for_prompt = [] # pgestoRun=20 # # logger.info(f"TOC pages to skip: {toc_pages}") # logger.info(f"Total pages in document: {pgestoRun}") # # Collect text lines from pages (skip TOC pages) # total_lines = 0 # for pno in range(len(doc)): # # if pages_to_check and pno not in pages_to_check: # # continue # # if pno in toc_pages: # # logger.debug(f"Skipping TOC page {pno}") # # continue # page = doc.load_page(pno) # page_height = page.rect.height # text_dict = page.get_text("dict") # lines_for_prompt = [] # lines_on_page = 0 # for block in text_dict.get("blocks", []): # if block.get("type") != 0: # text blocks only # continue # for line in block.get("lines", []): # spans = line.get("spans", []) # if not spans: # continue # # Use first span to check vertical position # y0 = spans[0]["bbox"][1] # y1 = spans[0]['bbox'][3] # # if y0 < top_margin or y1 > (page_height - bottom_margin): # # continue # text = " ".join(s.get('text','') for s in spans).strip() # if text: # # prefix with page for easier mapping back # lines_for_prompt.append(f"PAGE {pno+1}: {text}") # lines_on_page += 1 # # if lines_on_page > 0: # # page = doc.load_page(pno) # # page_height = page.rect.height # # lines_on_page = 0 # # text_dict = page.get_text("dict") # # lines = [] # # y_tolerance = 0.2 # tweak if needed (1–3 usually works) # # for block in page.get_text("dict").get('blocks', []): # # if block.get('type') != 0: # # continue # # for line in block.get('lines', []): # # spans = line.get('spans', []) # # if not spans: # # continue # # y0 = spans[0]['bbox'][1] # # y1 = spans[0]['bbox'][3] # # if y0 < top_margin or y1 > (page_height - bottom_margin): # # continue # # for s in spans: # # # text,font,size,flags,color # # # ArrayofTextWithFormat={'Font':s.get('font')},{'Size':s.get('size')},{'Flags':s.get('flags')},{'Color':s.get('color')},{'Text':s.get('text')} # # # prefix with page for easier mapping back # # text = s["text"].strip() # # lines_for_prompt.append(f"PAGE {pno+1}: {text}") # # # if not lines_for_prompt: # # # return [] # # if text: # # # prefix with page for easier mapping back # # # lines_for_prompt.append(f"PAGE {pno+1}: {line}") # # lines_on_page += 1 # if lines_on_page > 0: # logger.debug(f"Page {pno}: collected {lines_on_page} lines") # total_lines += lines_on_page # logger.info(f"Total lines collected for LLM: {total_lines}") # if not lines_for_prompt: # logger.warning("No lines collected for prompt") # return [] # # Log sample of lines # logger.info("Sample lines (first 10):") # for i, line in enumerate(lines_for_prompt[:10]): # logger.info(f" {i}: {line}") # prompt = LLM_prompt+"\n\nLines:\n" + "\n".join(lines_for_prompt) # logger.debug(f"Full prompt length: {len(prompt)} characters") # # Changed: Print entire prompt, not truncated # print("=" * 80) # print("FULL LLM PROMPT:") # print(prompt) # print("=" * 80) # # Also log to file # # try: # # with open("full_prompt.txt", "w", encoding="utf-8") as f: # # f.write(prompt) # # logger.info("Full prompt saved to full_prompt.txt") # # except Exception as e: # # logger.error(f"Could not save prompt to file: {e}") # if not api_key: # # No API key: return empty so caller can fallback to heuristics # logger.error("No API key provided") # return [] # url = "https://openrouter.ai/api/v1/chat/completions" # # Build headers following the OpenRouter example # headers = { # "Authorization": f"Bearer {api_key}", # "Content-Type": "application/json", # "HTTP-Referer": os.getenv("OPENROUTER_REFERER", ""), # "X-Title": os.getenv("OPENROUTER_X_TITLE", "") # } # # Log request details (without exposing full API key) # logger.info(f"Making request to OpenRouter with model: {model}") # logger.debug(f"Headers (API key masked): { {k: '***' if k == 'Authorization' else v for k, v in headers.items()} }") # # Wrap the prompt as the example 'content' array expected by OpenRouter # body = { # "model": model, # "messages": [ # { # "role": "user", # "content": [ # {"type": "text", "text": prompt} # ] # } # ] # } # # Debug: log request body (truncated) and write raw response for inspection # try: # # Changed: Log full body (excluding prompt text which is already logged) # logger.debug(f"Request body (without prompt text): { {k: v if k != 'messages' else '[...prompt...]' for k, v in body.items()} }") # # Removed timeout parameter # resp = requests.post( # url=url, # headers=headers, # data=json.dumps(body) # ) # logger.info(f"HTTP Response Status: {resp.status_code}") # resp.raise_for_status() # resp_text = resp.text # # Changed: Print entire response # print("=" * 80) # print("FULL LLM RESPONSE:") # print(resp_text) # print("=" * 80) # logger.info(f"LLM raw response length: {len(resp_text)}") # # Save raw response for offline inspection # try: # with open("llm_debug.json", "w", encoding="utf-8") as fh: # fh.write(resp_text) # logger.info("Raw response saved to llm_debug.json") # except Exception as e: # logger.error(f"Warning: could not write llm_debug.json: {e}") # rj = resp.json() # logger.info(f"LLM parsed response type: {type(rj)}") # if isinstance(rj, dict): # logger.debug(f"Response keys: {list(rj.keys())}") # except requests.exceptions.RequestException as e: # logger.error(f"HTTP request failed: {repr(e)}") # return [] # except Exception as e: # logger.error(f"LLM call failed: {repr(e)}") # return [] # # Extract textual reply robustly # text_reply = None # if isinstance(rj, dict): # choices = rj.get('choices') or [] # logger.debug(f"Number of choices in response: {len(choices)}") # if choices: # for i, c in enumerate(choices): # logger.debug(f"Choice {i}: {c}") # c0 = choices[0] # msg = c0.get('message') or c0.get('delta') or {} # content = msg.get('content') # if isinstance(content, list): # logger.debug(f"Content is a list with {len(content)} items") # for idx, c in enumerate(content): # if c.get('type') == 'text' and c.get('text'): # text_reply = c.get('text') # logger.debug(f"Found text reply in content[{idx}], length: {len(text_reply)}") # break # elif isinstance(content, str): # text_reply = content # logger.debug(f"Content is string, length: {len(text_reply)}") # elif isinstance(msg, dict) and msg.get('content') and isinstance(msg.get('content'), dict): # text_reply = msg.get('content').get('text') # logger.debug(f"Found text in nested content dict") # # Fallback extraction # if not text_reply: # logger.debug("Trying fallback extraction from choices") # for c in rj.get('choices', []): # if isinstance(c.get('text'), str): # text_reply = c.get('text') # logger.debug(f"Found text reply in choice.text, length: {len(text_reply)}") # break # if not text_reply: # logger.error("Could not extract text reply from response") # # Changed: Print the entire response structure for debugging # print("=" * 80) # print("FAILED TO EXTRACT TEXT REPLY. FULL RESPONSE STRUCTURE:") # print(json.dumps(rj, indent=2)) # print("=" * 80) # return [] # # Changed: Print the extracted text reply # print("=" * 80) # print("EXTRACTED TEXT REPLY:") # print(text_reply) # print("=" * 80) # logger.info(f"Extracted text reply length: {len(text_reply)}") # logger.debug(f"First 500 chars of reply: {text_reply[:500]}...") # s = text_reply.strip() # start = s.find('[') # end = s.rfind(']') # js = s[start:end+1] if start != -1 and end != -1 else s # logger.debug(f"Looking for JSON array: start={start}, end={end}") # logger.debug(f"Extracted JSON string (first 500 chars): {js[:500]}...") # try: # parsed = json.loads(js) # logger.info(f"Successfully parsed JSON, got {len(parsed)} items") # except json.JSONDecodeError as e: # logger.error(f"Failed to parse JSON: {e}") # logger.error(f"JSON string that failed to parse: {js[:1000]}") # # Try to find any JSON-like structure # try: # # Try to extract any JSON array # import re # json_pattern = r'\[\s*\{.*?\}\s*\]' # matches = re.findall(json_pattern, text_reply, re.DOTALL) # if matches: # logger.info(f"Found {len(matches)} potential JSON arrays via regex") # for i, match in enumerate(matches): # try: # parsed = json.loads(match) # logger.info(f"Successfully parsed regex match {i} with {len(parsed)} items") # break # except json.JSONDecodeError as e2: # logger.debug(f"Regex match {i} also failed: {e2}") # continue # else: # logger.error("All regex matches failed to parse") # return [] # else: # logger.error("No JSON-like pattern found via regex") # return [] # except Exception as e2: # logger.error(f"Regex extraction also failed: {e2}") # return [] # # Log parsed results # logger.info(f"Parsed {len(parsed)} header items:") # for i, obj in enumerate(parsed[:10]): # Log first 10 items # logger.info(f" Item {i}: {obj}") # # Normalize parsed entries and return # out = [] # for obj in parsed: # t = obj.get('text') # page = int(obj.get('page')) if obj.get('page') else None # level = obj.get('suggested_level') # conf = float(obj.get('confidence') or 0) # if t and page is not None: # out.append({'text': t, 'page': page-1, 'suggested_level': level, 'confidence': conf}) # logger.info(f"Returning {len(out)} valid header entries") # return out def process_document_in_chunks( lengthofDoc, pdf_path, LLM_prompt, model, chunk_size=15, ): total_pages = lengthofDoc all_results = [] print(f"DEBUG: process_document_in_chunks - Total pages: {total_pages}") for start in range(0, total_pages, chunk_size): end = start + chunk_size print(f"DEBUG: Processing pages {start + 1} → {min(end, total_pages)}") result = identify_headers_with_openrouterNEWW( pdf_path=pdf_path, model=model, LLM_prompt=LLM_prompt, pages_to_check=(start, end) ) print(f"DEBUG: Chunk returned {len(result) if result else 0} headers") if result: print(f"DEBUG: Sample header from chunk: {result[0]}") all_results.extend(result) print(f"DEBUG: Total headers collected: {len(all_results)}") return all_results def identify_headers_with_openrouterNEWW(pdf_path, model,LLM_prompt, pages_to_check=None, top_margin=0, bottom_margin=0): """Ask an LLM (OpenRouter) to identify headers in the document. Returns a list of dicts: {text, page, suggested_level, confidence}. The function sends plain page-line strings to the LLM (including page numbers) and asks for a JSON array containing only header lines with suggested levels. """ logger.info("=" * 80) logger.info("STARTING IDENTIFY_HEADERS_WITH_OPENROUTER") # logger.info(f"PDF Path: {pdf_path}") logger.info(f"Model: {model}") # logger.info(f"LLM Prompt: {LLM_prompt[:200]}..." if len(LLM_prompt) > 200 else f"LLM Prompt: {LLM_prompt}") doc = openPDF(pdf_path) api_key = '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' if api_key is None: api_key = os.getenv("OPENROUTER_API_KEY") or None model = str(model) # toc_pages = get_toc_page_numbers(doc) lines_for_prompt = [] # pgestoRun=20 # logger.info(f"TOC pages to skip: {toc_pages}") # logger.info(f"Total pages in document: {len(doc)}") logger.info(f"Total pages in document: {len(doc)}") # Collect text lines from pages (skip TOC pages) total_lines = 0 ArrayofTextWithFormat = [] total_pages = len(doc) if pages_to_check is None: start_page = 0 end_page = min(15, total_pages) else: start_page, end_page = pages_to_check end_page = min(end_page, total_pages) # 🔑 CRITICAL LINE for pno in range(start_page, end_page): page = doc.load_page(pno) # # Collect text lines from pages (skip TOC pages) # total_lines = 0 # for pno in range(len(doc)): # if pages_to_check and pno not in pages_to_check: # continue # if pno in toc_pages: # logger.debug(f"Skipping TOC page {pno}") # continue # page = doc.load_page(pno) # page_height = page.rect.height # lines_on_page = 0 # text_dict = page.get_text("dict") # lines = [] # # y_tolerance = 0.2 # tweak if needed (1–3 usually works) # for block in text_dict["blocks"]: # if block["type"] != 0: # continue # for line in block["lines"]: # for span in line["spans"]: # text = span["text"].strip() # if not text: # continue # if text: # # prefix with page for easier mapping back # lines_for_prompt.append(f"PAGE {pno+1}: {text}") # lines_on_page += 1 # if lines_on_page > 0: # logger.debug(f"Page {pno}: collected {lines_on_page} lines") # total_lines += lines_on_page # logger.info(f"Total lines collected for LLM: {total_lines}") page_height = page.rect.height lines_on_page = 0 text_dict = page.get_text("dict") lines = [] y_tolerance = 0.5 # tweak if needed (1–3 usually works) for block in text_dict["blocks"]: if block["type"] != 0: continue for line in block["lines"]: for span in line["spans"]: text = span["text"].strip() if not text: # Skip empty text continue # Extract all formatting attributes font = span.get('font') size = span.get('size') color = span.get('color') flags = span.get('flags', 0) bbox = span.get("bbox", (0, 0, 0, 0)) x0, y0, x1, y1 = bbox # Create text format dictionary text_format = { 'Font': font, 'Size': size, 'Flags': flags, 'Color': color, 'Text': text, 'BBox': bbox, 'Page': pno + 1 } # Add to ArrayofTextWithFormat ArrayofTextWithFormat.append(text_format) # For line grouping (keeping your existing logic) matched = False for l in lines: if abs(l["y"] - y0) <= y_tolerance: l["spans"].append((x0, text, font, size, color, flags)) matched = True break if not matched: lines.append({ "y": y0, "spans": [(x0, text, font, size, color, flags)] }) lines.sort(key=lambda l: l["y"]) # Join text inside each line with formatting info final_lines = [] for l in lines: l["spans"].sort(key=lambda s: s[0]) # left → right # Collect all text and formatting for this line line_text = " ".join(text for _, text, _, _, _, _ in l["spans"]) # Get dominant formatting for the line (based on first span) if l["spans"]: _, _, font, size, color, flags = l["spans"][0] # Store line with its formatting line_with_format = { 'text': line_text, 'font': font, 'size': size, 'color': color, 'flags': flags, 'page': pno + 1, 'y_position': l["y"] } final_lines.append(line_with_format) # Result for line_data in final_lines: line_text = line_data['text'] print(line_text) if line_text: # Create a formatted string with text properties format_info = f"Font: {line_data['font']}, Size: {line_data['size']}, Color: {line_data['color']}" lines_for_prompt.append(f"PAGE {pno+1}: {line_text} [{format_info}]") lines_on_page += 1 if lines_on_page > 0: logger.debug(f"Page {pno}: collected {lines_on_page} lines") total_lines += lines_on_page logger.info(f"Total lines collected for LLM: {total_lines}") if not lines_for_prompt: logger.warning("No lines collected for prompt") return [] # Log sample of lines logger.info("Sample lines (first 10):") for i, line in enumerate(lines_for_prompt[:10]): logger.info(f" {i}: {line}") prompt =LLM_prompt + "\n\nLines:\n" + "\n".join(lines_for_prompt) logger.debug(f"Full prompt length: {len(prompt)} characters") # Changed: Print entire prompt, not truncated print("=" * 80) print("FULL LLM PROMPT:") print(prompt) print("=" * 80) # Also log to file try: with open("full_prompt.txt", "w", encoding="utf-8") as f: f.write(prompt) logger.info("Full prompt saved to full_prompt.txt") except Exception as e: logger.error(f"Could not save prompt to file: {e}") if not api_key: # No API key: return empty so caller can fallback to heuristics logger.error("No API key provided") return [] url = "https://openrouter.ai/api/v1/chat/completions" # Build headers following the OpenRouter example headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", "HTTP-Referer": os.getenv("OPENROUTER_REFERER", ""), "X-Title": os.getenv("OPENROUTER_X_TITLE", ""), # "X-Request-Timestamp": str(unix_timestamp), # "X-Request-Datetime": current_time, } # Log request details (without exposing full API key) logger.info(f"Making request to OpenRouter with model: {model}") logger.debug(f"Headers (API key masked): { {k: '***' if k == 'Authorization' else v for k, v in headers.items()} }") # Wrap the prompt as the example 'content' array expected by OpenRouter body = { "model": model, "messages": [ { "role": "user", "content": [ {"type": "text", "text": prompt} ] } ] } # print(f"Request sent at: {current_time}") # print(f"Unix timestamp: {unix_timestamp}") # Debug: log request body (truncated) and write raw response for inspection try: # Changed: Log full body (excluding prompt text which is already logged) logger.debug(f"Request body (without prompt text): { {k: v if k != 'messages' else '[...prompt...]' for k, v in body.items()} }") # Removed timeout parameter resp = requests.post( url=url, headers=headers, data=json.dumps(body) ) logger.info(f"HTTP Response Status: {resp.status_code}") resp.raise_for_status() resp_text = resp.text # Changed: Print entire response print("=" * 80) print("FULL LLM RESPONSE:") print(resp_text) print("=" * 80) logger.info(f"LLM raw response length: {len(resp_text)}") # Save raw response for offline inspection try: with open("llm_debug.json", "w", encoding="utf-8") as fh: fh.write(resp_text) logger.info("Raw response saved to llm_debug.json") except Exception as e: logger.error(f"Warning: could not write llm_debug.json: {e}") rj = resp.json() logger.info(f"LLM parsed response type: {type(rj)}") if isinstance(rj, dict): logger.debug(f"Response keys: {list(rj.keys())}") except requests.exceptions.RequestException as e: logger.error(f"HTTP request failed: {repr(e)}") return [] except Exception as e: logger.error(f"LLM call failed: {repr(e)}") return [] # Extract textual reply robustly text_reply = None if isinstance(rj, dict): choices = rj.get('choices') or [] logger.debug(f"Number of choices in response: {len(choices)}") if choices: for i, c in enumerate(choices): logger.debug(f"Choice {i}: {c}") c0 = choices[0] msg = c0.get('message') or c0.get('delta') or {} content = msg.get('content') if isinstance(content, list): logger.debug(f"Content is a list with {len(content)} items") for idx, c in enumerate(content): if c.get('type') == 'text' and c.get('text'): text_reply = c.get('text') logger.debug(f"Found text reply in content[{idx}], length: {len(text_reply)}") break elif isinstance(content, str): text_reply = content logger.debug(f"Content is string, length: {len(text_reply)}") elif isinstance(msg, dict) and msg.get('content') and isinstance(msg.get('content'), dict): text_reply = msg.get('content').get('text') logger.debug(f"Found text in nested content dict") # Fallback extraction if not text_reply: logger.debug("Trying fallback extraction from choices") for c in rj.get('choices', []): if isinstance(c.get('text'), str): text_reply = c.get('text') logger.debug(f"Found text reply in choice.text, length: {len(text_reply)}") break if not text_reply: logger.error("Could not extract text reply from response") # Changed: Print the entire response structure for debugging print("=" * 80) print("FAILED TO EXTRACT TEXT REPLY. FULL RESPONSE STRUCTURE:") print(json.dumps(rj, indent=2)) print("=" * 80) return [] # Changed: Print the extracted text reply print("=" * 80) print("EXTRACTED TEXT REPLY:") print(text_reply) print("=" * 80) logger.info(f"Extracted text reply length: {len(text_reply)}") logger.debug(f"First 500 chars of reply: {text_reply[:500]}...") s = text_reply.strip() start = s.find('[') end = s.rfind(']') js = s[start:end+1] if start != -1 and end != -1 else s logger.debug(f"Looking for JSON array: start={start}, end={end}") logger.debug(f"Extracted JSON string (first 500 chars): {js[:500]}...") try: parsed = json.loads(js) logger.info(f"Successfully parsed JSON, got {len(parsed)} items") except json.JSONDecodeError as e: logger.error(f"Failed to parse JSON: {e}") logger.error(f"JSON string that failed to parse: {js[:1000]}") # Try to find any JSON-like structure try: # Try to extract any JSON array import re json_pattern = r'\[\s*\{.*?\}\s*\]' matches = re.findall(json_pattern, text_reply, re.DOTALL) if matches: logger.info(f"Found {len(matches)} potential JSON arrays via regex") for i, match in enumerate(matches): try: parsed = json.loads(match) logger.info(f"Successfully parsed regex match {i} with {len(parsed)} items") break except json.JSONDecodeError as e2: logger.debug(f"Regex match {i} also failed: {e2}") continue else: logger.error("All regex matches failed to parse") return [] else: logger.error("No JSON-like pattern found via regex") return [] except Exception as e2: logger.error(f"Regex extraction also failed: {e2}") return [] # Log parsed results logger.info(f"Parsed {len(parsed)} header items:") for i, obj in enumerate(parsed[:10]): # Log first 10 items logger.info(f" Item {i}: {obj}") # Normalize parsed entries and return out = [] for obj in parsed: t = obj.get('text') page = int(obj.get('page')) if obj.get('page') else None level = obj.get('suggested_level') conf = float(obj.get('confidence') or 0) if t and page is not None: out.append({'text': t, 'page': page-1, 'suggested_level': level, 'confidence': conf}) logger.info(f"Returning {len(out)} valid header entries") return out # def identify_headers_and_save_excel(pdf_path, model, llm_prompt): # try: # # 1. Get the result from your LLM function # result = identify_headers_with_openrouter(pdf_path, model, llm_prompt) # # 2. Safety Check: If LLM failed or returned nothing # if not result: # logger.warning("No headers found or LLM failed. Creating an empty report.") # df = pd.DataFrame([{"System Message": "No headers were identified by the LLM."}]) # else: # df = pd.DataFrame(result) # # 3. Use an Absolute Path for the output # # This ensures Gradio knows exactly where the file is # output_path = os.path.abspath("header_analysis_output.xlsx") # # 4. Save using the engine explicitly # df.to_excel(output_path, index=False, engine='openpyxl') # logger.info(f"File successfully saved to {output_path}") # return output_path # except Exception as e: # logger.error(f"Critical error in processing: {str(e)}") # # Return None or a custom error message to Gradio # return None def extract_section_under_header_tobebilledMultiplePDFS(multiplePDF_Paths,model,identified_headers): logger.debug(f"Starting function") # keywordstoSkip=["installation", "execution", "miscellaneous items", "workmanship", "testing", "labeling"] filenames=[] keywords = {'installation', 'execution', 'miscellaneous items', 'workmanship', 'testing', 'labeling'} arrayofPDFS=multiplePDF_Paths.split(',') print(multiplePDF_Paths) print(arrayofPDFS) docarray=[] jsons=[] df = pd.DataFrame(columns=["PDF Name","NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2","BodyText"]) for pdf_path in arrayofPDFS: headertoContinue1 = False headertoContinue2=False Alltexttobebilled='' parsed_url = urlparse(pdf_path) filename = os.path.basename(parsed_url.path) filename = unquote(filename) # decode URL-encoded characters filenames.append(filename) logger.debug(f"Starting with pdf: {filename}") # Optimized URL handling if pdf_path and ('http' in pdf_path or 'dropbox' in pdf_path): pdf_path = pdf_path.replace('dl=0', 'dl=1') # Cache frequently used values response = requests.get(pdf_path) pdf_content = BytesIO(response.content) if not pdf_content: raise ValueError("No valid PDF content found.") doc = fitz.open(stream=pdf_content, filetype="pdf") logger.info(f"Total pages in document: {len(doc)}") docHighlights = fitz.open(stream=pdf_content, filetype="pdf") most_common_font_size, most_common_color, most_common_font = get_regular_font_size_and_color(doc) # Precompute regex patterns dot_pattern = re.compile(r'\.{3,}') url_pattern = re.compile(r'https?://\S+|www\.\S+') toc_pages = get_toc_page_numbers(doc) logger.info(f"Skipping TOC pages: Range {toc_pages}") # headers, top_3_font_sizes, smallest_font_size, headersSpans = extract_headers( # doc, toc_pages, most_common_font_size, most_common_color, most_common_font, top_margin, bottom_margin # ) logger.info(f"Starting model run.") # identified_headers = identify_headers_with_openrouterNEWW(doc, model) allheaders_LLM=[] for h in identified_headers: if int(h["page"]) in toc_pages: continue if h['text']: allheaders_LLM.append(h['text']) logger.info(f"Done with model.") print('identified_headers',identified_headers) headers_json=headers_with_location(doc,identified_headers) headers=filter_headers_outside_toc(headers_json,toc_pages) hierarchy=build_hierarchy_from_llm(headers) listofHeaderstoMarkup = get_leaf_headers_with_paths(hierarchy) logger.info(f"Hierarchy built as {hierarchy}") # Precompute all children headers once allchildrenheaders = [normalize_text(item['text']) for item, p in listofHeaderstoMarkup] allchildrenheaders_set = set(allchildrenheaders) # For faster lookups # df = pd.DataFrame(columns=["NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2","BodyText"]) dictionaryNBS={} data_list_JSON = [] json_output=[] currentgroupname='' # if len(top_3_font_sizes)==3: # mainHeaderFontSize, subHeaderFontSize, subsubheaderFontSize = top_3_font_sizes # elif len(top_3_font_sizes)==2: # mainHeaderFontSize= top_3_font_sizes[0] # subHeaderFontSize= top_3_font_sizes[1] # subsubheaderFontSize= top_3_font_sizes[1] # Preload all pages to avoid repeated loading # pages = [doc.load_page(page_num) for page_num in range(len(doc)) if page_num not in toc_pages] for heading_to_searchDict,pathss in listofHeaderstoMarkup: heading_to_search = heading_to_searchDict['text'] heading_to_searchPageNum = heading_to_searchDict['page'] paths=heading_to_searchDict['path'] # Initialize variables headertoContinue1 = False headertoContinue2 = False matched_header_line = None done = False collecting = False collected_lines = [] page_highlights = {} current_bbox = {} last_y1s = {} mainHeader = '' subHeader = '' matched_header_line_norm = heading_to_search break_collecting = False heading_norm = normalize_text(heading_to_search) paths_norm = [normalize_text(p) for p in paths[0]] if paths and paths[0] else [] for page_num in range(heading_to_searchPageNum,len(doc)): # print(heading_to_search) if paths[0].strip().lower() != currentgroupname.strip().lower(): Alltexttobebilled+= paths[0] +'\n' currentgroupname=paths[0] # print(paths[0]) if page_num in toc_pages: continue if break_collecting: break page=doc[page_num] page_height = page.rect.height blocks = page.get_text("dict")["blocks"] for block in blocks: if break_collecting: break lines = block.get("lines", []) i = 0 while i < len(lines): if break_collecting: break spans = lines[i].get("spans", []) if not spans: i += 1 continue y0 = spans[0]["bbox"][1] y1 = spans[0]["bbox"][3] if y0 < top_margin or y1 > (page_height - bottom_margin): i += 1 continue line_text = get_spaced_text_from_spans(spans).lower() line_text_norm = normalize_text(line_text) # Combine with next line if available if i + 1 < len(lines): next_spans = lines[i + 1].get("spans", []) next_line_text = get_spaced_text_from_spans(next_spans).lower() combined_line_norm = normalize_text(line_text + " " + next_line_text) else: combined_line_norm = line_text_norm # Check if we should continue processing if combined_line_norm and combined_line_norm in paths[0]: headertoContinue1 = combined_line_norm if combined_line_norm and combined_line_norm in paths[-2]: headertoContinue2 = combined_line_norm # if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : last_path = paths[-2].lower() # if any(word in paths[-2].lower() for word in keywordstoSkip): # if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() or 'workmanship' in paths[-2].lower() or 'testing' in paths[-2].lower() or 'labeling' in paths[-2].lower(): if any(keyword in last_path for keyword in keywords): stringtowrite='Not to be billed' logger.info(f"Keyword found. Not to be billed activated. keywords: {keywords}") else: stringtowrite='To be billed' if stringtowrite=='To be billed': # Alltexttobebilled+= combined_line_norm ################################################# if matched_header_line_norm in combined_line_norm: Alltexttobebilled+='\n' Alltexttobebilled+= ' '+combined_line_norm # Optimized header matching existsfull = ( ( combined_line_norm in allchildrenheaders_set or combined_line_norm in allchildrenheaders ) and heading_to_search in combined_line_norm ) # New word-based matching current_line_words = set(combined_line_norm.split()) heading_words = set(heading_norm.split()) all_words_match = current_line_words.issubset(heading_words) and len(current_line_words) > 0 substring_match = ( heading_norm in combined_line_norm or combined_line_norm in heading_norm or all_words_match # Include the new word-based matching ) # substring_match = ( # heading_norm in combined_line_norm or # combined_line_norm in heading_norm # ) if (substring_match and existsfull and not collecting and len(combined_line_norm) > 0 ):#and (headertoContinue1 or headertoContinue2) ): # Check header conditions more efficiently # header_spans = [ # span for span in spans # if (is_header(span, most_common_font_size, most_common_color, most_common_font) # # and span['size'] >= subsubheaderFontSize # and span['size'] < mainHeaderFontSize) # ] if stringtowrite.startswith('To') : collecting = True # if stringtowrite=='To be billed': # Alltexttobebilled+='\n' # matched_header_font_size = max(span["size"] for span in header_spans) # collected_lines.append(line_text) valid_spans = [span for span in spans if span.get("bbox")] if valid_spans: x0s = [span["bbox"][0] for span in valid_spans] x1s = [span["bbox"][2] for span in valid_spans] y0s = [span["bbox"][1] for span in valid_spans] y1s = [span["bbox"][3] for span in valid_spans] header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] if page_num in current_bbox: cb = current_bbox[page_num] current_bbox[page_num] = [ min(cb[0], header_bbox[0]), min(cb[1], header_bbox[1]), max(cb[2], header_bbox[2]), max(cb[3], header_bbox[3]) ] else: current_bbox[page_num] = header_bbox last_y1s[page_num] = header_bbox[3] x0, y0, x1, y1 = header_bbox zoom = 200 left = int(x0) top = int(y0) zoom_str = f"{zoom},{left},{top}" pageNumberFound = page_num + 1 # Build the query parameters params = { 'pdfLink': pdf_path, # Your PDF link 'keyword': heading_to_search, # Your keyword (could be a string or list) } # URL encode each parameter encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} # Construct the final encoded link encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) # Correctly construct the final URL with page and zoom # final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" # Get current date and time now = datetime.now() # Format the output formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") # Optionally, add the URL to a DataFrame data_entry = { "PDF Name":filename, "NBSLink": zoom_str, "Subject": heading_to_search, "Page": str(pageNumberFound), "Author": "ADR", "Creation Date": formatted_time, "Layer": "Initial", "Code": stringtowrite, # "head above 1": paths[-2], # "head above 2": paths[0], "BodyText":collected_lines, "MC Connnection": 'Go to ' + paths[0].strip().split()[0] +'/'+ heading_to_search.strip().split()[0] + ' in '+ filename } # Dynamically add "head above 1", "head above 2", ... depending on the number of levels for i, path_text in enumerate(paths[:-1]): # skip the last one because that's the current heading data_entry[f"head above {i+1}"] = path_text data_list_JSON.append(data_entry) # Convert list to JSON # json_output = [data_list_JSON] # json_output = json.dumps(data_list_JSON, indent=4) i += 2 continue else: if (substring_match and not collecting and len(combined_line_norm) > 0): # and (headertoContinue1 or headertoContinue2) ): # Calculate word match percentage word_match_percent = words_match_ratio(heading_norm, combined_line_norm) * 100 # Check if at least 70% of header words exist in this line meets_word_threshold = word_match_percent >= 100 # Check header conditions (including word threshold) # header_spans = [ # span for span in spans # if (is_header(span, most_common_font_size, most_common_color, most_common_font) # # and span['size'] >= subsubheaderFontSize # and span['size'] < mainHeaderFontSize) # ] if (meets_word_threshold or same_start_word(heading_to_search, combined_line_norm) ) and stringtowrite.startswith('To'): collecting = True if stringtowrite=='To be billed': Alltexttobebilled+='\n' # if stringtowrite=='To be billed': # Alltexttobebilled+= ' '+ combined_line_norm # matched_header_font_size = max(span["size"] for span in header_spans) collected_lines.append(line_text) valid_spans = [span for span in spans if span.get("bbox")] if valid_spans: x0s = [span["bbox"][0] for span in valid_spans] x1s = [span["bbox"][2] for span in valid_spans] y0s = [span["bbox"][1] for span in valid_spans] y1s = [span["bbox"][3] for span in valid_spans] header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] if page_num in current_bbox: cb = current_bbox[page_num] current_bbox[page_num] = [ min(cb[0], header_bbox[0]), min(cb[1], header_bbox[1]), max(cb[2], header_bbox[2]), max(cb[3], header_bbox[3]) ] else: current_bbox[page_num] = header_bbox last_y1s[page_num] = header_bbox[3] x0, y0, x1, y1 = header_bbox zoom = 200 left = int(x0) top = int(y0) zoom_str = f"{zoom},{left},{top}" pageNumberFound = page_num + 1 # Build the query parameters params = { 'pdfLink': pdf_path, # Your PDF link 'keyword': heading_to_search, # Your keyword (could be a string or list) } # URL encode each parameter encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} # Construct the final encoded link encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) # Correctly construct the final URL with page and zoom # final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" # Get current date and time now = datetime.now() # Format the output formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") # Optionally, add the URL to a DataFrame logger.info(f"Logging into table") data_entry = { "PDF Name":filename, "NBSLink": zoom_str, "Subject": heading_to_search, "Page": str(pageNumberFound), "Author": "ADR", "Creation Date": formatted_time, "Layer": "Initial", "Code": stringtowrite, # "head above 1": paths[-2], # "head above 2": paths[0], "BodyText":collected_lines, "MC Connnection": 'Go to ' + paths[0].strip().split()[0] +'/'+ heading_to_search.strip().split()[0] + ' in '+ filename } # Dynamically add "head above 1", "head above 2", ... depending on the number of levels for i, path_text in enumerate(paths[:-1]): # skip the last one because that's the current heading data_entry[f"head above {i+1}"] = path_text data_list_JSON.append(data_entry) # Convert list to JSON # json_output = [data_list_JSON] # json_output = json.dumps(data_list_JSON, indent=4) i += 2 continue if collecting: norm_line = normalize_text(line_text) # Optimized URL check if url_pattern.match(norm_line): line_is_header = False else: # line_is_header = any(is_header(span, most_common_font_size, most_common_color, most_common_font) for span in spans) def normalize(text): return " ".join(text.lower().split()) line_text = " ".join(span["text"] for span in spans).strip() line_is_header = any( normalize(line_text) == normalize(header) for header in allheaders_LLM ) if line_is_header: header_font_size = max(span["size"] for span in spans) is_probably_real_header = ( # header_font_size >= matched_header_font_size and # is_header(spans[0], most_common_font_size, most_common_color, most_common_font) and len(line_text.strip()) > 2 ) if (norm_line != matched_header_line_norm and norm_line != heading_norm and is_probably_real_header): if line_text not in heading_norm: collecting = False done = True headertoContinue1 = False headertoContinue2=False for page_num, bbox in current_bbox.items(): bbox[3] = last_y1s.get(page_num, bbox[3]) page_highlights[page_num] = bbox highlight_boxes(docHighlights, page_highlights,stringtowrite) break_collecting = True break if break_collecting: break collected_lines.append(line_text) valid_spans = [span for span in spans if span.get("bbox")] if valid_spans: x0s = [span["bbox"][0] for span in valid_spans] x1s = [span["bbox"][2] for span in valid_spans] y0s = [span["bbox"][1] for span in valid_spans] y1s = [span["bbox"][3] for span in valid_spans] line_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] if page_num in current_bbox: cb = current_bbox[page_num] current_bbox[page_num] = [ min(cb[0], line_bbox[0]), min(cb[1], line_bbox[1]), max(cb[2], line_bbox[2]), max(cb[3], line_bbox[3]) ] else: current_bbox[page_num] = line_bbox last_y1s[page_num] = line_bbox[3] i += 1 if not done: for page_num, bbox in current_bbox.items(): bbox[3] = last_y1s.get(page_num, bbox[3]) page_highlights[page_num] = bbox if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : stringtowrite='Not to be billed' else: stringtowrite='To be billed' highlight_boxes(docHighlights, page_highlights,stringtowrite) docarray.append(docHighlights) if data_list_JSON and not data_list_JSON[-1]["BodyText"] and collected_lines: data_list_JSON[-1]["BodyText"] = collected_lines[1:] if len(collected_lines) > 0 else [] # Final cleanup of the JSON data before returning for entry in data_list_JSON: # Check if BodyText exists and has content if isinstance(entry.get("BodyText"), list) and len(entry["BodyText"]) > 0: # Check if the first line of the body is essentially the same as the Subject first_line = normalize_text(entry["BodyText"][0]) subject = normalize_text(entry["Subject"]) # If they match or the subject is inside the first line, remove it if subject in first_line or first_line in subject: entry["BodyText"] = entry["BodyText"][1:] jsons.append(data_list_JSON) logger.info(f"Markups done! Uploading to dropbox") logger.info(f"Uploaded and Readyy!") return jsons,identified_headers def testFunction(pdf_path, model,LLM_prompt): Alltexttobebilled='' alltextWithoutNotbilled='' # keywordstoSkip=["installation", "execution", "miscellaneous items", "workmanship", "testing", "labeling"] headertoContinue1 = False headertoContinue2=False parsed_url = urlparse(pdf_path) filename = os.path.basename(parsed_url.path) filename = unquote(filename) # decode URL-encoded characters # Optimized URL handling if pdf_path and ('http' in pdf_path or 'dropbox' in pdf_path): pdf_path = pdf_path.replace('dl=0', 'dl=1') # Cache frequently used values response = requests.get(pdf_path) pdf_content = BytesIO(response.content) if not pdf_content: raise ValueError("No valid PDF content found.") doc = fitz.open(stream=pdf_content, filetype="pdf") docHighlights = fitz.open(stream=pdf_content, filetype="pdf") parsed_url = urlparse(pdf_path) filename = os.path.basename(parsed_url.path) filename = unquote(filename) # decode URL-encoded characters #### Get regular tex font size, style , color most_common_font_size, most_common_color, most_common_font = get_regular_font_size_and_color(doc) # Precompute regex patterns dot_pattern = re.compile(r'\.{3,}') url_pattern = re.compile(r'https?://\S+|www\.\S+') highlighted=[] processed_subjects = set() # Initialize at the top of testFunction toc_pages = get_toc_page_numbers(doc) identified_headers=process_document_in_chunks(len(doc), pdf_path, LLM_prompt, model) # identified_headers = identify_headers_with_openrouterNEWW(doc, api_key='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')# ['text', fontsize, page number,y] # with open("identified_headers.txt", "w", encoding="utf-8") as f: # json.dump(identified_headers, f, indent=4) # with open("identified_headers.txt", "r", encoding="utf-8") as f: # identified_headers = json.load(f) print(identified_headers) allheaders_LLM=[] for h in identified_headers: if int(h["page"]) in toc_pages: continue if h['text']: allheaders_LLM.append(h['text']) headers_json=headers_with_location(doc,identified_headers) headers=filter_headers_outside_toc(headers_json,toc_pages) hierarchy=build_hierarchy_from_llm(headers) # identify_headers_and_save_excel(hierarchy) listofHeaderstoMarkup = get_leaf_headers_with_paths(hierarchy) allchildrenheaders = [normalize_text(item['text']) for item, p in listofHeaderstoMarkup] allchildrenheaders_set = set(allchildrenheaders) # For faster lookups # print('allchildrenheaders_set',allchildrenheaders_set) df = pd.DataFrame(columns=["NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2",'BodyText']) dictionaryNBS={} data_list_JSON = [] for heading_to_searchDict,pathss in listofHeaderstoMarkup: heading_to_search = heading_to_searchDict['text'] heading_to_searchPageNum = heading_to_searchDict['page'] paths=heading_to_searchDict['path'] # xloc=heading_to_searchDict['x'] yloc=heading_to_searchDict['y'] # Initialize variables headertoContinue1 = False headertoContinue2 = False matched_header_line = None done = False collecting = False collected_lines = [] page_highlights = {} current_bbox = {} last_y1s = {} mainHeader = '' subHeader = '' matched_header_line_norm = heading_to_search break_collecting = False heading_norm = normalize_text(heading_to_search) paths_norm = [normalize_text(p) for p in paths[0]] if paths and paths[0] else [] for page_num in range(heading_to_searchPageNum,len(doc)): if page_num in toc_pages: continue if break_collecting: break page=doc[page_num] page_height = page.rect.height blocks = page.get_text("dict")["blocks"] for block in blocks: if break_collecting: break lines = block.get("lines", []) i = 0 while i < len(lines): if break_collecting: break spans = lines[i].get("spans", []) if not spans: i += 1 continue # y0 = spans[0]["bbox"][1] # y1 = spans[0]["bbox"][3] x0 = spans[0]["bbox"][0] # left x1 = spans[0]["bbox"][2] # right y0 = spans[0]["bbox"][1] # top y1 = spans[0]["bbox"][3] # bottom if y0 < top_margin or y1 > (page_height - bottom_margin): i += 1 continue line_text = get_spaced_text_from_spans(spans).lower() line_text_norm = normalize_text(line_text) # Combine with next line if available if i + 1 < len(lines): next_spans = lines[i + 1].get("spans", []) next_line_text = get_spaced_text_from_spans(next_spans).lower() combined_line_norm = normalize_text(line_text + " " + next_line_text) else: combined_line_norm = line_text_norm # Check if we should continue processing if combined_line_norm and combined_line_norm in paths[0]: headertoContinue1 = combined_line_norm if combined_line_norm and combined_line_norm in paths[-2]: headertoContinue2 = combined_line_norm # print('paths',paths) # if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : # if any(word in paths[-2].lower() for word in keywordstoSkip): # stringtowrite='Not to be billed' # else: stringtowrite='To be billed' if stringtowrite!='To be billed': alltextWithoutNotbilled+= combined_line_norm ################################################# # Optimized header matching existsfull = ( ( combined_line_norm in allchildrenheaders_set or combined_line_norm in allchildrenheaders ) and heading_to_search in combined_line_norm ) # existsfull=False # if xloc==x0 and yloc ==y0: # existsfull=True # New word-based matching current_line_words = set(combined_line_norm.split()) heading_words = set(heading_norm.split()) all_words_match = current_line_words.issubset(heading_words) and len(current_line_words) > 0 substring_match = ( heading_norm in combined_line_norm or combined_line_norm in heading_norm or all_words_match # Include the new word-based matching ) # substring_match = ( # heading_norm in combined_line_norm or # combined_line_norm in heading_norm # ) if ( substring_match and existsfull and not collecting and len(combined_line_norm) > 0 ):#and (headertoContinue1 or headertoContinue2) ): # Check header conditions more efficiently # header_spans = [ # span for span in spans # if (is_header(span, most_common_font_size, most_common_color, most_common_font) ) # # and span['size'] >= subsubheaderFontSize # # and span['size'] < mainHeaderFontSize) # ] if stringtowrite.startswith('To'): collecting = True # matched_header_font_size = max(span["size"] for span in header_spans) Alltexttobebilled+= ' '+ combined_line_norm # collected_lines.append(line_text) valid_spans = [span for span in spans if span.get("bbox")] if valid_spans: x0s = [span["bbox"][0] for span in valid_spans] x1s = [span["bbox"][2] for span in valid_spans] y0s = [span["bbox"][1] for span in valid_spans] y1s = [span["bbox"][3] for span in valid_spans] header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] if page_num in current_bbox: cb = current_bbox[page_num] current_bbox[page_num] = [ min(cb[0], header_bbox[0]), min(cb[1], header_bbox[1]), max(cb[2], header_bbox[2]), max(cb[3], header_bbox[3]) ] else: current_bbox[page_num] = header_bbox last_y1s[page_num] = header_bbox[3] x0, y0, x1, y1 = header_bbox zoom = 200 left = int(x0) top = int(y0) zoom_str = f"{zoom},{left},{top}" pageNumberFound = page_num + 1 # Build the query parameters params = { 'pdfLink': pdf_path, # Your PDF link 'keyword': heading_to_search, # Your keyword (could be a string or list) } # URL encode each parameter encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} # Construct the final encoded link encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) # Correctly construct the final URL with page and zoom # final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" # Get current date and time now = datetime.now() # Format the output formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") # Optionally, add the URL to a DataFrame # Create the data entry only if the subject is unique if heading_to_search not in processed_subjects: data_entry = { "NBSLink": zoom_str, "Subject": heading_to_search, "Page": str(pageNumberFound), "Author": "ADR", "Creation Date": formatted_time, "Layer": "Initial", "Code": stringtowrite, "BodyText": collected_lines, "MC Connnection": 'Go to ' + paths[0].strip().split()[0] + '/' + heading_to_search.strip().split()[0] + ' in ' + filename } # Dynamically add hierarchy paths for i, path_text in enumerate(paths[:-1]): data_entry[f"head above {i+1}"] = path_text # Append to the list and mark this subject as processed data_list_JSON.append(data_entry) processed_subjects.add(heading_to_search) else: print(f"Skipping duplicate data entry for Subject: {heading_to_search}") # Convert list to JSON json_output = json.dumps(data_list_JSON, indent=4) i += 1 continue else: if (substring_match and not collecting and len(combined_line_norm) > 0): # and (headertoContinue1 or headertoContinue2) ): # Calculate word match percentage word_match_percent = words_match_ratio(heading_norm, combined_line_norm) * 100 # Check if at least 70% of header words exist in this line meets_word_threshold = word_match_percent >= 100 # Check header conditions (including word threshold) # header_spans = [ # span for span in spans # if (is_header(span, most_common_font_size, most_common_color, most_common_font)) # # and span['size'] >= subsubheaderFontSize # # and span['size'] < mainHeaderFontSize) # ] if (meets_word_threshold or same_start_word(heading_to_search, combined_line_norm) ) and stringtowrite.startswith('To'): collecting = True # matched_header_font_size = max(span["size"] for span in header_spans) Alltexttobebilled+= ' '+ combined_line_norm collected_lines.append(line_text) valid_spans = [span for span in spans if span.get("bbox")] if valid_spans: x0s = [span["bbox"][0] for span in valid_spans] x1s = [span["bbox"][2] for span in valid_spans] y0s = [span["bbox"][1] for span in valid_spans] y1s = [span["bbox"][3] for span in valid_spans] header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] if page_num in current_bbox: cb = current_bbox[page_num] current_bbox[page_num] = [ min(cb[0], header_bbox[0]), min(cb[1], header_bbox[1]), max(cb[2], header_bbox[2]), max(cb[3], header_bbox[3]) ] else: current_bbox[page_num] = header_bbox last_y1s[page_num] = header_bbox[3] x0, y0, x1, y1 = header_bbox zoom = 200 left = int(x0) top = int(y0) zoom_str = f"{zoom},{left},{top}" pageNumberFound = page_num + 1 # Build the query parameters params = { 'pdfLink': pdf_path, # Your PDF link 'keyword': heading_to_search, # Your keyword (could be a string or list) } # URL encode each parameter encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} # Construct the final encoded link encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) # Correctly construct the final URL with page and zoom # final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" # Get current date and time now = datetime.now() # Format the output formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") # Optionally, add the URL to a DataFrame # Create the data entry only if the subject is unique if heading_to_search not in processed_subjects: data_entry = { "NBSLink": zoom_str, "Subject": heading_to_search, "Page": str(pageNumberFound), "Author": "ADR", "Creation Date": formatted_time, "Layer": "Initial", "Code": stringtowrite, "BodyText": collected_lines, "MC Connnection": 'Go to ' + paths[0].strip().split()[0] + '/' + heading_to_search.strip().split()[0] + ' in ' + filename } # Dynamically add hierarchy paths for i, path_text in enumerate(paths[:-1]): data_entry[f"head above {i+1}"] = path_text # Append to the list and mark this subject as processed data_list_JSON.append(data_entry) processed_subjects.add(heading_to_search) else: print(f"Skipping duplicate data entry for Subject: {heading_to_search}") # Convert list to JSON json_output = json.dumps(data_list_JSON, indent=4) i += 2 continue if collecting: norm_line = normalize_text(line_text) def normalize(text): if isinstance(text, list): text = " ".join(text) return " ".join(text.lower().split()) def is_similar(a, b, threshold=0.75): return SequenceMatcher(None, a, b).ratio() >= threshold # Optimized URL check if url_pattern.match(norm_line): line_is_header = False else: line_is_header = any(is_header(span, most_common_font_size, most_common_color, most_common_font,allheaders_LLM) for span in spans) # def normalize(text): # return " ".join(text.lower().split()) # line_text = " ".join(span["text"] for span in spans).strip() # line_is_header = any( normalize(line_text) == normalize(header) for header in allheaders_LLM ) # for line_text in lines: # if collecting: # # Join all spans into one line # line_text = " ".join(span["text"] for span in spans).strip() # norm_line = normalize(line_text) # # Get max font size in this line # max_font_size = max(span.get("size", 0) for span in spans) # # Skip URLs # if url_pattern.match(norm_line): # line_is_header = False # else: # text_matches_header = any( # is_similar(norm_line, normalize(header)) # if not isinstance(header, list) # else is_similar(norm_line, normalize(" ".join(header))) # for header in allheaders_LLM # ) # # ✅ FINAL header condition # line_is_header = text_matches_header and max_font_size > 11 if line_is_header: header_font_size = max(span["size"] for span in spans) is_probably_real_header = ( # header_font_size >= matched_header_font_size and # is_header(spans[0], most_common_font_size, most_common_color, most_common_font) and len(line_text.strip()) > 2 ) if (norm_line != matched_header_line_norm and norm_line != heading_norm and is_probably_real_header): if line_text not in heading_norm: collecting = False done = True headertoContinue1 = False headertoContinue2=False for page_num, bbox in current_bbox.items(): bbox[3] = last_y1s.get(page_num, bbox[3]) page_highlights[page_num] = bbox can_highlight=False if [page_num,bbox] not in highlighted: highlighted.append([page_num,bbox]) can_highlight=True if can_highlight: highlight_boxes(docHighlights, page_highlights,stringtowrite) break_collecting = True break if break_collecting: break collected_lines.append(line_text) valid_spans = [span for span in spans if span.get("bbox")] if valid_spans: x0s = [span["bbox"][0] for span in valid_spans] x1s = [span["bbox"][2] for span in valid_spans] y0s = [span["bbox"][1] for span in valid_spans] y1s = [span["bbox"][3] for span in valid_spans] line_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] if page_num in current_bbox: cb = current_bbox[page_num] current_bbox[page_num] = [ min(cb[0], line_bbox[0]), min(cb[1], line_bbox[1]), max(cb[2], line_bbox[2]), max(cb[3], line_bbox[3]) ] else: current_bbox[page_num] = line_bbox last_y1s[page_num] = line_bbox[3] i += 1 if not done: for page_num, bbox in current_bbox.items(): bbox[3] = last_y1s.get(page_num, bbox[3]) page_highlights[page_num] = bbox # if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : # stringtowrite='Not to be billed' # else: stringtowrite='To be billed' highlight_boxes(docHighlights, page_highlights,stringtowrite) print("Current working directory:", os.getcwd()) docHighlights.save("highlighted_output.pdf") # dbxTeam = tsadropboxretrieval.ADR_Access_DropboxTeam('user') # metadata = dbxTeam.sharing_get_shared_link_metadata(pdf_path) # dbPath = '/TSA JOBS/ADR Test/FIND/' # pdf_bytes = BytesIO() # docHighlights.save(pdf_bytes) # pdflink = tsadropboxretrieval.uploadanyFile(doc=docHighlights, path=dbPath, pdfname=filename) # json_output=changepdflinks(json_output,pdflink) # return pdf_bytes.getvalue(), docHighlights , json_output , Alltexttobebilled , alltextWithoutNotbilled , filename # Final safety check: if the very last entry in our list has an empty BodyText, # but we have collected_lines, sync them. if data_list_JSON and not data_list_JSON[-1]["BodyText"] and collected_lines: data_list_JSON[-1]["BodyText"] = collected_lines[1:] if len(collected_lines) > 0 else [] # Final cleanup of the JSON data before returning for entry in data_list_JSON: # Check if BodyText exists and has content if isinstance(entry.get("BodyText"), list) and len(entry["BodyText"]) > 0: # Check if the first line of the body is essentially the same as the Subject first_line = normalize_text(entry["BodyText"][0]) subject = normalize_text(entry["Subject"]) # If they match or the subject is inside the first line, remove it if subject in first_line or first_line in subject: entry["BodyText"] = entry["BodyText"][1:] print('data_list_JSON',data_list_JSON) # json_output.append(data_list_JSON) json_output = json.dumps(data_list_JSON, indent=4) logger.info(f"Markups done! Uploading to dropbox") logger.info(f"Uploaded and Readyy!") return json_output,identified_headers def build_subject_body_map(jsons): subject_body = {} for obj in jsons: subject = obj.get("Subject") body = obj.get("BodyText", []) if subject: # join body text into a readable paragraph subject_body[subject.strip()] = " ".join(body) return subject_body # def identify_headers_and_save_excel(pdf_path, model,LLM_prompt): # try: # # result = identify_headers_with_openrouterNEWW(pdf_path, model,LLM_prompt) # print('beginnging identify') # jsons,result = testFunction(pdf_path, model,LLM_prompt) # print('done , will start dataframe',jsons,result) # if not result: # df = pd.DataFrame([{ # "text": None, # "page": None, # "suggested_level": None, # "confidence": None, # "body": None, # "System Message": "No headers were identified by the LLM." # }]) # else: # df = pd.DataFrame(result) # subject_body_map = {} # # Safely navigate the nested structure: [ [ [ {dict}, {dict} ] ] ] # for pdf_level in jsons: # if not isinstance(pdf_level, list): # continue # for section_level in pdf_level: # # If the LLM returns a list of dictionaries here # if isinstance(section_level, list): # for obj in section_level: # if isinstance(obj, dict): # subject = obj.get("Subject") # body = obj.get("BodyText", []) # if subject: # # Ensure body is a list before joining # body_str = " ".join(body) if isinstance(body, list) else str(body) # subject_body_map[subject.strip()] = body_str # # If the LLM returns a single dictionary here # elif isinstance(section_level, dict): # subject = section_level.get("Subject") # body = section_level.get("BodyText", []) # if subject: # body_str = " ".join(body) if isinstance(body, list) else str(body) # subject_body_map[subject.strip()] = body_str # # Map the extracted body text to the "text" column in your main DataFrame # if "text" in df.columns: # df["body"] = df["text"].map(lambda x: subject_body_map.get(str(x).strip()) if x else None) # else: # df["body"] = None # # Save to Excel # output_path = os.path.abspath("header_analysis_output.xlsx") # df.to_excel(output_path, index=False, engine="openpyxl") # print("--- Processed DataFrame ---") # print(df) # return output_path # except Exception as e: # print(f"ERROR - Critical error in processing: {e}") # # Re-raise or handle as needed # return None def identify_headers_and_save_excel(pdf_path, model,LLM_prompt): try: jsons, result = testFunction(pdf_path, model,LLM_prompt) if not result: df = pd.DataFrame([{ "text": None, "page": None, "suggested_level": None, "confidence": None, "body": None, "System Message": "No headers were identified by the LLM." }]) else: print('here') df = pd.DataFrame(result) # Convert JSON string to list if needed if isinstance(jsons, str): jsons = json.loads(jsons) subject_body_map = {} # ✅ jsons is a flat list of dicts for obj in jsons: if not isinstance(obj, dict): continue subject = obj.get("Subject") body = obj.get("BodyText", []) if subject: subject_body_map[subject.strip()] = " ".join(body) # ✅ Map body to dataframe df["body"] = df["text"].map(subject_body_map).fillna("") # ✅ Save once at end output_path = os.path.abspath("header_analysis_output.xlsx") df.to_excel(output_path, index=False, engine="openpyxl") print("--- Processed DataFrame ---") print(df) return output_path except Exception as e: logger.error(f"Critical error in processing: {str(e)}") return None # Improved launch with debug mode enabled iface = gr.Interface( fn=identify_headers_and_save_excel, inputs=[ gr.Textbox(label="PDF URL"), gr.Textbox(label="Model Type"), # Default example gr.Textbox(label="LLM Prompt") ], outputs=gr.File(label="Download Excel Results"), title="PDF Header Extractor" ) # Launch with debug=True to see errors in the console iface.launch(debug=True)