# -*- coding: utf-8 -*- """Copy of FindSpecsTrial(Retrieving+boundingBoxes)-InitialMarkups(ALL)_CleanedUp.ipynb Automatically generated by Colab. Original file is located at https://colab.research.google.com/drive/12XfVkmKmN3oVjHhLVE0_GgkftgArFEK2 """ baselink='https://adr.trevorsadd.co.uk/api/view-pdf?' newlink='https://adr.trevorsadd.co.uk/api/view-highlight?' tobebilledonlyLink='https://adr.trevorsadd.co.uk/api/view-pdf-tobebilled?' import time from datetime import datetime, timezone from difflib import SequenceMatcher from urllib.parse import urlparse, unquote import os from io import BytesIO import re import requests import pandas as pd import fitz # PyMuPDF import re import urllib.parse import pandas as pd import math import random import json from datetime import datetime from collections import defaultdict, Counter import difflib from fuzzywuzzy import fuzz import copy import json # import tsadropboxretrieval import urllib.parse import logging # Set up logging to see everything logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), # Print to console logging.FileHandler('debug.log', mode='w') # Save to file ] ) logger = logging.getLogger(__name__) top_margin = 50 bottom_margin = 75 def changepdflinks(json_data, pdf_path): print('ll , ' ,json_data,pdf_path) # base_viewer_link = "https://findconsole-initialmarkups.hf.space/view-pdf?" updated_json = [] for entry in json_data: # Extract needed fields zoom_str = entry.get("NBSLink", "") page_str=entry.get("Page","") # Encode the pdf link safely for URL usage encoded_pdf_link = urllib.parse.quote(pdf_path, safe='') # Construct the final link final_url = f"{baselink}pdfLink={encoded_pdf_link}#page={str(page_str)}&zoom={zoom_str}" # Replace the old NBSLink value with the full URL entry["NBSLink"] = final_url updated_json.append(entry) return updated_json import unicodedata import re def normalize_text(text): if not text: return "" text = unicodedata.normalize("NFKC", text) text = text.replace("\u00a0", " ") # non-breaking space text = re.sub(r"-\s*\n\s*", "", text) # de-hyphenation text = re.sub(r"\s+", " ", text) # collapse whitespace return text.strip().lower() import fitz # PyMuPDF # def getLocation_of_header(doc, headerText, expected_page): # locations = [] # if expected_page < 0 or expected_page >= doc.page_count: # raise ValueError(f"Invalid page number: {expected_page}") # normalized_header = normalize_text(headerText) # page = doc.load_page(expected_page) # page_height = page.rect.height # page_text = page.get_text("text") # normalized_page_text = normalize_text(page_text) # # ❌ Header not on this page at all # if normalized_header not in normalized_page_text: # return [] # rects = page.search_for(headerText) # # Fallback: case-insensitive # if not rects: # rects = page.search_for( # headerText, # flags=fitz.TEXT_IGNORECASE # ) # for r in rects: # y = r.y0 # if y <= top_margin: # continue # if y >= page_height - bottom_margin: # continue # locations.append({ # "headerText": headerText, # "page": expected_page, # "x": r.x0, # "y": y # }) # # ✅ STOP HERE — do not keep searching # print(locations) # return locations def getLocation_of_header(doc, headerText, expected_page=None): locations = [] # pages = ( # [(expected_page, doc.load_page(expected_page))] # if expected_page is not None # else enumerate(doc) # ) expectedpageNorm=expected_page page=doc[expectedpageNorm] # for page_number, page in pages: page_height = page.rect.height rects = page.search_for(headerText) for r in rects: y = r.y0 # Skip headers in top or bottom margin if y <= top_margin: continue if y >= page_height - bottom_margin: continue locations.append({ "headerText":headerText, "page": expectedpageNorm, "x": r.x0, "y": y }) return locations def filter_headers_outside_toc(headers, toc_pages): toc_pages_set = set(toc_pages) filtered = [] for h in headers: page = h[2] y = h[3] # Skip invalid / fallback headers if page is None or y is None: continue # Skip headers inside TOC pages if page in toc_pages_set: continue filtered.append(h) return filtered def headers_with_location(doc, llm_headers): """ Converts LLM headers into: [text, font_size, page, y, suggested_level, confidence] Always include all headers, even if location not found. """ headersJson = [] for h in llm_headers: text = h["text"] llm_page = h["page"] # Attempt to locate the header on the page locations = getLocation_of_header(doc, text,llm_page) if locations: for loc in locations: page = doc.load_page(loc["page"]) fontsize = None for block in page.get_text("dict")["blocks"]: if block.get("type") != 0: continue for line in block.get("lines", []): line_text = "".join(span["text"] for span in line["spans"]).strip() if normalize(line_text) == normalize(text): fontsize = line["spans"][0]["size"] break if fontsize: break entry = [ text, fontsize, loc["page"], loc["y"], loc["x"], ################## added x ################# h["suggested_level"], ] if entry not in headersJson: headersJson.append(entry) return headersJson # def build_hierarchy_from_llm(headers): # nodes = [] # for h in headers: # # if len(h) != 6: # # continue # print('headerrrrrrrrrrrrrrr',h) # text, size, page, y, level = h # if level is None: # continue # try: # level = int(level) # except Exception: # continue # node = { # "text": text, # "page": page if page is not None else -1, # "y": y if y is not None else -1, # "size": size, # "bold": False, # "color": None, # "font": None, # "children": [], # "is_numbered": is_numbered(text), # "original_size": size, # "norm_text": normalize(text), # "level": level, # } # nodes.append(node) # nodes.sort(key=lambda x: (x["page"], x["y"])) # # print('nodes',nodes) # root = [] # stack = [] # added_level0 = set() # for header in nodes: # if header['level'] < 0: # continue # if header['level'] == 0: # norm_text = header['norm_text'] # if norm_text in added_level0: # continue # added_level0.add(norm_text) # while stack and stack[-1]['level'] >= header['level']: # stack.pop() # parent = stack[-1] if stack else None # if parent: # header["path"] = parent["path"] + [header["norm_text"]] # parent["children"].append(header) # else: # header["path"] = [header["norm_text"]] # root.append(header) # stack.append(header) # def enforce_nesting(node_list, parent_level=-1): # for node in node_list: # if node['level'] <= parent_level: # node['level'] = parent_level + 1 # enforce_nesting(node['children'], node['level']) # enforce_nesting(root) # root = [h for h in root if not (h['level'] == 0 and not h['children'])] # header_tree = enforce_level_hierarchy(root) # return header_tree # <-- RETURN ONLY THE TREE, no extra array def build_hierarchy_from_llm(headers): nodes = [] # ------------------------- # 1. Build nodes safely # ------------------------- for h in headers: # print("headerrrrrrrrrrrrrrr", h) if len(h) < 5: continue text, size, page, y,x, level = h if level is None: continue try: level = int(level) except Exception: continue node = { "text": text, "page": page if page is not None else -1, "y": y if y is not None else -1, "x": x if x is not None else -1, "size": size, "bold": False, "color": None, "font": None, "children": [], "is_numbered": is_numbered(text), "original_size": size, "norm_text": normalize(text), "level": level, } nodes.append(node) if not nodes: return [] # ------------------------- # 2. Sort top-to-bottom # ------------------------- nodes.sort(key=lambda x: (x["page"], x["y"])) # ------------------------- # 3. NORMALIZE LEVELS # (smallest level → 0) # ------------------------- min_level = min(n["level"] for n in nodes) for n in nodes: n["level"] -= min_level # ------------------------- # 4. Build hierarchy # ------------------------- root = [] stack = [] added_level0 = set() for header in nodes: lvl = header["level"] if lvl < 0: continue # De-duplicate true top-level headers if lvl == 0: key = (header["norm_text"], header["page"]) if key in added_level0: continue added_level0.add(key) while stack and stack[-1]["level"] >= lvl: stack.pop() parent = stack[-1] if stack else None if parent: header["path"] = parent["path"] + [header["norm_text"]] parent["children"].append(header) else: header["path"] = [header["norm_text"]] root.append(header) stack.append(header) # ------------------------- # 5. Enforce nesting sanity # ------------------------- def enforce_nesting(node_list, parent_level=-1): for node in node_list: if node["level"] <= parent_level: node["level"] = parent_level + 1 enforce_nesting(node["children"], node["level"]) enforce_nesting(root) # ------------------------- # 6. OPTIONAL cleanup # (only if real level-0s exist) # ------------------------- if any(h["level"] == 0 for h in root): root = [ h for h in root if not (h["level"] == 0 and not h["children"]) ] # ------------------------- # 7. Final pass # ------------------------- header_tree = enforce_level_hierarchy(root) return header_tree import ast # identified_headers_str = """[ # { # "text": "DENEIA PRIMARY ACADEMY", # "page": 4, # "suggested_level": 1, # }, # { # "text": "ARCHITECTURAL SPECIFICATION", # "page": 4, # "suggested_level": 2, # }, # { # "text": "Client: CAMBRIDGESHIRE COUNTY COUNCIL", # "page": 4, # "suggested_level": 3, # }, # { # "text": "R10 Rainwater drainage systems", # "page": 4, # "suggested_level": 4, # }, # { # "text": "Z10 Purpose made joinery", # "page": 4, # "suggested_level": 4, # }, # { # "text": "Z11 Purpose made metalwork", # "page": 4, # "suggested_level": 4, # }, # { # "text": "Z20 Fixings and adhesives", # "page": 4, # "suggested_level": 4, # }, # { # "text": "Z21 Mortars", # "page": 4, # "suggested_level": 4, # }, # { # "text": "Z22 Sealants", # "page": 4, # "suggested_level": 4, # }, # { # "text": "Z31 Powder coatings", # "page": 4, # "suggested_level": 4, # }, # { # "text": "B91", # "page": 5, # "suggested_level": 1, # }, # { # "text": "Buildings in the landscape", # "page": 5, # "suggested_level": 2, # }, # { # "text": "System outline", # "page": 5, # "suggested_level": 3, # }, # { # "text": "105 Proprietary small buildings/ structures", # "page": 5, # "suggested_level": 4, # }, # { # "text": "Products", # "page": 5, # "suggested_level": 3, # }, # { # "text": "330 Semi-closed shelters", # "page": 5, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Materials", # "page": 5, # "suggested_level": 3, # "confidence": 1.0 # }, # { # "text": "Aluminium to Canopies", # "page": 5, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Execution/ erection/ installation", # "page": 6, # "suggested_level": 3, # "confidence": 1.0 # }, # { # "text": "Canopies: Erection/ installation generally", # "page": 6, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Canopies: Jointing/ Fixing generally", # "page": 6, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Canopies: Concrete foundations generally", # "page": 6, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Canopies: Erection of prefabricated buildings/ structures", # "page": 6, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Canopies: Making good galvanized surfaces", # "page": 6, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Completion", # "page": 6, # "suggested_level": 3, # "confidence": 1.0 # }, # { # "text": "Documentation", # "page": 6, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "F10", # "page": 8, # "suggested_level": 1, # "confidence": 1.0 # }, # { # "text": "Brick/ block walling", # "page": 8, # "suggested_level": 2, # "confidence": 1.0 # }, # { # "text": "Types of walling", # "page": 8, # "suggested_level": 3, # "confidence": 1.0 # }, # { # "text": "Clay facing brickwork", # "page": 8, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Clay bricks", # "page": 8, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Aggregate concrete blocks Below DPC", # "page": 9, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Testing", # "page": 10, # "suggested_level": 3, # "confidence": 1.0 # }, # { # "text": "Hard landscaping materials specification", # "page": 10, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Compressive strength of mortar for each walling type", # "page": 10, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Fresh mortar cement content", # "page": 11, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Workmanship generally", # "page": 11, # "suggested_level": 3, # "confidence": 1.0 # }, # { # "text": "Conditioning of clay bricks and blocks", # "page": 11, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Conditioning of concrete bricks/ blocks", # "page": 11, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Mortar designations", # "page": 11, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Laying generally", # "page": 12, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Accuracy", # "page": 12, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Height of lifts in walling using cement-gauged or hydraulic lime mortar", # "page": 12, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Height of lifts in walling using thin-layer mortar", # "page": 12, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Levelling of separate leaves", # "page": 13, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Coursing brickwork", # "page": 13, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Lintels", # "page": 13, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Jointing", # "page": 13, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Accessible joints not exposed to view", # "page": 13, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Pointing", # "page": 13, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Fire-stopping", # "page": 13, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Adverse weather", # "page": 13, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Additional requirements for facework", # "page": 14, # "suggested_level": 3, # "confidence": 1.0 # }, # { # "text": "The term facework", # "page": 14, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Brick/ Concrete block samples", # "page": 14, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Colour consistency of masonry units", # "page": 14, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Appearance", # "page": 14, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Ground level", # "page": 14, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Putlog scaffolding", # "page": 14, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Cleanliness", # "page": 14, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "F30", # "page": 15, # "suggested_level": 1, # "confidence": 1.0 # }, # { # "text": "Accessories/ sundry items for brick/ block/ stone walling", # "page": 15, # "suggested_level": 2, # "confidence": 1.0 # }, # { # "text": "Cavities", # "page": 15, # "suggested_level": 3, # "confidence": 1.0 # }, # { # "text": "Concrete fill to base of cavity", # "page": 15, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Cleanliness", # "page": 15, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Perpend joint plastics weep holes", # "page": 15, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Partial fill cavity insulation Below DPC", # "page": 15, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Air bricks in external walling", # "page": 16, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Ventilation ducts", # "page": 16, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Cavity closers", # "page": 16, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Reinforcing/ fixing accessories", # "page": 16, # "suggested_level": 3, # "confidence": 1.0 # }, # { # "text": "Cavity wall ties used with partial fill insulation", # "page": 16, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Cavity wall ties to in-situ structure", # "page": 17, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Steel Ties - Masonry to Steel Columns", # "page": 17, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Surface-mounted channels", # "page": 17, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Carbon steel fabric reinforcement", # "page": 18, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "General-purpose slip ties", # "page": 18, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Flexible damp-proof courses/ cavity trays", # "page": 19, # "suggested_level": 3, # "confidence": 1.0 # }, # { # "text": "Damp Proof Membrane-Masonry walls Separation layer for non-ventilated cavity fire barriers (NVFB). CORTEX 0100FR Class B, DPC/ DPM", # "page": 19, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Polypropylene (PP) damp-proof courses", # "page": 19, # "suggested_level": 4, # "confidence": 1.0 # }, # { # "text": "Polypropylene (PP) Site formed and cavity trays", # "page": 20, # "suggested_level": 4, # "confidence": 1.0 # } # ] # """ # identified_headers = ast.literal_eval(identified_headers_str) def get_toc_page_numbers(doc, max_pages_to_check=15): toc_pages = [] # 1. Existing Dot Pattern (looking for ".....") dot_pattern = re.compile(r"\.{2,}") # 2. NEW: Title Pattern (looking for specific headers) # ^ and $ ensure the line is JUST that word (ignoring "The contents of the bag...") # re.IGNORECASE makes it match "CONTENTS", "Contents", "Index", etc. title_pattern = re.compile(r"^\s*(table of contents|contents|index)\s*$", re.IGNORECASE) for page_num in range(min(len(doc), max_pages_to_check)): page = doc.load_page(page_num) blocks = page.get_text("dict")["blocks"] dot_line_count = 0 has_toc_title = False for block in blocks: for line in block.get("lines", []): # Extract text from spans (mimicking get_spaced_text_from_spans) line_text = " ".join([span["text"] for span in line["spans"]]).strip() # CHECK A: Does the line have dots? if dot_pattern.search(line_text): dot_line_count += 1 # CHECK B: Is this line a Title? # We check this early in the loop. If a page has a title "Contents", # we mark it immediately. if title_pattern.match(line_text): has_toc_title = True # CONDITION: # It is a TOC page if it has a Title OR if it has dot leaders. # We use 'dot_line_count >= 1' to be sensitive to single-item lists. if has_toc_title or dot_line_count >= 1: toc_pages.append(page_num) # RETURN: # If we found TOC pages (e.g., [2, 3]), we return [0, 1, 2, 3] # This covers the cover page, inside cover, and the TOC itself. if toc_pages: last_toc_page = toc_pages[0] return list(range(0, last_toc_page + 1)) return [] # Return empty list if nothing found def get_regular_font_size_and_color(doc): font_sizes = [] colors = [] fonts = [] # Loop through all pages for page_num in range(len(doc)): page = doc.load_page(page_num) for span in page.get_text("dict")["blocks"]: if "lines" in span: for line in span["lines"]: for span in line["spans"]: font_sizes.append(span['size']) colors.append(span['color']) fonts.append(span['font']) # Get the most common font size, color, and font most_common_font_size = Counter(font_sizes).most_common(1)[0][0] if font_sizes else None most_common_color = Counter(colors).most_common(1)[0][0] if colors else None most_common_font = Counter(fonts).most_common(1)[0][0] if fonts else None return most_common_font_size, most_common_color, most_common_font def normalize_text(text): if text is None: return "" return re.sub(r'\s+', ' ', text.strip().lower()) def get_spaced_text_from_spans(spans): return normalize_text(" ".join(span["text"].strip() for span in spans)) def is_header(span, most_common_font_size, most_common_color, most_common_font,allheadersLLM): fontname = span.get("font", "").lower() # is_italic = "italic" in fontname or "oblique" in fontname isheader=False is_bold = "bold" in fontname or span.get("bold", False) # print('TEXT to CHECK',span['text'] ) if span['text'] in allheadersLLM: # normalize text in both span[text] isheader=True return ( ( span["size"] > most_common_font_size or # span["font"].lower() != most_common_font.lower() or (isheader and span["size"] > most_common_font_size ) ) ) def add_span_to_nearest_group(span_y, grouped_dict, pageNum=None, threshold=0.5): for (p, y) in grouped_dict: if pageNum is not None and p != pageNum: continue if abs(y - span_y) <= threshold: return (p, y) return (pageNum, span_y) def extract_headers(doc, toc_pages, most_common_font_size, most_common_color, most_common_font, top_margin, bottom_margin): grouped_headers = defaultdict(list) spans = [] line_merge_threshold = 1.5 # Maximum vertical distance between lines to consider as part of same header for pageNum in range(len(doc)): if pageNum in toc_pages: continue page = doc.load_page(pageNum) page_height = page.rect.height text_instances = page.get_text("dict") # First pass: collect all potential header spans potential_header_spans = [] for block in text_instances['blocks']: if block['type'] != 0: continue for line in block['lines']: for span in line['spans']: span_y0 = span['bbox'][1] span_y1 = span['bbox'][3] if span_y0 < top_margin or span_y1 > (page_height - bottom_margin): continue span_text = normalize_text(span.get('text', '')) if not span_text: continue if span_text.startswith('http://www') or span_text.startswith('www'): continue if any(( 'page' in span_text, not re.search(r'[a-z0-9]', span_text), 'end of section' in span_text, re.search(r'page\s+\d+\s+of\s+\d+', span_text), re.search(r'\b(?:\d{1,2}[/-])?\d{1,2}[/-]\d{2,4}\b', span_text), # re.search(r'\b(?:jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)', span_text), 'specification:' in span_text )): continue cleaned_text = re.sub(r'[.\-]{4,}.*$', '', span_text).strip() cleaned_text = normalize_text(cleaned_text) if is_header(span, most_common_font_size, most_common_color, most_common_font): potential_header_spans.append({ 'text': cleaned_text, 'size': span['size'], 'pageNum': pageNum, 'y0': span_y0, 'y1': span_y1, 'x0': span['bbox'][0], 'x1': span['bbox'][2], 'span': span }) # Sort spans by vertical position (top to bottom) potential_header_spans.sort(key=lambda s: (s['pageNum'], s['y0'])) # Second pass: group spans that are vertically close and likely part of same header i = 0 while i < len(potential_header_spans): current = potential_header_spans[i] header_text = current['text'] header_size = current['size'] header_page = current['pageNum'] min_y = current['y0'] max_y = current['y1'] spans_group = [current['span']] # Look ahead to find adjacent lines that might be part of same header j = i + 1 while j < len(potential_header_spans): next_span = potential_header_spans[j] # Check if on same page and vertically close with similar styling if (next_span['pageNum'] == header_page and next_span['y0'] - max_y < line_merge_threshold and abs(next_span['size'] - header_size) < 0.5): header_text += " " + next_span['text'] max_y = next_span['y1'] spans_group.append(next_span['span']) j += 1 else: break # Add the merged header grouped_headers[(header_page, min_y)].append({ "text": header_text.strip(), "size": header_size, "pageNum": header_page, "spans": spans_group }) spans.extend(spans_group) i = j # Skip the spans we've already processed # Prepare final headers list headers = [] for (pageNum, y), header_groups in sorted(grouped_headers.items()): for group in header_groups: headers.append([ group['text'], group['size'], group['pageNum'], y ]) font_sizes = [size for _, size, _, _ in headers] font_size_counts = Counter(font_sizes) # Filter font sizes that appear at least 3 times valid_font_sizes = [size for size, count in font_size_counts.items() if count >= 1] # Sort in descending order valid_font_sizes_sorted = sorted(valid_font_sizes, reverse=True) # If only 2 sizes, repeat the second one if len(valid_font_sizes_sorted) == 2: top_3_font_sizes = [valid_font_sizes_sorted[0], valid_font_sizes_sorted[1], valid_font_sizes_sorted[1]] else: top_3_font_sizes = valid_font_sizes_sorted[:3] # Get the smallest font size among valid ones smallest_font_size = min(valid_font_sizes) if valid_font_sizes else None return headers, top_3_font_sizes, smallest_font_size, spans # def identify_headers_with_openrouter(doc, api_key=None, model="google/gemini-2.5-flash", pages_to_check=None, top_margin=70, bottom_margin=85, timeout=30): # """Ask an LLM (OpenRouter) to identify headers in the document. # Returns a list of dicts: {text, page, suggested_level, confidence}. # The function sends plain page-line strings to the LLM (including page numbers) # and asks for a JSON array containing only header lines with suggested levels. # """ # if api_key is None: # api_key = os.getenv("OPENROUTER_API_KEY") or None # toc_pages = get_toc_page_numbers(doc) # lines_for_prompt = [] # # Collect text lines from pages (skip TOC pages) # for pno in range(len(doc)): # if pages_to_check and pno not in pages_to_check: # continue # if pno in toc_pages: # continue # page = doc.load_page(pno) # page_height = page.rect.height # for block in page.get_text("dict").get('blocks', []): # if block.get('type') != 0: # continue # for line in block.get('lines', []): # spans = line.get('spans', []) # if not spans: # continue # y0 = spans[0]['bbox'][1] # y1 = spans[0]['bbox'][3] # if y0 < top_margin or y1 > (page_height - bottom_margin): # continue # for s in spans: # # text,font,size,flags,color # ArrayofTextWithFormat={'Font':s.get('font')},{'Size':s.get('size')},{'Flags':s.get('flags')},{'Color':s.get('color')},{'Text':s.get('text')} # # prefix with page for easier mapping back # lines_for_prompt.append(f"PAGE {pno+1}: {ArrayofTextWithFormat}") # if not lines_for_prompt: # return [] # prompt = ( # "You are a document parser specialized in Uniclass and NBS (National Building Specification) specification documents. " # "Your task is to extract ONLY headers and their hierarchical levels, NOT the body content of clauses.\n\n" # "UNDERSTANDING THE DOCUMENT STRUCTURE:\n" # "Uniclass/NBS documents follow a hierarchical structure. You could encounter some client requirement document that don't have section codes or numbers but you would find the product catgeory header.\n" # "The hierarchy can extend to whatever depth the document requires. Your job is to identify the RELATIVE hierarchy " # "based on the document's structure, not fit it into a predetermined number of levels.\n\n" # "CRITICAL RULE: Product codes starting with \"Pr_\" (Uniclass product codes) represent specific products and are " # "typically at deeper levels of the hierarchy compared to their parent sections.\n\n" # "IDENTIFYING HEADERS (NOT BODY TEXT):\n" # "Headers typically have these characteristics:\n" # "1. Document titles and main section headers (often in larger font, ALL CAPS, or bold)\n" # "2. Uniclass section codes (like F10, H11, L20, M10, etc.) followed by descriptions should all be in the same level.\n" # "3. Section headers like \"Types of...\", \"Products\", \"Workmanship\", \"Testing\", \"Execution\", \"Completion\"\n" # "4. Product category headers (e.g., \"110 Clay facing brickwork\" or \"Pr_\" followed by product names)\n" # "EXCLUDING NBS CLAUSE BODIES:\n" # "DO NOT extract these as headers:\n" # "- Detailed specification text that describes requirements, standards, or instructions\n" # "- Multi-sentence explanatory text under headers\n" # "- Reference to standards (e.g., \"To BS EN 771-1\", \"Comply with BS 8000-3\")\n" # "- Performance requirements and technical specifications\n" # "- Installation instructions and procedural text\n" # "- Material descriptions and property specifications\n" # "- Bullet points or numbered lists that are clearly body content, not headers\n" # "- Text that reads like a complete sentence or instruction rather than a title\n\n" # "HIERARCHY ASSIGNMENT:\n" # "Assign hierarchy levels based on the RELATIVE position in the document structure:\n" # "- Lower numbers (1, 2, 3) for higher-level sections (document titles, major sections)\n" # "- Higher numbers for deeper nested sections and subsections\n" # "- The actual level numbers should reflect the nesting depth you observe in the document\n" # "WHAT TO EXTRACT:\n" # "Extract ONLY lines that serve as:\n" # "- Document titles and section titles\n" # "- Section codes (F10, H11, L20, M10, P10, R10, etc.)\n" # "- Major section headers (\"GENERAL\", \"PRODUCTS\", \"EXECUTION\", \"Workmanship\", \"Testing\", \"Completion\")\n" # "- Product category headers\n" # "- Numbered sections and subsections that act as titles (not body text)\n" # "- Any line that clearly serves as a heading, title, or section marker (typically short, title-case or capitalized)\n\n" # "EXCLUDE:\n" # "- All descriptive paragraphs and body text of NBS clauses\n" # "- Specification details and requirements\n" # "- Instructions and procedural text\n" # "- Standards references embedded in body text\n" # "- Any multi-sentence explanatory content\n\n" # "Return a JSON array of objects with keys:\n" # " text: the exact line text (without the 'PAGE N:' prefix),\n" # " page: the page number (integer),\n" # " suggested_level: integer representing the hierarchical depth (1 for top-level, increasing numbers for deeper nesting),\n" # " confidence: 0-1 (based on how certain you are this is a header, not body text).\n\n" # "Be selective - only extract actual headers/titles. This is a one-time run. It must be right." # "Determine the appropriate hierarchy levels based on the structure you observe. Use as many levels as necessary " # "to accurately represent the document's structure.\n\n" # "Return only a single JSON array (no commentary). Include only headers.\n\n" # "Lines:\n" + "\n".join(lines_for_prompt) # ) # if not api_key: # # No API key: return empty so caller can fallback to heuristics # return [] # url = "https://openrouter.ai/api/v1/chat/completions" # # Build headers following the OpenRouter example # headers = { # "Authorization": f"Bearer {api_key}", # "Content-Type": "application/json", # "HTTP-Referer": os.getenv("OPENROUTER_REFERER", ""), # "X-Title": os.getenv("OPENROUTER_X_TITLE", "") # } # # Wrap the prompt as the example 'content' array expected by OpenRouter # body = { # "model": model, # "messages": [ # { # "role": "user", # "content": [ # {"type": "text", "text": prompt} # ] # } # ] # } # # Debug: log request body (truncated) and write raw response for inspection # try: # print("LLM request (truncated):", prompt[:1000]) # resp = requests.post( # url=url, # headers=headers, # data=json.dumps(body), # timeout=timeout # ) # resp.raise_for_status() # resp_text = resp.text # print("LLM raw response length:", len(resp_text)) # # Save raw response for offline inspection # try: # with open("llm_debug.json", "w", encoding="utf-8") as fh: # fh.write(resp_text) # except Exception as e: # print("Warning: could not write llm_debug.json:", e) # rj = resp.json() # print("LLM parsed response keys:", list(rj.keys()) if isinstance(rj, dict) else type(rj)) # except Exception as e: # print("LLM call failed:", repr(e)) # return [] # # Extract textual reply robustly # text_reply = None # if isinstance(rj, dict): # choices = rj.get('choices') or [] # if choices: # c0 = choices[0] # msg = c0.get('message') or c0.get('delta') or {} # content = msg.get('content') # if isinstance(content, list): # for c in content: # if c.get('type') == 'text' and c.get('text'): # text_reply = c.get('text') # break # elif isinstance(content, str): # text_reply = content # elif isinstance(msg, dict) and msg.get('content') and isinstance(msg.get('content'), dict): # text_reply = msg.get('content').get('text') # if not text_reply: # for c in rj.get('choices', []): # if isinstance(c.get('text'), str): # text_reply = c.get('text') # break # if not text_reply: # return [] # s = text_reply.strip() # start = s.find('[') # end = s.rfind(']') # js = s[start:end+1] if start != -1 and end != -1 else s # try: # parsed = json.loads(js) # except Exception: # return [] # # Normalize parsed entries and return # out = [] # for obj in parsed: # t = obj.get('text') # page = int(obj.get('page')) if obj.get('page') else None # level = obj.get('suggested_level') # conf = float(obj.get('confidence') or 0) # if t and page is not None: # out.append({'text': t, 'page': page-1, 'suggested_level': level, 'confidence': conf}) # return out def identify_headers_with_openrouter(doc,api_key, model="google/gemini-2.5-flash", pages_to_check=None, top_margin=0, bottom_margin=0): """Ask an LLM (OpenRouter) to identify headers in the document. Returns a list of dicts: {text, page, suggested_level, confidence}. The function sends plain page-line strings to the LLM (including page numbers) and asks for a JSON array containing only header lines with suggested levels. """ logger.info("=" * 80) logger.info("STARTING IDENTIFY_HEADERS_WITH_OPENROUTER") # logger.info(f"PDF Path: {pdf_path}") logger.info(f"Model: {model}") # logger.info(f"LLM Prompt: {LLM_prompt[:200]}..." if len(LLM_prompt) > 200 else f"LLM Prompt: {LLM_prompt}") # doc = openPDF(pdf_path) api_key = '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' if api_key is None: api_key = os.getenv("OPENROUTER_API_KEY") or None model = str(model) toc_pages = get_toc_page_numbers(doc) lines_for_prompt = [] # pgestoRun=20 logger.info(f"TOC pages to skip: {toc_pages}") logger.info(f"Total pages in document: {len(doc)}") # Collect text lines from pages (skip TOC pages) total_lines = 0 for pno in range(len(doc)): if pages_to_check and pno not in pages_to_check: continue if pno in toc_pages: logger.debug(f"Skipping TOC page {pno}") continue page = doc.load_page(pno) page_height = page.rect.height lines_on_page = 0 text_dict = page.get_text("dict") lines = [] y_tolerance = 0.2 # tweak if needed (1–3 usually works) for block in text_dict["blocks"]: if block["type"] != 0: continue for line in block["lines"]: for span in line["spans"]: text = span["text"].strip() if not text: continue x0, y0, x1, y1 = span["bbox"] matched = False for l in lines: if abs(l["y"] - y0) <= y_tolerance: l["spans"].append((x0, text)) matched = True break if not matched: lines.append({ "y": y0, "spans": [(x0, text)] }) lines.sort(key=lambda l: l["y"]) # Join text inside each line final_lines = [] for l in lines: l["spans"].sort(key=lambda s: s[0]) # left → right line_text = " ".join(text for _, text in l["spans"]) final_lines.append(line_text) # Result for line in final_lines: if text: # prefix with page for easier mapping back lines_for_prompt.append(f"PAGE {pno+1}: {line}") lines_on_page += 1 if lines_on_page > 0: logger.debug(f"Page {pno}: collected {lines_on_page} lines") total_lines += lines_on_page logger.info(f"Total lines collected for LLM: {total_lines}") if not lines_for_prompt: logger.warning("No lines collected for prompt") return [] # Log sample of lines logger.info("Sample lines (first 10):") for i, line in enumerate(lines_for_prompt[:10]): logger.info(f" {i}: {line}") prompt =( "You are a document parser specialized in Uniclass and NBS (National Building Specification) specification documents. " "Your task is to extract ONLY headers and their hierarchical levels, NOT the body content of clauses.\n\n" "UNDERSTANDING THE DOCUMENT STRUCTURE:\n" "Uniclass/NBS documents follow a hierarchical structure. You could encounter some client requirement document that don't have section codes or numbers but you would find the product category header.\n" "The hierarchy can extend to whatever depth the document requires. Your job is to identify the RELATIVE hierarchy " "based on the document's structure, not fit it into a predetermined number of levels.\n\n" "IDENTIFY DOCUMENT FORMAT\n\n:" "Identify the document format from the EXPECTED FORMATS in order to use the relevant example when IDENTIFYING HEADERS (NOT BODY TEXT)\n\n" "EXPECTED FORMATS\n" "There are more than one type for a specification document format, which are NBS Specification, Uni class, Outline Specification, Scope\n\n" "CRITICAL RULE: Product codes starting with \"Pr_\" (Uniclass product codes) represent specific products and are " "typically at deeper levels of the hierarchy compared to their parent sections.\n\n" "IDENTIFYING HEADERS (NOT BODY TEXT):\n" "Headers typically have these characteristics:\n" "1. Document titles and main section headers (often in larger font, ALL CAPS, or bold, or different color)\n" "2. NBS section codes (like F10, H11, L20, M10, etc.) often are level 1 headers.\n" "3. Section headers like \"Types of...\", \"Products\", \"Workmanship\", \"Testing\", \"Execution\", \"Completion\"\n" "4. Product category headers (e.g., \"105A Proprietary small buildings/structures\" or \"110 Clay facing brickwork\" or \"Pr_\" followed by product names or \"Ss_\" followed by system names)\n" "EXCLUDING NBS CLAUSE BODIES:\n" "DO NOT extract these as headers:\n" "- Detailed specification text that describes requirements, standards, or instructions\n" "- Multi-sentence explanatory text under headers\n" "- Reference to standards (e.g., \"To BS EN 771-1\", \"Comply with BS 8000-3\")\n" "- Performance requirements and technical specifications\n" "- Installation instructions and procedural text\n" "- Material descriptions and property specifications\n" "- Bullet points or numbered lists that are clearly body content, not headers\n" "- Text that reads like a complete sentence or instruction rather than a title\n\n" "HIERARCHY ASSIGNMENT:\n" "Assign hierarchy levels based on the RELATIVE position in the document structure:\n" "- Lower numbers (1, 2, 3) for higher-level sections (document titles, major sections)\n" "- Higher numbers for deeper nested sections and subsections\n" "- The actual level numbers should reflect the nesting depth you observe in the document\n" "WHAT TO EXTRACT:\n" "Extract ONLY lines that serve as:\n" "- Document titles and section titles\n" "- Section codes (F10, H11, L20, M10, P10, R10, etc.)\n" "- Major section headers (\"GENERAL\", \"PRODUCTS\", \"EXECUTION\", \"Workmanship\", \"Testing\", \"Completion\")\n" "- Product category headers\n" "- Numbered sections and subsections that act as titles (not body text)\n" "- Any line that clearly serves as a heading, title, or section marker (typically short, title-case or capitalized)\n\n" "EXCLUDE:\n" "- All descriptive paragraphs and body text of NBS clauses\n" "- Specification details and requirements\n" "- Instructions and procedural text\n" "- Standards references embedded in body text\n" "- Any multi-sentence explanatory content\n\n" "Return a JSON array of objects with keys:\n" " text: the exact line text (without the 'PAGE N:' prefix),\n" " page: the page number (integer),\n" " suggested_level: integer representing the hierarchical depth (1 for top-level, increasing numbers for deeper nesting),\n" " confidence: 0-1 (based on how certain you are this is a header, not body text).\n\n" "Be selective - only extract actual headers/titles. This is a one-time run. It must be right." "Determine the appropriate hierarchy levels based on the structure you observe. Use as many levels as necessary " "to accurately represent the document's structure.\n\n" "THESE ARE THE EXAMPLES OF EACH DOCUMENT HEADERS:" "Example 1 of an NBS Specification document headers:" "

B91

" "

Buildings in the landscape

" "

System outline

" "

105A Proprietary small buildings/ structures

" "Example 2 of an NBS Specification document headers:" "

B91

" "

Buildings in the landscape

" "

Products

" "

330 Door Canopy Type A

" "Example 3 of an NBS Specification document headers:" "

P30

" "

Trenches, pipeways and pits for buried engineering services

" "

Products

" "

301 Access covers, manhole tops and frames

" "Example 1 of an Uni class document headers:" "

Ss_25_10_20_85

" "

Stick curtain walling systems Wall Type WT2

" "

Systems

" "

Ss_25_10_20_85
Stick curtain walling systems Wall Type WT2

" "Example 2 of an Uni class document headers:" "

Pr_20_85_06_85

" "

Stainless steel hoops Trolley Parks

" "

Products

" "

Pr_20_85_06_85 Stainless steel hoops Trolley Parks

" "Example 1 of an Outline Specification document headers:" "

External Walls

" "

Brickwork External Walls

" "Example 2 of an Outline Specification document headers:" "

INTERNAL WALL SURFACE FINISHES

" "

PAINT FINISHES/ DURABLE WALL FINISHES

" "Return only a single JSON array (no commentary). Include only headers.\n\n" + "\n\nLines:\n" + "\n".join(lines_for_prompt) ) logger.debug(f"Full prompt length: {len(prompt)} characters") # Changed: Print entire prompt, not truncated print("=" * 80) print("FULL LLM PROMPT:") print(prompt) print("=" * 80) # Also log to file try: with open("full_prompt.txt", "w", encoding="utf-8") as f: f.write(prompt) logger.info("Full prompt saved to full_prompt.txt") except Exception as e: logger.error(f"Could not save prompt to file: {e}") if not api_key: # No API key: return empty so caller can fallback to heuristics logger.error("No API key provided") return [] url = "https://openrouter.ai/api/v1/chat/completions" # Unix timestamp (seconds since epoch) unix_timestamp = int(time.time()) # Current datetime in ISO format (UTC) current_time = datetime.now(timezone.utc).isoformat() # Build headers following the OpenRouter example headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", "HTTP-Referer": os.getenv("OPENROUTER_REFERER", ""), "X-Title": os.getenv("OPENROUTER_X_TITLE", ""), "X-Request-Timestamp": str(unix_timestamp), "X-Request-Datetime": current_time, } # Log request details (without exposing full API key) logger.info(f"Making request to OpenRouter with model: {model}") logger.debug(f"Headers (API key masked): { {k: '***' if k == 'Authorization' else v for k, v in headers.items()} }") # Wrap the prompt as the example 'content' array expected by OpenRouter body = { "model": model, "messages": [ { "role": "user", "content": [ {"type": "text", "text": prompt} ] } ] } print(f"Request sent at: {current_time}") print(f"Unix timestamp: {unix_timestamp}") # Debug: log request body (truncated) and write raw response for inspection try: # Changed: Log full body (excluding prompt text which is already logged) logger.debug(f"Request body (without prompt text): { {k: v if k != 'messages' else '[...prompt...]' for k, v in body.items()} }") # Removed timeout parameter resp = requests.post( url=url, headers=headers, data=json.dumps(body) ) logger.info(f"HTTP Response Status: {resp.status_code}") resp.raise_for_status() resp_text = resp.text # Changed: Print entire response print("=" * 80) print("FULL LLM RESPONSE:") print(resp_text) print("=" * 80) logger.info(f"LLM raw response length: {len(resp_text)}") # Save raw response for offline inspection try: with open("llm_debug.json", "w", encoding="utf-8") as fh: fh.write(resp_text) logger.info("Raw response saved to llm_debug.json") except Exception as e: logger.error(f"Warning: could not write llm_debug.json: {e}") rj = resp.json() logger.info(f"LLM parsed response type: {type(rj)}") if isinstance(rj, dict): logger.debug(f"Response keys: {list(rj.keys())}") except requests.exceptions.RequestException as e: logger.error(f"HTTP request failed: {repr(e)}") return [] except Exception as e: logger.error(f"LLM call failed: {repr(e)}") return [] # Extract textual reply robustly text_reply = None if isinstance(rj, dict): choices = rj.get('choices') or [] logger.debug(f"Number of choices in response: {len(choices)}") if choices: for i, c in enumerate(choices): logger.debug(f"Choice {i}: {c}") c0 = choices[0] msg = c0.get('message') or c0.get('delta') or {} content = msg.get('content') if isinstance(content, list): logger.debug(f"Content is a list with {len(content)} items") for idx, c in enumerate(content): if c.get('type') == 'text' and c.get('text'): text_reply = c.get('text') logger.debug(f"Found text reply in content[{idx}], length: {len(text_reply)}") break elif isinstance(content, str): text_reply = content logger.debug(f"Content is string, length: {len(text_reply)}") elif isinstance(msg, dict) and msg.get('content') and isinstance(msg.get('content'), dict): text_reply = msg.get('content').get('text') logger.debug(f"Found text in nested content dict") # Fallback extraction if not text_reply: logger.debug("Trying fallback extraction from choices") for c in rj.get('choices', []): if isinstance(c.get('text'), str): text_reply = c.get('text') logger.debug(f"Found text reply in choice.text, length: {len(text_reply)}") break if not text_reply: logger.error("Could not extract text reply from response") # Changed: Print the entire response structure for debugging print("=" * 80) print("FAILED TO EXTRACT TEXT REPLY. FULL RESPONSE STRUCTURE:") print(json.dumps(rj, indent=2)) print("=" * 80) return [] # Changed: Print the extracted text reply print("=" * 80) print("EXTRACTED TEXT REPLY:") print(text_reply) print("=" * 80) logger.info(f"Extracted text reply length: {len(text_reply)}") logger.debug(f"First 500 chars of reply: {text_reply[:500]}...") s = text_reply.strip() start = s.find('[') end = s.rfind(']') js = s[start:end+1] if start != -1 and end != -1 else s logger.debug(f"Looking for JSON array: start={start}, end={end}") logger.debug(f"Extracted JSON string (first 500 chars): {js[:500]}...") try: parsed = json.loads(js) logger.info(f"Successfully parsed JSON, got {len(parsed)} items") except json.JSONDecodeError as e: logger.error(f"Failed to parse JSON: {e}") logger.error(f"JSON string that failed to parse: {js[:1000]}") # Try to find any JSON-like structure try: # Try to extract any JSON array import re json_pattern = r'\[\s*\{.*?\}\s*\]' matches = re.findall(json_pattern, text_reply, re.DOTALL) if matches: logger.info(f"Found {len(matches)} potential JSON arrays via regex") for i, match in enumerate(matches): try: parsed = json.loads(match) logger.info(f"Successfully parsed regex match {i} with {len(parsed)} items") break except json.JSONDecodeError as e2: logger.debug(f"Regex match {i} also failed: {e2}") continue else: logger.error("All regex matches failed to parse") return [] else: logger.error("No JSON-like pattern found via regex") return [] except Exception as e2: logger.error(f"Regex extraction also failed: {e2}") return [] # Log parsed results logger.info(f"Parsed {len(parsed)} header items:") for i, obj in enumerate(parsed[:10]): # Log first 10 items logger.info(f" Item {i}: {obj}") # Normalize parsed entries and return out = [] for obj in parsed: t = obj.get('text') page = int(obj.get('page')) if obj.get('page') else None level = obj.get('suggested_level') conf = float(obj.get('confidence') or 0) if t and page is not None: out.append({'text': t, 'page': page-1, 'suggested_level': level, 'confidence': conf}) logger.info(f"Returning {len(out)} valid header entries") return out import pandas as pd import os def flatten_node(node, parent_path=None): """ Recursively flatten JSON nodes for Excel. Args: node (dict): The current JSON node. parent_path (list): List of parent texts for hierarchy. Returns: list[dict]: Flattened list of nodes with hierarchy info. """ if parent_path is None: parent_path = [] # Current node info current = { "text": node.get("text"), "page": node.get("page"), "y": node.get("y"), "size": node.get("size"), "bold": node.get("bold"), "font": node.get("font"), "level": node.get("level"), "is_numbered": node.get("is_numbered"), "norm_text": node.get("norm_text"), "path": " > ".join(node.get("path", [])), "parent_path": " > ".join(parent_path) } flat_list = [current] # Recurse into children for child in node.get("children", []): flat_list.extend(flatten_node(child, parent_path + [node.get("text", "")])) return flat_list def identify_headers_and_save_excel(result): """ Converts nested JSON to flattened Excel. """ try: all_flat = [] # Flatten all top-level nodes for item in result: all_flat.extend(flatten_node(item)) # Convert to DataFrame df = pd.DataFrame(all_flat) # Save to Excel output_path = os.path.abspath("nested_header_analysis.xlsx") df.to_excel(output_path, index=False, engine="openpyxl") print(f"Excel saved at: {output_path}") print(df.head()) return output_path except Exception as e: print(f"Critical error: {str(e)}") return None def process_document_in_chunks( doc, api_key, chunk_size=13, model="google/gemini-2.5-flash" ): total_pages = len(doc) all_results = [] for start in range(0, total_pages, chunk_size): end = start + chunk_size logger.info(f"Processing pages {start + 1} → {min(end, total_pages)}") result = identify_headers_with_openrouterNEWW( doc=doc, api_key=api_key, model=model, pages_to_check=(start, end) ) # page 1 -> 15 1,2,3 # page 16 : header 1 # if result: all_results.extend(result) return all_results def identify_hierarchy_levels_openrouter(allheadersLLM,api_key, model="google/gemini-2.5-flash", top_margin=0, bottom_margin=0): """Ask an LLM (OpenRouter) to identify headers in the document. Returns a list of dicts: {text, page, suggested_level, confidence}. The function sends plain page-line strings to the LLM (including page numbers) and asks for a JSON array containing only header lines with suggested levels. """ logger.info("=" * 80) logger.info("STARTING IDENTIFY_HEADERS_WITH_OPENROUTER") # logger.info(f"PDF Path: {pdf_path}") logger.info(f"Model: {model}") # logger.info(f"LLM Prompt: {LLM_prompt[:200]}..." if len(LLM_prompt) > 200 else f"LLM Prompt: {LLM_prompt}") # doc = openPDF(pdf_path) api_key = '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' if api_key is None: api_key = os.getenv("OPENROUTER_API_KEY") or None model = str(model) lines_for_prompt = [] total_lines = len(allheadersLLM) # lines_for_prompt=allheadersLLM # text_to_page = {} # for i, item in enumerate(allheaders_LLM): # if isinstance(item, list): # t = item[0] # page = item[1] # if t is not None: # lines_for_prompt.append(t) # text_to_page[t] = page lines_for_prompt = [] # keep a list of pages in same order as lines_for_prompt pages_for_prompt = [] for item in allheadersLLM: # if isinstance(item, dict): t = item[0] page = item[1] if t is not None: lines_for_prompt.append(t) pages_for_prompt.append(page) # elif isinstance(item, str): # lines_for_prompt.append(item) # pages_for_prompt.append(None) logger.info(f"Total lines collected for LLM: {total_lines}") if not lines_for_prompt: logger.warning("No lines collected for prompt") return [] # Log sample of lines logger.info("Sample lines (first 10):") for i, line in enumerate(lines_for_prompt[:10]): logger.info(f" {i}: {line}") prompt =( "You are a header hierarchy specialized in Uniclass and NBS (National Building Specification) specification headers." "Your task is to suggest ONLY headers and their hierarchical levels.\n\n" "UNDERSTANDING THE HEADER STRUCTURE:\n" "Uniclass/NBS headers follow a hierarchical structure. You could encounter some client requirement headers that don't have section codes or numbers but you would find the product category header.\n" "The hierarchy can extend to whatever depth the headers require. Your job is to identify the RELATIVE hierarchy." "based on the headers' structure, not fit it into a predetermined number of levels.\n\n" "Determine the appropriate hierarchy levels based on the structure you observe. Use as many levels as necessary " "to accurately represent the document's structure.\n\n" "THESE ARE THE EXAMPLES OF EACH DOCUMENT HEADERS:" "Example 1 of an NBS Specification document headers:" "

B91

" "

Buildings in the landscape

" "

System outline

" "

105A Proprietary small buildings/ structures

" "Example 2 of an NBS Specification document headers:" "

B91

" "

Buildings in the landscape

" "

Products

" "

330 Door Canopy Type A

" "Example 3 of an NBS Specification document headers:" "

P30

" "

Trenches, pipeways and pits for buried engineering services

" "

Products

" "

301 Access covers, manhole tops and frames

" "Example 1 of an Uni class document headers:" "

Ss_25_10_20_85

" "

Stick curtain walling systems Wall Type WT2

" "

Systems

" "

Ss_25_10_20_85
Stick curtain walling systems Wall Type WT2

" "Example 2 of an Uni class document headers:" "

Pr_20_85_06_85

" "

Stainless steel hoops Trolley Parks

" "

Products

" "

Pr_20_85_06_85 Stainless steel hoops Trolley Parks

" "Example 1 of an Outline Specification document headers:" "

External Walls

" "

Brickwork External Walls

" "Example 2 of an Outline Specification document headers:" "

INTERNAL WALL SURFACE FINISHES

" "

PAINT FINISHES/ DURABLE WALL FINISHES

" " RETURN FORMAT (STRICT JSON ONLY — NO MARKDOWN, NO TEXT):" " Return a JSON array where each item is:" " {text: header text exactly as given," "suggested_level: integer (1 for top level, 2 for sub level, etc)," "confidence: number between 0 and 1}" " Example:" " [ {" "text: G20 Carpentry/ timber-framing/ first fixing," "suggested_level: 1," "confidence: 0.95},{ text: General," "suggested_level: 2," "confidence: 0.92}]" " IMPORTANT RULES:" " - Output ONLY valid JSON" " - No explanations" "- No markdown" "- No headings" "- No comments" + "\n\nLines:\n" + "\n".join(lines_for_prompt) ) logger.debug(f"Full prompt length: {len(prompt)} characters") # Changed: Print entire prompt, not truncated print("=" * 80) print("FULL LLM PROMPT:") print(prompt) print("=" * 80) # Also log to file try: with open("full_prompt.txt", "w", encoding="utf-8") as f: f.write(prompt) logger.info("Full prompt saved to full_prompt.txt") except Exception as e: logger.error(f"Could not save prompt to file: {e}") if not api_key: # No API key: return empty so caller can fallback to heuristics logger.error("No API key provided") return [] url = "https://openrouter.ai/api/v1/chat/completions" # Unix timestamp (seconds since epoch) unix_timestamp = int(time.time()) # Current datetime in ISO format (UTC) current_time = datetime.now(timezone.utc).isoformat() # Build headers following the OpenRouter example headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", "HTTP-Referer": os.getenv("OPENROUTER_REFERER", ""), "X-Title": os.getenv("OPENROUTER_X_TITLE", ""), # "X-Request-Timestamp": str(unix_timestamp), # "X-Request-Datetime": current_time, } # Log request details (without exposing full API key) logger.info(f"Making request to OpenRouter with model: {model}") logger.debug(f"Headers (API key masked): { {k: '***' if k == 'Authorization' else v for k, v in headers.items()} }") # Wrap the prompt as the example 'content' array expected by OpenRouter body = { "model": model, "messages": [ { "role": "user", "content": [ {"type": "text", "text": prompt} ] } ] } # print(f"Request sent at: {current_time}") # print(f"Unix timestamp: {unix_timestamp}") # Debug: log request body (truncated) and write raw response for inspection try: # Changed: Log full body (excluding prompt text which is already logged) logger.debug(f"Request body (without prompt text): { {k: v if k != 'messages' else '[...prompt...]' for k, v in body.items()} }") # Removed timeout parameter resp = requests.post( url=url, headers=headers, data=json.dumps(body) ) logger.info(f"HTTP Response Status: {resp.status_code}") resp.raise_for_status() resp_text = resp.text # Changed: Print entire response print("=" * 80) print("FULL LLM RESPONSE:") print(resp_text) print("=" * 80) logger.info(f"LLM raw response length: {len(resp_text)}") # Save raw response for offline inspection try: with open("llm_debug.json", "w", encoding="utf-8") as fh: fh.write(resp_text) logger.info("Raw response saved to llm_debug.json") except Exception as e: logger.error(f"Warning: could not write llm_debug.json: {e}") rj = resp.json() logger.info(f"LLM parsed response type: {type(rj)}") if isinstance(rj, dict): logger.debug(f"Response keys: {list(rj.keys())}") except requests.exceptions.RequestException as e: logger.error(f"HTTP request failed: {repr(e)}") return [] except Exception as e: logger.error(f"LLM call failed: {repr(e)}") return [] # Extract textual reply robustly text_reply = None if isinstance(rj, dict): choices = rj.get('choices') or [] logger.debug(f"Number of choices in response: {len(choices)}") if choices: for i, c in enumerate(choices): logger.debug(f"Choice {i}: {c}") c0 = choices[0] msg = c0.get('message') or c0.get('delta') or {} content = msg.get('content') if isinstance(content, list): logger.debug(f"Content is a list with {len(content)} items") for idx, c in enumerate(content): if c.get('type') == 'text' and c.get('text'): text_reply = c.get('text') logger.debug(f"Found text reply in content[{idx}], length: {len(text_reply)}") break elif isinstance(content, str): text_reply = content logger.debug(f"Content is string, length: {len(text_reply)}") elif isinstance(msg, dict) and msg.get('content') and isinstance(msg.get('content'), dict): text_reply = msg.get('content').get('text') logger.debug(f"Found text in nested content dict") # Fallback extraction if not text_reply: logger.debug("Trying fallback extraction from choices") for c in rj.get('choices', []): if isinstance(c.get('text'), str): text_reply = c.get('text') logger.debug(f"Found text reply in choice.text, length: {len(text_reply)}") break if not text_reply: logger.error("Could not extract text reply from response") # Changed: Print the entire response structure for debugging print("=" * 80) print("FAILED TO EXTRACT TEXT REPLY. FULL RESPONSE STRUCTURE:") print(json.dumps(rj, indent=2)) print("=" * 80) return [] # Changed: Print the extracted text reply print("=" * 80) print("EXTRACTED TEXT REPLY:") print(text_reply) print("=" * 80) logger.info(f"Extracted text reply length: {len(text_reply)}") logger.debug(f"First 500 chars of reply: {text_reply[:500]}...") s = text_reply.strip() start = s.find('[') end = s.rfind(']') js = s[start:end+1] if start != -1 and end != -1 else s logger.debug(f"Looking for JSON array: start={start}, end={end}") logger.debug(f"Extracted JSON string (first 500 chars): {js[:500]}...") try: parsed = json.loads(js) logger.info(f"Successfully parsed JSON, got {len(parsed)} items") except json.JSONDecodeError as e: logger.error(f"Failed to parse JSON: {e}") logger.error(f"JSON string that failed to parse: {js[:1000]}") # Try to find any JSON-like structure print('text reply:',text_reply) try: # Try to extract any JSON array import re json_pattern = r'\[\s*\{.*?\}\s*\]' matches = re.findall(json_pattern, text_reply, re.DOTALL) if matches: logger.info(f"Found {len(matches)} potential JSON arrays via regex") for i, match in enumerate(matches): try: parsed = json.loads(match) logger.info(f"Successfully parsed regex match {i} with {len(parsed)} items") break except json.JSONDecodeError as e2: logger.debug(f"Regex match {i} also failed: {e2}") continue else: logger.error("All regex matches failed to parse") return [] else: logger.error("No JSON-like pattern found via regex") return [] except Exception as e2: logger.error(f"Regex extraction also failed: {e2}") return [] # Log parsed results logger.info(f"Parsed {len(parsed)} header items:") for i, obj in enumerate(parsed[:10]): # Log first 10 items logger.info(f" Item {i}: {obj}") out = [] for i, obj in enumerate(parsed): t = obj.get('text') level = obj.get('suggested_level') conf = float(obj.get('confidence') or 0) # Assign page number directly from the same index page = pages_for_prompt[i] if i < len(pages_for_prompt) else None out.append({ 'text': t, 'page': page, 'suggested_level': int(level), 'confidence': conf }) logger.info(f"Returning {len(out)} valid header entries") return out def identify_headers_with_openrouterNEWW(doc,api_key, pages_to_check, model="google/gemini-2.5-flash", top_margin=0, bottom_margin=0): """Ask an LLM (OpenRouter) to identify headers in the document. Returns a list of dicts: {text, page, suggested_level, confidence}. The function sends plain page-line strings to the LLM (including page numbers) and asks for a JSON array containing only header lines with suggested levels. """ logger.info("=" * 80) logger.info("STARTING IDENTIFY_HEADERS_WITH_OPENROUTER") # logger.info(f"PDF Path: {pdf_path}") logger.info(f"Model: {model}") # logger.info(f"LLM Prompt: {LLM_prompt[:200]}..." if len(LLM_prompt) > 200 else f"LLM Prompt: {LLM_prompt}") # doc = openPDF(pdf_path) api_key = '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' if api_key is None: api_key = os.getenv("OPENROUTER_API_KEY") or None model = str(model) # toc_pages = get_toc_page_numbers(doc) lines_for_prompt = [] # pgestoRun=20 # logger.info(f"TOC pages to skip: {toc_pages}") logger.info(f"Total pages in document: {len(doc)}") # Collect text lines from pages (skip TOC pages) total_lines = 0 ArrayofTextWithFormat = [] total_pages = len(doc) if pages_to_check is None: start_page = 0 end_page = min(15, total_pages) else: start_page, end_page = pages_to_check end_page = min(end_page, total_pages) # 🔑 CRITICAL LINE for pno in range(start_page, end_page): page = doc.load_page(pno) # for pno in range(start,end): # page = doc.load_page(pno) page_height = page.rect.height lines_on_page = 0 text_dict = page.get_text("dict") lines = [] y_tolerance = 0.5 # tweak if needed (1–3 usually works) for block in text_dict["blocks"]: if block["type"] != 0: continue for line in block["lines"]: for span in line["spans"]: text = span["text"].strip() if not text: # Skip empty text continue # Extract all formatting attributes font = span.get('font') size = span.get('size') color = span.get('color') flags = span.get('flags', 0) bbox = span.get("bbox", (0, 0, 0, 0)) x0, y0, x1, y1 = bbox # Create text format dictionary text_format = { 'Font': font, 'Size': size, 'Flags': flags, 'Color': color, 'Text': text, 'BBox': bbox, 'Page': pno + 1 } # Add to ArrayofTextWithFormat ArrayofTextWithFormat.append(text_format) # For line grouping (keeping your existing logic) matched = False for l in lines: if abs(l["y"] - y0) <= y_tolerance: l["spans"].append((x0, text, font, size, color, flags)) matched = True break if not matched: lines.append({ "y": y0, "spans": [(x0, text, font, size, color, flags)] }) lines.sort(key=lambda l: l["y"]) # Join text inside each line with formatting info final_lines = [] for l in lines: l["spans"].sort(key=lambda s: s[0]) # left → right # Collect all text and formatting for this line line_text = " ".join(text for _, text, _, _, _, _ in l["spans"]) # Get dominant formatting for the line (based on first span) if l["spans"]: _, _, font, size, color, flags = l["spans"][0] # Store line with its formatting line_with_format = { 'text': line_text, 'font': font, 'size': size, 'color': color, 'flags': flags, 'page': pno + 1, 'y_position': l["y"] } final_lines.append(line_with_format) # Result for line_data in final_lines: line_text = line_data['text'] print(line_text) if line_text: # Create a formatted string with text properties # format_info = f"Font: {line_data['font']}, Size: {line_data['size']}, Color: {line_data['color']}" lines_for_prompt.append(f"PAGE {pno+1}: {line_text}") lines_on_page += 1 if lines_on_page > 0: logger.debug(f"Page {pno}: collected {lines_on_page} lines") total_lines += lines_on_page logger.info(f"Total lines collected for LLM: {total_lines}") # Now ArrayofTextWithFormat contains dictionaries for each text span with full formatting print(f"\nTotal text spans with formatting: {len(ArrayofTextWithFormat)}") print("\nSample of formatted text entries:") for i, entry in enumerate(ArrayofTextWithFormat[:3]): # Show first 3 entries print(f"Entry {i+1}: {entry}") # for pno in range(15): # # if pages_to_check and pno not in pages_to_check: # # continue # # if pno in toc_pages: # # logger.debug(f"Skipping TOC page {pno}") # # continue # page = doc.load_page(pno) # page_height = page.rect.height # lines_on_page = 0 # text_dict = page.get_text("dict") # lines = [] # y_tolerance = 2 # tweak if needed (1–3 usually works) # for block in text_dict["blocks"]: # if block["type"] != 0: # continue # for line in block["lines"]: # for span in line["spans"]: # text = span["text"].strip() # size= span['size'] # x0, y0, x1, y1 = span["bbox"] # matched = False # for l in lines: # if abs(l["y"] - y0) <= y_tolerance: # l["spans"].append((x0, text,)) # matched = True # break # if not matched: # lines.append({ # "y": y0, # "spans": [(x0, text)] # }) # lines.sort(key=lambda l: l["y"]) # # Join text inside each line # final_lines = [] # for l in lines: # l["spans"].sort(key=lambda s: s[0]) # left → right # line_text = " ".join(text for _, text in l["spans"]) # final_lines.append(line_text) # # Result # for line in final_lines: # print(line) # if text: # # prefix with page for easier mapping back # lines_for_prompt.append(f"PAGE {pno+1}: {line}") # lines_on_page += 1 # if lines_on_page > 0: # logger.debug(f"Page {pno}: collected {lines_on_page} lines") # total_lines += lines_on_page # logger.info(f"Total lines collected for LLM: {total_lines}") # page = doc.load_page(pno) # page_height = page.rect.height # lines_on_page = 0 # text_dict = page.get_text("dict") # lines = [] # # y_tolerance = 0.2 # tweak if needed (1–3 usually works) # for block in text_dict["blocks"]: # if block["type"] != 0: # continue # for line in block["lines"]: # for s in line["spans"]: # # text = span["text"].strip() # # size= span['size'] # # if not text: # # continue # # if text: # # prefix with page for easier mapping back # # lines_for_prompt.append(f"PAGE {pno+1}: {text}") # # for s in spans: # # text,font,size,flags,color # ArrayofTextWithFormat={'Font':s.get('font')},{'Size':s.get('size')},{'Flags':s.get('flags')},{'Color':s.get('color')},{'Text':s.get('text')} # # prefix with page for easier mapping back # lines_for_prompt.append(f"PAGE {pno+1}: {ArrayofTextWithFormat}") # lines_on_page += 1 # if lines_on_page > 0: # logger.debug(f"Page {pno}: collected {lines_on_page} lines") # total_lines += lines_on_page # logger.info(f"Total lines collected for LLM: {total_lines}") if not lines_for_prompt: logger.warning("No lines collected for prompt") return [] # Log sample of lines logger.info("Sample lines (first 10):") for i, line in enumerate(lines_for_prompt[:10]): logger.info(f" {i}: {line}") prompt =( "You are a document parser specialized in Uniclass and NBS (National Building Specification) specification documents. " "Your task is to extract ONLY headers and their hierarchical levels, NOT the body content of clauses.\n\n" "UNDERSTANDING THE DOCUMENT STRUCTURE:\n" "Uniclass/NBS documents follow a hierarchical structure. You could encounter some client requirement document that don't have section codes or numbers but you would find the product category header.\n" # "The hierarchy can extend to whatever depth the document requires. Your job is to identify the RELATIVE hierarchy " # "based on the document's structure, not fit it into a predetermined number of levels.\n\n" "IDENTIFY DOCUMENT FORMAT\n\n:" "Identify the document format from the EXPECTED FORMATS in order to use the relevant example when IDENTIFYING HEADERS (NOT BODY TEXT)\n\n" "EXPECTED FORMATS\n" "There are more than one type for a specification document format, which are NBS Specification, Uni class, Outline Specification, Scope\n\n" "CRITICAL RULE: Product codes starting with \"Pr_\" (Uniclass product codes) represent specific products and are " "typically at deeper levels of the hierarchy compared to their parent sections.\n\n" "IDENTIFYING HEADERS (NOT BODY TEXT):\n" "Headers typically have these characteristics:\n" "1. Document titles and main section headers (often in larger font, ALL CAPS, or bold, or different color)\n" "2. NBS section codes (like F10, H11, L20, M10, etc.) often are level 1 headers.\n" "3. Section headers like \"Types of...\", \"Products\", \"Workmanship\", \"Testing\", \"Execution\", \"Completion\"\n" "4. Product category headers (e.g., \"105A Proprietary small buildings/structures\" or \"110 Clay facing brickwork\" or \"Pr_\" followed by product names or \"Ss_\" followed by system names)\n" "EXCLUDING NBS CLAUSE BODIES:\n" "DO NOT extract these as headers:\n" "- Detailed specification text that describes requirements, standards, or instructions\n" "- Multi-sentence explanatory text under headers\n" "- Reference to standards (e.g., \"To BS EN 771-1\", \"Comply with BS 8000-3\")\n" "- Performance requirements and technical specifications\n" "- Installation instructions and procedural text\n" "- Material descriptions and property specifications\n" "- Bullet points or numbered lists that are clearly body content, not headers\n" "- Text that reads like a complete sentence or instruction rather than a title\n\n" "HIERARCHY ASSIGNMENT:\n" "Assign hierarchy levels based on the RELATIVE position in the document structure:\n" "- Lower numbers (1, 2, 3) for higher-level sections (document titles, major sections)\n" "- Higher numbers for deeper nested sections and subsections\n" "- The actual level numbers should reflect the nesting depth you observe in the document\n" "WHAT TO EXTRACT:\n" "Extract ONLY lines that serve as:\n" "- Document titles and section titles\n" "- Section codes (F10, H11, L20, M10, P10, R10, etc.)\n" "- Major section headers (\"GENERAL\", \"PRODUCTS\", \"EXECUTION\", \"Workmanship\", \"Testing\", \"Completion\")\n" "- Product category headers\n" "- Numbered sections and subsections that act as titles (not body text)\n" "- Any line that clearly serves as a heading, title, or section marker (typically short, title-case or capitalized)\n\n" "EXCLUDE:\n" "- All descriptive paragraphs and body text of NBS clauses\n" "- Specification details and requirements\n" "- Instructions and procedural text\n" "- Standards references embedded in body text\n" "- Any multi-sentence explanatory content\n\n" "Return a JSON array of objects with keys:\n" " text: the exact line text (without the 'PAGE N:' prefix),\n" " page: the page number (integer),\n" # " suggested_level: integer representing the hierarchical depth (1 for top-level, increasing numbers for deeper nesting),\n" " confidence: 0-1 (based on how certain you are this is a header, not body text).\n\n" "Be selective - only extract actual headers/titles. This is a one-time run. It must be right." # "Determine the appropriate hierarchy levels based on the structure you observe. Use as many levels as necessary " # "to accurately represent the document's structure.\n\n" "DO NOT SUGGEST A HIERARCHY LEVEL. YOUR JOB IS TO IDENTIFY HEADERS ONLY." "THESE ARE THE EXAMPLES OF EACH DOCUMENT HEADERS:" "Example 1 of an NBS Specification document headers:" "

B91

" "

Buildings in the landscape

" "

System outline

" "

105A Proprietary small buildings/ structures

" "Example 2 of an NBS Specification document headers:" "

B91

" "

Buildings in the landscape

" "

Products

" "

330 Door Canopy Type A

" "Example 3 of an NBS Specification document headers:" "

P30

" "

Trenches, pipeways and pits for buried engineering services

" "

Products

" "

301 Access covers, manhole tops and frames

" "Example 1 of an Uni class document headers:" "

Ss_25_10_20_85

" "

Stick curtain walling systems Wall Type WT2

" "

Systems

" "

Ss_25_10_20_85
Stick curtain walling systems Wall Type WT2

" "Example 2 of an Uni class document headers:" "

Pr_20_85_06_85

" "

Stainless steel hoops Trolley Parks

" "

Products

" "

Pr_20_85_06_85 Stainless steel hoops Trolley Parks

" "Example 1 of an Outline Specification document headers:" "

External Walls

" "

Brickwork External Walls

" "Example 2 of an Outline Specification document headers:" "

INTERNAL WALL SURFACE FINISHES

" "

PAINT FINISHES/ DURABLE WALL FINISHES

" "Return only a single JSON array (no commentary). Include only headers.\n\n" + "\n\nLines:\n" + "\n".join(lines_for_prompt) ) logger.debug(f"Full prompt length: {len(prompt)} characters") # Changed: Print entire prompt, not truncated print("=" * 80) print("FULL LLM PROMPT:") print(prompt) print("=" * 80) # Also log to file try: with open("full_prompt.txt", "w", encoding="utf-8") as f: f.write(prompt) logger.info("Full prompt saved to full_prompt.txt") except Exception as e: logger.error(f"Could not save prompt to file: {e}") if not api_key: # No API key: return empty so caller can fallback to heuristics logger.error("No API key provided") return [] url = "https://openrouter.ai/api/v1/chat/completions" # Unix timestamp (seconds since epoch) unix_timestamp = int(time.time()) # Current datetime in ISO format (UTC) current_time = datetime.now(timezone.utc).isoformat() # Build headers following the OpenRouter example headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", "HTTP-Referer": os.getenv("OPENROUTER_REFERER", ""), "X-Title": os.getenv("OPENROUTER_X_TITLE", ""), # "X-Request-Timestamp": str(unix_timestamp), # "X-Request-Datetime": current_time, } # Log request details (without exposing full API key) logger.info(f"Making request to OpenRouter with model: {model}") logger.debug(f"Headers (API key masked): { {k: '***' if k == 'Authorization' else v for k, v in headers.items()} }") # Wrap the prompt as the example 'content' array expected by OpenRouter body = { "model": model, "messages": [ { "role": "user", "content": [ {"type": "text", "text": prompt} ] } ] } # print(f"Request sent at: {current_time}") # print(f"Unix timestamp: {unix_timestamp}") # Debug: log request body (truncated) and write raw response for inspection try: # Changed: Log full body (excluding prompt text which is already logged) logger.debug(f"Request body (without prompt text): { {k: v if k != 'messages' else '[...prompt...]' for k, v in body.items()} }") # Removed timeout parameter resp = requests.post( url=url, headers=headers, data=json.dumps(body) ) logger.info(f"HTTP Response Status: {resp.status_code}") resp.raise_for_status() resp_text = resp.text # Changed: Print entire response print("=" * 80) print("FULL LLM RESPONSE:") print(resp_text) print("=" * 80) logger.info(f"LLM raw response length: {len(resp_text)}") # Save raw response for offline inspection try: with open("llm_debug.json", "w", encoding="utf-8") as fh: fh.write(resp_text) logger.info("Raw response saved to llm_debug.json") except Exception as e: logger.error(f"Warning: could not write llm_debug.json: {e}") rj = resp.json() logger.info(f"LLM parsed response type: {type(rj)}") if isinstance(rj, dict): logger.debug(f"Response keys: {list(rj.keys())}") except requests.exceptions.RequestException as e: logger.error(f"HTTP request failed: {repr(e)}") return [] except Exception as e: logger.error(f"LLM call failed: {repr(e)}") return [] # Extract textual reply robustly text_reply = None if isinstance(rj, dict): choices = rj.get('choices') or [] logger.debug(f"Number of choices in response: {len(choices)}") if choices: for i, c in enumerate(choices): logger.debug(f"Choice {i}: {c}") c0 = choices[0] msg = c0.get('message') or c0.get('delta') or {} content = msg.get('content') if isinstance(content, list): logger.debug(f"Content is a list with {len(content)} items") for idx, c in enumerate(content): if c.get('type') == 'text' and c.get('text'): text_reply = c.get('text') logger.debug(f"Found text reply in content[{idx}], length: {len(text_reply)}") break elif isinstance(content, str): text_reply = content logger.debug(f"Content is string, length: {len(text_reply)}") elif isinstance(msg, dict) and msg.get('content') and isinstance(msg.get('content'), dict): text_reply = msg.get('content').get('text') logger.debug(f"Found text in nested content dict") # Fallback extraction if not text_reply: logger.debug("Trying fallback extraction from choices") for c in rj.get('choices', []): if isinstance(c.get('text'), str): text_reply = c.get('text') logger.debug(f"Found text reply in choice.text, length: {len(text_reply)}") break if not text_reply: logger.error("Could not extract text reply from response") # Changed: Print the entire response structure for debugging print("=" * 80) print("FAILED TO EXTRACT TEXT REPLY. FULL RESPONSE STRUCTURE:") print(json.dumps(rj, indent=2)) print("=" * 80) return [] # Changed: Print the extracted text reply print("=" * 80) print("EXTRACTED TEXT REPLY:") print(text_reply) print("=" * 80) logger.info(f"Extracted text reply length: {len(text_reply)}") logger.debug(f"First 500 chars of reply: {text_reply[:500]}...") s = text_reply.strip() start = s.find('[') end = s.rfind(']') js = s[start:end+1] if start != -1 and end != -1 else s logger.debug(f"Looking for JSON array: start={start}, end={end}") logger.debug(f"Extracted JSON string (first 500 chars): {js[:500]}...") try: parsed = json.loads(js) logger.info(f"Successfully parsed JSON, got {len(parsed)} items") except json.JSONDecodeError as e: logger.error(f"Failed to parse JSON: {e}") logger.error(f"JSON string that failed to parse: {js[:1000]}") # Try to find any JSON-like structure try: # Try to extract any JSON array import re json_pattern = r'\[\s*\{.*?\}\s*\]' matches = re.findall(json_pattern, text_reply, re.DOTALL) if matches: logger.info(f"Found {len(matches)} potential JSON arrays via regex") for i, match in enumerate(matches): try: parsed = json.loads(match) logger.info(f"Successfully parsed regex match {i} with {len(parsed)} items") break except json.JSONDecodeError as e2: logger.debug(f"Regex match {i} also failed: {e2}") continue else: logger.error("All regex matches failed to parse") return [] else: logger.error("No JSON-like pattern found via regex") return [] except Exception as e2: logger.error(f"Regex extraction also failed: {e2}") return [] # Log parsed results logger.info(f"Parsed {len(parsed)} header items:") for i, obj in enumerate(parsed[:10]): # Log first 10 items logger.info(f" Item {i}: {obj}") # Normalize parsed entries and return out = [] for obj in parsed: t = obj.get('text') page = int(obj.get('page')) if obj.get('page') else None level = obj.get('suggested_level') conf = float(obj.get('confidence') or 0) size=obj.get('size') if t and page is not None: out.append({'text': t, 'page': page-1, 'suggested_level': level, 'confidence': conf,'size':size}) logger.info(f"Returning {len(out)} valid header entries") return out def mapPages_header_hierarchy(headers,hierarchy): mapped_hierarchy = [] header_idx = 0 # pointer in headers for h_item in hierarchy: h_text = h_item.get("text") h_level = h_item.get("suggested_level") h_conf = float(h_item.get("confidence", 0)) page = None combined_text = "" start_idx = header_idx # Try to match hierarchy text by concatenating headers while header_idx < len(headers) and len(combined_text) < len(h_text): header = headers[header_idx] header_text = header.get("text") if isinstance(header, dict) else str(header) header_page = header.get("page") if isinstance(header, dict) else None if combined_text: combined_text += " " # add space between concatenated headers combined_text += header_text if page is None: page = header_page # take page of first matching header header_idx += 1 # Optional: check if merged headers partially match hierarchy if h_text not in combined_text: # fallback: use last header page or None if start_idx < len(headers): page = headers[start_idx].get("page") if isinstance(headers[start_idx], dict) else None mapped_hierarchy.append({ "text": h_text, "page": page, "suggested_level": int(h_level), "confidence": h_conf }) return mapped_hierarchy # # Save results # with open("mapped_hierarchy_with_pages.json", "w", encoding="utf-8") as f: # json.dump(mapped_hierarchy, f, indent=2) # print(f"Original headers: {len(headers)}") # print(f"Hierarchy items: {len(hierarchy)}") # print(f"Mapped hierarchy: {len(mapped_hierarchy)}") # def identify_headers_with_openrouter(doc, api_key=None, model="google/gemini-2.5-pro", pages_to_check=None, top_margin=70, bottom_margin=85, timeout=30): # # def identify_headers_with_openrouter(pdf_path, model, LLM_prompt, pages_to_check=None, top_margin=0, bottom_margin=0): # """Ask an LLM (OpenRouter) to identify headers in the document. # Returns a list of dicts: {text, page, suggested_level, confidence, body}. # The function sends plain page-line strings to the LLM (including page numbers) # and asks for a JSON array containing headers with suggested levels and body for the last header. # """ # logger.info("=" * 80) # logger.info("STARTING IDENTIFY_HEADERS_WITH_OPENROUTER") # # logger.info(f"PDF Path: {pdf_path}") # # logger.info(f"Model: {model}") # # logger.info(f"LLM Prompt: {LLM_prompt[:200]}..." if len(LLM_prompt) > 200 else f"LLM Prompt: {LLM_prompt}") # api_key = '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' # if api_key is None: # api_key = os.getenv("OPENROUTER_API_KEY") or None # model = str(model) # toc_pages = get_toc_page_numbers(doc) # lines_for_prompt = [] # logger.info(f"TOC pages to skip: {toc_pages}") # logger.info(f"Total pages in document: {len(doc)}") # # Collect text lines from pages (skip TOC pages) # total_lines = 0 # for pno in range(len(doc)): # if pages_to_check and pno not in pages_to_check: # continue # if pno in toc_pages: # logger.debug(f"Skipping TOC page {pno}") # continue # page = doc.load_page(pno) # page_height = page.rect.height # lines_on_page = 0 # text_dict = page.get_text("dict") # lines = [] # y_tolerance = 2 # tweak if needed (1–3 usually works) # for block in text_dict["blocks"]: # if block["type"] != 0: # continue # for line in block["lines"]: # for span in line["spans"]: # text = span["text"].strip() # if not text: # continue # x0, y0, x1, y1 = span["bbox"] # matched = False # for l in lines: # if abs(l["y"] - y0) <= y_tolerance: # l["spans"].append((x0, text)) # matched = True # break # if not matched: # lines.append({ # "y": y0, # "spans": [(x0, text)] # }) # lines.sort(key=lambda l: l["y"]) # # Join text inside each line # final_lines = [] # for l in lines: # l["spans"].sort(key=lambda s: s[0]) # left → right # line_text = " ".join(text for _, text in l["spans"]) # final_lines.append(line_text) # # Result # for line in final_lines: # print(line) # if text: # # prefix with page for easier mapping back # lines_for_prompt.append(f"PAGE {pno+1}: {line}") # lines_on_page += 1 # if lines_on_page > 0: # logger.debug(f"Page {pno}: collected {lines_on_page} lines") # total_lines += lines_on_page # logger.info(f"Total lines collected for LLM: {total_lines}") # if not lines_for_prompt: # logger.warning("No lines collected for prompt") # return [] # # Log sample of lines # logger.info("Sample lines (first 10):") # for i, line in enumerate(lines_for_prompt[:10]): # logger.info(f" {i}: {line}") # prompt = ( # "You are a document parser specialized in Uniclass and NBS (National Building Specification) specification documents. Your task is to extract ONLY headers, their hierarchical levels, and the body content belonging to the last header in a sequence. The body extraction must stop when another header is encountered.\n\n" # "UNDERSTANDING THE DOCUMENT STRUCTURE:\n" # "Uniclass/NBS documents follow a hierarchical structure. You could encounter some client requirement documents that don't have section codes or numbers, but you would find the product category header.\n" # "The hierarchy can extend to whatever depth the document requires. Your job is to identify the RELATIVE hierarchy " # "based on the document's structure, not fit it into a predetermined number of levels.\n\n" # "IDENTIFY DOCUMENT FORMAT:\n" # "Identify the document format from the EXPECTED FORMATS in order to use the relevant example when IDENTIFYING HEADERS (NOT BODY TEXT)\n\n" # "EXPECTED FORMATS:\n" # "There are more than one type of specification document format, which are: NBS Specification, Uniclass, Outline Specification, Scope.\n\n" # "CRITICAL RULE:\n" # "Product codes starting with \"Pr_\" (Uniclass product codes) represent specific products and are " # "typically at deeper levels of the hierarchy compared to their parent sections.\n\n" # "IDENTIFYING HEADERS (NOT BODY TEXT):\n" # "Headers typically have these characteristics:\n" # "1. Document titles and main section headers (often in larger font, ALL CAPS, or bold, or different color)\n" # "2. NBS section codes (like F10, H11, L20, M10, etc.) often are level 1 headers.\n" # "3. Section headers like \"Types of...\", \"Products\", \"Workmanship\", \"Testing\", \"Execution\", \"Completion\"\n" # "4. Product category headers (e.g., \"105A Proprietary small buildings/structures\" or \"110 Clay facing brickwork\" or \"Pr_\" followed by product names or \"Ss_\" followed by system names)\n\n" # "EXTRACTING BODY CONTENT FOR THE LAST HEADER:\n" # "- For the deepest, most recent header you identify (i.e., the last header before encountering body text), you MUST also extract its associated body content.\n" # "- The body content begins on the line immediately following the header.\n" # "- The body content stops when you encounter the next header of any hierarchical level, or the end of the document.\n" # "- The body content includes the detailed specification text that describes requirements, standards, instructions, etc., which are otherwise excluded from header extraction.\n\n" # "EXCLUDING NBS CLAUSE BODIES (EXCEPT FOR THE LAST HEADER):\n" # "DO NOT extract body content for any header except the last one in a sequence. For all other headers, exclude:\n" # "- Detailed specification text that describes requirements, standards, or instructions\n" # "- Multi-sentence explanatory text under headers\n" # "- Reference to standards (e.g., \"To BS EN 771-1\", \"Comply with BS 8000-3\")\n" # "- Performance requirements and technical specifications\n" # "- Installation instructions and procedural text\n" # "- Material descriptions and property specifications\n" # "- Bullet points or numbered lists that are clearly body content, not headers\n" # "- Text that reads like a complete sentence or instruction rather than a title\n\n" # "HIERARCHY ASSIGNMENT:\n" # "Assign hierarchy levels based on the RELATIVE position in the document structure:\n" # "- Lower numbers (1, 2, 3) for higher-level sections (document titles, major sections)\n" # "- Higher numbers for deeper nested sections and subsections\n" # "- The actual level numbers should reflect the nesting depth you observe in the document\n\n" # "WHAT TO EXTRACT:\n" # "1. For ALL identified headers: Extract the header text, page, hierarchical level, and confidence.\n" # "2. For the LAST HEADER only: Also extract its associated body content, which starts after the header and stops at the next header or document end.\n\n" # "EXCLUDE:\n" # "- For all but the last header, exclude all descriptive paragraphs and body text.\n" # "- Do not extract body content for any header that is followed by another header before its body text begins.\n\n" # "Return a JSON array of objects with the following keys:\n" # " text: The exact header line text (without the 'PAGE N:' prefix),\n" # " page: The page number (integer) where the header is found,\n" # " suggested_level: An integer representing the hierarchical depth (1 for top-level, increasing numbers for deeper nesting),\n" # " confidence: A number between 0-1 based on how certain you are this is a header, not body text,\n" # " body: (For the last header only) A string containing the body content belonging to that header. The body starts on the line after the header and stops at the next header or document end. For all other headers, this should be an empty string \"\".\n\n" # "Be selective - only extract actual headers/titles and the body for the final header. This is a one-time run. It must be right.\n\n" # "Determine the appropriate hierarchy levels based on the structure you observe. Use as many levels as necessary " # "to accurately represent the document's structure.\n\n" # "THESE ARE THE EXAMPLES OF EACH DOCUMENT HEADERS:\n" # "Example 1 of an NBS Specification document headers:\n" # "

B91

\n" # "

Buildings in the landscape

\n" # "

System outline

\n" # "

105A Proprietary small buildings/ structures

\n\n" # "

1. Description: Central Substation BLD 09 Contractor Design Portion (CDP)" # "2. Drawing Reference: DABS-ATK-09-ZZ-D-A-011000DABS-ATK-09-00-D-A-031000DABS-ATK-09-00-D-A-031001" # "3. Building type: Modular GRP Enclosure" # "4. Manufacturer: Glasdon UK Limited Poulton Business Park Poulton-le-Fylde Lancashire FY6 8JW Suzanne Warren Customer Support Office Manager suzanne.warren@glasdon - uk.co.uk T: +44(0)1253 600418 Web: www.glasdon.com" # "5. Product: Garrison Housing" # "6. Internal Dimensions: Low Voltage enclosure: 8570 x 3893mm High Voltage enclosure: 8570 x 5247mm" # "7. Internal Height: 3000mm" # "8. Design Requirements: Colour: 04_Dark Brown Flooring: Not required, fixed to concrete slab Electrics: Standard electrical installation; see MEP drawings for other requirements. Ventilation: Not required Glazing: Not Required Doors: Double (Louvered Doors) and single (Panel Doors) Miscellaneous: Exit Furniture to doors / Resin - coated internal finish" # "Panels: 18mm plywood core locations as per MEP drawings, otherwise 18mm foam core panels" # "Other: Aluminum flashing to seal the enclosures back to - back gap (roof and sides)" # "9. MEP Penetrations: Sealant to manufacturers recommendations.

" # "Example 2 of an NBS Specification document headers:\n" # "

B91

\n" # "

Buildings in the landscape

\n" # "

Products

\n" # "

330 Door Canopy Type A

\n\n" # "

1. Description: MAIN ENTRANCE CANOPY - PEDESTRIAN ACCESS BUILDINGS 01 TO 05" # "2. Manufacturer: Able Canopies Ltd 2.1. Contact details 2.1.1. Address: 9 - 11 Faraday Close Gorse Lane Industrial Estate Clacton - on - Sea 2.1.2. Telephone: 0800 389 9072 2.1.3. Web: www.ablecanopies.co.uk 2.1.4. Email: sales@ablecanopies.co.uk" # "2.2. Product reference: Colwyn Wall Mounted Entrance/ Walkway Canopy" # "3. Shelter description: Entrance/ walkway canopy." # "4. Dimensions" # "4.1. Plan size: Single Door Entrance: Width 1500mm Depth 1000mm Height 400mm or as standard fabrication Double Door Entrance: Width 2500mm Depth: 1000mm Height: 400mm or as standard fabrication" # "5. Frame" # "5.1. Material: Aluminium." # "5.2. Finish: Powder coated." # "5.3. Colour:" # "RAL 8014 / Van Dyke Brown" # "6. Roof covering: 4 mm thick solid polycarbonate sheets." # "7. Rainwater drainage: Integrated aluminium gutter fixed to wall, with a drain side and drip spout; colour as frame" # "8. Method of fixing to ground, base or walls: Fixed to envelope wall, using the provided secondary steel(s) by structural engineer; all fixings to manufacturers recommendations. Fixing height: bottom/base of fixing support brackets at 2500mm from external FFL

" # "Example 3 of an NBS Specification document headers:\n" # "

P30

\n" # "

Trenches, pipeways and pits for buried engineering services

\n" # "

Products

\n" # "

301 Access covers, manhole tops and frames

\n\n" # "Example 1 of a Uniclass document headers:\n" # "

Ss_25_10_20_85

\n" # "

Stick curtain walling systems Wall Type WT2

\n" # "

Systems

\n" # "

Ss_25_10_20_85
Stick curtain walling systems Wall Type WT2

\n\n" # "

1.Description: Proposed curtain walling to be installed. Refer to drawing no. 421001" # "2.System performance: Ss_25_10_20/205 Design submittals" # "3.System manufacturer: Senior Architectural Systems Ltd" # "4. Contact details" # "4.1.Address: Eland Road Denaby Main Doncaster South Yorkshire DN12 4HA" # "4.2.Telephone: +44 (0)1709 772600" # "4.3.Web: https://www.seniorarchitectural.co.uk" # "4.4.Email: info@sasmail.co.uk" # "5.Product reference: SF52 Fully Capped Curtain Wall System (Mullion drained)" # "6.System performance: Air permeability: 600 Pa, Water tightness: 600 Pa; Wind resistance: 2400 Pa." # "7. Framing" # "7.1.Frame members: Aluminium curtain wall frame sections, mullion drained." # "7.2.Frame accessories: Fully capped." # "8. Cladding units" # "8.1.Glass units: As shown window drawing" # "8.2.Glazing accessories: As shown window drawing" # "9.System accessories: Aluminium/ co-extruded plastic pressure plate." # "10.Colour/ Finish: Polyester Powder Coated Aluminium RAL 7016 Anthracite Grey

" # "Example 2 of a Uniclass document headers:\n" # "

Pr_20_85_06_85

\n" # "

Stainless steel hoops Trolley Parks

\n" # "

Products

\n" # "

Pr_20_85_06_85 Stainless steel hoops Trolley Parks

\n\n" # "Example 1 of an Outline Specification document headers:\n" # "

External Walls

\n" # "

Brickwork External Walls

\n\n" # "Example 2 of an Outline Specification document headers:\n" # "

INTERNAL WALL SURFACE FINISHES

\n" # "

PAINT FINISHES/ DURABLE WALL FINISHES

\n\n" # "Return only a single JSON array (no commentary). Include only headers and the body for the last header.\n" # + "\n\nLines:\n" + "\n".join(lines_for_prompt) # ) # logger.debug(f"Full prompt length: {len(prompt)} characters") # print("=" * 80) # print("FULL LLM PROMPT:") # print(prompt) # print("=" * 80) # # Also log to file # try: # with open("full_prompt.txt", "w", encoding="utf-8") as f: # f.write(prompt) # logger.info("Full prompt saved to full_prompt.txt") # except Exception as e: # logger.error(f"Could not save prompt to file: {e}") # if not api_key: # logger.error("No API key provided") # return [] # url = "https://openrouter.ai/api/v1/chat/completions" # headers = { # "Authorization": f"Bearer {api_key}", # "Content-Type": "application/json", # "HTTP-Referer": os.getenv("OPENROUTER_REFERER", ""), # "X-Title": os.getenv("OPENROUTER_X_TITLE", "") # } # logger.info(f"Making request to OpenRouter with model: {model}") # logger.debug(f"Headers (API key masked): { {k: '***' if k == 'Authorization' else v for k, v in headers.items()} }") # body = { # "model": model, # "messages": [ # { # "role": "user", # "content": [ # {"type": "text", "text": prompt} # ] # } # ] # } # try: # logger.debug(f"Request body (without prompt text): { {k: v if k != 'messages' else '[...prompt...]' for k, v in body.items()} }") # resp = requests.post( # url=url, # headers=headers, # data=json.dumps(body) # ) # logger.info(f"HTTP Response Status: {resp.status_code}") # resp.raise_for_status() # resp_text = resp.text # print("=" * 80) # print("FULL LLM RESPONSE:") # print(resp_text) # print("=" * 80) # logger.info(f"LLM raw response length: {len(resp_text)}") # try: # with open("llm_debug.json", "w", encoding="utf-8") as fh: # fh.write(resp_text) # logger.info("Raw response saved to llm_debug.json") # except Exception as e: # logger.error(f"Warning: could not write llm_debug.json: {e}") # rj = resp.json() # logger.info(f"LLM parsed response type: {type(rj)}") # if isinstance(rj, dict): # logger.debug(f"Response keys: {list(rj.keys())}") # except requests.exceptions.RequestException as e: # logger.error(f"HTTP request failed: {repr(e)}") # return [] # except Exception as e: # logger.error(f"LLM call failed: {repr(e)}") # return [] # # Extract textual reply robustly # text_reply = None # if isinstance(rj, dict): # choices = rj.get('choices') or [] # logger.debug(f"Number of choices in response: {len(choices)}") # if choices: # for i, c in enumerate(choices): # logger.debug(f"Choice {i}: {c}") # c0 = choices[0] # msg = c0.get('message') or c0.get('delta') or {} # content = msg.get('content') # if isinstance(content, list): # logger.debug(f"Content is a list with {len(content)} items") # for idx, c in enumerate(content): # if c.get('type') == 'text' and c.get('text'): # text_reply = c.get('text') # logger.debug(f"Found text reply in content[{idx}], length: {len(text_reply)}") # break # elif isinstance(content, str): # text_reply = content # logger.debug(f"Content is string, length: {len(text_reply)}") # elif isinstance(msg, dict) and msg.get('content') and isinstance(msg.get('content'), dict): # text_reply = msg.get('content').get('text') # logger.debug(f"Found text in nested content dict") # # Fallback extraction # if not text_reply: # logger.debug("Trying fallback extraction from choices") # for c in rj.get('choices', []): # if isinstance(c.get('text'), str): # text_reply = c.get('text') # logger.debug(f"Found text reply in choice.text, length: {len(text_reply)}") # break # if not text_reply: # logger.error("Could not extract text reply from response") # print("=" * 80) # print("FAILED TO EXTRACT TEXT REPLY. FULL RESPONSE STRUCTURE:") # print(json.dumps(rj, indent=2)) # print("=" * 80) # return [] # print("=" * 80) # print("EXTRACTED TEXT REPLY:") # print(text_reply) # print("=" * 80) # logger.info(f"Extracted text reply length: {len(text_reply)}") # logger.debug(f"First 500 chars of reply: {text_reply[:500]}...") # s = text_reply.strip() # start = s.find('[') # end = s.rfind(']') # js = s[start:end+1] if start != -1 and end != -1 else s # logger.debug(f"Looking for JSON array: start={start}, end={end}") # logger.debug(f"Extracted JSON string (first 500 chars): {js[:500]}...") # try: # parsed = json.loads(js) # logger.info(f"Successfully parsed JSON, got {len(parsed)} items") # except json.JSONDecodeError as e: # logger.error(f"Failed to parse JSON: {e}") # logger.error(f"JSON string that failed to parse: {js[:1000]}") # try: # import re # json_pattern = r'\[\s*\{.*?\}\s*\]' # matches = re.findall(json_pattern, text_reply, re.DOTALL) # if matches: # logger.info(f"Found {len(matches)} potential JSON arrays via regex") # for i, match in enumerate(matches): # try: # parsed = json.loads(match) # logger.info(f"Successfully parsed regex match {i} with {len(parsed)} items") # break # except json.JSONDecodeError as e2: # logger.debug(f"Regex match {i} also failed: {e2}") # continue # else: # logger.error("All regex matches failed to parse") # return [] # else: # logger.error("No JSON-like pattern found via regex") # return [] # except Exception as e2: # logger.error(f"Regex extraction also failed: {e2}") # return [] # # Log parsed results # logger.info(f"Parsed {len(parsed)} header items:") # for i, obj in enumerate(parsed[:10]): # logger.info(f" Item {i}: {obj}") # # Normalize parsed entries and return # out = [] # for obj in parsed: # t = obj.get('text') # page = int(obj.get('page')) if obj.get('page') else None # level = obj.get('suggested_level') # conf = float(obj.get('confidence') or 0) # body = obj.get('body', '') # Get body content, default to empty string # if t and page is not None: # out.append({ # 'text': t, # 'page': page-1, # 'suggested_level': level, # 'confidence': conf, # 'body': body # Add body to output # }) # logger.info(f"Returning {len(out)} valid header entries with body content for last header") # # Log which entries have body content # for i, item in enumerate(out): # if item.get('body'): # logger.info(f"Entry {i} has body content (length: {len(item['body'])})") # with open("full_output.txt", "w", encoding="utf-8") as f: # f.write(out) # return out def openrouter_test_prompt(prompt, api_key=None, model="google/gemini-2.5-pro", timeout=30): """Send a raw prompt to OpenRouter using the example content-array format and print the response. Useful for quickly testing whether the API key/endpoint works and what the raw reply looks like. """ if api_key is None: api_key = os.getenv("OPENROUTER_API_KEY") or None if not api_key: print("No API key available for OpenRouter test.") return None url = "https://openrouter.ai/api/v1/chat/completions" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", "HTTP-Referer": os.getenv("OPENROUTER_REFERER", ""), "X-Title": os.getenv("OPENROUTER_X_TITLE", "") } body = { "model": model, "messages": [ { "role": "user", "content": [ {"type": "text", "text": prompt} ] } ] } try: print("Sending OpenRouter test prompt (truncated):", prompt[:1000]) resp = requests.post(url, headers=headers, data=json.dumps(body), timeout=timeout) print("Status:", resp.status_code) print(resp.text[:4000]) try: with open("llm_test_response.json", "w", encoding="utf-8") as fh: fh.write(resp.text) except Exception: pass return resp except Exception as e: print("OpenRouter test failed:", repr(e)) return None # def locate_identified_headers(doc, identified_headers, min_match=0.65): # """Given identified headers (text+page+level), locate best matching spans to return x/y/page/level/confidence. # identified_headers: list of {'text','page','suggested_level','confidence'} # Returns list of {'text','page','x','y','level','confidence'} # """ # if not identified_headers: # return [] # toc_pages = get_toc_page_numbers(doc) # located = [] # for item in identified_headers: # text = item['text'] # page_hint = item.get('page', 0) # norm = normalize_text(text) # best = None # best_score = 0.0 # # search starting from hinted page # page_range = list(range(page_hint, len(doc))) + list(range(0, page_hint)) # for pno in page_range: # if pno in toc_pages: # continue # page = doc.load_page(pno) # for block in page.get_text('dict').get('blocks', []): # if block.get('type') != 0: # continue # for line in block.get('lines', []): # line_text = normalize_text(' '.join(s.get('text','') for s in line.get('spans', []))) # if not line_text: # continue # if norm in line_text or line_text in norm: # score = 1.0 # else: # score = fuzz.ratio(norm, line_text) / 100.0 # if score > best_score: # best_score = score # best = (pno, line, score) # if best_score >= 0.95: # break # if best and best_score >= min_match: # pno, line, score = best # spans = line.get('spans', []) # if spans: # x = min((s.get('bbox',[0,0,0,0])[0] for s in spans)) # y = min((s.get('bbox',[0,0,0,0])[1] for s in spans)) # else: # x = None # y = None # located.append({'text': text, 'page': pno, 'x': x, 'y': y, 'level': item.get('suggested_level'), 'confidence': item.get('confidence') or score}) # return located # def classify_and_locate_headers_with_openrouter(doc, candidates=None, api_key=None): # """Compatibility wrapper: classify header candidates (heuristic if no LLM) # and locate them. Returns list of {'text','page','x','y','level','confidence'}. # """ # # If no candidates provided, extract them heuristically # if candidates is None: # most_common_font_size, most_common_color, most_common_font = get_regular_font_size_and_color(doc) # toc_pages = get_toc_page_numbers(doc) # candidates, _, _, _ = extract_headers(doc, toc_pages, most_common_font_size, most_common_color, most_common_font, top_margin, bottom_margin) # if not candidates: # return [] # # Heuristic classification by font size: largest -> higher level # sizes = sorted({c[1] for c in candidates}, reverse=True) # def pick_level(sz): # if not sizes: # return None # if sz >= sizes[0] - 0.1: # return 1 # if len(sizes) > 1 and sz >= sizes[1] - 0.1: # return 2 # return 3 # identified = [] # for c in candidates: # text, size, pageNum, y = c[:4] # identified.append({ # 'text': text, # 'page': pageNum, # 'suggested_level': pick_level(size), # 'confidence': 0.6 # }) # # Now locate identified headers # located = locate_identified_headers(doc, identified) # return located def is_numbered(text): return bool(re.match(r'^\d', text.strip())) def is_similar(a, b, threshold=0.85): return difflib.SequenceMatcher(None, a, b).ratio() > threshold def normalize(text): text = text.lower() text = re.sub(r'\.{2,}', '', text) # remove long dots text = re.sub(r'\s+', ' ', text) # replace multiple spaces with one return text.strip() def clean_toc_entry(toc_text): """Remove page numbers and formatting from TOC entries""" # Remove everything after last sequence of dots/whitespace followed by digits return re.sub(r'[\.\s]+\d+.*$', '', toc_text).strip('. ') def enforce_level_hierarchy(headers): """ Ensure level 2 headers only exist under level 1 headers and clean up any orphaned headers """ def process_node_list(node_list, parent_level=-1): i = 0 while i < len(node_list): node = node_list[i] # Remove level 2 headers that don't have a level 1 parent if node['level'] == 2 and parent_level != 1: node_list.pop(i) continue # Recursively process children process_node_list(node['children'], node['level']) i += 1 process_node_list(headers) return headers def build_header_hierarchy(doc, toc_pages, most_common_font_size, most_common_color, most_common_font, top_margin=70, bottom_margin=70): # Extract headers with margin handling headers_list, top_3_font_sizes, smallest_font_size, spans = extract_headers( doc, toc_pages=toc_pages, most_common_font_size=most_common_font_size, most_common_color=most_common_color, most_common_font=most_common_font, top_margin=top_margin, bottom_margin=50 ) # Step 1: Collect and filter potential headers headers = [] seen_headers = set() # First extract TOC entries to get exact level 0 header texts toc_entries = {} for pno in toc_pages: print(pno) page = doc[pno] toc_text = page.get_text() for line in toc_text.split('\n'): clean_line = line.strip() if clean_line: norm_line = normalize(clean_line) toc_entries[norm_line] = clean_line # Store original text print(toc_pages) for h in headers_list: text, size, pageNum, y = h[:4] page = doc.load_page(pageNum) page_height = page.rect.height # Skip margin areas if y < top_margin or y > (page_height - bottom_margin): continue norm_text = normalize(text) if len(norm_text) > 2 and size >= most_common_font_size: headers.append({ "text": text, "page": pageNum, "y": y, "size": size, "bold": h[4] if len(h) > 4 else False, # "italic": h[5] if len(h) > 5 else False, "color": h[6] if len(h) > 6 else None, "font": h[7] if len(h) > 7 else None, "children": [], "is_numbered": is_numbered(text), "original_size": size, "norm_text": norm_text, "level": -1 # Initialize as unassigned }) # Sort by page and vertical position headers.sort(key=lambda h: (h['page'], h['y'])) # Step 2: Detect consecutive headers and assign levels i = 0 while i < len(headers) - 1: current = headers[i] next_header = headers[i+1] # Check if they are on the same page and very close vertically (likely consecutive lines) if (current['page'] == next_header['page'] and abs(current['y'] - next_header['y']) < 20): # 20pt threshold for "same line" # Case 1: Both unassigned - make current level 1 and next level 2 if current['level'] == -1 and next_header['level'] == -1: current['level'] = 1 next_header['level'] = 2 i += 1 # Skip next header since we processed it # Case 2: Current unassigned, next assigned - make current one level above elif current['level'] == -1 and next_header['level'] != -1: current['level'] = max(1, next_header['level'] - 1) # Case 3: Current assigned, next unassigned - make next one level below elif current['level'] != -1 and next_header['level'] == -1: next_header['level'] = current['level'] + 1 i += 1 # Skip next header since we processed it i += 1 # Step 2: Identify level 0 headers (largest and in TOC) # max_size = max(h['size'] for h in headers) if headers else 0 print(top_3_font_sizes) max_size,subheaderSize,nbsheadersize=top_3_font_sizes print(max_size) toc_text_match=[] # Improved TOC matching with exact and substring matching toc_matches = [] for h in headers: norm_text = h['norm_text'] matching_toc_texts = [] # Check both exact matches and substring matches for toc_norm, toc_text in toc_entries.items(): # Exact match case if norm_text == toc_norm and len(toc_text)>4 and h['size']==max_size: matching_toc_texts.append(toc_text) # Substring match case (header is substring of TOC entry) elif norm_text in toc_norm and len(toc_text)>4 and h['size']==max_size: matching_toc_texts.append(toc_text) # Substring match case (TOC entry is substring of header) elif toc_norm in norm_text and len(toc_text)>4 and h['size']==max_size: matching_toc_texts.append(toc_text) if matching_toc_texts and h['size'] >= max_size * 0.9: best_match = max(matching_toc_texts, key=lambda x: (len(x), -len(x.replace(norm_text, '')))) h['text'] = normalize_text(clean_toc_entry(best_match)) h['level'] = 0 if h['text'] not in toc_text_match: toc_matches.append(h) toc_text_match.append(h['text']) elif matching_toc_texts and h['size'] < max_size * 0.9 and h['size'] > nbsheadersize : # h['size'] < max_size * 0.9 and h['size'] > max_size*0.75: print(h['text'],matching_toc_texts) headers.remove(h) continue # Remove duplicates - keep only first occurrence of each level 0 header unique_level0 = [] seen_level0 = set() for h in toc_matches: # Use the cleaned text for duplicate checking cleaned_text = clean_toc_entry(h['text']) norm_cleaned_text = normalize(cleaned_text) if norm_cleaned_text not in seen_level0: seen_level0.add(norm_cleaned_text) # Update the header text with cleaned version h['text'] = cleaned_text unique_level0.append(h) print(f"Added unique header: {cleaned_text} (normalized: {norm_cleaned_text})") # Step 3: Process headers under each level 0 to identify level 1 format # First, group headers by their level 0 parent level0_headers = [h for h in headers if h['level'] == 0] header_groups = [] for i, level0 in enumerate(level0_headers): start_idx = headers.index(level0) end_idx = headers.index(level0_headers[i+1]) if i+1 < len(level0_headers) else len(headers) group = headers[start_idx:end_idx] header_groups.append(group) # Now process each group to identify level 1 format for group in header_groups: level0 = group[0] level1_candidates = [h for h in group[1:] if h['level'] == -1] if not level1_candidates: continue # The first candidate is our reference level 1 first_level1 = level1_candidates[0] level1_format = { 'font': first_level1['font'], 'color': first_level1['color'], 'starts_with_number': is_numbered(first_level1['text']), 'size': first_level1['size'], 'bold': first_level1['bold'] # 'italic': first_level1['italic'] } # Assign levels based on the reference format for h in level1_candidates: current_format = { 'font': h['font'], 'color': h['color'], 'starts_with_number': is_numbered(h['text']), 'size': h['size'], 'bold': h['bold'] # 'italic': h['italic'] } # Compare with level1 format if (current_format['font'] == level1_format['font'] and current_format['color'] == level1_format['color'] and current_format['starts_with_number'] == level1_format['starts_with_number'] and abs(current_format['size'] - level1_format['size']) <= 0.1 and current_format['bold'] == level1_format['bold'] ): #and # current_format['italic'] == level1_format['italic']): h['level'] = 1 else: h['level'] = 2 # Step 4: Assign levels to remaining unassigned headers unassigned = [h for h in headers if h['level'] == -1] if unassigned: # Cluster by size with tolerance sizes = sorted({h['size'] for h in unassigned}, reverse=True) clusters = [] for size in sizes: found_cluster = False for cluster in clusters: if abs(size - cluster['size']) <= max(size, cluster['size']) * 0.1: cluster['headers'].extend([h for h in unassigned if abs(h['size'] - size) <= size * 0.1]) found_cluster = True break if not found_cluster: clusters.append({ 'size': size, 'headers': [h for h in unassigned if abs(h['size'] - size) <= size * 0.1] }) # Assign levels starting from 1 clusters.sort(key=lambda x: -x['size']) for i, cluster in enumerate(clusters): for h in cluster['headers']: base_level = i + 1 if h['bold']: base_level = max(1, base_level - 1) h['level'] = base_level # Step 5: Build hierarchy root = [] stack = [] # Create a set of normalized texts from unique_level0 to avoid duplicates unique_level0_texts = {h['norm_text'] for h in unique_level0} # Filter out any headers from the original list that match unique_level0 headers filtered_headers = [] for h in headers: if h['norm_text'] in unique_level0_texts and h not in unique_level0: h['level'] = 0 filtered_headers.append(h) # Combine all headers - unique_level0 first, then the filtered headers all_headers = unique_level0 + filtered_headers all_headers.sort(key=lambda h: (h['page'], h['y'])) # Track which level 0 headers we've already added added_level0 = set() for header in all_headers: if header['level'] < 0: continue if header['level'] == 0: norm_text = header['norm_text'] if norm_text in added_level0: continue added_level0.add(norm_text) # Pop stack until we find a parent while stack and stack[-1]['level'] >= header['level']: stack.pop() current_parent = stack[-1] if stack else None if current_parent: current_parent['children'].append(header) else: root.append(header) stack.append(header) # Step 6: Enforce proper nesting def enforce_nesting(node_list, parent_level=-1): for node in node_list: if node['level'] <= parent_level: node['level'] = parent_level + 1 enforce_nesting(node['children'], node['level']) enforce_nesting(root) root = [h for h in root if not (h['level'] == 0 and not h['children'])] header_tree = enforce_level_hierarchy(root) return header_tree def adjust_levels_if_level0_not_in_toc(doc, toc_pages, root): def normalize(text): return re.sub(r'\s+', ' ', text.strip().lower()) toc_text = "" for pno in toc_pages: page = doc.load_page(pno) toc_text += page.get_text() toc_text_normalized = normalize(toc_text) def is_level0_in_toc_text(header): return header['level'] == 0 and normalize(header['text']) in toc_text_normalized if any(is_level0_in_toc_text(h) for h in root): return # No change needed def increase_levels(node_list): for node in node_list: node['level'] += 1 increase_levels(node['children']) def assign_numbers_to_headers(headers, prefix=None): for idx, header in enumerate(headers, 1): current_number = f"{prefix}.{idx}" if prefix else str(idx) header["number"] = current_number assign_numbers_to_headers(header["children"], current_number) def print_tree_with_numbers(headers, indent=0): for header in headers: size_info = f"size:{header['original_size']:.1f}" if 'original_size' in header else "" print(" " * indent + f"{header.get('number', '?')} {header['text']} " + f"(Level {header['level']}, p:{header['page']+1}, {size_info})") print_tree_with_numbers(header["children"], indent + 1) def process_document_headers(doc, toc_pages, most_common_font_size, most_common_color, most_common_font, top_margin=70, bottom_margin=50): print(f"Processing with margins - top:{top_margin}pt, bottom:{bottom_margin}pt") header_tree = build_header_hierarchy(doc, toc_pages, most_common_font_size, most_common_color, most_common_font, top_margin, bottom_margin) adjust_levels_if_level0_not_in_toc(doc, toc_pages, header_tree) print("Assigning numbers...") assign_numbers_to_headers(header_tree) print("Document structure (excluding margins):") print_tree_with_numbers(header_tree) return header_tree def highlight_boxes(doc, highlights, stringtowrite, fixed_width=500): # Set your desired width here for page_num, bbox in highlights.items(): page = doc.load_page(page_num) page_width = page.rect.width # Get original rect for vertical coordinates orig_rect = fitz.Rect(bbox) rect_height = orig_rect.height if rect_height > 30: if orig_rect.width > 10: # Center horizontally using fixed width center_x = page_width / 2 new_x0 = center_x - fixed_width / 2 new_x1 = center_x + fixed_width / 2 new_rect = fitz.Rect(new_x0, orig_rect.y0, new_x1, orig_rect.y1) # Add highlight rectangle annot = page.add_rect_annot(new_rect) if stringtowrite.startswith('Not'): annot.set_colors(stroke=(0.5, 0.5, 0.5), fill=(0.5, 0.5, 0.5)) else: annot.set_colors(stroke=(1, 1, 0), fill=(1, 1, 0)) annot.set_opacity(0.3) annot.update() # Add right-aligned freetext annotation inside the fixed-width box text = '['+stringtowrite +']' annot1 = page.add_freetext_annot( new_rect, text, fontsize=15, fontname='helv', text_color=(1, 0, 0), rotate=page.rotation, align=2 # right alignment ) annot1.update() def get_leaf_headers_with_paths(listtoloop, path=None, output=None): if path is None: path = [] if output is None: output = [] for header in listtoloop: current_path = path + [header['text']] if not header['children']: if header['level'] != 0 and header['level'] != 1: output.append((header, current_path)) else: get_leaf_headers_with_paths(header['children'], current_path, output) return output # Add this helper function at the top of your code def words_match_ratio(text1, text2): words1 = set(text1.split()) words2 = set(text2.split()) if not words1 or not words2: return 0.0 common_words = words1 & words2 return len(common_words) / len(words1) def same_start_word(s1, s2): # Split both strings into words words1 = s1.strip().split() words2 = s2.strip().split() # Check if both have at least one word and compare the first ones if words1 and words2: return words1[0].lower() == words2[0].lower() return False def extract_section_under_header(multiplePDF_Paths): filenames=[] keywords = {'installation', 'execution', 'miscellaneous items', 'workmanship', 'testing', 'labeling'} arrayofPDFS=multiplePDF_Paths.split(',') print(multiplePDF_Paths) print(arrayofPDFS,len(arrayofPDFS)) docarray=[] jsons=[] df = pd.DataFrame(columns=["PDF Name","NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2","BodyText"]) for pdf_path in arrayofPDFS: headertoContinue1 = False headertoContinue2=False Alltexttobebilled='' parsed_url = urlparse(pdf_path) filename = os.path.basename(parsed_url.path) filename = unquote(filename) # decode URL-encoded characters filenames.append(filename) # Optimized URL handling if pdf_path and ('http' in pdf_path or 'dropbox' in pdf_path): pdf_path = pdf_path.replace('dl=0', 'dl=1') # Cache frequently used values response = requests.get(pdf_path) pdf_content = BytesIO(response.content) if not pdf_content: raise ValueError("No valid PDF content found.") doc = fitz.open(stream=pdf_content, filetype="pdf") docHighlights = fitz.open(stream=pdf_content, filetype="pdf") most_common_font_size, most_common_color, most_common_font = get_regular_font_size_and_color(doc) # Precompute regex patterns dot_pattern = re.compile(r'\.{3,}') url_pattern = re.compile(r'https?://\S+|www\.\S+') toc_pages = get_toc_page_numbers(doc) # replace this with LLM that will extract the HEADERS # FOR RAWAN headers, top_3_font_sizes, smallest_font_size, headersSpans = extract_headers( doc, toc_pages, most_common_font_size, most_common_color, most_common_font, top_margin, bottom_margin ) # replace this with LLM that will extract the HIERARCHY # FOR RAWAN hierarchy = build_header_hierarchy(doc, toc_pages, most_common_font_size, most_common_color, most_common_font) listofHeaderstoMarkup = get_leaf_headers_with_paths(hierarchy) # Precompute all children headers once allchildrenheaders = [normalize_text(item['text']) for item, p in listofHeaderstoMarkup] allchildrenheaders_set = set(allchildrenheaders) # For faster lookups df = pd.DataFrame(columns=["NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2"]) dictionaryNBS={} data_list_JSON = [] if len(top_3_font_sizes)==3: mainHeaderFontSize, subHeaderFontSize, subsubheaderFontSize = top_3_font_sizes elif len(top_3_font_sizes)==2: mainHeaderFontSize= top_3_font_sizes[0] subHeaderFontSize= top_3_font_sizes[1] subsubheaderFontSize= top_3_font_sizes[1] # Preload all pages to avoid repeated loading # pages = [doc.load_page(page_num) for page_num in range(len(doc)) if page_num not in toc_pages] for heading_to_searchDict, paths in listofHeaderstoMarkup: heading_to_search = heading_to_searchDict['text'] heading_to_searchPageNum = heading_to_searchDict['page'] # Initialize variables headertoContinue1 = False headertoContinue2 = False matched_header_line = None done = False collecting = False collected_lines = [] page_highlights = {} current_bbox = {} last_y1s = {} mainHeader = '' subHeader = '' matched_header_line_norm = heading_to_search break_collecting = False heading_norm = normalize_text(heading_to_search) paths_norm = [normalize_text(p) for p in paths[0]] if paths and paths[0] else [] for page_num in range(heading_to_searchPageNum,len(doc)): if page_num in toc_pages: continue if break_collecting: break page=doc[page_num] page_height = page.rect.height blocks = page.get_text("dict")["blocks"] for block in blocks: if break_collecting: break lines = block.get("lines", []) i = 0 while i < len(lines): if break_collecting: break spans = lines[i].get("spans", []) if not spans: i += 1 continue y0 = spans[0]["bbox"][1] y1 = spans[0]["bbox"][3] if y0 < top_margin or y1 > (page_height - bottom_margin): i += 1 continue line_text = get_spaced_text_from_spans(spans).lower() line_text_norm = normalize_text(line_text) # Combine with next line if available if i + 1 < len(lines): next_spans = lines[i + 1].get("spans", []) next_line_text = get_spaced_text_from_spans(next_spans).lower() combined_line_norm = normalize_text(line_text + " " + next_line_text) else: combined_line_norm = line_text_norm # Check if we should continue processing if combined_line_norm and combined_line_norm in paths[0]: headertoContinue1 = combined_line_norm if combined_line_norm and combined_line_norm in paths[-2]: headertoContinue2 = combined_line_norm if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : stringtowrite='Not to be billed' else: stringtowrite='To be billed' # Optimized header matching existsfull = ( ( combined_line_norm in allchildrenheaders_set or combined_line_norm in allchildrenheaders ) and heading_to_search in combined_line_norm ) # New word-based matching current_line_words = set(combined_line_norm.split()) heading_words = set(heading_norm.split()) all_words_match = current_line_words.issubset(heading_words) and len(current_line_words) > 0 substring_match = ( heading_norm in combined_line_norm or combined_line_norm in heading_norm or all_words_match # Include the new word-based matching ) # substring_match = ( # heading_norm in combined_line_norm or # combined_line_norm in heading_norm # ) if (substring_match and existsfull and not collecting and len(combined_line_norm) > 0 ):#and (headertoContinue1 or headertoContinue2) ): # Check header conditions more efficiently header_spans = [ span for span in spans if (is_header(span, most_common_font_size, most_common_color, most_common_font) # and span['size'] >= subsubheaderFontSize and span['size'] < mainHeaderFontSize) ] if header_spans: collecting = True matched_header_font_size = max(span["size"] for span in header_spans) collected_lines.append(line_text) valid_spans = [span for span in spans if span.get("bbox")] if valid_spans: x0s = [span["bbox"][0] for span in valid_spans] x1s = [span["bbox"][2] for span in valid_spans] y0s = [span["bbox"][1] for span in valid_spans] y1s = [span["bbox"][3] for span in valid_spans] header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] if page_num in current_bbox: cb = current_bbox[page_num] current_bbox[page_num] = [ min(cb[0], header_bbox[0]), min(cb[1], header_bbox[1]), max(cb[2], header_bbox[2]), max(cb[3], header_bbox[3]) ] else: current_bbox[page_num] = header_bbox last_y1s[page_num] = header_bbox[3] x0, y0, x1, y1 = header_bbox zoom = 200 left = int(x0) top = int(y0) zoom_str = f"{zoom},{left},{top}" pageNumberFound = page_num + 1 # Build the query parameters # params = { # 'pdfLink': pdf_path, # Your PDF link # 'keyword': heading_to_search, # Your keyword (could be a string or list) # } # # URL encode each parameter # encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} # # Construct the final encoded link # encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) # # Correctly construct the final URL with page and zoom # final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" # Get current date and time now = datetime.now() # Format the output formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") # Optionally, add the URL to a DataFrame data_entry = { "NBSLink": zoom_str, "Subject": heading_to_search, "Page": str(pageNumberFound), "Author": "ADR", "Creation Date": formatted_time, "Layer": "Initial", "Code": stringtowrite, "head above 1": paths[-2], "head above 2": paths[0], # "MC Connnection": 'Go to ' + paths[0].strip().split()[0] +'/'+ heading_to_search.strip().split()[0] + ' in '+ filename } data_list_JSON.append(data_entry) # Convert list to JSON # json_output = json.dumps(data_list_JSON, indent=4) i += 2 continue else: if (substring_match and not collecting and len(combined_line_norm) > 0): # and (headertoContinue1 or headertoContinue2) ): # Calculate word match percentage word_match_percent = words_match_ratio(heading_norm, combined_line_norm) * 100 # Check if at least 70% of header words exist in this line meets_word_threshold = word_match_percent >= 100 # Check header conditions (including word threshold) header_spans = [ span for span in spans if (is_header(span, most_common_font_size, most_common_color, most_common_font) # and span['size'] >= subsubheaderFontSize and span['size'] < mainHeaderFontSize) ] if header_spans and (meets_word_threshold or same_start_word(heading_to_search, combined_line_norm) ): collecting = True matched_header_font_size = max(span["size"] for span in header_spans) collected_lines.append(line_text) valid_spans = [span for span in spans if span.get("bbox")] if valid_spans: x0s = [span["bbox"][0] for span in valid_spans] x1s = [span["bbox"][2] for span in valid_spans] y0s = [span["bbox"][1] for span in valid_spans] y1s = [span["bbox"][3] for span in valid_spans] header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] if page_num in current_bbox: cb = current_bbox[page_num] current_bbox[page_num] = [ min(cb[0], header_bbox[0]), min(cb[1], header_bbox[1]), max(cb[2], header_bbox[2]), max(cb[3], header_bbox[3]) ] else: current_bbox[page_num] = header_bbox last_y1s[page_num] = header_bbox[3] x0, y0, x1, y1 = header_bbox zoom = 200 left = int(x0) top = int(y0) zoom_str = f"{zoom},{left},{top}" pageNumberFound = page_num + 1 # Build the query parameters # params = { # 'pdfLink': pdf_path, # Your PDF link # 'keyword': heading_to_search, # Your keyword (could be a string or list) # } # # URL encode each parameter # encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} # # Construct the final encoded link # encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) # # Correctly construct the final URL with page and zoom # final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" # Get current date and time now = datetime.now() # Format the output formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") # Optionally, add the URL to a DataFrame data_entry = { "NBSLink": zoom_str, "Subject": heading_to_search, "Page": str(pageNumberFound), "Author": "ADR", "Creation Date": formatted_time, "Layer": "Initial", "Code": stringtowrite, "head above 1": paths[-2], "head above 2": paths[0], # "MC Connnection": 'Go to ' + paths[0].strip().split()[0] +'/'+ heading_to_search.strip().split()[0] + ' in '+ filename } data_list_JSON.append(data_entry) # Convert list to JSON # json_output = json.dumps(data_list_JSON, indent=4) i += 2 continue if collecting: norm_line = normalize_text(line_text) # Optimized URL check if url_pattern.match(norm_line): line_is_header = False else: line_is_header = any(is_header(span, most_common_font_size, most_common_color, most_common_font) for span in spans) if line_is_header: header_font_size = max(span["size"] for span in spans) is_probably_real_header = ( header_font_size >= matched_header_font_size and is_header(spans[0], most_common_font_size, most_common_color, most_common_font) and len(line_text.strip()) > 2 ) if (norm_line != matched_header_line_norm and norm_line != heading_norm and is_probably_real_header): if line_text not in heading_norm: collecting = False done = True headertoContinue1 = False headertoContinue2=False for page_num, bbox in current_bbox.items(): bbox[3] = last_y1s.get(page_num, bbox[3]) page_highlights[page_num] = bbox highlight_boxes(docHighlights, page_highlights,stringtowrite) break_collecting = True break if break_collecting: break collected_lines.append(line_text) valid_spans = [span for span in spans if span.get("bbox")] if valid_spans: x0s = [span["bbox"][0] for span in valid_spans] x1s = [span["bbox"][2] for span in valid_spans] y0s = [span["bbox"][1] for span in valid_spans] y1s = [span["bbox"][3] for span in valid_spans] line_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] if page_num in current_bbox: cb = current_bbox[page_num] current_bbox[page_num] = [ min(cb[0], line_bbox[0]), min(cb[1], line_bbox[1]), max(cb[2], line_bbox[2]), max(cb[3], line_bbox[3]) ] else: current_bbox[page_num] = line_bbox last_y1s[page_num] = line_bbox[3] i += 1 if not done: for page_num, bbox in current_bbox.items(): bbox[3] = last_y1s.get(page_num, bbox[3]) page_highlights[page_num] = bbox if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : stringtowrite='Not to be billed' else: stringtowrite='To be billed' highlight_boxes(docHighlights, page_highlights,stringtowrite) docarray.append(docHighlights) jsons.append(data_list_JSON) print('lenght of json:',len(jsons)) dbxTeam = tsadropboxretrieval.ADR_Access_DropboxTeam('user') dbPath = '/TSA JOBS/ADR Test/FIND/' jsonCombined=[] for i in range(len(arrayofPDFS)): pdflink = tsadropboxretrieval.uploadanyFile(doc=docarray[i], path=dbPath, pdfname=filenames[i]) json_input = copy.deepcopy(jsons[i]) # make a deep copy json_output1 = changepdflinks(json_input, pdflink) jsonCombined.extend(json_output1) pdf_bytes = BytesIO() docHighlights.save(pdf_bytes) combined_json_str = json.dumps(jsonCombined, indent=1) print('lenght of json:',len(combined_json_str)) return pdf_bytes.getvalue(), docHighlights , combined_json_str ######################################################################################################################################################## ######################################################################################################################################################## # filter out the output with the same font style, da el most common (capital, font size, bold , style....) # kol el headers with the most common font size and style da el min header, else (mini headers we dont need with a differnt style ) ignore def extract_section_under_header_tobebilledOnly(pdf_path): Alltexttobebilled='' alltextWithoutNotbilled='' # keywordstoSkip=["installation", "execution", "miscellaneous items", "workmanship", "testing", "labeling"] headertoContinue1 = False headertoContinue2=False parsed_url = urlparse(pdf_path) filename = os.path.basename(parsed_url.path) filename = unquote(filename) # decode URL-encoded characters # Optimized URL handling if pdf_path and ('http' in pdf_path or 'dropbox' in pdf_path): pdf_path = pdf_path.replace('dl=0', 'dl=1') # Cache frequently used values response = requests.get(pdf_path) pdf_content = BytesIO(response.content) if not pdf_content: raise ValueError("No valid PDF content found.") doc = fitz.open(stream=pdf_content, filetype="pdf") docHighlights = fitz.open(stream=pdf_content, filetype="pdf") parsed_url = urlparse(pdf_path) filename = os.path.basename(parsed_url.path) filename = unquote(filename) # decode URL-encoded characters #### Get regular tex font size, style , color most_common_font_size, most_common_color, most_common_font = get_regular_font_size_and_color(doc) # Precompute regex patterns dot_pattern = re.compile(r'\.{3,}') url_pattern = re.compile(r'https?://\S+|www\.\S+') toc_pages = get_toc_page_numbers(doc) ## will be replaced with ur LLM to get headers instead of using this function #### RAWAN ##CALL LLM ################## JSON File - saved output from openrouter ####################3 # with open("identified_headers.json", "r", encoding="utf-8") as f: # identified_headers = json.load(f) # allheaders_LLM=[] # for h in identified_headers: # if int(h["page"]) in toc_pages: # continue # if h['text']: # allheaders_LLM.append(h['text']) ### identify_headers_with_openrouter - having the code of ys on the same line (works on weird pdf ) ### identify_headers_with_openrouterNEWW - new version we worked on on jan 14 wednesday , removed ys on the same line (didnt work on weird pdf - worked on regular pdf) identified_headers = identify_headers_with_openrouterNEWW(doc, api_key='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')# ['text', fontsize, page number,y] print(identified_headers) allheaders_LLM=[] for h in identified_headers: if int(h["page"]) in toc_pages: continue if h['text']: allheaders_LLM.append(h['text']) # print('identified_headers', identified_headers) headers_json=headers_with_location(doc,identified_headers) # print('headers_json',headers_json) headers=filter_headers_outside_toc(headers_json,toc_pages) # print('headers',headers) hierarchy=build_hierarchy_from_llm(headers) print('hierarchy',hierarchy) identify_headers_and_save_excel(hierarchy) # IF IT WORKED: CNTINUE CODE # if len(identified_headers) # ELSE FALLBACK CHECK IF HEADERS ARE INCORRECT # ## # headers, top_3_font_sizes, smallest_font_size, headersSpans = extract_headers( # doc, toc_pages, most_common_font_size, most_common_color, most_common_font, top_margin, bottom_margin # ) # print('headers',headers) # ### ignore this and let LLM build the hierarchy tree # ### or pass headers here directly # hierarchy = build_header_hierarchy(doc, toc_pages, most_common_font_size, most_common_color, most_common_font) listofHeaderstoMarkup = get_leaf_headers_with_paths(hierarchy) print('listofHeaderstoMarkup',listofHeaderstoMarkup) # Precompute all children headers once allchildrenheaders = [normalize_text(item['text']) for item, p in listofHeaderstoMarkup] allchildrenheaders_set = set(allchildrenheaders) # For faster lookups # print('allchildrenheaders_set',allchildrenheaders_set) df = pd.DataFrame(columns=["NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2",'BodyText']) dictionaryNBS={} data_list_JSON = [] # if len(top_3_font_sizes)==3: # mainHeaderFontSize, subHeaderFontSize, subsubheaderFontSize = top_3_font_sizes # elif len(top_3_font_sizes)==2: # mainHeaderFontSize= top_3_font_sizes[0] # subHeaderFontSize= top_3_font_sizes[1] # subsubheaderFontSize= top_3_font_sizes[1] # Preload all pages to avoid repeated loading # pages = [doc.load_page(page_num) for page_num in range(len(doc)) if page_num not in toc_pages] for heading_to_searchDict,pathss in listofHeaderstoMarkup: heading_to_search = heading_to_searchDict['text'] heading_to_searchPageNum = heading_to_searchDict['page'] paths=heading_to_searchDict['path'] # Initialize variables headertoContinue1 = False headertoContinue2 = False matched_header_line = None done = False collecting = False collected_lines = [] page_highlights = {} current_bbox = {} last_y1s = {} mainHeader = '' subHeader = '' matched_header_line_norm = heading_to_search break_collecting = False heading_norm = normalize_text(heading_to_search) paths_norm = [normalize_text(p) for p in paths[0]] if paths and paths[0] else [] for page_num in range(heading_to_searchPageNum,len(doc)): if page_num in toc_pages: continue if break_collecting: break page=doc[page_num] page_height = page.rect.height blocks = page.get_text("dict")["blocks"] for block in blocks: if break_collecting: break lines = block.get("lines", []) i = 0 while i < len(lines): if break_collecting: break spans = lines[i].get("spans", []) if not spans: i += 1 continue y0 = spans[0]["bbox"][1] y1 = spans[0]["bbox"][3] if y0 < top_margin or y1 > (page_height - bottom_margin): i += 1 continue line_text = get_spaced_text_from_spans(spans).lower() line_text_norm = normalize_text(line_text) # Combine with next line if available if i + 1 < len(lines): next_spans = lines[i + 1].get("spans", []) next_line_text = get_spaced_text_from_spans(next_spans).lower() combined_line_norm = normalize_text(line_text + " " + next_line_text) else: combined_line_norm = line_text_norm # Check if we should continue processing if combined_line_norm and combined_line_norm in paths[0]: headertoContinue1 = combined_line_norm if combined_line_norm and combined_line_norm in paths[-2]: headertoContinue2 = combined_line_norm # print('paths',paths) if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : # if any(word in paths[-2].lower() for word in keywordstoSkip): stringtowrite='Not to be billed' else: stringtowrite='To be billed' if stringtowrite!='To be billed': alltextWithoutNotbilled+= combined_line_norm ################################################# # Optimized header matching existsfull = ( ( combined_line_norm in allchildrenheaders_set or combined_line_norm in allchildrenheaders ) and heading_to_search in combined_line_norm ) # New word-based matching current_line_words = set(combined_line_norm.split()) heading_words = set(heading_norm.split()) all_words_match = current_line_words.issubset(heading_words) and len(current_line_words) > 0 substring_match = ( heading_norm in combined_line_norm or combined_line_norm in heading_norm or all_words_match # Include the new word-based matching ) # substring_match = ( # heading_norm in combined_line_norm or # combined_line_norm in heading_norm # ) if (substring_match and existsfull and not collecting and len(combined_line_norm) > 0 ):#and (headertoContinue1 or headertoContinue2) ): # Check header conditions more efficiently # header_spans = [ # span for span in spans # if (is_header(span, most_common_font_size, most_common_color, most_common_font) ) # # and span['size'] >= subsubheaderFontSize # # and span['size'] < mainHeaderFontSize) # ] if stringtowrite.startswith('To'): collecting = True # matched_header_font_size = max(span["size"] for span in header_spans) Alltexttobebilled+= ' '+ combined_line_norm # collected_lines.append(line_text) valid_spans = [span for span in spans if span.get("bbox")] if valid_spans: x0s = [span["bbox"][0] for span in valid_spans] x1s = [span["bbox"][2] for span in valid_spans] y0s = [span["bbox"][1] for span in valid_spans] y1s = [span["bbox"][3] for span in valid_spans] header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] if page_num in current_bbox: cb = current_bbox[page_num] current_bbox[page_num] = [ min(cb[0], header_bbox[0]), min(cb[1], header_bbox[1]), max(cb[2], header_bbox[2]), max(cb[3], header_bbox[3]) ] else: current_bbox[page_num] = header_bbox last_y1s[page_num] = header_bbox[3] x0, y0, x1, y1 = header_bbox zoom = 200 left = int(x0) top = int(y0) zoom_str = f"{zoom},{left},{top}" pageNumberFound = page_num + 1 # Build the query parameters params = { 'pdfLink': pdf_path, # Your PDF link 'keyword': heading_to_search, # Your keyword (could be a string or list) } # URL encode each parameter encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} # Construct the final encoded link encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) # Correctly construct the final URL with page and zoom final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" # Get current date and time now = datetime.now() # Format the output formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") # Optionally, add the URL to a DataFrame data_entry = { "NBSLink": zoom_str, "Subject": heading_to_search, "Page": str(pageNumberFound), "Author": "ADR", "Creation Date": formatted_time, "Layer": "Initial", "Code": stringtowrite, # "head above 1": paths[-2], # "head above 2": paths[0], "BodyText": collected_lines, "MC Connnection": 'Go to ' + paths[0].strip().split()[0] +'/'+ heading_to_search.strip().split()[0] + ' in '+ filename } for i, path_text in enumerate(paths[:-1]): # skip the last one because that's the current heading data_entry[f"head above {i+1}"] = path_text data_list_JSON.append(data_entry) # Convert list to JSON json_output = json.dumps(data_list_JSON, indent=4) i += 2 continue else: if (substring_match and not collecting and len(combined_line_norm) > 0): # and (headertoContinue1 or headertoContinue2) ): # Calculate word match percentage word_match_percent = words_match_ratio(heading_norm, combined_line_norm) * 100 # Check if at least 70% of header words exist in this line meets_word_threshold = word_match_percent >= 100 # Check header conditions (including word threshold) # header_spans = [ # span for span in spans # if (is_header(span, most_common_font_size, most_common_color, most_common_font)) # # and span['size'] >= subsubheaderFontSize # # and span['size'] < mainHeaderFontSize) # ] if (meets_word_threshold or same_start_word(heading_to_search, combined_line_norm) ) and stringtowrite.startswith('To'): collecting = True # matched_header_font_size = max(span["size"] for span in header_spans) Alltexttobebilled+= ' '+ combined_line_norm collected_lines.append(line_text) valid_spans = [span for span in spans if span.get("bbox")] if valid_spans: x0s = [span["bbox"][0] for span in valid_spans] x1s = [span["bbox"][2] for span in valid_spans] y0s = [span["bbox"][1] for span in valid_spans] y1s = [span["bbox"][3] for span in valid_spans] header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] if page_num in current_bbox: cb = current_bbox[page_num] current_bbox[page_num] = [ min(cb[0], header_bbox[0]), min(cb[1], header_bbox[1]), max(cb[2], header_bbox[2]), max(cb[3], header_bbox[3]) ] else: current_bbox[page_num] = header_bbox last_y1s[page_num] = header_bbox[3] x0, y0, x1, y1 = header_bbox zoom = 200 left = int(x0) top = int(y0) zoom_str = f"{zoom},{left},{top}" pageNumberFound = page_num + 1 # Build the query parameters params = { 'pdfLink': pdf_path, # Your PDF link 'keyword': heading_to_search, # Your keyword (could be a string or list) } # URL encode each parameter encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} # Construct the final encoded link encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) # Correctly construct the final URL with page and zoom final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" # Get current date and time now = datetime.now() # Format the output formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") # Optionally, add the URL to a DataFrame data_entry = { "NBSLink": zoom_str, "Subject": heading_to_search, "Page": str(pageNumberFound), "Author": "ADR", "Creation Date": formatted_time, "Layer": "Initial", "Code": stringtowrite, # "head above 1": paths[-2], # "head above 2": paths[0], "BodyText": collected_lines, "MC Connnection": 'Go to ' + paths[0].strip().split()[0] +'/'+ heading_to_search.strip().split()[0] + ' in '+ filename } # Dynamically add "head above 1", "head above 2", ... depending on the number of levels for i, path_text in enumerate(paths[:-1]): # skip the last one because that's the current heading data_entry[f"head above {i+1}"] = path_text data_list_JSON.append(data_entry) # Convert list to JSON json_output = json.dumps(data_list_JSON, indent=4) i += 2 continue if collecting: norm_line = normalize_text(line_text) # Optimized URL check if url_pattern.match(norm_line): line_is_header = False else: # line_is_header = any(is_header(span, most_common_font_size, most_common_color, most_common_font) for span in spans) def normalize(text): return " ".join(text.lower().split()) line_text = " ".join(span["text"] for span in spans).strip() line_is_header = any( normalize(line_text) == normalize(header) for header in allheaders_LLM ) if line_is_header: header_font_size = max(span["size"] for span in spans) is_probably_real_header = ( # header_font_size >= matched_header_font_size and # is_header(spans[0], most_common_font_size, most_common_color, most_common_font) and len(line_text.strip()) > 2 ) if (norm_line != matched_header_line_norm and norm_line != heading_norm and is_probably_real_header): if line_text not in heading_norm: collecting = False done = True headertoContinue1 = False headertoContinue2=False for page_num, bbox in current_bbox.items(): bbox[3] = last_y1s.get(page_num, bbox[3]) page_highlights[page_num] = bbox highlight_boxes(docHighlights, page_highlights,stringtowrite) break_collecting = True break if break_collecting: break collected_lines.append(line_text) valid_spans = [span for span in spans if span.get("bbox")] if valid_spans: x0s = [span["bbox"][0] for span in valid_spans] x1s = [span["bbox"][2] for span in valid_spans] y0s = [span["bbox"][1] for span in valid_spans] y1s = [span["bbox"][3] for span in valid_spans] line_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] if page_num in current_bbox: cb = current_bbox[page_num] current_bbox[page_num] = [ min(cb[0], line_bbox[0]), min(cb[1], line_bbox[1]), max(cb[2], line_bbox[2]), max(cb[3], line_bbox[3]) ] else: current_bbox[page_num] = line_bbox last_y1s[page_num] = line_bbox[3] i += 1 if not done: for page_num, bbox in current_bbox.items(): bbox[3] = last_y1s.get(page_num, bbox[3]) page_highlights[page_num] = bbox if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : stringtowrite='Not to be billed' else: stringtowrite='To be billed' highlight_boxes(docHighlights, page_highlights,stringtowrite) print("Current working directory:", os.getcwd()) docHighlights.save("highlighted_output.pdf") # dbxTeam = tsadropboxretrieval.ADR_Access_DropboxTeam('user') # metadata = dbxTeam.sharing_get_shared_link_metadata(pdf_path) # dbPath = '/TSA JOBS/ADR Test/FIND/' # pdf_bytes = BytesIO() # docHighlights.save(pdf_bytes) # pdflink = tsadropboxretrieval.uploadanyFile(doc=docHighlights, path=dbPath, pdfname=filename) # json_output=changepdflinks(json_output,pdflink) # return pdf_bytes.getvalue(), docHighlights , json_output , Alltexttobebilled , alltextWithoutNotbilled , filename # Final safety check: if the very last entry in our list has an empty BodyText, # but we have collected_lines, sync them. if data_list_JSON and not data_list_JSON[-1]["BodyText"] and collected_lines: data_list_JSON[-1]["BodyText"] = collected_lines[1:] if len(collected_lines) > 0 else [] # Final cleanup of the JSON data before returning for entry in data_list_JSON: # Check if BodyText exists and has content if isinstance(entry.get("BodyText"), list) and len(entry["BodyText"]) > 0: # Check if the first line of the body is essentially the same as the Subject first_line = normalize_text(entry["BodyText"][0]) subject = normalize_text(entry["Subject"]) # If they match or the subject is inside the first line, remove it if subject in first_line or first_line in subject: entry["BodyText"] = entry["BodyText"][1:] # return json_output json_output = json.dumps(data_list_JSON, indent=4) print('json_output',json_output) with open("json_output.txt", "w", encoding="utf-8") as f: json.dump(json_output, f, indent=4) return json_output def extract_section_under_header_tobebilled2(pdf_path): # keywordstoSkip=["installation", "execution", "miscellaneous items", "workmanship", "testing", "labeling"] keywords = {'installation', 'execution', 'miscellaneous items', 'workmanship', 'testing', 'labeling'} headertoContinue1 = False headertoContinue2=False Alltexttobebilled='' parsed_url = urlparse(pdf_path) filename = os.path.basename(parsed_url.path) filename = unquote(filename) # decode URL-encoded characters # Optimized URL handling if pdf_path and ('http' in pdf_path or 'dropbox' in pdf_path): pdf_path = pdf_path.replace('dl=0', 'dl=1') # Cache frequently used values response = requests.get(pdf_path) pdf_content = BytesIO(response.content) if not pdf_content: raise ValueError("No valid PDF content found.") doc = fitz.open(stream=pdf_content, filetype="pdf") docHighlights = fitz.open(stream=pdf_content, filetype="pdf") most_common_font_size, most_common_color, most_common_font = get_regular_font_size_and_color(doc) # Precompute regex patterns dot_pattern = re.compile(r'\.{3,}') url_pattern = re.compile(r'https?://\S+|www\.\S+') toc_pages = get_toc_page_numbers(doc) headers, top_3_font_sizes, smallest_font_size, headersSpans = extract_headers( doc, toc_pages, most_common_font_size, most_common_color, most_common_font, top_margin, bottom_margin ) hierarchy = build_header_hierarchy(doc, toc_pages, most_common_font_size, most_common_color, most_common_font) listofHeaderstoMarkup = get_leaf_headers_with_paths(hierarchy) # Precompute all children headers once allchildrenheaders = [normalize_text(item['text']) for item, p in listofHeaderstoMarkup] allchildrenheaders_set = set(allchildrenheaders) # For faster lookups df = pd.DataFrame(columns=["NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2","BodyText"]) dictionaryNBS={} data_list_JSON = [] currentgroupname='' if len(top_3_font_sizes)==3: mainHeaderFontSize, subHeaderFontSize, subsubheaderFontSize = top_3_font_sizes elif len(top_3_font_sizes)==2: mainHeaderFontSize= top_3_font_sizes[0] subHeaderFontSize= top_3_font_sizes[1] subsubheaderFontSize= top_3_font_sizes[1] # Preload all pages to avoid repeated loading # pages = [doc.load_page(page_num) for page_num in range(len(doc)) if page_num not in toc_pages] for heading_to_searchDict, paths in listofHeaderstoMarkup: heading_to_search = heading_to_searchDict['text'] heading_to_searchPageNum = heading_to_searchDict['page'] # Initialize variables headertoContinue1 = False headertoContinue2 = False matched_header_line = None done = False collecting = False collected_lines = [] page_highlights = {} current_bbox = {} last_y1s = {} mainHeader = '' subHeader = '' matched_header_line_norm = heading_to_search break_collecting = False heading_norm = normalize_text(heading_to_search) paths_norm = [normalize_text(p) for p in paths[0]] if paths and paths[0] else [] for page_num in range(heading_to_searchPageNum,len(doc)): print(heading_to_search) if paths[0].strip().lower() != currentgroupname.strip().lower(): Alltexttobebilled+= paths[0] +'\n' currentgroupname=paths[0] print(paths[0]) if page_num in toc_pages: continue if break_collecting: break page=doc[page_num] page_height = page.rect.height blocks = page.get_text("dict")["blocks"] for block in blocks: if break_collecting: break lines = block.get("lines", []) i = 0 while i < len(lines): if break_collecting: break spans = lines[i].get("spans", []) if not spans: i += 1 continue y0 = spans[0]["bbox"][1] y1 = spans[0]["bbox"][3] if y0 < top_margin or y1 > (page_height - bottom_margin): i += 1 continue line_text = get_spaced_text_from_spans(spans).lower() line_text_norm = normalize_text(line_text) # Combine with next line if available if i + 1 < len(lines): next_spans = lines[i + 1].get("spans", []) next_line_text = get_spaced_text_from_spans(next_spans).lower() combined_line_norm = normalize_text(line_text + " " + next_line_text) else: combined_line_norm = line_text_norm # Check if we should continue processing if combined_line_norm and combined_line_norm in paths[0]: headertoContinue1 = combined_line_norm if combined_line_norm and combined_line_norm in paths[-2]: headertoContinue2 = combined_line_norm # if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : last_path = paths[-2].lower() # if any(word in paths[-2].lower() for word in keywordstoSkip): # if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() or 'workmanship' in paths[-2].lower() or 'testing' in paths[-2].lower() or 'labeling' in paths[-2].lower(): if any(keyword in last_path for keyword in keywords): stringtowrite='Not to be billed' else: stringtowrite='To be billed' if stringtowrite=='To be billed': # Alltexttobebilled+= combined_line_norm ################################################# if matched_header_line_norm in combined_line_norm: Alltexttobebilled+='\n' Alltexttobebilled+= ' '+combined_line_norm # Optimized header matching existsfull = ( ( combined_line_norm in allchildrenheaders_set or combined_line_norm in allchildrenheaders ) and heading_to_search in combined_line_norm ) # New word-based matching current_line_words = set(combined_line_norm.split()) heading_words = set(heading_norm.split()) all_words_match = current_line_words.issubset(heading_words) and len(current_line_words) > 0 substring_match = ( heading_norm in combined_line_norm or combined_line_norm in heading_norm or all_words_match # Include the new word-based matching ) # substring_match = ( # heading_norm in combined_line_norm or # combined_line_norm in heading_norm # ) if (substring_match and existsfull and not collecting and len(combined_line_norm) > 0 ):#and (headertoContinue1 or headertoContinue2) ): # Check header conditions more efficiently header_spans = [ span for span in spans if (is_header(span, most_common_font_size, most_common_color, most_common_font) # and span['size'] >= subsubheaderFontSize and span['size'] < mainHeaderFontSize) ] if header_spans and stringtowrite.startswith('To'): collecting = True # if stringtowrite=='To be billed': # Alltexttobebilled+='\n' matched_header_font_size = max(span["size"] for span in header_spans) # collected_lines.append(line_text) valid_spans = [span for span in spans if span.get("bbox")] if valid_spans: x0s = [span["bbox"][0] for span in valid_spans] x1s = [span["bbox"][2] for span in valid_spans] y0s = [span["bbox"][1] for span in valid_spans] y1s = [span["bbox"][3] for span in valid_spans] header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] if page_num in current_bbox: cb = current_bbox[page_num] current_bbox[page_num] = [ min(cb[0], header_bbox[0]), min(cb[1], header_bbox[1]), max(cb[2], header_bbox[2]), max(cb[3], header_bbox[3]) ] else: current_bbox[page_num] = header_bbox last_y1s[page_num] = header_bbox[3] x0, y0, x1, y1 = header_bbox zoom = 200 left = int(x0) top = int(y0) zoom_str = f"{zoom},{left},{top}" pageNumberFound = page_num + 1 # Build the query parameters params = { 'pdfLink': pdf_path, # Your PDF link 'keyword': heading_to_search, # Your keyword (could be a string or list) } # URL encode each parameter encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} # Construct the final encoded link encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) # Correctly construct the final URL with page and zoom final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" # Get current date and time now = datetime.now() # Format the output formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") # Optionally, add the URL to a DataFrame data_entry = { "NBSLink": zoom_str, "Subject": heading_to_search, "Page": str(pageNumberFound), "Author": "ADR", "Creation Date": formatted_time, "Layer": "Initial", "Code": stringtowrite, "head above 1": paths[-2], "head above 2": paths[0], "BodyText":collected_lines, "MC Connnection": 'Go to ' + paths[0].strip().split()[0] +'/'+ heading_to_search.strip().split()[0] + ' in '+ filename } data_list_JSON.append(data_entry) # Convert list to JSON json_output = json.dumps(data_list_JSON, indent=4) i += 2 continue else: if (substring_match and not collecting and len(combined_line_norm) > 0): # and (headertoContinue1 or headertoContinue2) ): # Calculate word match percentage word_match_percent = words_match_ratio(heading_norm, combined_line_norm) * 100 # Check if at least 70% of header words exist in this line meets_word_threshold = word_match_percent >= 100 # Check header conditions (including word threshold) header_spans = [ span for span in spans if (is_header(span, most_common_font_size, most_common_color, most_common_font) # and span['size'] >= subsubheaderFontSize and span['size'] < mainHeaderFontSize) ] if header_spans and (meets_word_threshold or same_start_word(heading_to_search, combined_line_norm) ) and stringtowrite.startswith('To'): collecting = True if stringtowrite=='To be billed': Alltexttobebilled+='\n' # if stringtowrite=='To be billed': # Alltexttobebilled+= ' '+ combined_line_norm matched_header_font_size = max(span["size"] for span in header_spans) collected_lines.append(line_text) valid_spans = [span for span in spans if span.get("bbox")] if valid_spans: x0s = [span["bbox"][0] for span in valid_spans] x1s = [span["bbox"][2] for span in valid_spans] y0s = [span["bbox"][1] for span in valid_spans] y1s = [span["bbox"][3] for span in valid_spans] header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] if page_num in current_bbox: cb = current_bbox[page_num] current_bbox[page_num] = [ min(cb[0], header_bbox[0]), min(cb[1], header_bbox[1]), max(cb[2], header_bbox[2]), max(cb[3], header_bbox[3]) ] else: current_bbox[page_num] = header_bbox last_y1s[page_num] = header_bbox[3] x0, y0, x1, y1 = header_bbox zoom = 200 left = int(x0) top = int(y0) zoom_str = f"{zoom},{left},{top}" pageNumberFound = page_num + 1 # Build the query parameters params = { 'pdfLink': pdf_path, # Your PDF link 'keyword': heading_to_search, # Your keyword (could be a string or list) } # URL encode each parameter encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} # Construct the final encoded link encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) # Correctly construct the final URL with page and zoom final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" # Get current date and time now = datetime.now() # Format the output formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") # Optionally, add the URL to a DataFrame data_entry = { "NBSLink": zoom_str, "Subject": heading_to_search, "Page": str(pageNumberFound), "Author": "ADR", "Creation Date": formatted_time, "Layer": "Initial", "Code": stringtowrite, "head above 1": paths[-2], "head above 2": paths[0], "BodyText":collected_lines, "MC Connnection": 'Go to ' + paths[0].strip().split()[0] +'/'+ heading_to_search.strip().split()[0] + ' in '+ filename } data_list_JSON.append(data_entry) # Convert list to JSON json_output = json.dumps(data_list_JSON, indent=4) i += 2 continue if collecting: norm_line = normalize_text(line_text) # Optimized URL check if url_pattern.match(norm_line): line_is_header = False else: line_is_header = any(is_header(span, most_common_font_size, most_common_color, most_common_font) for span in spans) if line_is_header: header_font_size = max(span["size"] for span in spans) is_probably_real_header = ( header_font_size >= matched_header_font_size and is_header(spans[0], most_common_font_size, most_common_color, most_common_font) and len(line_text.strip()) > 2 ) if (norm_line != matched_header_line_norm and norm_line != heading_norm and is_probably_real_header): if line_text not in heading_norm: collecting = False done = True headertoContinue1 = False headertoContinue2=False for page_num, bbox in current_bbox.items(): bbox[3] = last_y1s.get(page_num, bbox[3]) page_highlights[page_num] = bbox highlight_boxes(docHighlights, page_highlights,stringtowrite) break_collecting = True break if break_collecting: break collected_lines.append(line_text) valid_spans = [span for span in spans if span.get("bbox")] if valid_spans: x0s = [span["bbox"][0] for span in valid_spans] x1s = [span["bbox"][2] for span in valid_spans] y0s = [span["bbox"][1] for span in valid_spans] y1s = [span["bbox"][3] for span in valid_spans] line_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] if page_num in current_bbox: cb = current_bbox[page_num] current_bbox[page_num] = [ min(cb[0], line_bbox[0]), min(cb[1], line_bbox[1]), max(cb[2], line_bbox[2]), max(cb[3], line_bbox[3]) ] else: current_bbox[page_num] = line_bbox last_y1s[page_num] = line_bbox[3] i += 1 if not done: for page_num, bbox in current_bbox.items(): bbox[3] = last_y1s.get(page_num, bbox[3]) page_highlights[page_num] = bbox if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : stringtowrite='Not to be billed' else: stringtowrite='To be billed' highlight_boxes(docHighlights, page_highlights,stringtowrite) # docHighlights.save("highlighted_output.pdf", garbage=4, deflate=True) dbxTeam = tsadropboxretrieval.ADR_Access_DropboxTeam('user') metadata = dbxTeam.sharing_get_shared_link_metadata(pdf_path) dbPath = '/TSA JOBS/ADR Test/FIND/' pdf_bytes = BytesIO() docHighlights.save(pdf_bytes) pdflink = tsadropboxretrieval.uploadanyFile(doc=docHighlights, path=dbPath, pdfname=filename) json_output=changepdflinks(json_output,pdflink) return pdf_bytes.getvalue(), docHighlights , json_output, Alltexttobebilled , filename def extract_section_under_header_tobebilledMultiplePDFS(multiplePDF_Paths): # keywordstoSkip=["installation", "execution", "miscellaneous items", "workmanship", "testing", "labeling"] filenames=[] keywords = {'installation', 'execution', 'miscellaneous items', 'workmanship', 'testing', 'labeling'} arrayofPDFS=multiplePDF_Paths.split(',') print(multiplePDF_Paths) print(arrayofPDFS) docarray=[] jsons=[] df = pd.DataFrame(columns=["PDF Name","NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2","BodyText"]) for pdf_path in arrayofPDFS: headertoContinue1 = False headertoContinue2=False Alltexttobebilled='' parsed_url = urlparse(pdf_path) filename = os.path.basename(parsed_url.path) filename = unquote(filename) # decode URL-encoded characters filenames.append(filename) # Optimized URL handling if pdf_path and ('http' in pdf_path or 'dropbox' in pdf_path): pdf_path = pdf_path.replace('dl=0', 'dl=1') # Cache frequently used values response = requests.get(pdf_path) pdf_content = BytesIO(response.content) if not pdf_content: raise ValueError("No valid PDF content found.") doc = fitz.open(stream=pdf_content, filetype="pdf") docHighlights = fitz.open(stream=pdf_content, filetype="pdf") most_common_font_size, most_common_color, most_common_font = get_regular_font_size_and_color(doc) # Precompute regex patterns dot_pattern = re.compile(r'\.{3,}') url_pattern = re.compile(r'https?://\S+|www\.\S+') toc_pages = get_toc_page_numbers(doc) # headers, top_3_font_sizes, smallest_font_size, headersSpans = extract_headers( # doc, toc_pages, most_common_font_size, most_common_color, most_common_font, top_margin, bottom_margin # ) # identified_headers = identify_headers_with_openrouter(doc, api_key='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')# ['text', fontsize, page number,y] headers_json=headers_with_location(doc,identified_headers) headers=filter_headers_outside_toc(headers_json,toc_pages) hierarchy=build_hierarchy_from_llm(headers) listofHeaderstoMarkup = get_leaf_headers_with_paths(hierarchy) # Precompute all children headers once allchildrenheaders = [normalize_text(item['text']) for item, p in listofHeaderstoMarkup] allchildrenheaders_set = set(allchildrenheaders) # For faster lookups # df = pd.DataFrame(columns=["NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2","BodyText"]) dictionaryNBS={} data_list_JSON = [] json_output=[] currentgroupname='' # if len(top_3_font_sizes)==3: # mainHeaderFontSize, subHeaderFontSize, subsubheaderFontSize = top_3_font_sizes # elif len(top_3_font_sizes)==2: # mainHeaderFontSize= top_3_font_sizes[0] # subHeaderFontSize= top_3_font_sizes[1] # subsubheaderFontSize= top_3_font_sizes[1] # Preload all pages to avoid repeated loading # pages = [doc.load_page(page_num) for page_num in range(len(doc)) if page_num not in toc_pages] for heading_to_searchDict,pathss in listofHeaderstoMarkup: heading_to_search = heading_to_searchDict['text'] heading_to_searchPageNum = heading_to_searchDict['page'] paths=heading_to_searchDict['path'] # Initialize variables headertoContinue1 = False headertoContinue2 = False matched_header_line = None done = False collecting = False collected_lines = [] page_highlights = {} current_bbox = {} last_y1s = {} mainHeader = '' subHeader = '' matched_header_line_norm = heading_to_search break_collecting = False heading_norm = normalize_text(heading_to_search) paths_norm = [normalize_text(p) for p in paths[0]] if paths and paths[0] else [] for page_num in range(heading_to_searchPageNum,len(doc)): # print(heading_to_search) if paths[0].strip().lower() != currentgroupname.strip().lower(): Alltexttobebilled+= paths[0] +'\n' currentgroupname=paths[0] # print(paths[0]) if page_num in toc_pages: continue if break_collecting: break page=doc[page_num] page_height = page.rect.height blocks = page.get_text("dict")["blocks"] for block in blocks: if break_collecting: break lines = block.get("lines", []) i = 0 while i < len(lines): if break_collecting: break spans = lines[i].get("spans", []) if not spans: i += 1 continue y0 = spans[0]["bbox"][1] y1 = spans[0]["bbox"][3] if y0 < top_margin or y1 > (page_height - bottom_margin): i += 1 continue line_text = get_spaced_text_from_spans(spans).lower() line_text_norm = normalize_text(line_text) # Combine with next line if available if i + 1 < len(lines): next_spans = lines[i + 1].get("spans", []) next_line_text = get_spaced_text_from_spans(next_spans).lower() combined_line_norm = normalize_text(line_text + " " + next_line_text) else: combined_line_norm = line_text_norm # Check if we should continue processing # if combined_line_norm and combined_line_norm in paths[0]: # headertoContinue1 = combined_line_norm # if combined_line_norm and combined_line_norm in paths[-2]: # headertoContinue2 = combined_line_norm # if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : last_path = paths[-2].lower() # if any(word in paths[-2].lower() for word in keywordstoSkip): # if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() or 'workmanship' in paths[-2].lower() or 'testing' in paths[-2].lower() or 'labeling' in paths[-2].lower(): if any(keyword in last_path for keyword in keywords): stringtowrite='Not to be billed' else: stringtowrite='To be billed' if stringtowrite=='To be billed': # Alltexttobebilled+= combined_line_norm ################################################# if matched_header_line_norm in combined_line_norm: Alltexttobebilled+='\n' Alltexttobebilled+= ' '+combined_line_norm # Optimized header matching existsfull = ( ( combined_line_norm in allchildrenheaders_set or combined_line_norm in allchildrenheaders ) and heading_to_search in combined_line_norm ) # New word-based matching current_line_words = set(combined_line_norm.split()) heading_words = set(heading_norm.split()) all_words_match = current_line_words.issubset(heading_words) and len(current_line_words) > 0 substring_match = ( heading_norm in combined_line_norm or combined_line_norm in heading_norm or all_words_match # Include the new word-based matching ) # substring_match = ( # heading_norm in combined_line_norm or # combined_line_norm in heading_norm # ) if (substring_match and existsfull and not collecting and len(combined_line_norm) > 0 ):#and (headertoContinue1 or headertoContinue2) ): # Check header conditions more efficiently # header_spans = [ # span for span in spans # if (is_header(span, most_common_font_size, most_common_color, most_common_font) # # and span['size'] >= subsubheaderFontSize # and span['size'] < mainHeaderFontSize) # ] if stringtowrite.startswith('To') : collecting = True # if stringtowrite=='To be billed': # Alltexttobebilled+='\n' # matched_header_font_size = max(span["size"] for span in header_spans) # collected_lines.append(line_text) valid_spans = [span for span in spans if span.get("bbox")] if valid_spans: x0s = [span["bbox"][0] for span in valid_spans] x1s = [span["bbox"][2] for span in valid_spans] y0s = [span["bbox"][1] for span in valid_spans] y1s = [span["bbox"][3] for span in valid_spans] header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] if page_num in current_bbox: cb = current_bbox[page_num] current_bbox[page_num] = [ min(cb[0], header_bbox[0]), min(cb[1], header_bbox[1]), max(cb[2], header_bbox[2]), max(cb[3], header_bbox[3]) ] else: current_bbox[page_num] = header_bbox last_y1s[page_num] = header_bbox[3] x0, y0, x1, y1 = header_bbox zoom = 200 left = int(x0) top = int(y0) zoom_str = f"{zoom},{left},{top}" pageNumberFound = page_num + 1 # Build the query parameters params = { 'pdfLink': pdf_path, # Your PDF link 'keyword': heading_to_search, # Your keyword (could be a string or list) } # URL encode each parameter encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} # Construct the final encoded link encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) # Correctly construct the final URL with page and zoom final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" # Get current date and time now = datetime.now() # Format the output formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") # Optionally, add the URL to a DataFrame data_entry = { "PDF Name":filename, "NBSLink": zoom_str, "Subject": heading_to_search, "Page": str(pageNumberFound), "Author": "ADR", "Creation Date": formatted_time, "Layer": "Initial", "Code": stringtowrite, # "head above 1": paths[-2], # "head above 2": paths[0], "BodyText":collected_lines, "MC Connnection": 'Go to ' + paths[0].strip().split()[0] +'/'+ heading_to_search.strip().split()[0] + ' in '+ filename } # Dynamically add "head above 1", "head above 2", ... depending on the number of levels for i, path_text in enumerate(paths[:-1]): # skip the last one because that's the current heading data_entry[f"head above {i+1}"] = path_text data_list_JSON.append(data_entry) # Convert list to JSON # json_output = [data_list_JSON] # json_output = json.dumps(data_list_JSON, indent=4) i += 2 continue else: if (substring_match and not collecting and len(combined_line_norm) > 0): # and (headertoContinue1 or headertoContinue2) ): # Calculate word match percentage word_match_percent = words_match_ratio(heading_norm, combined_line_norm) * 100 # Check if at least 70% of header words exist in this line meets_word_threshold = word_match_percent >= 100 # Check header conditions (including word threshold) # header_spans = [ # span for span in spans # if (is_header(span, most_common_font_size, most_common_color, most_common_font) # # and span['size'] >= subsubheaderFontSize # and span['size'] < mainHeaderFontSize) # ] if (meets_word_threshold or same_start_word(heading_to_search, combined_line_norm) ) and stringtowrite.startswith('To'): collecting = True if stringtowrite=='To be billed': Alltexttobebilled+='\n' # if stringtowrite=='To be billed': # Alltexttobebilled+= ' '+ combined_line_norm # matched_header_font_size = max(span["size"] for span in header_spans) collected_lines.append(line_text) valid_spans = [span for span in spans if span.get("bbox")] if valid_spans: x0s = [span["bbox"][0] for span in valid_spans] x1s = [span["bbox"][2] for span in valid_spans] y0s = [span["bbox"][1] for span in valid_spans] y1s = [span["bbox"][3] for span in valid_spans] header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] if page_num in current_bbox: cb = current_bbox[page_num] current_bbox[page_num] = [ min(cb[0], header_bbox[0]), min(cb[1], header_bbox[1]), max(cb[2], header_bbox[2]), max(cb[3], header_bbox[3]) ] else: current_bbox[page_num] = header_bbox last_y1s[page_num] = header_bbox[3] x0, y0, x1, y1 = header_bbox zoom = 200 left = int(x0) top = int(y0) zoom_str = f"{zoom},{left},{top}" pageNumberFound = page_num + 1 # Build the query parameters params = { 'pdfLink': pdf_path, # Your PDF link 'keyword': heading_to_search, # Your keyword (could be a string or list) } # URL encode each parameter encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} # Construct the final encoded link encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) # Correctly construct the final URL with page and zoom final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" # Get current date and time now = datetime.now() # Format the output formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") # Optionally, add the URL to a DataFrame data_entry = { "PDF Name":filename, "NBSLink": zoom_str, "Subject": heading_to_search, "Page": str(pageNumberFound), "Author": "ADR", "Creation Date": formatted_time, "Layer": "Initial", "Code": stringtowrite, # "head above 1": paths[-2], # "head above 2": paths[0], "BodyText":collected_lines, "MC Connnection": 'Go to ' + paths[0].strip().split()[0] +'/'+ heading_to_search.strip().split()[0] + ' in '+ filename } # Dynamically add "head above 1", "head above 2", ... depending on the number of levels for i, path_text in enumerate(paths[:-1]): # skip the last one because that's the current heading data_entry[f"head above {i+1}"] = path_text data_list_JSON.append(data_entry) # Convert list to JSON # json_output = [data_list_JSON] # json_output = json.dumps(data_list_JSON, indent=4) i += 2 continue if collecting: norm_line = normalize_text(line_text) # Optimized URL check if url_pattern.match(norm_line): line_is_header = False else: line_is_header = any(is_header(span, most_common_font_size, most_common_color, most_common_font) for span in spans) if line_is_header: header_font_size = max(span["size"] for span in spans) is_probably_real_header = ( # header_font_size >= matched_header_font_size and # is_header(spans[0], most_common_font_size, most_common_color, most_common_font) and len(line_text.strip()) > 2 ) if (norm_line != matched_header_line_norm and norm_line != heading_norm and is_probably_real_header): if line_text not in heading_norm: collecting = False done = True headertoContinue1 = False headertoContinue2=False for page_num, bbox in current_bbox.items(): bbox[3] = last_y1s.get(page_num, bbox[3]) page_highlights[page_num] = bbox highlight_boxes(docHighlights, page_highlights,stringtowrite) break_collecting = True break if break_collecting: break collected_lines.append(line_text) valid_spans = [span for span in spans if span.get("bbox")] if valid_spans: x0s = [span["bbox"][0] for span in valid_spans] x1s = [span["bbox"][2] for span in valid_spans] y0s = [span["bbox"][1] for span in valid_spans] y1s = [span["bbox"][3] for span in valid_spans] line_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] if page_num in current_bbox: cb = current_bbox[page_num] current_bbox[page_num] = [ min(cb[0], line_bbox[0]), min(cb[1], line_bbox[1]), max(cb[2], line_bbox[2]), max(cb[3], line_bbox[3]) ] else: current_bbox[page_num] = line_bbox last_y1s[page_num] = line_bbox[3] i += 1 if not done: for page_num, bbox in current_bbox.items(): bbox[3] = last_y1s.get(page_num, bbox[3]) page_highlights[page_num] = bbox if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : stringtowrite='Not to be billed' else: stringtowrite='To be billed' highlight_boxes(docHighlights, page_highlights,stringtowrite) docarray.append(docHighlights) jsons.append(data_list_JSON) dbxTeam = tsadropboxretrieval.ADR_Access_DropboxTeam('user') dbPath = '/TSA JOBS/ADR Test/FIND/' jsonCombined=[] for i in range(len(arrayofPDFS)): singlepdf=arrayofPDFS[i] metadata = dbxTeam.sharing_get_shared_link_metadata(singlepdf) pdf_bytes = BytesIO() docHighlights.save(pdf_bytes) pdflink = tsadropboxretrieval.uploadanyFile(doc=docarray[i], path=dbPath, pdfname=filenames[i]) # json_copy = copy.deepcopy(jsons[i]) # Update links for this JSON # json_output1 = changepdflinks(json_copy, pdflink) json_output1=changepdflinks(jsons[i],pdflink) jsonCombined.extend(json_output1) combined_json_str = json.dumps(jsonCombined, indent=1) print(combined_json_str) return pdf_bytes.getvalue(), docHighlights , combined_json_str, Alltexttobebilled , filenames def testFunction(pdf_path): Alltexttobebilled='' alltextWithoutNotbilled='' # keywordstoSkip=["installation", "execution", "miscellaneous items", "workmanship", "testing", "labeling"] headertoContinue1 = False headertoContinue2=False parsed_url = urlparse(pdf_path) filename = os.path.basename(parsed_url.path) filename = unquote(filename) # decode URL-encoded characters # Optimized URL handling if pdf_path and ('http' in pdf_path or 'dropbox' in pdf_path): pdf_path = pdf_path.replace('dl=0', 'dl=1') # Cache frequently used values response = requests.get(pdf_path) pdf_content = BytesIO(response.content) if not pdf_content: raise ValueError("No valid PDF content found.") doc = fitz.open(stream=pdf_content, filetype="pdf") docHighlights = fitz.open(stream=pdf_content, filetype="pdf") parsed_url = urlparse(pdf_path) filename = os.path.basename(parsed_url.path) filename = unquote(filename) # decode URL-encoded characters #### Get regular tex font size, style , color most_common_font_size, most_common_color, most_common_font = get_regular_font_size_and_color(doc) # Precompute regex patterns dot_pattern = re.compile(r'\.{3,}') url_pattern = re.compile(r'https?://\S+|www\.\S+') highlighted=[] processed_subjects = set() # Initialize at the top of testFunction toc_pages = get_toc_page_numbers(doc) headers=process_document_in_chunks(doc, api_key='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') # identified_headers = identify_headers_with_openrouterNEWW(doc, api_key='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')# ['text', fontsize, page number,y] with open("identified_headers.json", "w", encoding="utf-8") as f: json.dump(headers, f, indent=4) # with open("identified_headers.json", "r", encoding="utf-8") as f: # headers = json.load(f) # print(identified_headers) allheaders_LLM=[] for h in headers: # if int(h["page"]) in toc_pages: # continue if h['text']: allheaders_LLM.append([h['text'],h["page"]]) hierarchy=identify_hierarchy_levels_openrouter(allheadersLLM=allheaders_LLM,api_key='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') with open("identified_hierarchy.json", "w", encoding="utf-8") as f: json.dump(hierarchy, f, indent=4) # with open("identified_hierarchy.json", "r", encoding="utf-8") as f: # hierarchy = json.load(f) identified_headers=mapPages_header_hierarchy(headers,hierarchy) print('identified_headers',identified_headers) headers_json=headers_with_location(doc,identified_headers) headers=filter_headers_outside_toc(headers_json,toc_pages) hierarchy=build_hierarchy_from_llm(headers) # identify_headers_and_save_excel(hierarchy) listofHeaderstoMarkup = get_leaf_headers_with_paths(hierarchy) allchildrenheaders = [normalize_text(item['text']) for item, p in listofHeaderstoMarkup] allchildrenheaders_set = set(allchildrenheaders) # For faster lookups # print('allchildrenheaders_set',allchildrenheaders_set) df = pd.DataFrame(columns=["NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2",'BodyText']) dictionaryNBS={} data_list_JSON = [] for heading_to_searchDict,pathss in listofHeaderstoMarkup: heading_to_search = heading_to_searchDict['text'] heading_to_searchPageNum = heading_to_searchDict['page'] paths=heading_to_searchDict['path'] # xloc=heading_to_searchDict['x'] yloc=heading_to_searchDict['y'] # Initialize variables headertoContinue1 = False headertoContinue2 = False matched_header_line = None done = False collecting = False collected_lines = [] page_highlights = {} current_bbox = {} last_y1s = {} mainHeader = '' subHeader = '' matched_header_line_norm = heading_to_search break_collecting = False heading_norm = normalize_text(heading_to_search) paths_norm = [normalize_text(p) for p in paths[0]] if paths and paths[0] else [] for page_num in range(heading_to_searchPageNum,len(doc)): if page_num in toc_pages: continue if break_collecting: break page=doc[page_num] page_height = page.rect.height blocks = page.get_text("dict")["blocks"] for block in blocks: if break_collecting: break lines = block.get("lines", []) i = 0 while i < len(lines): if break_collecting: break spans = lines[i].get("spans", []) if not spans: i += 1 continue # y0 = spans[0]["bbox"][1] # y1 = spans[0]["bbox"][3] x0 = spans[0]["bbox"][0] # left x1 = spans[0]["bbox"][2] # right y0 = spans[0]["bbox"][1] # top y1 = spans[0]["bbox"][3] # bottom if y0 < top_margin or y1 > (page_height - bottom_margin): i += 1 continue line_text = get_spaced_text_from_spans(spans).lower() line_text_norm = normalize_text(line_text) # Combine with next line if available if i + 1 < len(lines): next_spans = lines[i + 1].get("spans", []) next_line_text = get_spaced_text_from_spans(next_spans).lower() combined_line_norm = normalize_text(line_text + " " + next_line_text) else: combined_line_norm = line_text_norm # Check if we should continue processing if combined_line_norm and combined_line_norm in paths[0]: headertoContinue1 = combined_line_norm if combined_line_norm and combined_line_norm in paths[-2]: headertoContinue2 = combined_line_norm # print('paths',paths) # if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : # if any(word in paths[-2].lower() for word in keywordstoSkip): # stringtowrite='Not to be billed' # else: stringtowrite='To be billed' if stringtowrite!='To be billed': alltextWithoutNotbilled+= combined_line_norm ################################################# # Optimized header matching existsfull = ( ( combined_line_norm in allchildrenheaders_set or combined_line_norm in allchildrenheaders ) and heading_to_search in combined_line_norm ) # existsfull=False # if xloc==x0 and yloc ==y0: # existsfull=True # New word-based matching current_line_words = set(combined_line_norm.split()) heading_words = set(heading_norm.split()) all_words_match = current_line_words.issubset(heading_words) and len(current_line_words) > 0 substring_match = ( heading_norm in combined_line_norm or combined_line_norm in heading_norm or all_words_match # Include the new word-based matching ) # substring_match = ( # heading_norm in combined_line_norm or # combined_line_norm in heading_norm # ) if ( substring_match and existsfull and not collecting and len(combined_line_norm) > 0 ):#and (headertoContinue1 or headertoContinue2) ): # Check header conditions more efficiently # header_spans = [ # span for span in spans # if (is_header(span, most_common_font_size, most_common_color, most_common_font) ) # # and span['size'] >= subsubheaderFontSize # # and span['size'] < mainHeaderFontSize) # ] if stringtowrite.startswith('To'): collecting = True # matched_header_font_size = max(span["size"] for span in header_spans) Alltexttobebilled+= ' '+ combined_line_norm # collected_lines.append(line_text) valid_spans = [span for span in spans if span.get("bbox")] if valid_spans: x0s = [span["bbox"][0] for span in valid_spans] x1s = [span["bbox"][2] for span in valid_spans] y0s = [span["bbox"][1] for span in valid_spans] y1s = [span["bbox"][3] for span in valid_spans] header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] if page_num in current_bbox: cb = current_bbox[page_num] current_bbox[page_num] = [ min(cb[0], header_bbox[0]), min(cb[1], header_bbox[1]), max(cb[2], header_bbox[2]), max(cb[3], header_bbox[3]) ] else: current_bbox[page_num] = header_bbox last_y1s[page_num] = header_bbox[3] x0, y0, x1, y1 = header_bbox zoom = 200 left = int(x0) top = int(y0) zoom_str = f"{zoom},{left},{top}" pageNumberFound = page_num + 1 # Build the query parameters params = { 'pdfLink': pdf_path, # Your PDF link 'keyword': heading_to_search, # Your keyword (could be a string or list) } # URL encode each parameter encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} # Construct the final encoded link encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) # Correctly construct the final URL with page and zoom final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" # Get current date and time now = datetime.now() # Format the output formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") # Optionally, add the URL to a DataFrame # Create the data entry only if the subject is unique if heading_to_search not in processed_subjects: data_entry = { "NBSLink": zoom_str, "Subject": heading_to_search, "Page": str(pageNumberFound), "Author": "ADR", "Creation Date": formatted_time, "Layer": "Initial", "Code": stringtowrite, "BodyText": collected_lines, "MC Connnection": 'Go to ' + paths[0].strip().split()[0] + '/' + heading_to_search.strip().split()[0] + ' in ' + filename } # Dynamically add hierarchy paths for i, path_text in enumerate(paths[:-1]): data_entry[f"head above {i+1}"] = path_text # Append to the list and mark this subject as processed data_list_JSON.append(data_entry) processed_subjects.add(heading_to_search) else: print(f"Skipping duplicate data entry for Subject: {heading_to_search}") # Convert list to JSON json_output = json.dumps(data_list_JSON, indent=4) i += 1 continue else: if (substring_match and not collecting and len(combined_line_norm) > 0): # and (headertoContinue1 or headertoContinue2) ): # Calculate word match percentage word_match_percent = words_match_ratio(heading_norm, combined_line_norm) * 100 # Check if at least 70% of header words exist in this line meets_word_threshold = word_match_percent >= 100 # Check header conditions (including word threshold) # header_spans = [ # span for span in spans # if (is_header(span, most_common_font_size, most_common_color, most_common_font)) # # and span['size'] >= subsubheaderFontSize # # and span['size'] < mainHeaderFontSize) # ] if (meets_word_threshold or same_start_word(heading_to_search, combined_line_norm) ) and stringtowrite.startswith('To'): collecting = True # matched_header_font_size = max(span["size"] for span in header_spans) Alltexttobebilled+= ' '+ combined_line_norm collected_lines.append(line_text) valid_spans = [span for span in spans if span.get("bbox")] if valid_spans: x0s = [span["bbox"][0] for span in valid_spans] x1s = [span["bbox"][2] for span in valid_spans] y0s = [span["bbox"][1] for span in valid_spans] y1s = [span["bbox"][3] for span in valid_spans] header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] if page_num in current_bbox: cb = current_bbox[page_num] current_bbox[page_num] = [ min(cb[0], header_bbox[0]), min(cb[1], header_bbox[1]), max(cb[2], header_bbox[2]), max(cb[3], header_bbox[3]) ] else: current_bbox[page_num] = header_bbox last_y1s[page_num] = header_bbox[3] x0, y0, x1, y1 = header_bbox zoom = 200 left = int(x0) top = int(y0) zoom_str = f"{zoom},{left},{top}" pageNumberFound = page_num + 1 # Build the query parameters params = { 'pdfLink': pdf_path, # Your PDF link 'keyword': heading_to_search, # Your keyword (could be a string or list) } # URL encode each parameter encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} # Construct the final encoded link encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) # Correctly construct the final URL with page and zoom final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" # Get current date and time now = datetime.now() # Format the output formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") # Optionally, add the URL to a DataFrame # Create the data entry only if the subject is unique if heading_to_search not in processed_subjects: data_entry = { "NBSLink": zoom_str, "Subject": heading_to_search, "Page": str(pageNumberFound), "Author": "ADR", "Creation Date": formatted_time, "Layer": "Initial", "Code": stringtowrite, "BodyText": collected_lines, "MC Connnection": 'Go to ' + paths[0].strip().split()[0] + '/' + heading_to_search.strip().split()[0] + ' in ' + filename } # Dynamically add hierarchy paths for i, path_text in enumerate(paths[:-1]): data_entry[f"head above {i+1}"] = path_text # Append to the list and mark this subject as processed data_list_JSON.append(data_entry) processed_subjects.add(heading_to_search) else: print(f"Skipping duplicate data entry for Subject: {heading_to_search}") # Convert list to JSON json_output = json.dumps(data_list_JSON, indent=4) i += 2 continue if collecting: norm_line = normalize_text(line_text) def normalize(text): if isinstance(text, list): text = " ".join(text) return " ".join(text.lower().split()) def is_similar(a, b, threshold=0.75): return SequenceMatcher(None, a, b).ratio() >= threshold # Optimized URL check if url_pattern.match(norm_line): line_is_header = False else: line_is_header = any(is_header(span, most_common_font_size, most_common_color, most_common_font,allheaders_LLM) for span in spans) # def normalize(text): # return " ".join(text.lower().split()) # line_text = " ".join(span["text"] for span in spans).strip() # line_is_header = any( normalize(line_text) == normalize(header) for header in allheaders_LLM ) # for line_text in lines: # if collecting: # # Join all spans into one line # line_text = " ".join(span["text"] for span in spans).strip() # norm_line = normalize(line_text) # # Get max font size in this line # max_font_size = max(span.get("size", 0) for span in spans) # # Skip URLs # if url_pattern.match(norm_line): # line_is_header = False # else: # text_matches_header = any( # is_similar(norm_line, normalize(header)) # if not isinstance(header, list) # else is_similar(norm_line, normalize(" ".join(header))) # for header in allheaders_LLM # ) # # ✅ FINAL header condition # line_is_header = text_matches_header and max_font_size > 11 if line_is_header: header_font_size = max(span["size"] for span in spans) is_probably_real_header = ( # header_font_size >= matched_header_font_size and # is_header(spans[0], most_common_font_size, most_common_color, most_common_font) and len(line_text.strip()) > 2 ) if (norm_line != matched_header_line_norm and norm_line != heading_norm and is_probably_real_header): if line_text not in heading_norm: collecting = False done = True headertoContinue1 = False headertoContinue2=False for page_num, bbox in current_bbox.items(): bbox[3] = last_y1s.get(page_num, bbox[3]) page_highlights[page_num] = bbox can_highlight=False if [page_num,bbox] not in highlighted: highlighted.append([page_num,bbox]) can_highlight=True if can_highlight: highlight_boxes(docHighlights, page_highlights,stringtowrite) break_collecting = True break if break_collecting: break collected_lines.append(line_text) valid_spans = [span for span in spans if span.get("bbox")] if valid_spans: x0s = [span["bbox"][0] for span in valid_spans] x1s = [span["bbox"][2] for span in valid_spans] y0s = [span["bbox"][1] for span in valid_spans] y1s = [span["bbox"][3] for span in valid_spans] line_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] if page_num in current_bbox: cb = current_bbox[page_num] current_bbox[page_num] = [ min(cb[0], line_bbox[0]), min(cb[1], line_bbox[1]), max(cb[2], line_bbox[2]), max(cb[3], line_bbox[3]) ] else: current_bbox[page_num] = line_bbox last_y1s[page_num] = line_bbox[3] i += 1 if not done: for page_num, bbox in current_bbox.items(): bbox[3] = last_y1s.get(page_num, bbox[3]) page_highlights[page_num] = bbox # if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : # stringtowrite='Not to be billed' # else: stringtowrite='To be billed' highlight_boxes(docHighlights, page_highlights,stringtowrite) print("Current working directory:", os.getcwd()) docHighlights.save("highlighted_output.pdf") # dbxTeam = tsadropboxretrieval.ADR_Access_DropboxTeam('user') # metadata = dbxTeam.sharing_get_shared_link_metadata(pdf_path) # dbPath = '/TSA JOBS/ADR Test/FIND/' # pdf_bytes = BytesIO() # docHighlights.save(pdf_bytes) # pdflink = tsadropboxretrieval.uploadanyFile(doc=docHighlights, path=dbPath, pdfname=filename) # json_output=changepdflinks(json_output,pdflink) # return pdf_bytes.getvalue(), docHighlights , json_output , Alltexttobebilled , alltextWithoutNotbilled , filename # Final safety check: if the very last entry in our list has an empty BodyText, # but we have collected_lines, sync them. if data_list_JSON and not data_list_JSON[-1]["BodyText"] and collected_lines: data_list_JSON[-1]["BodyText"] = collected_lines[1:] if len(collected_lines) > 0 else [] # Final cleanup of the JSON data before returning for entry in data_list_JSON: # Check if BodyText exists and has content if isinstance(entry.get("BodyText"), list) and len(entry["BodyText"]) > 0: # Check if the first line of the body is essentially the same as the Subject first_line = normalize_text(entry["BodyText"][0]) subject = normalize_text(entry["Subject"]) # If they match or the subject is inside the first line, remove it if subject in first_line or first_line in subject: entry["BodyText"] = entry["BodyText"][1:] # return json_output json_output = json.dumps(data_list_JSON, indent=4) # print('json_output',json_output) with open("json_output.txt", "w", encoding="utf-8") as f: json.dump(json_output, f, indent=4) return json_output, identified_headers def identify_headers_and_save_excel(pdf_path): try: jsons, result = testFunction(pdf_path) if not result: df = pd.DataFrame([{ "text": None, "page": None, "suggested_level": None, "confidence": None, "body": None, "System Message": "No headers were identified by the LLM." }]) else: print('here') df = pd.DataFrame(result) # Convert JSON string to list if needed if isinstance(jsons, str): jsons = json.loads(jsons) subject_body_map = {} # ✅ jsons is a flat list of dicts for obj in jsons: if not isinstance(obj, dict): continue subject = obj.get("Subject") body = obj.get("BodyText", []) if subject: subject_body_map[subject.strip()] = " ".join(body) # ✅ Map body to dataframe df["body"] = df["text"].map(subject_body_map).fillna("") # ✅ Save once at end output_path = os.path.abspath("header_analysis_output.xlsx") df.to_excel(output_path, index=False, engine="openpyxl") print("--- Processed DataFrame ---") print(df) return output_path except Exception as e: logger.error(f"Critical error in processing: {str(e)}") return None # ########### Main find ffunction to get the body - pass link here to the pdf from dropbox#### pdf_path='https://www.dropbox.com/scl/fi/z7u2b7nf5egnybkcp3aw6/DL0371-LRA-XX-XX-SP-AR-00050_NBS-SPECIFICATION_For-Tender_T01.pdf?rlkey=lbzxvdab4bnpri769ws9ogdqt&st=503l8jqq&dl=0' model='google/gemini-2.5-flash' output_path=identify_headers_and_save_excel(pdf_path) # testFunction('https://www.dropbox.com/scl/fi/z7u2b7nf5egnybkcp3aw6/DL0371-LRA-XX-XX-SP-AR-00050_NBS-SPECIFICATION_For-Tender_T01.pdf?rlkey=lbzxvdab4bnpri769ws9ogdqt&st=503l8jqq&dl=0') #### Calling openrouter only to test################ # pdf_path='https://www.dropbox.com/scl/fi/z7u2b7nf5egnybkcp3aw6/DL0371-LRA-XX-XX-SP-AR-00050_NBS-SPECIFICATION_For-Tender_T01.pdf?rlkey=lbzxvdab4bnpri769ws9ogdqt&st=503l8jqq&dl=0' # pdf_path = pdf_path.replace('dl=0', 'dl=1') # # # Cache frequently used values # response = requests.get(pdf_path) # pdf_content = BytesIO(response.content) # if not pdf_content: # raise ValueError("No valid PDF content found.") # doc = fitz.open(stream=pdf_content, filetype="pdf") # identified_headers=process_document_in_chunks(doc=doc,api_key='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') # # identified_headers = identify_headers_with_openrouterNEWW(doc, api_key='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') # # ###Saving in json the output of openrouter headers#### # with open("identified_headers.json", "w", encoding="utf-8") as f: # json.dump( # identified_headers, # f, # ensure_ascii=False, # indent=2, # default=str # 👈 THIS FIXES IT # ) # f.write("\n") # toc_pages=get_toc_page_numbers(doc) # with open("identified_headers.json", "r", encoding="utf-8") as f: # identified_headers = json.load(f) # allheaders_LLM=[] # for h in identified_headers: # # if int(h["page"]) in toc_pages: # # continue # if h['text']: # allheaders_LLM.append([h['text'],h['page']]) # hierarchy=identify_hierarchy_levels_openrouter(allheadersLLM=allheaders_LLM,api_key='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') # with open("identified_hierarchy.json", "w", encoding="utf-8") as f: # json.dump( # hierarchy, # f, # ensure_ascii=False, # indent=2, # default=str # 👈 THIS FIXES IT # ) # f.write("\n") #############################################################################################