Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os | |
| import json | |
| import requests | |
| from io import BytesIO | |
| from datetime import datetime | |
| from difflib import SequenceMatcher | |
| import pandas as pd | |
| from io import BytesIO | |
| import fitz # PyMuPDF | |
| from collections import defaultdict, Counter | |
| from urllib.parse import urlparse, unquote | |
| import os | |
| from io import BytesIO | |
| import re | |
| import requests | |
| import pandas as pd | |
| import fitz # PyMuPDF | |
| import re | |
| import urllib.parse | |
| import difflib | |
| import copy | |
| # import tsadropboxretrieval | |
| import urllib.parse | |
| import logging | |
| # Set up logging to see everything | |
| logging.basicConfig( | |
| level=logging.DEBUG, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler(), # Print to console | |
| logging.FileHandler('debug.log', mode='w') # Save to file | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| top_margin = 70 | |
| bottom_margin = 85 | |
| def getLocation_of_header(doc, headerText, expected_page=None): | |
| locations = [] | |
| # pages = ( | |
| # [(expected_page, doc.load_page(expected_page))] | |
| # if expected_page is not None | |
| # else enumerate(doc) | |
| # ) | |
| expectedpageNorm=expected_page | |
| page=doc[expectedpageNorm] | |
| # for page_number, page in pages: | |
| page_height = page.rect.height | |
| rects = page.search_for(headerText) | |
| for r in rects: | |
| y = r.y0 | |
| # Skip headers in top or bottom margin | |
| if y <= top_margin: | |
| continue | |
| if y >= page_height - bottom_margin: | |
| continue | |
| locations.append({ | |
| "headerText":headerText, | |
| "page": expectedpageNorm, | |
| "x": r.x0, | |
| "y": y | |
| }) | |
| return locations | |
| def filter_headers_outside_toc(headers, toc_pages): | |
| toc_pages_set = set(toc_pages) | |
| filtered = [] | |
| for h in headers: | |
| page = h[2] | |
| y = h[3] | |
| # Skip invalid / fallback headers | |
| if page is None or y is None: | |
| continue | |
| # Skip headers inside TOC pages | |
| if page in toc_pages_set: | |
| continue | |
| filtered.append(h) | |
| return filtered | |
| def headers_with_location(doc, llm_headers): | |
| """ | |
| Converts LLM headers into: | |
| [text, font_size, page, y, suggested_level, confidence] | |
| Always include all headers, even if location not found. | |
| """ | |
| headersJson = [] | |
| for h in llm_headers: | |
| text = h["text"] | |
| llm_page = h["page"] | |
| # Attempt to locate the header on the page | |
| locations = getLocation_of_header(doc, text,llm_page) | |
| if locations: | |
| for loc in locations: | |
| page = doc.load_page(loc["page"]) | |
| fontsize = None | |
| for block in page.get_text("dict")["blocks"]: | |
| if block.get("type") != 0: | |
| continue | |
| for line in block.get("lines", []): | |
| line_text = "".join(span["text"] for span in line["spans"]).strip() | |
| if normalize(line_text) == normalize(text): | |
| fontsize = line["spans"][0]["size"] | |
| break | |
| if fontsize: | |
| break | |
| entry = [ | |
| text, | |
| fontsize, | |
| loc["page"], | |
| loc["y"], | |
| h["suggested_level"], | |
| ] | |
| if entry not in headersJson: | |
| headersJson.append(entry) | |
| return headersJson | |
| def build_hierarchy_from_llm(headers): | |
| nodes = [] | |
| # ------------------------- | |
| # 1. Build nodes safely | |
| # ------------------------- | |
| for h in headers: | |
| # print("headerrrrrrrrrrrrrrr", h) | |
| if len(h) < 5: | |
| continue | |
| text, size, page, y, level = h | |
| if level is None: | |
| continue | |
| try: | |
| level = int(level) | |
| except Exception: | |
| continue | |
| node = { | |
| "text": text, | |
| "page": page if page is not None else -1, | |
| "y": y if y is not None else -1, | |
| "size": size, | |
| "bold": False, | |
| "color": None, | |
| "font": None, | |
| "children": [], | |
| "is_numbered": is_numbered(text), | |
| "original_size": size, | |
| "norm_text": normalize(text), | |
| "level": level, | |
| } | |
| nodes.append(node) | |
| if not nodes: | |
| return [] | |
| # ------------------------- | |
| # 2. Sort top-to-bottom | |
| # ------------------------- | |
| nodes.sort(key=lambda x: (x["page"], x["y"])) | |
| # ------------------------- | |
| # 3. NORMALIZE LEVELS | |
| # (smallest level → 0) | |
| # ------------------------- | |
| min_level = min(n["level"] for n in nodes) | |
| for n in nodes: | |
| n["level"] -= min_level | |
| # ------------------------- | |
| # 4. Build hierarchy | |
| # ------------------------- | |
| root = [] | |
| stack = [] | |
| added_level0 = set() | |
| for header in nodes: | |
| lvl = header["level"] | |
| if lvl < 0: | |
| continue | |
| # De-duplicate true top-level headers | |
| if lvl == 0: | |
| key = (header["norm_text"], header["page"]) | |
| if key in added_level0: | |
| continue | |
| added_level0.add(key) | |
| while stack and stack[-1]["level"] >= lvl: | |
| stack.pop() | |
| parent = stack[-1] if stack else None | |
| if parent: | |
| header["path"] = parent["path"] + [header["norm_text"]] | |
| parent["children"].append(header) | |
| else: | |
| header["path"] = [header["norm_text"]] | |
| root.append(header) | |
| stack.append(header) | |
| # ------------------------- | |
| # 5. Enforce nesting sanity | |
| # ------------------------- | |
| def enforce_nesting(node_list, parent_level=-1): | |
| for node in node_list: | |
| if node["level"] <= parent_level: | |
| node["level"] = parent_level + 1 | |
| enforce_nesting(node["children"], node["level"]) | |
| enforce_nesting(root) | |
| # ------------------------- | |
| # 6. OPTIONAL cleanup | |
| # (only if real level-0s exist) | |
| # ------------------------- | |
| if any(h["level"] == 0 for h in root): | |
| root = [ | |
| h for h in root | |
| if not (h["level"] == 0 and not h["children"]) | |
| ] | |
| # ------------------------- | |
| # 7. Final pass | |
| # ------------------------- | |
| header_tree = enforce_level_hierarchy(root) | |
| return header_tree | |
| def get_regular_font_size_and_color(doc): | |
| font_sizes = [] | |
| colors = [] | |
| fonts = [] | |
| # Loop through all pages | |
| for page_num in range(len(doc)): | |
| page = doc.load_page(page_num) | |
| for span in page.get_text("dict")["blocks"]: | |
| if "lines" in span: | |
| for line in span["lines"]: | |
| for span in line["spans"]: | |
| font_sizes.append(span['size']) | |
| colors.append(span['color']) | |
| fonts.append(span['font']) | |
| # Get the most common font size, color, and font | |
| most_common_font_size = Counter(font_sizes).most_common(1)[0][0] if font_sizes else None | |
| most_common_color = Counter(colors).most_common(1)[0][0] if colors else None | |
| most_common_font = Counter(fonts).most_common(1)[0][0] if fonts else None | |
| return most_common_font_size, most_common_color, most_common_font | |
| def normalize_text(text): | |
| if text is None: | |
| return "" | |
| return re.sub(r'\s+', ' ', text.strip().lower()) | |
| def get_spaced_text_from_spans(spans): | |
| return normalize_text(" ".join(span["text"].strip() for span in spans)) | |
| def is_numbered(text): | |
| return bool(re.match(r'^\d', text.strip())) | |
| def is_similar(a, b, threshold=0.85): | |
| return difflib.SequenceMatcher(None, a, b).ratio() > threshold | |
| def normalize(text): | |
| text = text.lower() | |
| text = re.sub(r'\.{2,}', '', text) # remove long dots | |
| text = re.sub(r'\s+', ' ', text) # replace multiple spaces with one | |
| return text.strip() | |
| def clean_toc_entry(toc_text): | |
| """Remove page numbers and formatting from TOC entries""" | |
| # Remove everything after last sequence of dots/whitespace followed by digits | |
| return re.sub(r'[\.\s]+\d+.*$', '', toc_text).strip('. ') | |
| def enforce_level_hierarchy(headers): | |
| """ | |
| Ensure level 2 headers only exist under level 1 headers | |
| and clean up any orphaned headers | |
| """ | |
| def process_node_list(node_list, parent_level=-1): | |
| i = 0 | |
| while i < len(node_list): | |
| node = node_list[i] | |
| # Remove level 2 headers that don't have a level 1 parent | |
| if node['level'] == 2 and parent_level != 1: | |
| node_list.pop(i) | |
| continue | |
| # Recursively process children | |
| process_node_list(node['children'], node['level']) | |
| i += 1 | |
| process_node_list(headers) | |
| return headers | |
| def highlight_boxes(doc, highlights, stringtowrite, fixed_width=500): # Set your desired width here | |
| for page_num, bbox in highlights.items(): | |
| page = doc.load_page(page_num) | |
| page_width = page.rect.width | |
| # Get original rect for vertical coordinates | |
| orig_rect = fitz.Rect(bbox) | |
| rect_height = orig_rect.height | |
| if rect_height > 30: | |
| if orig_rect.width > 10: | |
| # Center horizontally using fixed width | |
| center_x = page_width / 2 | |
| new_x0 = center_x - fixed_width / 2 | |
| new_x1 = center_x + fixed_width / 2 | |
| new_rect = fitz.Rect(new_x0, orig_rect.y0, new_x1, orig_rect.y1) | |
| # Add highlight rectangle | |
| annot = page.add_rect_annot(new_rect) | |
| if stringtowrite.startswith('Not'): | |
| annot.set_colors(stroke=(0.5, 0.5, 0.5), fill=(0.5, 0.5, 0.5)) | |
| else: | |
| annot.set_colors(stroke=(1, 1, 0), fill=(1, 1, 0)) | |
| annot.set_opacity(0.3) | |
| annot.update() | |
| # Add right-aligned freetext annotation inside the fixed-width box | |
| text = '['+stringtowrite +']' | |
| annot1 = page.add_freetext_annot( | |
| new_rect, | |
| text, | |
| fontsize=15, | |
| fontname='helv', | |
| text_color=(1, 0, 0), | |
| rotate=page.rotation, | |
| align=2 # right alignment | |
| ) | |
| annot1.update() | |
| def get_leaf_headers_with_paths(listtoloop, path=None, output=None): | |
| if path is None: | |
| path = [] | |
| if output is None: | |
| output = [] | |
| for header in listtoloop: | |
| current_path = path + [header['text']] | |
| if not header['children']: | |
| if header['level'] != 0 and header['level'] != 1: | |
| output.append((header, current_path)) | |
| else: | |
| get_leaf_headers_with_paths(header['children'], current_path, output) | |
| return output | |
| # Add this helper function at the top of your code | |
| def words_match_ratio(text1, text2): | |
| words1 = set(text1.split()) | |
| words2 = set(text2.split()) | |
| if not words1 or not words2: | |
| return 0.0 | |
| common_words = words1 & words2 | |
| return len(common_words) / len(words1) | |
| def same_start_word(s1, s2): | |
| # Split both strings into words | |
| words1 = s1.strip().split() | |
| words2 = s2.strip().split() | |
| # Check if both have at least one word and compare the first ones | |
| if words1 and words2: | |
| return words1[0].lower() == words2[0].lower() | |
| return False | |
| def get_toc_page_numbers(doc, max_pages_to_check=15): | |
| toc_pages = [] | |
| logger.debug(f"Starting TOC detection, checking first {max_pages_to_check} pages") | |
| # 1. Existing Dot Pattern (looking for ".....") | |
| dot_pattern = re.compile(r"\.{2,}") | |
| # 2. NEW: Title Pattern (looking for specific headers) | |
| # ^ and $ ensure the line is JUST that word (ignoring "The contents of the bag...") | |
| # re.IGNORECASE makes it match "CONTENTS", "Contents", "Index", etc. | |
| title_pattern = re.compile(r"^\s*(table of contents|contents|index)\s*$", re.IGNORECASE) | |
| for page_num in range(min(len(doc), max_pages_to_check)): | |
| page = doc.load_page(page_num) | |
| blocks = page.get_text("dict")["blocks"] | |
| dot_line_count = 0 | |
| has_toc_title = False | |
| logger.debug(f"Checking page {page_num} for TOC") | |
| for block in blocks: | |
| for line in block.get("lines", []): | |
| # Extract text from spans (mimicking get_spaced_text_from_spans) | |
| line_text = " ".join([span["text"] for span in line["spans"]]).strip() | |
| # CHECK A: Does the line have dots? | |
| if dot_pattern.search(line_text): | |
| dot_line_count += 1 | |
| logger.debug(f" Found dot pattern on page {page_num}: '{line_text[:50]}...'") | |
| # CHECK B: Is this line a Title? | |
| # We check this early in the loop. If a page has a title "Contents", | |
| # we mark it immediately. | |
| if title_pattern.match(line_text): | |
| has_toc_title = True | |
| logger.debug(f" Found TOC title on page {page_num}: '{line_text}'") | |
| # CONDITION: | |
| # It is a TOC page if it has a Title OR if it has dot leaders. | |
| # We use 'dot_line_count >= 1' to be sensitive to single-item lists. | |
| if has_toc_title or dot_line_count >= 1: | |
| toc_pages.append(page_num) | |
| logger.info(f"Page {page_num} identified as TOC page") | |
| # RETURN: | |
| # If we found TOC pages (e.g., [2, 3]), we return [0, 1, 2, 3] | |
| # This covers the cover page, inside cover, and the TOC itself. | |
| if toc_pages: | |
| last_toc_page = toc_pages[0] | |
| result = list(range(0, last_toc_page + 1)) | |
| logger.info(f"TOC pages found: {result}") | |
| return result | |
| logger.info("No TOC pages found") | |
| return [] # Return empty list if nothing found | |
| def is_header(span, most_common_font_size, most_common_color, most_common_font,allheadersLLM): | |
| fontname = span.get("font", "").lower() | |
| # is_italic = "italic" in fontname or "oblique" in fontname | |
| isheader=False | |
| is_bold = "bold" in fontname or span.get("bold", False) | |
| if span['text'] in allheadersLLM: | |
| isheader=True | |
| return ( | |
| ( | |
| span["size"] > most_common_font_size or | |
| span["font"].lower() != most_common_font.lower() or | |
| (isheader and span["size"] > most_common_font_size ) | |
| ) | |
| ) | |
| def openPDF(pdf_path): | |
| logger.info(f"Opening PDF from URL: {pdf_path}") | |
| pdf_path = pdf_path.replace('dl=0', 'dl=1') | |
| response = requests.get(pdf_path) | |
| logger.debug(f"PDF download response status: {response.status_code}") | |
| pdf_content = BytesIO(response.content) | |
| if not pdf_content: | |
| logger.error("No valid PDF content found.") | |
| raise ValueError("No valid PDF content found.") | |
| doc = fitz.open(stream=pdf_content, filetype="pdf") | |
| logger.info(f"PDF opened successfully, {len(doc)} pages") | |
| return doc | |
| # def identify_headers_with_openrouter(pdf_path, model, LLM_prompt, pages_to_check=None, top_margin=0, bottom_margin=0): | |
| # """Ask an LLM (OpenRouter) to identify headers in the document. | |
| # Returns a list of dicts: {text, page, suggested_level, confidence}. | |
| # The function sends plain page-line strings to the LLM (including page numbers) | |
| # and asks for a JSON array containing only header lines with suggested levels. | |
| # """ | |
| # logger.info("=" * 80) | |
| # logger.info("STARTING IDENTIFY_HEADERS_WITH_OPENROUTER") | |
| # logger.info(f"PDF Path: {pdf_path}") | |
| # logger.info(f"Model: {model}") | |
| # logger.info(f"LLM Prompt: {LLM_prompt[:200]}..." if len(LLM_prompt) > 200 else f"LLM Prompt: {LLM_prompt}") | |
| # doc = openPDF(pdf_path) | |
| # api_key = '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' | |
| # if api_key is None: | |
| # api_key = os.getenv("OPENROUTER_API_KEY") or None | |
| # model = str(model) | |
| # # toc_pages = get_toc_page_numbers(doc) | |
| # lines_for_prompt = [] | |
| # pgestoRun=20 | |
| # # logger.info(f"TOC pages to skip: {toc_pages}") | |
| # logger.info(f"Total pages in document: {pgestoRun}") | |
| # # Collect text lines from pages (skip TOC pages) | |
| # total_lines = 0 | |
| # for pno in range(len(doc)): | |
| # # if pages_to_check and pno not in pages_to_check: | |
| # # continue | |
| # # if pno in toc_pages: | |
| # # logger.debug(f"Skipping TOC page {pno}") | |
| # # continue | |
| # page = doc.load_page(pno) | |
| # page_height = page.rect.height | |
| # text_dict = page.get_text("dict") | |
| # lines_for_prompt = [] | |
| # lines_on_page = 0 | |
| # for block in text_dict.get("blocks", []): | |
| # if block.get("type") != 0: # text blocks only | |
| # continue | |
| # for line in block.get("lines", []): | |
| # spans = line.get("spans", []) | |
| # if not spans: | |
| # continue | |
| # # Use first span to check vertical position | |
| # y0 = spans[0]["bbox"][1] | |
| # y1 = spans[0]['bbox'][3] | |
| # # if y0 < top_margin or y1 > (page_height - bottom_margin): | |
| # # continue | |
| # text = " ".join(s.get('text','') for s in spans).strip() | |
| # if text: | |
| # # prefix with page for easier mapping back | |
| # lines_for_prompt.append(f"PAGE {pno+1}: {text}") | |
| # lines_on_page += 1 | |
| # # if lines_on_page > 0: | |
| # # page = doc.load_page(pno) | |
| # # page_height = page.rect.height | |
| # # lines_on_page = 0 | |
| # # text_dict = page.get_text("dict") | |
| # # lines = [] | |
| # # y_tolerance = 0.2 # tweak if needed (1–3 usually works) | |
| # # for block in page.get_text("dict").get('blocks', []): | |
| # # if block.get('type') != 0: | |
| # # continue | |
| # # for line in block.get('lines', []): | |
| # # spans = line.get('spans', []) | |
| # # if not spans: | |
| # # continue | |
| # # y0 = spans[0]['bbox'][1] | |
| # # y1 = spans[0]['bbox'][3] | |
| # # if y0 < top_margin or y1 > (page_height - bottom_margin): | |
| # # continue | |
| # # for s in spans: | |
| # # # text,font,size,flags,color | |
| # # # ArrayofTextWithFormat={'Font':s.get('font')},{'Size':s.get('size')},{'Flags':s.get('flags')},{'Color':s.get('color')},{'Text':s.get('text')} | |
| # # # prefix with page for easier mapping back | |
| # # text = s["text"].strip() | |
| # # lines_for_prompt.append(f"PAGE {pno+1}: {text}") | |
| # # # if not lines_for_prompt: | |
| # # # return [] | |
| # # if text: | |
| # # # prefix with page for easier mapping back | |
| # # # lines_for_prompt.append(f"PAGE {pno+1}: {line}") | |
| # # lines_on_page += 1 | |
| # if lines_on_page > 0: | |
| # logger.debug(f"Page {pno}: collected {lines_on_page} lines") | |
| # total_lines += lines_on_page | |
| # logger.info(f"Total lines collected for LLM: {total_lines}") | |
| # if not lines_for_prompt: | |
| # logger.warning("No lines collected for prompt") | |
| # return [] | |
| # # Log sample of lines | |
| # logger.info("Sample lines (first 10):") | |
| # for i, line in enumerate(lines_for_prompt[:10]): | |
| # logger.info(f" {i}: {line}") | |
| # prompt = LLM_prompt+"\n\nLines:\n" + "\n".join(lines_for_prompt) | |
| # logger.debug(f"Full prompt length: {len(prompt)} characters") | |
| # # Changed: Print entire prompt, not truncated | |
| # print("=" * 80) | |
| # print("FULL LLM PROMPT:") | |
| # print(prompt) | |
| # print("=" * 80) | |
| # # Also log to file | |
| # # try: | |
| # # with open("full_prompt.txt", "w", encoding="utf-8") as f: | |
| # # f.write(prompt) | |
| # # logger.info("Full prompt saved to full_prompt.txt") | |
| # # except Exception as e: | |
| # # logger.error(f"Could not save prompt to file: {e}") | |
| # if not api_key: | |
| # # No API key: return empty so caller can fallback to heuristics | |
| # logger.error("No API key provided") | |
| # return [] | |
| # url = "https://openrouter.ai/api/v1/chat/completions" | |
| # # Build headers following the OpenRouter example | |
| # headers = { | |
| # "Authorization": f"Bearer {api_key}", | |
| # "Content-Type": "application/json", | |
| # "HTTP-Referer": os.getenv("OPENROUTER_REFERER", ""), | |
| # "X-Title": os.getenv("OPENROUTER_X_TITLE", "") | |
| # } | |
| # # Log request details (without exposing full API key) | |
| # logger.info(f"Making request to OpenRouter with model: {model}") | |
| # logger.debug(f"Headers (API key masked): { {k: '***' if k == 'Authorization' else v for k, v in headers.items()} }") | |
| # # Wrap the prompt as the example 'content' array expected by OpenRouter | |
| # body = { | |
| # "model": model, | |
| # "messages": [ | |
| # { | |
| # "role": "user", | |
| # "content": [ | |
| # {"type": "text", "text": prompt} | |
| # ] | |
| # } | |
| # ] | |
| # } | |
| # # Debug: log request body (truncated) and write raw response for inspection | |
| # try: | |
| # # Changed: Log full body (excluding prompt text which is already logged) | |
| # logger.debug(f"Request body (without prompt text): { {k: v if k != 'messages' else '[...prompt...]' for k, v in body.items()} }") | |
| # # Removed timeout parameter | |
| # resp = requests.post( | |
| # url=url, | |
| # headers=headers, | |
| # data=json.dumps(body) | |
| # ) | |
| # logger.info(f"HTTP Response Status: {resp.status_code}") | |
| # resp.raise_for_status() | |
| # resp_text = resp.text | |
| # # Changed: Print entire response | |
| # print("=" * 80) | |
| # print("FULL LLM RESPONSE:") | |
| # print(resp_text) | |
| # print("=" * 80) | |
| # logger.info(f"LLM raw response length: {len(resp_text)}") | |
| # # Save raw response for offline inspection | |
| # try: | |
| # with open("llm_debug.json", "w", encoding="utf-8") as fh: | |
| # fh.write(resp_text) | |
| # logger.info("Raw response saved to llm_debug.json") | |
| # except Exception as e: | |
| # logger.error(f"Warning: could not write llm_debug.json: {e}") | |
| # rj = resp.json() | |
| # logger.info(f"LLM parsed response type: {type(rj)}") | |
| # if isinstance(rj, dict): | |
| # logger.debug(f"Response keys: {list(rj.keys())}") | |
| # except requests.exceptions.RequestException as e: | |
| # logger.error(f"HTTP request failed: {repr(e)}") | |
| # return [] | |
| # except Exception as e: | |
| # logger.error(f"LLM call failed: {repr(e)}") | |
| # return [] | |
| # # Extract textual reply robustly | |
| # text_reply = None | |
| # if isinstance(rj, dict): | |
| # choices = rj.get('choices') or [] | |
| # logger.debug(f"Number of choices in response: {len(choices)}") | |
| # if choices: | |
| # for i, c in enumerate(choices): | |
| # logger.debug(f"Choice {i}: {c}") | |
| # c0 = choices[0] | |
| # msg = c0.get('message') or c0.get('delta') or {} | |
| # content = msg.get('content') | |
| # if isinstance(content, list): | |
| # logger.debug(f"Content is a list with {len(content)} items") | |
| # for idx, c in enumerate(content): | |
| # if c.get('type') == 'text' and c.get('text'): | |
| # text_reply = c.get('text') | |
| # logger.debug(f"Found text reply in content[{idx}], length: {len(text_reply)}") | |
| # break | |
| # elif isinstance(content, str): | |
| # text_reply = content | |
| # logger.debug(f"Content is string, length: {len(text_reply)}") | |
| # elif isinstance(msg, dict) and msg.get('content') and isinstance(msg.get('content'), dict): | |
| # text_reply = msg.get('content').get('text') | |
| # logger.debug(f"Found text in nested content dict") | |
| # # Fallback extraction | |
| # if not text_reply: | |
| # logger.debug("Trying fallback extraction from choices") | |
| # for c in rj.get('choices', []): | |
| # if isinstance(c.get('text'), str): | |
| # text_reply = c.get('text') | |
| # logger.debug(f"Found text reply in choice.text, length: {len(text_reply)}") | |
| # break | |
| # if not text_reply: | |
| # logger.error("Could not extract text reply from response") | |
| # # Changed: Print the entire response structure for debugging | |
| # print("=" * 80) | |
| # print("FAILED TO EXTRACT TEXT REPLY. FULL RESPONSE STRUCTURE:") | |
| # print(json.dumps(rj, indent=2)) | |
| # print("=" * 80) | |
| # return [] | |
| # # Changed: Print the extracted text reply | |
| # print("=" * 80) | |
| # print("EXTRACTED TEXT REPLY:") | |
| # print(text_reply) | |
| # print("=" * 80) | |
| # logger.info(f"Extracted text reply length: {len(text_reply)}") | |
| # logger.debug(f"First 500 chars of reply: {text_reply[:500]}...") | |
| # s = text_reply.strip() | |
| # start = s.find('[') | |
| # end = s.rfind(']') | |
| # js = s[start:end+1] if start != -1 and end != -1 else s | |
| # logger.debug(f"Looking for JSON array: start={start}, end={end}") | |
| # logger.debug(f"Extracted JSON string (first 500 chars): {js[:500]}...") | |
| # try: | |
| # parsed = json.loads(js) | |
| # logger.info(f"Successfully parsed JSON, got {len(parsed)} items") | |
| # except json.JSONDecodeError as e: | |
| # logger.error(f"Failed to parse JSON: {e}") | |
| # logger.error(f"JSON string that failed to parse: {js[:1000]}") | |
| # # Try to find any JSON-like structure | |
| # try: | |
| # # Try to extract any JSON array | |
| # import re | |
| # json_pattern = r'\[\s*\{.*?\}\s*\]' | |
| # matches = re.findall(json_pattern, text_reply, re.DOTALL) | |
| # if matches: | |
| # logger.info(f"Found {len(matches)} potential JSON arrays via regex") | |
| # for i, match in enumerate(matches): | |
| # try: | |
| # parsed = json.loads(match) | |
| # logger.info(f"Successfully parsed regex match {i} with {len(parsed)} items") | |
| # break | |
| # except json.JSONDecodeError as e2: | |
| # logger.debug(f"Regex match {i} also failed: {e2}") | |
| # continue | |
| # else: | |
| # logger.error("All regex matches failed to parse") | |
| # return [] | |
| # else: | |
| # logger.error("No JSON-like pattern found via regex") | |
| # return [] | |
| # except Exception as e2: | |
| # logger.error(f"Regex extraction also failed: {e2}") | |
| # return [] | |
| # # Log parsed results | |
| # logger.info(f"Parsed {len(parsed)} header items:") | |
| # for i, obj in enumerate(parsed[:10]): # Log first 10 items | |
| # logger.info(f" Item {i}: {obj}") | |
| # # Normalize parsed entries and return | |
| # out = [] | |
| # for obj in parsed: | |
| # t = obj.get('text') | |
| # page = int(obj.get('page')) if obj.get('page') else None | |
| # level = obj.get('suggested_level') | |
| # conf = float(obj.get('confidence') or 0) | |
| # if t and page is not None: | |
| # out.append({'text': t, 'page': page-1, 'suggested_level': level, 'confidence': conf}) | |
| # logger.info(f"Returning {len(out)} valid header entries") | |
| # return out | |
| def process_document_in_chunks( | |
| lengthofDoc, | |
| pdf_path, | |
| LLM_prompt, | |
| model, | |
| chunk_size=15, | |
| ): | |
| total_pages = lengthofDoc | |
| all_results = [] | |
| print(f"DEBUG: process_document_in_chunks - Total pages: {total_pages}") | |
| for start in range(0, total_pages, chunk_size): | |
| end = start + chunk_size | |
| print(f"DEBUG: Processing pages {start + 1} → {min(end, total_pages)}") | |
| result = identify_headers_with_openrouterNEWW( | |
| pdf_path=pdf_path, | |
| model=model, | |
| LLM_prompt=LLM_prompt, | |
| pages_to_check=(start, end) | |
| ) | |
| print(f"DEBUG: Chunk returned {len(result) if result else 0} headers") | |
| if result: | |
| print(f"DEBUG: Sample header from chunk: {result[0]}") | |
| all_results.extend(result) | |
| print(f"DEBUG: Total headers collected: {len(all_results)}") | |
| return all_results | |
| def identify_headers_with_openrouterNEWW(pdf_path, model,LLM_prompt, pages_to_check=None, top_margin=0, bottom_margin=0): | |
| """Ask an LLM (OpenRouter) to identify headers in the document. | |
| Returns a list of dicts: {text, page, suggested_level, confidence}. | |
| The function sends plain page-line strings to the LLM (including page numbers) | |
| and asks for a JSON array containing only header lines with suggested levels. | |
| """ | |
| logger.info("=" * 80) | |
| logger.info("STARTING IDENTIFY_HEADERS_WITH_OPENROUTER") | |
| # logger.info(f"PDF Path: {pdf_path}") | |
| logger.info(f"Model: {model}") | |
| # logger.info(f"LLM Prompt: {LLM_prompt[:200]}..." if len(LLM_prompt) > 200 else f"LLM Prompt: {LLM_prompt}") | |
| doc = openPDF(pdf_path) | |
| api_key = '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' | |
| if api_key is None: | |
| api_key = os.getenv("OPENROUTER_API_KEY") or None | |
| model = str(model) | |
| # toc_pages = get_toc_page_numbers(doc) | |
| lines_for_prompt = [] | |
| # pgestoRun=20 | |
| # logger.info(f"TOC pages to skip: {toc_pages}") | |
| # logger.info(f"Total pages in document: {len(doc)}") | |
| logger.info(f"Total pages in document: {len(doc)}") | |
| # Collect text lines from pages (skip TOC pages) | |
| total_lines = 0 | |
| ArrayofTextWithFormat = [] | |
| total_pages = len(doc) | |
| if pages_to_check is None: | |
| start_page = 0 | |
| end_page = min(15, total_pages) | |
| else: | |
| start_page, end_page = pages_to_check | |
| end_page = min(end_page, total_pages) # 🔑 CRITICAL LINE | |
| for pno in range(start_page, end_page): | |
| page = doc.load_page(pno) | |
| # # Collect text lines from pages (skip TOC pages) | |
| # total_lines = 0 | |
| # for pno in range(len(doc)): | |
| # if pages_to_check and pno not in pages_to_check: | |
| # continue | |
| # if pno in toc_pages: | |
| # logger.debug(f"Skipping TOC page {pno}") | |
| # continue | |
| # page = doc.load_page(pno) | |
| # page_height = page.rect.height | |
| # lines_on_page = 0 | |
| # text_dict = page.get_text("dict") | |
| # lines = [] | |
| # # y_tolerance = 0.2 # tweak if needed (1–3 usually works) | |
| # for block in text_dict["blocks"]: | |
| # if block["type"] != 0: | |
| # continue | |
| # for line in block["lines"]: | |
| # for span in line["spans"]: | |
| # text = span["text"].strip() | |
| # if not text: | |
| # continue | |
| # if text: | |
| # # prefix with page for easier mapping back | |
| # lines_for_prompt.append(f"PAGE {pno+1}: {text}") | |
| # lines_on_page += 1 | |
| # if lines_on_page > 0: | |
| # logger.debug(f"Page {pno}: collected {lines_on_page} lines") | |
| # total_lines += lines_on_page | |
| # logger.info(f"Total lines collected for LLM: {total_lines}") | |
| page_height = page.rect.height | |
| lines_on_page = 0 | |
| text_dict = page.get_text("dict") | |
| lines = [] | |
| y_tolerance = 0.5 # tweak if needed (1–3 usually works) | |
| for block in text_dict["blocks"]: | |
| if block["type"] != 0: | |
| continue | |
| for line in block["lines"]: | |
| for span in line["spans"]: | |
| text = span["text"].strip() | |
| if not text: # Skip empty text | |
| continue | |
| # Extract all formatting attributes | |
| font = span.get('font') | |
| size = span.get('size') | |
| color = span.get('color') | |
| flags = span.get('flags', 0) | |
| bbox = span.get("bbox", (0, 0, 0, 0)) | |
| x0, y0, x1, y1 = bbox | |
| # Create text format dictionary | |
| text_format = { | |
| 'Font': font, | |
| 'Size': size, | |
| 'Flags': flags, | |
| 'Color': color, | |
| 'Text': text, | |
| 'BBox': bbox, | |
| 'Page': pno + 1 | |
| } | |
| # Add to ArrayofTextWithFormat | |
| ArrayofTextWithFormat.append(text_format) | |
| # For line grouping (keeping your existing logic) | |
| matched = False | |
| for l in lines: | |
| if abs(l["y"] - y0) <= y_tolerance: | |
| l["spans"].append((x0, text, font, size, color, flags)) | |
| matched = True | |
| break | |
| if not matched: | |
| lines.append({ | |
| "y": y0, | |
| "spans": [(x0, text, font, size, color, flags)] | |
| }) | |
| lines.sort(key=lambda l: l["y"]) | |
| # Join text inside each line with formatting info | |
| final_lines = [] | |
| for l in lines: | |
| l["spans"].sort(key=lambda s: s[0]) # left → right | |
| # Collect all text and formatting for this line | |
| line_text = " ".join(text for _, text, _, _, _, _ in l["spans"]) | |
| # Get dominant formatting for the line (based on first span) | |
| if l["spans"]: | |
| _, _, font, size, color, flags = l["spans"][0] | |
| # Store line with its formatting | |
| line_with_format = { | |
| 'text': line_text, | |
| 'font': font, | |
| 'size': size, | |
| 'color': color, | |
| 'flags': flags, | |
| 'page': pno + 1, | |
| 'y_position': l["y"] | |
| } | |
| final_lines.append(line_with_format) | |
| # Result | |
| for line_data in final_lines: | |
| line_text = line_data['text'] | |
| print(line_text) | |
| if line_text: | |
| # Create a formatted string with text properties | |
| format_info = f"Font: {line_data['font']}, Size: {line_data['size']}, Color: {line_data['color']}" | |
| lines_for_prompt.append(f"PAGE {pno+1}: {line_text} [{format_info}]") | |
| lines_on_page += 1 | |
| if lines_on_page > 0: | |
| logger.debug(f"Page {pno}: collected {lines_on_page} lines") | |
| total_lines += lines_on_page | |
| logger.info(f"Total lines collected for LLM: {total_lines}") | |
| if not lines_for_prompt: | |
| logger.warning("No lines collected for prompt") | |
| return [] | |
| # Log sample of lines | |
| logger.info("Sample lines (first 10):") | |
| for i, line in enumerate(lines_for_prompt[:10]): | |
| logger.info(f" {i}: {line}") | |
| prompt =LLM_prompt + "\n\nLines:\n" + "\n".join(lines_for_prompt) | |
| logger.debug(f"Full prompt length: {len(prompt)} characters") | |
| # Changed: Print entire prompt, not truncated | |
| print("=" * 80) | |
| print("FULL LLM PROMPT:") | |
| print(prompt) | |
| print("=" * 80) | |
| # Also log to file | |
| try: | |
| with open("full_prompt.txt", "w", encoding="utf-8") as f: | |
| f.write(prompt) | |
| logger.info("Full prompt saved to full_prompt.txt") | |
| except Exception as e: | |
| logger.error(f"Could not save prompt to file: {e}") | |
| if not api_key: | |
| # No API key: return empty so caller can fallback to heuristics | |
| logger.error("No API key provided") | |
| return [] | |
| url = "https://openrouter.ai/api/v1/chat/completions" | |
| # Build headers following the OpenRouter example | |
| headers = { | |
| "Authorization": f"Bearer {api_key}", | |
| "Content-Type": "application/json", | |
| "HTTP-Referer": os.getenv("OPENROUTER_REFERER", ""), | |
| "X-Title": os.getenv("OPENROUTER_X_TITLE", ""), | |
| # "X-Request-Timestamp": str(unix_timestamp), | |
| # "X-Request-Datetime": current_time, | |
| } | |
| # Log request details (without exposing full API key) | |
| logger.info(f"Making request to OpenRouter with model: {model}") | |
| logger.debug(f"Headers (API key masked): { {k: '***' if k == 'Authorization' else v for k, v in headers.items()} }") | |
| # Wrap the prompt as the example 'content' array expected by OpenRouter | |
| body = { | |
| "model": model, | |
| "messages": [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": prompt} | |
| ] | |
| } | |
| ] | |
| } | |
| # print(f"Request sent at: {current_time}") | |
| # print(f"Unix timestamp: {unix_timestamp}") | |
| # Debug: log request body (truncated) and write raw response for inspection | |
| try: | |
| # Changed: Log full body (excluding prompt text which is already logged) | |
| logger.debug(f"Request body (without prompt text): { {k: v if k != 'messages' else '[...prompt...]' for k, v in body.items()} }") | |
| # Removed timeout parameter | |
| resp = requests.post( | |
| url=url, | |
| headers=headers, | |
| data=json.dumps(body) | |
| ) | |
| logger.info(f"HTTP Response Status: {resp.status_code}") | |
| resp.raise_for_status() | |
| resp_text = resp.text | |
| # Changed: Print entire response | |
| print("=" * 80) | |
| print("FULL LLM RESPONSE:") | |
| print(resp_text) | |
| print("=" * 80) | |
| logger.info(f"LLM raw response length: {len(resp_text)}") | |
| # Save raw response for offline inspection | |
| try: | |
| with open("llm_debug.json", "w", encoding="utf-8") as fh: | |
| fh.write(resp_text) | |
| logger.info("Raw response saved to llm_debug.json") | |
| except Exception as e: | |
| logger.error(f"Warning: could not write llm_debug.json: {e}") | |
| rj = resp.json() | |
| logger.info(f"LLM parsed response type: {type(rj)}") | |
| if isinstance(rj, dict): | |
| logger.debug(f"Response keys: {list(rj.keys())}") | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"HTTP request failed: {repr(e)}") | |
| return [] | |
| except Exception as e: | |
| logger.error(f"LLM call failed: {repr(e)}") | |
| return [] | |
| # Extract textual reply robustly | |
| text_reply = None | |
| if isinstance(rj, dict): | |
| choices = rj.get('choices') or [] | |
| logger.debug(f"Number of choices in response: {len(choices)}") | |
| if choices: | |
| for i, c in enumerate(choices): | |
| logger.debug(f"Choice {i}: {c}") | |
| c0 = choices[0] | |
| msg = c0.get('message') or c0.get('delta') or {} | |
| content = msg.get('content') | |
| if isinstance(content, list): | |
| logger.debug(f"Content is a list with {len(content)} items") | |
| for idx, c in enumerate(content): | |
| if c.get('type') == 'text' and c.get('text'): | |
| text_reply = c.get('text') | |
| logger.debug(f"Found text reply in content[{idx}], length: {len(text_reply)}") | |
| break | |
| elif isinstance(content, str): | |
| text_reply = content | |
| logger.debug(f"Content is string, length: {len(text_reply)}") | |
| elif isinstance(msg, dict) and msg.get('content') and isinstance(msg.get('content'), dict): | |
| text_reply = msg.get('content').get('text') | |
| logger.debug(f"Found text in nested content dict") | |
| # Fallback extraction | |
| if not text_reply: | |
| logger.debug("Trying fallback extraction from choices") | |
| for c in rj.get('choices', []): | |
| if isinstance(c.get('text'), str): | |
| text_reply = c.get('text') | |
| logger.debug(f"Found text reply in choice.text, length: {len(text_reply)}") | |
| break | |
| if not text_reply: | |
| logger.error("Could not extract text reply from response") | |
| # Changed: Print the entire response structure for debugging | |
| print("=" * 80) | |
| print("FAILED TO EXTRACT TEXT REPLY. FULL RESPONSE STRUCTURE:") | |
| print(json.dumps(rj, indent=2)) | |
| print("=" * 80) | |
| return [] | |
| # Changed: Print the extracted text reply | |
| print("=" * 80) | |
| print("EXTRACTED TEXT REPLY:") | |
| print(text_reply) | |
| print("=" * 80) | |
| logger.info(f"Extracted text reply length: {len(text_reply)}") | |
| logger.debug(f"First 500 chars of reply: {text_reply[:500]}...") | |
| s = text_reply.strip() | |
| start = s.find('[') | |
| end = s.rfind(']') | |
| js = s[start:end+1] if start != -1 and end != -1 else s | |
| logger.debug(f"Looking for JSON array: start={start}, end={end}") | |
| logger.debug(f"Extracted JSON string (first 500 chars): {js[:500]}...") | |
| try: | |
| parsed = json.loads(js) | |
| logger.info(f"Successfully parsed JSON, got {len(parsed)} items") | |
| except json.JSONDecodeError as e: | |
| logger.error(f"Failed to parse JSON: {e}") | |
| logger.error(f"JSON string that failed to parse: {js[:1000]}") | |
| # Try to find any JSON-like structure | |
| try: | |
| # Try to extract any JSON array | |
| import re | |
| json_pattern = r'\[\s*\{.*?\}\s*\]' | |
| matches = re.findall(json_pattern, text_reply, re.DOTALL) | |
| if matches: | |
| logger.info(f"Found {len(matches)} potential JSON arrays via regex") | |
| for i, match in enumerate(matches): | |
| try: | |
| parsed = json.loads(match) | |
| logger.info(f"Successfully parsed regex match {i} with {len(parsed)} items") | |
| break | |
| except json.JSONDecodeError as e2: | |
| logger.debug(f"Regex match {i} also failed: {e2}") | |
| continue | |
| else: | |
| logger.error("All regex matches failed to parse") | |
| return [] | |
| else: | |
| logger.error("No JSON-like pattern found via regex") | |
| return [] | |
| except Exception as e2: | |
| logger.error(f"Regex extraction also failed: {e2}") | |
| return [] | |
| # Log parsed results | |
| logger.info(f"Parsed {len(parsed)} header items:") | |
| for i, obj in enumerate(parsed[:10]): # Log first 10 items | |
| logger.info(f" Item {i}: {obj}") | |
| # Normalize parsed entries and return | |
| out = [] | |
| for obj in parsed: | |
| t = obj.get('text') | |
| page = int(obj.get('page')) if obj.get('page') else None | |
| level = obj.get('suggested_level') | |
| conf = float(obj.get('confidence') or 0) | |
| if t and page is not None: | |
| out.append({'text': t, 'page': page-1, 'suggested_level': level, 'confidence': conf}) | |
| logger.info(f"Returning {len(out)} valid header entries") | |
| return out | |
| # def identify_headers_and_save_excel(pdf_path, model, llm_prompt): | |
| # try: | |
| # # 1. Get the result from your LLM function | |
| # result = identify_headers_with_openrouter(pdf_path, model, llm_prompt) | |
| # # 2. Safety Check: If LLM failed or returned nothing | |
| # if not result: | |
| # logger.warning("No headers found or LLM failed. Creating an empty report.") | |
| # df = pd.DataFrame([{"System Message": "No headers were identified by the LLM."}]) | |
| # else: | |
| # df = pd.DataFrame(result) | |
| # # 3. Use an Absolute Path for the output | |
| # # This ensures Gradio knows exactly where the file is | |
| # output_path = os.path.abspath("header_analysis_output.xlsx") | |
| # # 4. Save using the engine explicitly | |
| # df.to_excel(output_path, index=False, engine='openpyxl') | |
| # logger.info(f"File successfully saved to {output_path}") | |
| # return output_path | |
| # except Exception as e: | |
| # logger.error(f"Critical error in processing: {str(e)}") | |
| # # Return None or a custom error message to Gradio | |
| # return None | |
| def extract_section_under_header_tobebilledMultiplePDFS(multiplePDF_Paths,model,identified_headers): | |
| logger.debug(f"Starting function") | |
| # keywordstoSkip=["installation", "execution", "miscellaneous items", "workmanship", "testing", "labeling"] | |
| filenames=[] | |
| keywords = {'installation', 'execution', 'miscellaneous items', 'workmanship', 'testing', 'labeling'} | |
| arrayofPDFS=multiplePDF_Paths.split(',') | |
| print(multiplePDF_Paths) | |
| print(arrayofPDFS) | |
| docarray=[] | |
| jsons=[] | |
| df = pd.DataFrame(columns=["PDF Name","NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2","BodyText"]) | |
| for pdf_path in arrayofPDFS: | |
| headertoContinue1 = False | |
| headertoContinue2=False | |
| Alltexttobebilled='' | |
| parsed_url = urlparse(pdf_path) | |
| filename = os.path.basename(parsed_url.path) | |
| filename = unquote(filename) # decode URL-encoded characters | |
| filenames.append(filename) | |
| logger.debug(f"Starting with pdf: {filename}") | |
| # Optimized URL handling | |
| if pdf_path and ('http' in pdf_path or 'dropbox' in pdf_path): | |
| pdf_path = pdf_path.replace('dl=0', 'dl=1') | |
| # Cache frequently used values | |
| response = requests.get(pdf_path) | |
| pdf_content = BytesIO(response.content) | |
| if not pdf_content: | |
| raise ValueError("No valid PDF content found.") | |
| doc = fitz.open(stream=pdf_content, filetype="pdf") | |
| logger.info(f"Total pages in document: {len(doc)}") | |
| docHighlights = fitz.open(stream=pdf_content, filetype="pdf") | |
| most_common_font_size, most_common_color, most_common_font = get_regular_font_size_and_color(doc) | |
| # Precompute regex patterns | |
| dot_pattern = re.compile(r'\.{3,}') | |
| url_pattern = re.compile(r'https?://\S+|www\.\S+') | |
| toc_pages = get_toc_page_numbers(doc) | |
| logger.info(f"Skipping TOC pages: Range {toc_pages}") | |
| # headers, top_3_font_sizes, smallest_font_size, headersSpans = extract_headers( | |
| # doc, toc_pages, most_common_font_size, most_common_color, most_common_font, top_margin, bottom_margin | |
| # ) | |
| logger.info(f"Starting model run.") | |
| # identified_headers = identify_headers_with_openrouterNEWW(doc, model) | |
| allheaders_LLM=[] | |
| for h in identified_headers: | |
| if int(h["page"]) in toc_pages: | |
| continue | |
| if h['text']: | |
| allheaders_LLM.append(h['text']) | |
| logger.info(f"Done with model.") | |
| print('identified_headers',identified_headers) | |
| headers_json=headers_with_location(doc,identified_headers) | |
| headers=filter_headers_outside_toc(headers_json,toc_pages) | |
| hierarchy=build_hierarchy_from_llm(headers) | |
| listofHeaderstoMarkup = get_leaf_headers_with_paths(hierarchy) | |
| logger.info(f"Hierarchy built as {hierarchy}") | |
| # Precompute all children headers once | |
| allchildrenheaders = [normalize_text(item['text']) for item, p in listofHeaderstoMarkup] | |
| allchildrenheaders_set = set(allchildrenheaders) # For faster lookups | |
| # df = pd.DataFrame(columns=["NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2","BodyText"]) | |
| dictionaryNBS={} | |
| data_list_JSON = [] | |
| json_output=[] | |
| currentgroupname='' | |
| # if len(top_3_font_sizes)==3: | |
| # mainHeaderFontSize, subHeaderFontSize, subsubheaderFontSize = top_3_font_sizes | |
| # elif len(top_3_font_sizes)==2: | |
| # mainHeaderFontSize= top_3_font_sizes[0] | |
| # subHeaderFontSize= top_3_font_sizes[1] | |
| # subsubheaderFontSize= top_3_font_sizes[1] | |
| # Preload all pages to avoid repeated loading | |
| # pages = [doc.load_page(page_num) for page_num in range(len(doc)) if page_num not in toc_pages] | |
| for heading_to_searchDict,pathss in listofHeaderstoMarkup: | |
| heading_to_search = heading_to_searchDict['text'] | |
| heading_to_searchPageNum = heading_to_searchDict['page'] | |
| paths=heading_to_searchDict['path'] | |
| # Initialize variables | |
| headertoContinue1 = False | |
| headertoContinue2 = False | |
| matched_header_line = None | |
| done = False | |
| collecting = False | |
| collected_lines = [] | |
| page_highlights = {} | |
| current_bbox = {} | |
| last_y1s = {} | |
| mainHeader = '' | |
| subHeader = '' | |
| matched_header_line_norm = heading_to_search | |
| break_collecting = False | |
| heading_norm = normalize_text(heading_to_search) | |
| paths_norm = [normalize_text(p) for p in paths[0]] if paths and paths[0] else [] | |
| for page_num in range(heading_to_searchPageNum,len(doc)): | |
| # print(heading_to_search) | |
| if paths[0].strip().lower() != currentgroupname.strip().lower(): | |
| Alltexttobebilled+= paths[0] +'\n' | |
| currentgroupname=paths[0] | |
| # print(paths[0]) | |
| if page_num in toc_pages: | |
| continue | |
| if break_collecting: | |
| break | |
| page=doc[page_num] | |
| page_height = page.rect.height | |
| blocks = page.get_text("dict")["blocks"] | |
| for block in blocks: | |
| if break_collecting: | |
| break | |
| lines = block.get("lines", []) | |
| i = 0 | |
| while i < len(lines): | |
| if break_collecting: | |
| break | |
| spans = lines[i].get("spans", []) | |
| if not spans: | |
| i += 1 | |
| continue | |
| y0 = spans[0]["bbox"][1] | |
| y1 = spans[0]["bbox"][3] | |
| if y0 < top_margin or y1 > (page_height - bottom_margin): | |
| i += 1 | |
| continue | |
| line_text = get_spaced_text_from_spans(spans).lower() | |
| line_text_norm = normalize_text(line_text) | |
| # Combine with next line if available | |
| if i + 1 < len(lines): | |
| next_spans = lines[i + 1].get("spans", []) | |
| next_line_text = get_spaced_text_from_spans(next_spans).lower() | |
| combined_line_norm = normalize_text(line_text + " " + next_line_text) | |
| else: | |
| combined_line_norm = line_text_norm | |
| # Check if we should continue processing | |
| if combined_line_norm and combined_line_norm in paths[0]: | |
| headertoContinue1 = combined_line_norm | |
| if combined_line_norm and combined_line_norm in paths[-2]: | |
| headertoContinue2 = combined_line_norm | |
| # if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : | |
| last_path = paths[-2].lower() | |
| # if any(word in paths[-2].lower() for word in keywordstoSkip): | |
| # if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() or 'workmanship' in paths[-2].lower() or 'testing' in paths[-2].lower() or 'labeling' in paths[-2].lower(): | |
| if any(keyword in last_path for keyword in keywords): | |
| stringtowrite='Not to be billed' | |
| logger.info(f"Keyword found. Not to be billed activated. keywords: {keywords}") | |
| else: | |
| stringtowrite='To be billed' | |
| if stringtowrite=='To be billed': | |
| # Alltexttobebilled+= combined_line_norm ################################################# | |
| if matched_header_line_norm in combined_line_norm: | |
| Alltexttobebilled+='\n' | |
| Alltexttobebilled+= ' '+combined_line_norm | |
| # Optimized header matching | |
| existsfull = ( | |
| ( combined_line_norm in allchildrenheaders_set or | |
| combined_line_norm in allchildrenheaders ) and heading_to_search in combined_line_norm | |
| ) | |
| # New word-based matching | |
| current_line_words = set(combined_line_norm.split()) | |
| heading_words = set(heading_norm.split()) | |
| all_words_match = current_line_words.issubset(heading_words) and len(current_line_words) > 0 | |
| substring_match = ( | |
| heading_norm in combined_line_norm or | |
| combined_line_norm in heading_norm or | |
| all_words_match # Include the new word-based matching | |
| ) | |
| # substring_match = ( | |
| # heading_norm in combined_line_norm or | |
| # combined_line_norm in heading_norm | |
| # ) | |
| if (substring_match and existsfull and not collecting and | |
| len(combined_line_norm) > 0 ):#and (headertoContinue1 or headertoContinue2) ): | |
| # Check header conditions more efficiently | |
| # header_spans = [ | |
| # span for span in spans | |
| # if (is_header(span, most_common_font_size, most_common_color, most_common_font) | |
| # # and span['size'] >= subsubheaderFontSize | |
| # and span['size'] < mainHeaderFontSize) | |
| # ] | |
| if stringtowrite.startswith('To') : | |
| collecting = True | |
| # if stringtowrite=='To be billed': | |
| # Alltexttobebilled+='\n' | |
| # matched_header_font_size = max(span["size"] for span in header_spans) | |
| # collected_lines.append(line_text) | |
| valid_spans = [span for span in spans if span.get("bbox")] | |
| if valid_spans: | |
| x0s = [span["bbox"][0] for span in valid_spans] | |
| x1s = [span["bbox"][2] for span in valid_spans] | |
| y0s = [span["bbox"][1] for span in valid_spans] | |
| y1s = [span["bbox"][3] for span in valid_spans] | |
| header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] | |
| if page_num in current_bbox: | |
| cb = current_bbox[page_num] | |
| current_bbox[page_num] = [ | |
| min(cb[0], header_bbox[0]), | |
| min(cb[1], header_bbox[1]), | |
| max(cb[2], header_bbox[2]), | |
| max(cb[3], header_bbox[3]) | |
| ] | |
| else: | |
| current_bbox[page_num] = header_bbox | |
| last_y1s[page_num] = header_bbox[3] | |
| x0, y0, x1, y1 = header_bbox | |
| zoom = 200 | |
| left = int(x0) | |
| top = int(y0) | |
| zoom_str = f"{zoom},{left},{top}" | |
| pageNumberFound = page_num + 1 | |
| # Build the query parameters | |
| params = { | |
| 'pdfLink': pdf_path, # Your PDF link | |
| 'keyword': heading_to_search, # Your keyword (could be a string or list) | |
| } | |
| # URL encode each parameter | |
| encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} | |
| # Construct the final encoded link | |
| encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) | |
| # Correctly construct the final URL with page and zoom | |
| # final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" | |
| # Get current date and time | |
| now = datetime.now() | |
| # Format the output | |
| formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") | |
| # Optionally, add the URL to a DataFrame | |
| data_entry = { | |
| "PDF Name":filename, | |
| "NBSLink": zoom_str, | |
| "Subject": heading_to_search, | |
| "Page": str(pageNumberFound), | |
| "Author": "ADR", | |
| "Creation Date": formatted_time, | |
| "Layer": "Initial", | |
| "Code": stringtowrite, | |
| # "head above 1": paths[-2], | |
| # "head above 2": paths[0], | |
| "BodyText":collected_lines, | |
| "MC Connnection": 'Go to ' + paths[0].strip().split()[0] +'/'+ heading_to_search.strip().split()[0] + ' in '+ filename | |
| } | |
| # Dynamically add "head above 1", "head above 2", ... depending on the number of levels | |
| for i, path_text in enumerate(paths[:-1]): # skip the last one because that's the current heading | |
| data_entry[f"head above {i+1}"] = path_text | |
| data_list_JSON.append(data_entry) | |
| # Convert list to JSON | |
| # json_output = [data_list_JSON] | |
| # json_output = json.dumps(data_list_JSON, indent=4) | |
| i += 2 | |
| continue | |
| else: | |
| if (substring_match and not collecting and | |
| len(combined_line_norm) > 0): # and (headertoContinue1 or headertoContinue2) ): | |
| # Calculate word match percentage | |
| word_match_percent = words_match_ratio(heading_norm, combined_line_norm) * 100 | |
| # Check if at least 70% of header words exist in this line | |
| meets_word_threshold = word_match_percent >= 100 | |
| # Check header conditions (including word threshold) | |
| # header_spans = [ | |
| # span for span in spans | |
| # if (is_header(span, most_common_font_size, most_common_color, most_common_font) | |
| # # and span['size'] >= subsubheaderFontSize | |
| # and span['size'] < mainHeaderFontSize) | |
| # ] | |
| if (meets_word_threshold or same_start_word(heading_to_search, combined_line_norm) ) and stringtowrite.startswith('To'): | |
| collecting = True | |
| if stringtowrite=='To be billed': | |
| Alltexttobebilled+='\n' | |
| # if stringtowrite=='To be billed': | |
| # Alltexttobebilled+= ' '+ combined_line_norm | |
| # matched_header_font_size = max(span["size"] for span in header_spans) | |
| collected_lines.append(line_text) | |
| valid_spans = [span for span in spans if span.get("bbox")] | |
| if valid_spans: | |
| x0s = [span["bbox"][0] for span in valid_spans] | |
| x1s = [span["bbox"][2] for span in valid_spans] | |
| y0s = [span["bbox"][1] for span in valid_spans] | |
| y1s = [span["bbox"][3] for span in valid_spans] | |
| header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] | |
| if page_num in current_bbox: | |
| cb = current_bbox[page_num] | |
| current_bbox[page_num] = [ | |
| min(cb[0], header_bbox[0]), | |
| min(cb[1], header_bbox[1]), | |
| max(cb[2], header_bbox[2]), | |
| max(cb[3], header_bbox[3]) | |
| ] | |
| else: | |
| current_bbox[page_num] = header_bbox | |
| last_y1s[page_num] = header_bbox[3] | |
| x0, y0, x1, y1 = header_bbox | |
| zoom = 200 | |
| left = int(x0) | |
| top = int(y0) | |
| zoom_str = f"{zoom},{left},{top}" | |
| pageNumberFound = page_num + 1 | |
| # Build the query parameters | |
| params = { | |
| 'pdfLink': pdf_path, # Your PDF link | |
| 'keyword': heading_to_search, # Your keyword (could be a string or list) | |
| } | |
| # URL encode each parameter | |
| encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} | |
| # Construct the final encoded link | |
| encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) | |
| # Correctly construct the final URL with page and zoom | |
| # final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" | |
| # Get current date and time | |
| now = datetime.now() | |
| # Format the output | |
| formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") | |
| # Optionally, add the URL to a DataFrame | |
| logger.info(f"Logging into table") | |
| data_entry = { | |
| "PDF Name":filename, | |
| "NBSLink": zoom_str, | |
| "Subject": heading_to_search, | |
| "Page": str(pageNumberFound), | |
| "Author": "ADR", | |
| "Creation Date": formatted_time, | |
| "Layer": "Initial", | |
| "Code": stringtowrite, | |
| # "head above 1": paths[-2], | |
| # "head above 2": paths[0], | |
| "BodyText":collected_lines, | |
| "MC Connnection": 'Go to ' + paths[0].strip().split()[0] +'/'+ heading_to_search.strip().split()[0] + ' in '+ filename | |
| } | |
| # Dynamically add "head above 1", "head above 2", ... depending on the number of levels | |
| for i, path_text in enumerate(paths[:-1]): # skip the last one because that's the current heading | |
| data_entry[f"head above {i+1}"] = path_text | |
| data_list_JSON.append(data_entry) | |
| # Convert list to JSON | |
| # json_output = [data_list_JSON] | |
| # json_output = json.dumps(data_list_JSON, indent=4) | |
| i += 2 | |
| continue | |
| if collecting: | |
| norm_line = normalize_text(line_text) | |
| # Optimized URL check | |
| if url_pattern.match(norm_line): | |
| line_is_header = False | |
| else: | |
| # line_is_header = any(is_header(span, most_common_font_size, most_common_color, most_common_font) for span in spans) | |
| def normalize(text): | |
| return " ".join(text.lower().split()) | |
| line_text = " ".join(span["text"] for span in spans).strip() | |
| line_is_header = any( | |
| normalize(line_text) == normalize(header) | |
| for header in allheaders_LLM | |
| ) | |
| if line_is_header: | |
| header_font_size = max(span["size"] for span in spans) | |
| is_probably_real_header = ( | |
| # header_font_size >= matched_header_font_size and | |
| # is_header(spans[0], most_common_font_size, most_common_color, most_common_font) and | |
| len(line_text.strip()) > 2 | |
| ) | |
| if (norm_line != matched_header_line_norm and | |
| norm_line != heading_norm and | |
| is_probably_real_header): | |
| if line_text not in heading_norm: | |
| collecting = False | |
| done = True | |
| headertoContinue1 = False | |
| headertoContinue2=False | |
| for page_num, bbox in current_bbox.items(): | |
| bbox[3] = last_y1s.get(page_num, bbox[3]) | |
| page_highlights[page_num] = bbox | |
| highlight_boxes(docHighlights, page_highlights,stringtowrite) | |
| break_collecting = True | |
| break | |
| if break_collecting: | |
| break | |
| collected_lines.append(line_text) | |
| valid_spans = [span for span in spans if span.get("bbox")] | |
| if valid_spans: | |
| x0s = [span["bbox"][0] for span in valid_spans] | |
| x1s = [span["bbox"][2] for span in valid_spans] | |
| y0s = [span["bbox"][1] for span in valid_spans] | |
| y1s = [span["bbox"][3] for span in valid_spans] | |
| line_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] | |
| if page_num in current_bbox: | |
| cb = current_bbox[page_num] | |
| current_bbox[page_num] = [ | |
| min(cb[0], line_bbox[0]), | |
| min(cb[1], line_bbox[1]), | |
| max(cb[2], line_bbox[2]), | |
| max(cb[3], line_bbox[3]) | |
| ] | |
| else: | |
| current_bbox[page_num] = line_bbox | |
| last_y1s[page_num] = line_bbox[3] | |
| i += 1 | |
| if not done: | |
| for page_num, bbox in current_bbox.items(): | |
| bbox[3] = last_y1s.get(page_num, bbox[3]) | |
| page_highlights[page_num] = bbox | |
| if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : | |
| stringtowrite='Not to be billed' | |
| else: | |
| stringtowrite='To be billed' | |
| highlight_boxes(docHighlights, page_highlights,stringtowrite) | |
| docarray.append(docHighlights) | |
| if data_list_JSON and not data_list_JSON[-1]["BodyText"] and collected_lines: | |
| data_list_JSON[-1]["BodyText"] = collected_lines[1:] if len(collected_lines) > 0 else [] | |
| # Final cleanup of the JSON data before returning | |
| for entry in data_list_JSON: | |
| # Check if BodyText exists and has content | |
| if isinstance(entry.get("BodyText"), list) and len(entry["BodyText"]) > 0: | |
| # Check if the first line of the body is essentially the same as the Subject | |
| first_line = normalize_text(entry["BodyText"][0]) | |
| subject = normalize_text(entry["Subject"]) | |
| # If they match or the subject is inside the first line, remove it | |
| if subject in first_line or first_line in subject: | |
| entry["BodyText"] = entry["BodyText"][1:] | |
| jsons.append(data_list_JSON) | |
| logger.info(f"Markups done! Uploading to dropbox") | |
| logger.info(f"Uploaded and Readyy!") | |
| return jsons,identified_headers | |
| def testFunction(pdf_path, model,LLM_prompt): | |
| Alltexttobebilled='' | |
| alltextWithoutNotbilled='' | |
| # keywordstoSkip=["installation", "execution", "miscellaneous items", "workmanship", "testing", "labeling"] | |
| headertoContinue1 = False | |
| headertoContinue2=False | |
| parsed_url = urlparse(pdf_path) | |
| filename = os.path.basename(parsed_url.path) | |
| filename = unquote(filename) # decode URL-encoded characters | |
| # Optimized URL handling | |
| if pdf_path and ('http' in pdf_path or 'dropbox' in pdf_path): | |
| pdf_path = pdf_path.replace('dl=0', 'dl=1') | |
| # Cache frequently used values | |
| response = requests.get(pdf_path) | |
| pdf_content = BytesIO(response.content) | |
| if not pdf_content: | |
| raise ValueError("No valid PDF content found.") | |
| doc = fitz.open(stream=pdf_content, filetype="pdf") | |
| docHighlights = fitz.open(stream=pdf_content, filetype="pdf") | |
| parsed_url = urlparse(pdf_path) | |
| filename = os.path.basename(parsed_url.path) | |
| filename = unquote(filename) # decode URL-encoded characters | |
| #### Get regular tex font size, style , color | |
| most_common_font_size, most_common_color, most_common_font = get_regular_font_size_and_color(doc) | |
| # Precompute regex patterns | |
| dot_pattern = re.compile(r'\.{3,}') | |
| url_pattern = re.compile(r'https?://\S+|www\.\S+') | |
| highlighted=[] | |
| processed_subjects = set() # Initialize at the top of testFunction | |
| toc_pages = get_toc_page_numbers(doc) | |
| identified_headers=process_document_in_chunks(len(doc), pdf_path, LLM_prompt, model) | |
| # identified_headers = identify_headers_with_openrouterNEWW(doc, api_key='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')# ['text', fontsize, page number,y] | |
| # with open("identified_headers.txt", "w", encoding="utf-8") as f: | |
| # json.dump(identified_headers, f, indent=4) | |
| # with open("identified_headers.txt", "r", encoding="utf-8") as f: | |
| # identified_headers = json.load(f) | |
| print(identified_headers) | |
| allheaders_LLM=[] | |
| for h in identified_headers: | |
| if int(h["page"]) in toc_pages: | |
| continue | |
| if h['text']: | |
| allheaders_LLM.append(h['text']) | |
| headers_json=headers_with_location(doc,identified_headers) | |
| headers=filter_headers_outside_toc(headers_json,toc_pages) | |
| hierarchy=build_hierarchy_from_llm(headers) | |
| # identify_headers_and_save_excel(hierarchy) | |
| listofHeaderstoMarkup = get_leaf_headers_with_paths(hierarchy) | |
| allchildrenheaders = [normalize_text(item['text']) for item, p in listofHeaderstoMarkup] | |
| allchildrenheaders_set = set(allchildrenheaders) # For faster lookups | |
| # print('allchildrenheaders_set',allchildrenheaders_set) | |
| df = pd.DataFrame(columns=["NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2",'BodyText']) | |
| dictionaryNBS={} | |
| data_list_JSON = [] | |
| for heading_to_searchDict,pathss in listofHeaderstoMarkup: | |
| heading_to_search = heading_to_searchDict['text'] | |
| heading_to_searchPageNum = heading_to_searchDict['page'] | |
| paths=heading_to_searchDict['path'] | |
| # xloc=heading_to_searchDict['x'] | |
| yloc=heading_to_searchDict['y'] | |
| # Initialize variables | |
| headertoContinue1 = False | |
| headertoContinue2 = False | |
| matched_header_line = None | |
| done = False | |
| collecting = False | |
| collected_lines = [] | |
| page_highlights = {} | |
| current_bbox = {} | |
| last_y1s = {} | |
| mainHeader = '' | |
| subHeader = '' | |
| matched_header_line_norm = heading_to_search | |
| break_collecting = False | |
| heading_norm = normalize_text(heading_to_search) | |
| paths_norm = [normalize_text(p) for p in paths[0]] if paths and paths[0] else [] | |
| for page_num in range(heading_to_searchPageNum,len(doc)): | |
| if page_num in toc_pages: | |
| continue | |
| if break_collecting: | |
| break | |
| page=doc[page_num] | |
| page_height = page.rect.height | |
| blocks = page.get_text("dict")["blocks"] | |
| for block in blocks: | |
| if break_collecting: | |
| break | |
| lines = block.get("lines", []) | |
| i = 0 | |
| while i < len(lines): | |
| if break_collecting: | |
| break | |
| spans = lines[i].get("spans", []) | |
| if not spans: | |
| i += 1 | |
| continue | |
| # y0 = spans[0]["bbox"][1] | |
| # y1 = spans[0]["bbox"][3] | |
| x0 = spans[0]["bbox"][0] # left | |
| x1 = spans[0]["bbox"][2] # right | |
| y0 = spans[0]["bbox"][1] # top | |
| y1 = spans[0]["bbox"][3] # bottom | |
| if y0 < top_margin or y1 > (page_height - bottom_margin): | |
| i += 1 | |
| continue | |
| line_text = get_spaced_text_from_spans(spans).lower() | |
| line_text_norm = normalize_text(line_text) | |
| # Combine with next line if available | |
| if i + 1 < len(lines): | |
| next_spans = lines[i + 1].get("spans", []) | |
| next_line_text = get_spaced_text_from_spans(next_spans).lower() | |
| combined_line_norm = normalize_text(line_text + " " + next_line_text) | |
| else: | |
| combined_line_norm = line_text_norm | |
| # Check if we should continue processing | |
| if combined_line_norm and combined_line_norm in paths[0]: | |
| headertoContinue1 = combined_line_norm | |
| if combined_line_norm and combined_line_norm in paths[-2]: | |
| headertoContinue2 = combined_line_norm | |
| # print('paths',paths) | |
| # if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : | |
| # if any(word in paths[-2].lower() for word in keywordstoSkip): | |
| # stringtowrite='Not to be billed' | |
| # else: | |
| stringtowrite='To be billed' | |
| if stringtowrite!='To be billed': | |
| alltextWithoutNotbilled+= combined_line_norm ################################################# | |
| # Optimized header matching | |
| existsfull = ( | |
| ( combined_line_norm in allchildrenheaders_set or | |
| combined_line_norm in allchildrenheaders ) and heading_to_search in combined_line_norm | |
| ) | |
| # existsfull=False | |
| # if xloc==x0 and yloc ==y0: | |
| # existsfull=True | |
| # New word-based matching | |
| current_line_words = set(combined_line_norm.split()) | |
| heading_words = set(heading_norm.split()) | |
| all_words_match = current_line_words.issubset(heading_words) and len(current_line_words) > 0 | |
| substring_match = ( | |
| heading_norm in combined_line_norm or | |
| combined_line_norm in heading_norm or | |
| all_words_match # Include the new word-based matching | |
| ) | |
| # substring_match = ( | |
| # heading_norm in combined_line_norm or | |
| # combined_line_norm in heading_norm | |
| # ) | |
| if ( substring_match and existsfull and not collecting and | |
| len(combined_line_norm) > 0 ):#and (headertoContinue1 or headertoContinue2) ): | |
| # Check header conditions more efficiently | |
| # header_spans = [ | |
| # span for span in spans | |
| # if (is_header(span, most_common_font_size, most_common_color, most_common_font) ) | |
| # # and span['size'] >= subsubheaderFontSize | |
| # # and span['size'] < mainHeaderFontSize) | |
| # ] | |
| if stringtowrite.startswith('To'): | |
| collecting = True | |
| # matched_header_font_size = max(span["size"] for span in header_spans) | |
| Alltexttobebilled+= ' '+ combined_line_norm | |
| # collected_lines.append(line_text) | |
| valid_spans = [span for span in spans if span.get("bbox")] | |
| if valid_spans: | |
| x0s = [span["bbox"][0] for span in valid_spans] | |
| x1s = [span["bbox"][2] for span in valid_spans] | |
| y0s = [span["bbox"][1] for span in valid_spans] | |
| y1s = [span["bbox"][3] for span in valid_spans] | |
| header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] | |
| if page_num in current_bbox: | |
| cb = current_bbox[page_num] | |
| current_bbox[page_num] = [ | |
| min(cb[0], header_bbox[0]), | |
| min(cb[1], header_bbox[1]), | |
| max(cb[2], header_bbox[2]), | |
| max(cb[3], header_bbox[3]) | |
| ] | |
| else: | |
| current_bbox[page_num] = header_bbox | |
| last_y1s[page_num] = header_bbox[3] | |
| x0, y0, x1, y1 = header_bbox | |
| zoom = 200 | |
| left = int(x0) | |
| top = int(y0) | |
| zoom_str = f"{zoom},{left},{top}" | |
| pageNumberFound = page_num + 1 | |
| # Build the query parameters | |
| params = { | |
| 'pdfLink': pdf_path, # Your PDF link | |
| 'keyword': heading_to_search, # Your keyword (could be a string or list) | |
| } | |
| # URL encode each parameter | |
| encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} | |
| # Construct the final encoded link | |
| encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) | |
| # Correctly construct the final URL with page and zoom | |
| # final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" | |
| # Get current date and time | |
| now = datetime.now() | |
| # Format the output | |
| formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") | |
| # Optionally, add the URL to a DataFrame | |
| # Create the data entry only if the subject is unique | |
| if heading_to_search not in processed_subjects: | |
| data_entry = { | |
| "NBSLink": zoom_str, | |
| "Subject": heading_to_search, | |
| "Page": str(pageNumberFound), | |
| "Author": "ADR", | |
| "Creation Date": formatted_time, | |
| "Layer": "Initial", | |
| "Code": stringtowrite, | |
| "BodyText": collected_lines, | |
| "MC Connnection": 'Go to ' + paths[0].strip().split()[0] + '/' + heading_to_search.strip().split()[0] + ' in ' + filename | |
| } | |
| # Dynamically add hierarchy paths | |
| for i, path_text in enumerate(paths[:-1]): | |
| data_entry[f"head above {i+1}"] = path_text | |
| # Append to the list and mark this subject as processed | |
| data_list_JSON.append(data_entry) | |
| processed_subjects.add(heading_to_search) | |
| else: | |
| print(f"Skipping duplicate data entry for Subject: {heading_to_search}") | |
| # Convert list to JSON | |
| json_output = json.dumps(data_list_JSON, indent=4) | |
| i += 1 | |
| continue | |
| else: | |
| if (substring_match and not collecting and | |
| len(combined_line_norm) > 0): # and (headertoContinue1 or headertoContinue2) ): | |
| # Calculate word match percentage | |
| word_match_percent = words_match_ratio(heading_norm, combined_line_norm) * 100 | |
| # Check if at least 70% of header words exist in this line | |
| meets_word_threshold = word_match_percent >= 100 | |
| # Check header conditions (including word threshold) | |
| # header_spans = [ | |
| # span for span in spans | |
| # if (is_header(span, most_common_font_size, most_common_color, most_common_font)) | |
| # # and span['size'] >= subsubheaderFontSize | |
| # # and span['size'] < mainHeaderFontSize) | |
| # ] | |
| if (meets_word_threshold or same_start_word(heading_to_search, combined_line_norm) ) and stringtowrite.startswith('To'): | |
| collecting = True | |
| # matched_header_font_size = max(span["size"] for span in header_spans) | |
| Alltexttobebilled+= ' '+ combined_line_norm | |
| collected_lines.append(line_text) | |
| valid_spans = [span for span in spans if span.get("bbox")] | |
| if valid_spans: | |
| x0s = [span["bbox"][0] for span in valid_spans] | |
| x1s = [span["bbox"][2] for span in valid_spans] | |
| y0s = [span["bbox"][1] for span in valid_spans] | |
| y1s = [span["bbox"][3] for span in valid_spans] | |
| header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] | |
| if page_num in current_bbox: | |
| cb = current_bbox[page_num] | |
| current_bbox[page_num] = [ | |
| min(cb[0], header_bbox[0]), | |
| min(cb[1], header_bbox[1]), | |
| max(cb[2], header_bbox[2]), | |
| max(cb[3], header_bbox[3]) | |
| ] | |
| else: | |
| current_bbox[page_num] = header_bbox | |
| last_y1s[page_num] = header_bbox[3] | |
| x0, y0, x1, y1 = header_bbox | |
| zoom = 200 | |
| left = int(x0) | |
| top = int(y0) | |
| zoom_str = f"{zoom},{left},{top}" | |
| pageNumberFound = page_num + 1 | |
| # Build the query parameters | |
| params = { | |
| 'pdfLink': pdf_path, # Your PDF link | |
| 'keyword': heading_to_search, # Your keyword (could be a string or list) | |
| } | |
| # URL encode each parameter | |
| encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} | |
| # Construct the final encoded link | |
| encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) | |
| # Correctly construct the final URL with page and zoom | |
| # final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" | |
| # Get current date and time | |
| now = datetime.now() | |
| # Format the output | |
| formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") | |
| # Optionally, add the URL to a DataFrame | |
| # Create the data entry only if the subject is unique | |
| if heading_to_search not in processed_subjects: | |
| data_entry = { | |
| "NBSLink": zoom_str, | |
| "Subject": heading_to_search, | |
| "Page": str(pageNumberFound), | |
| "Author": "ADR", | |
| "Creation Date": formatted_time, | |
| "Layer": "Initial", | |
| "Code": stringtowrite, | |
| "BodyText": collected_lines, | |
| "MC Connnection": 'Go to ' + paths[0].strip().split()[0] + '/' + heading_to_search.strip().split()[0] + ' in ' + filename | |
| } | |
| # Dynamically add hierarchy paths | |
| for i, path_text in enumerate(paths[:-1]): | |
| data_entry[f"head above {i+1}"] = path_text | |
| # Append to the list and mark this subject as processed | |
| data_list_JSON.append(data_entry) | |
| processed_subjects.add(heading_to_search) | |
| else: | |
| print(f"Skipping duplicate data entry for Subject: {heading_to_search}") | |
| # Convert list to JSON | |
| json_output = json.dumps(data_list_JSON, indent=4) | |
| i += 2 | |
| continue | |
| if collecting: | |
| norm_line = normalize_text(line_text) | |
| def normalize(text): | |
| if isinstance(text, list): | |
| text = " ".join(text) | |
| return " ".join(text.lower().split()) | |
| def is_similar(a, b, threshold=0.75): | |
| return SequenceMatcher(None, a, b).ratio() >= threshold | |
| # Optimized URL check | |
| if url_pattern.match(norm_line): | |
| line_is_header = False | |
| else: | |
| line_is_header = any(is_header(span, most_common_font_size, most_common_color, most_common_font,allheaders_LLM) for span in spans) | |
| # def normalize(text): | |
| # return " ".join(text.lower().split()) | |
| # line_text = " ".join(span["text"] for span in spans).strip() | |
| # line_is_header = any( normalize(line_text) == normalize(header) for header in allheaders_LLM ) | |
| # for line_text in lines: | |
| # if collecting: | |
| # # Join all spans into one line | |
| # line_text = " ".join(span["text"] for span in spans).strip() | |
| # norm_line = normalize(line_text) | |
| # # Get max font size in this line | |
| # max_font_size = max(span.get("size", 0) for span in spans) | |
| # # Skip URLs | |
| # if url_pattern.match(norm_line): | |
| # line_is_header = False | |
| # else: | |
| # text_matches_header = any( | |
| # is_similar(norm_line, normalize(header)) | |
| # if not isinstance(header, list) | |
| # else is_similar(norm_line, normalize(" ".join(header))) | |
| # for header in allheaders_LLM | |
| # ) | |
| # # ✅ FINAL header condition | |
| # line_is_header = text_matches_header and max_font_size > 11 | |
| if line_is_header: | |
| header_font_size = max(span["size"] for span in spans) | |
| is_probably_real_header = ( | |
| # header_font_size >= matched_header_font_size and | |
| # is_header(spans[0], most_common_font_size, most_common_color, most_common_font) and | |
| len(line_text.strip()) > 2 | |
| ) | |
| if (norm_line != matched_header_line_norm and | |
| norm_line != heading_norm and | |
| is_probably_real_header): | |
| if line_text not in heading_norm: | |
| collecting = False | |
| done = True | |
| headertoContinue1 = False | |
| headertoContinue2=False | |
| for page_num, bbox in current_bbox.items(): | |
| bbox[3] = last_y1s.get(page_num, bbox[3]) | |
| page_highlights[page_num] = bbox | |
| can_highlight=False | |
| if [page_num,bbox] not in highlighted: | |
| highlighted.append([page_num,bbox]) | |
| can_highlight=True | |
| if can_highlight: | |
| highlight_boxes(docHighlights, page_highlights,stringtowrite) | |
| break_collecting = True | |
| break | |
| if break_collecting: | |
| break | |
| collected_lines.append(line_text) | |
| valid_spans = [span for span in spans if span.get("bbox")] | |
| if valid_spans: | |
| x0s = [span["bbox"][0] for span in valid_spans] | |
| x1s = [span["bbox"][2] for span in valid_spans] | |
| y0s = [span["bbox"][1] for span in valid_spans] | |
| y1s = [span["bbox"][3] for span in valid_spans] | |
| line_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] | |
| if page_num in current_bbox: | |
| cb = current_bbox[page_num] | |
| current_bbox[page_num] = [ | |
| min(cb[0], line_bbox[0]), | |
| min(cb[1], line_bbox[1]), | |
| max(cb[2], line_bbox[2]), | |
| max(cb[3], line_bbox[3]) | |
| ] | |
| else: | |
| current_bbox[page_num] = line_bbox | |
| last_y1s[page_num] = line_bbox[3] | |
| i += 1 | |
| if not done: | |
| for page_num, bbox in current_bbox.items(): | |
| bbox[3] = last_y1s.get(page_num, bbox[3]) | |
| page_highlights[page_num] = bbox | |
| # if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : | |
| # stringtowrite='Not to be billed' | |
| # else: | |
| stringtowrite='To be billed' | |
| highlight_boxes(docHighlights, page_highlights,stringtowrite) | |
| print("Current working directory:", os.getcwd()) | |
| docHighlights.save("highlighted_output.pdf") | |
| # dbxTeam = tsadropboxretrieval.ADR_Access_DropboxTeam('user') | |
| # metadata = dbxTeam.sharing_get_shared_link_metadata(pdf_path) | |
| # dbPath = '/TSA JOBS/ADR Test/FIND/' | |
| # pdf_bytes = BytesIO() | |
| # docHighlights.save(pdf_bytes) | |
| # pdflink = tsadropboxretrieval.uploadanyFile(doc=docHighlights, path=dbPath, pdfname=filename) | |
| # json_output=changepdflinks(json_output,pdflink) | |
| # return pdf_bytes.getvalue(), docHighlights , json_output , Alltexttobebilled , alltextWithoutNotbilled , filename | |
| # Final safety check: if the very last entry in our list has an empty BodyText, | |
| # but we have collected_lines, sync them. | |
| if data_list_JSON and not data_list_JSON[-1]["BodyText"] and collected_lines: | |
| data_list_JSON[-1]["BodyText"] = collected_lines[1:] if len(collected_lines) > 0 else [] | |
| # Final cleanup of the JSON data before returning | |
| for entry in data_list_JSON: | |
| # Check if BodyText exists and has content | |
| if isinstance(entry.get("BodyText"), list) and len(entry["BodyText"]) > 0: | |
| # Check if the first line of the body is essentially the same as the Subject | |
| first_line = normalize_text(entry["BodyText"][0]) | |
| subject = normalize_text(entry["Subject"]) | |
| # If they match or the subject is inside the first line, remove it | |
| if subject in first_line or first_line in subject: | |
| entry["BodyText"] = entry["BodyText"][1:] | |
| print('data_list_JSON',data_list_JSON) | |
| # json_output.append(data_list_JSON) | |
| json_output = json.dumps(data_list_JSON, indent=4) | |
| logger.info(f"Markups done! Uploading to dropbox") | |
| logger.info(f"Uploaded and Readyy!") | |
| return json_output,identified_headers | |
| def build_subject_body_map(jsons): | |
| subject_body = {} | |
| for obj in jsons: | |
| subject = obj.get("Subject") | |
| body = obj.get("BodyText", []) | |
| if subject: | |
| # join body text into a readable paragraph | |
| subject_body[subject.strip()] = " ".join(body) | |
| return subject_body | |
| # def identify_headers_and_save_excel(pdf_path, model,LLM_prompt): | |
| # try: | |
| # # result = identify_headers_with_openrouterNEWW(pdf_path, model,LLM_prompt) | |
| # print('beginnging identify') | |
| # jsons,result = testFunction(pdf_path, model,LLM_prompt) | |
| # print('done , will start dataframe',jsons,result) | |
| # if not result: | |
| # df = pd.DataFrame([{ | |
| # "text": None, | |
| # "page": None, | |
| # "suggested_level": None, | |
| # "confidence": None, | |
| # "body": None, | |
| # "System Message": "No headers were identified by the LLM." | |
| # }]) | |
| # else: | |
| # df = pd.DataFrame(result) | |
| # subject_body_map = {} | |
| # # Safely navigate the nested structure: [ [ [ {dict}, {dict} ] ] ] | |
| # for pdf_level in jsons: | |
| # if not isinstance(pdf_level, list): | |
| # continue | |
| # for section_level in pdf_level: | |
| # # If the LLM returns a list of dictionaries here | |
| # if isinstance(section_level, list): | |
| # for obj in section_level: | |
| # if isinstance(obj, dict): | |
| # subject = obj.get("Subject") | |
| # body = obj.get("BodyText", []) | |
| # if subject: | |
| # # Ensure body is a list before joining | |
| # body_str = " ".join(body) if isinstance(body, list) else str(body) | |
| # subject_body_map[subject.strip()] = body_str | |
| # # If the LLM returns a single dictionary here | |
| # elif isinstance(section_level, dict): | |
| # subject = section_level.get("Subject") | |
| # body = section_level.get("BodyText", []) | |
| # if subject: | |
| # body_str = " ".join(body) if isinstance(body, list) else str(body) | |
| # subject_body_map[subject.strip()] = body_str | |
| # # Map the extracted body text to the "text" column in your main DataFrame | |
| # if "text" in df.columns: | |
| # df["body"] = df["text"].map(lambda x: subject_body_map.get(str(x).strip()) if x else None) | |
| # else: | |
| # df["body"] = None | |
| # # Save to Excel | |
| # output_path = os.path.abspath("header_analysis_output.xlsx") | |
| # df.to_excel(output_path, index=False, engine="openpyxl") | |
| # print("--- Processed DataFrame ---") | |
| # print(df) | |
| # return output_path | |
| # except Exception as e: | |
| # print(f"ERROR - Critical error in processing: {e}") | |
| # # Re-raise or handle as needed | |
| # return None | |
| def identify_headers_and_save_excel(pdf_path, model,LLM_prompt): | |
| try: | |
| jsons, result = testFunction(pdf_path, model,LLM_prompt) | |
| if not result: | |
| df = pd.DataFrame([{ | |
| "text": None, | |
| "page": None, | |
| "suggested_level": None, | |
| "confidence": None, | |
| "body": None, | |
| "System Message": "No headers were identified by the LLM." | |
| }]) | |
| else: | |
| print('here') | |
| df = pd.DataFrame(result) | |
| # Convert JSON string to list if needed | |
| if isinstance(jsons, str): | |
| jsons = json.loads(jsons) | |
| subject_body_map = {} | |
| # ✅ jsons is a flat list of dicts | |
| for obj in jsons: | |
| if not isinstance(obj, dict): | |
| continue | |
| subject = obj.get("Subject") | |
| body = obj.get("BodyText", []) | |
| if subject: | |
| subject_body_map[subject.strip()] = " ".join(body) | |
| # ✅ Map body to dataframe | |
| df["body"] = df["text"].map(subject_body_map).fillna("") | |
| # ✅ Save once at end | |
| output_path = os.path.abspath("header_analysis_output.xlsx") | |
| df.to_excel(output_path, index=False, engine="openpyxl") | |
| print("--- Processed DataFrame ---") | |
| print(df) | |
| return output_path | |
| except Exception as e: | |
| logger.error(f"Critical error in processing: {str(e)}") | |
| return None | |
| # Improved launch with debug mode enabled | |
| iface = gr.Interface( | |
| fn=identify_headers_and_save_excel, | |
| inputs=[ | |
| gr.Textbox(label="PDF URL"), | |
| gr.Textbox(label="Model Type"), # Default example | |
| gr.Textbox(label="LLM Prompt") | |
| ], | |
| outputs=gr.File(label="Download Excel Results"), | |
| title="PDF Header Extractor" | |
| ) | |
| # Launch with debug=True to see errors in the console | |
| iface.launch(debug=True) |