Spaces:
Runtime error
Runtime error
| # -*- coding: utf-8 -*- | |
| """Copy of FindSpecsTrial(Retrieving+boundingBoxes)-InitialMarkups(ALL)_CleanedUp.ipynb | |
| Automatically generated by Colab. | |
| Original file is located at | |
| https://colab.research.google.com/drive/12XfVkmKmN3oVjHhLVE0_GgkftgArFEK2 | |
| """ | |
| baselink='https://adr.trevorsadd.co.uk/api/view-pdf?' | |
| newlink='https://adr.trevorsadd.co.uk/api/view-highlight?' | |
| tobebilledonlyLink='https://adr.trevorsadd.co.uk/api/view-pdf-tobebilled?' | |
| from urllib.parse import urlparse, unquote | |
| import os | |
| from io import BytesIO | |
| import re | |
| import requests | |
| import pandas as pd | |
| import fitz # PyMuPDF | |
| import re | |
| import urllib.parse | |
| import pandas as pd | |
| import math | |
| import random | |
| import json | |
| from datetime import datetime | |
| from collections import defaultdict, Counter | |
| import difflib | |
| from fuzzywuzzy import fuzz | |
| import copy | |
| import tsadropboxretrieval | |
| import urllib.parse | |
| top_margin = 70 | |
| bottom_margin = 85 | |
| def changepdflinks(json_data, pdf_path): | |
| print('ll , ' ,json_data,pdf_path) | |
| # base_viewer_link = "https://findconsole-initialmarkups.hf.space/view-pdf?" | |
| updated_json = [] | |
| for entry in json_data: | |
| # Extract needed fields | |
| zoom_str = entry.get("NBSLink", "") | |
| page_str=entry.get("Page","") | |
| # Encode the pdf link safely for URL usage | |
| encoded_pdf_link = urllib.parse.quote(pdf_path, safe='') | |
| # Construct the final link | |
| final_url = f"{baselink}pdfLink={encoded_pdf_link}#page={str(page_str)}&zoom={zoom_str}" | |
| # Replace the old NBSLink value with the full URL | |
| entry["NBSLink"] = final_url | |
| updated_json.append(entry) | |
| return updated_json | |
| def get_toc_page_numbers(doc, max_pages_to_check=15): | |
| toc_pages = [] | |
| # 1. Existing Dot Pattern (looking for ".....") | |
| dot_pattern = re.compile(r"\.{2,}") | |
| # 2. NEW: Title Pattern (looking for specific headers) | |
| # ^ and $ ensure the line is JUST that word (ignoring "The contents of the bag...") | |
| # re.IGNORECASE makes it match "CONTENTS", "Contents", "Index", etc. | |
| title_pattern = re.compile(r"^\s*(table of contents|contents|index)\s*$", re.IGNORECASE) | |
| for page_num in range(min(len(doc), max_pages_to_check)): | |
| page = doc.load_page(page_num) | |
| blocks = page.get_text("dict")["blocks"] | |
| dot_line_count = 0 | |
| has_toc_title = False | |
| for block in blocks: | |
| for line in block.get("lines", []): | |
| # Extract text from spans (mimicking get_spaced_text_from_spans) | |
| line_text = " ".join([span["text"] for span in line["spans"]]).strip() | |
| # CHECK A: Does the line have dots? | |
| if dot_pattern.search(line_text): | |
| dot_line_count += 1 | |
| # CHECK B: Is this line a Title? | |
| # We check this early in the loop. If a page has a title "Contents", | |
| # we mark it immediately. | |
| if title_pattern.match(line_text): | |
| has_toc_title = True | |
| # CONDITION: | |
| # It is a TOC page if it has a Title OR if it has dot leaders. | |
| # We use 'dot_line_count >= 1' to be sensitive to single-item lists. | |
| if has_toc_title or dot_line_count >= 1: | |
| toc_pages.append(page_num) | |
| # RETURN: | |
| # If we found TOC pages (e.g., [2, 3]), we return [0, 1, 2, 3] | |
| # This covers the cover page, inside cover, and the TOC itself. | |
| if toc_pages: | |
| last_toc_page = toc_pages[0] | |
| return list(range(0, last_toc_page + 1)) | |
| return [] # Return empty list if nothing found | |
| def get_regular_font_size_and_color(doc): | |
| font_sizes = [] | |
| colors = [] | |
| fonts = [] | |
| # Loop through all pages | |
| for page_num in range(len(doc)): | |
| page = doc.load_page(page_num) | |
| for span in page.get_text("dict")["blocks"]: | |
| if "lines" in span: | |
| for line in span["lines"]: | |
| for span in line["spans"]: | |
| font_sizes.append(span['size']) | |
| colors.append(span['color']) | |
| fonts.append(span['font']) | |
| # Get the most common font size, color, and font | |
| most_common_font_size = Counter(font_sizes).most_common(1)[0][0] if font_sizes else None | |
| most_common_color = Counter(colors).most_common(1)[0][0] if colors else None | |
| most_common_font = Counter(fonts).most_common(1)[0][0] if fonts else None | |
| return most_common_font_size, most_common_color, most_common_font | |
| def normalize_text(text): | |
| if text is None: | |
| return "" | |
| return re.sub(r'\s+', ' ', text.strip().lower()) | |
| def get_spaced_text_from_spans(spans): | |
| return normalize_text(" ".join(span["text"].strip() for span in spans)) | |
| def is_header(span, most_common_font_size, most_common_color, most_common_font): | |
| fontname = span.get("font", "").lower() | |
| # is_italic = "italic" in fontname or "oblique" in fontname | |
| is_bold = "bold" in fontname or span.get("bold", False) | |
| return ( | |
| ( | |
| span["size"] > most_common_font_size or | |
| span["font"].lower() != most_common_font.lower() or | |
| (is_bold and span["size"] > most_common_font_size ) | |
| ) | |
| ) | |
| def add_span_to_nearest_group(span_y, grouped_dict, pageNum=None, threshold=0.5): | |
| for (p, y) in grouped_dict: | |
| if pageNum is not None and p != pageNum: | |
| continue | |
| if abs(y - span_y) <= threshold: | |
| return (p, y) | |
| return (pageNum, span_y) | |
| def extract_headers(doc, toc_pages, most_common_font_size, most_common_color, most_common_font, top_margin, bottom_margin): | |
| grouped_headers = defaultdict(list) | |
| spans = [] | |
| line_merge_threshold = 1.5 # Maximum vertical distance between lines to consider as part of same header | |
| for pageNum in range(len(doc)): | |
| if pageNum in toc_pages: | |
| continue | |
| page = doc.load_page(pageNum) | |
| page_height = page.rect.height | |
| text_instances = page.get_text("dict") | |
| # First pass: collect all potential header spans | |
| potential_header_spans = [] | |
| for block in text_instances['blocks']: | |
| if block['type'] != 0: | |
| continue | |
| for line in block['lines']: | |
| for span in line['spans']: | |
| span_y0 = span['bbox'][1] | |
| span_y1 = span['bbox'][3] | |
| if span_y0 < top_margin or span_y1 > (page_height - bottom_margin): | |
| continue | |
| span_text = normalize_text(span.get('text', '')) | |
| if not span_text: | |
| continue | |
| if span_text.startswith('http://www') or span_text.startswith('www'): | |
| continue | |
| if any(( | |
| 'page' in span_text, | |
| not re.search(r'[a-z0-9]', span_text), | |
| 'end of section' in span_text, | |
| re.search(r'page\s+\d+\s+of\s+\d+', span_text), | |
| re.search(r'\b(?:\d{1,2}[/-])?\d{1,2}[/-]\d{2,4}\b', span_text), | |
| # re.search(r'\b(?:jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)', span_text), | |
| 'specification:' in span_text | |
| )): | |
| continue | |
| cleaned_text = re.sub(r'[.\-]{4,}.*$', '', span_text).strip() | |
| cleaned_text = normalize_text(cleaned_text) | |
| if is_header(span, most_common_font_size, most_common_color, most_common_font): | |
| potential_header_spans.append({ | |
| 'text': cleaned_text, | |
| 'size': span['size'], | |
| 'pageNum': pageNum, | |
| 'y0': span_y0, | |
| 'y1': span_y1, | |
| 'x0': span['bbox'][0], | |
| 'x1': span['bbox'][2], | |
| 'span': span | |
| }) | |
| # Sort spans by vertical position (top to bottom) | |
| potential_header_spans.sort(key=lambda s: (s['pageNum'], s['y0'])) | |
| # Second pass: group spans that are vertically close and likely part of same header | |
| i = 0 | |
| while i < len(potential_header_spans): | |
| current = potential_header_spans[i] | |
| header_text = current['text'] | |
| header_size = current['size'] | |
| header_page = current['pageNum'] | |
| min_y = current['y0'] | |
| max_y = current['y1'] | |
| spans_group = [current['span']] | |
| # Look ahead to find adjacent lines that might be part of same header | |
| j = i + 1 | |
| while j < len(potential_header_spans): | |
| next_span = potential_header_spans[j] | |
| # Check if on same page and vertically close with similar styling | |
| if (next_span['pageNum'] == header_page and | |
| next_span['y0'] - max_y < line_merge_threshold and | |
| abs(next_span['size'] - header_size) < 0.5): | |
| header_text += " " + next_span['text'] | |
| max_y = next_span['y1'] | |
| spans_group.append(next_span['span']) | |
| j += 1 | |
| else: | |
| break | |
| # Add the merged header | |
| grouped_headers[(header_page, min_y)].append({ | |
| "text": header_text.strip(), | |
| "size": header_size, | |
| "pageNum": header_page, | |
| "spans": spans_group | |
| }) | |
| spans.extend(spans_group) | |
| i = j # Skip the spans we've already processed | |
| # Prepare final headers list | |
| headers = [] | |
| for (pageNum, y), header_groups in sorted(grouped_headers.items()): | |
| for group in header_groups: | |
| headers.append([ | |
| group['text'], | |
| group['size'], | |
| group['pageNum'], | |
| y | |
| ]) | |
| font_sizes = [size for _, size, _, _ in headers] | |
| font_size_counts = Counter(font_sizes) | |
| # Filter font sizes that appear at least 3 times | |
| valid_font_sizes = [size for size, count in font_size_counts.items() if count >= 1] | |
| # Sort in descending order | |
| valid_font_sizes_sorted = sorted(valid_font_sizes, reverse=True) | |
| # If only 2 sizes, repeat the second one | |
| if len(valid_font_sizes_sorted) == 2: | |
| top_3_font_sizes = [valid_font_sizes_sorted[0], valid_font_sizes_sorted[1], valid_font_sizes_sorted[1]] | |
| else: | |
| top_3_font_sizes = valid_font_sizes_sorted[:3] | |
| # Get the smallest font size among valid ones | |
| smallest_font_size = min(valid_font_sizes) if valid_font_sizes else None | |
| return headers, top_3_font_sizes, smallest_font_size, spans | |
| def is_numbered(text): | |
| return bool(re.match(r'^\d', text.strip())) | |
| def is_similar(a, b, threshold=0.85): | |
| return difflib.SequenceMatcher(None, a, b).ratio() > threshold | |
| def normalize(text): | |
| text = text.lower() | |
| text = re.sub(r'\.{2,}', '', text) # remove long dots | |
| text = re.sub(r'\s+', ' ', text) # replace multiple spaces with one | |
| return text.strip() | |
| def clean_toc_entry(toc_text): | |
| """Remove page numbers and formatting from TOC entries""" | |
| # Remove everything after last sequence of dots/whitespace followed by digits | |
| return re.sub(r'[\.\s]+\d+.*$', '', toc_text).strip('. ') | |
| def enforce_level_hierarchy(headers): | |
| """ | |
| Ensure level 2 headers only exist under level 1 headers | |
| and clean up any orphaned headers | |
| """ | |
| def process_node_list(node_list, parent_level=-1): | |
| i = 0 | |
| while i < len(node_list): | |
| node = node_list[i] | |
| # Remove level 2 headers that don't have a level 1 parent | |
| if node['level'] == 2 and parent_level != 1: | |
| node_list.pop(i) | |
| continue | |
| # Recursively process children | |
| process_node_list(node['children'], node['level']) | |
| i += 1 | |
| process_node_list(headers) | |
| return headers | |
| def build_header_hierarchy(doc, toc_pages, most_common_font_size, most_common_color, most_common_font, top_margin=70, bottom_margin=70): | |
| # Extract headers with margin handling | |
| headers_list, top_3_font_sizes, smallest_font_size, spans = extract_headers( | |
| doc, | |
| toc_pages=toc_pages, | |
| most_common_font_size=most_common_font_size, | |
| most_common_color=most_common_color, | |
| most_common_font=most_common_font, | |
| top_margin=top_margin, | |
| bottom_margin=50 | |
| ) | |
| # Step 1: Collect and filter potential headers | |
| headers = [] | |
| seen_headers = set() | |
| # First extract TOC entries to get exact level 0 header texts | |
| toc_entries = {} | |
| for pno in toc_pages: | |
| print(pno) | |
| page = doc[pno] | |
| toc_text = page.get_text() | |
| for line in toc_text.split('\n'): | |
| clean_line = line.strip() | |
| if clean_line: | |
| norm_line = normalize(clean_line) | |
| toc_entries[norm_line] = clean_line # Store original text | |
| print(toc_pages) | |
| for h in headers_list: | |
| text, size, pageNum, y = h[:4] | |
| page = doc.load_page(pageNum) | |
| page_height = page.rect.height | |
| # Skip margin areas | |
| if y < top_margin or y > (page_height - bottom_margin): | |
| continue | |
| norm_text = normalize(text) | |
| if len(norm_text) > 2 and size >= most_common_font_size: | |
| headers.append({ | |
| "text": text, | |
| "page": pageNum, | |
| "y": y, | |
| "size": size, | |
| "bold": h[4] if len(h) > 4 else False, | |
| # "italic": h[5] if len(h) > 5 else False, | |
| "color": h[6] if len(h) > 6 else None, | |
| "font": h[7] if len(h) > 7 else None, | |
| "children": [], | |
| "is_numbered": is_numbered(text), | |
| "original_size": size, | |
| "norm_text": norm_text, | |
| "level": -1 # Initialize as unassigned | |
| }) | |
| # Sort by page and vertical position | |
| headers.sort(key=lambda h: (h['page'], h['y'])) | |
| # Step 2: Detect consecutive headers and assign levels | |
| i = 0 | |
| while i < len(headers) - 1: | |
| current = headers[i] | |
| next_header = headers[i+1] | |
| # Check if they are on the same page and very close vertically (likely consecutive lines) | |
| if (current['page'] == next_header['page'] and | |
| abs(current['y'] - next_header['y']) < 20): # 20pt threshold for "same line" | |
| # Case 1: Both unassigned - make current level 1 and next level 2 | |
| if current['level'] == -1 and next_header['level'] == -1: | |
| current['level'] = 1 | |
| next_header['level'] = 2 | |
| i += 1 # Skip next header since we processed it | |
| # Case 2: Current unassigned, next assigned - make current one level above | |
| elif current['level'] == -1 and next_header['level'] != -1: | |
| current['level'] = max(1, next_header['level'] - 1) | |
| # Case 3: Current assigned, next unassigned - make next one level below | |
| elif current['level'] != -1 and next_header['level'] == -1: | |
| next_header['level'] = current['level'] + 1 | |
| i += 1 # Skip next header since we processed it | |
| i += 1 | |
| # Step 2: Identify level 0 headers (largest and in TOC) | |
| # max_size = max(h['size'] for h in headers) if headers else 0 | |
| print(top_3_font_sizes) | |
| max_size,subheaderSize,nbsheadersize=top_3_font_sizes | |
| print(max_size) | |
| toc_text_match=[] | |
| # Improved TOC matching with exact and substring matching | |
| toc_matches = [] | |
| for h in headers: | |
| norm_text = h['norm_text'] | |
| matching_toc_texts = [] | |
| # Check both exact matches and substring matches | |
| for toc_norm, toc_text in toc_entries.items(): | |
| # Exact match case | |
| if norm_text == toc_norm and len(toc_text)>4 and h['size']==max_size: | |
| matching_toc_texts.append(toc_text) | |
| # Substring match case (header is substring of TOC entry) | |
| elif norm_text in toc_norm and len(toc_text)>4 and h['size']==max_size: | |
| matching_toc_texts.append(toc_text) | |
| # Substring match case (TOC entry is substring of header) | |
| elif toc_norm in norm_text and len(toc_text)>4 and h['size']==max_size: | |
| matching_toc_texts.append(toc_text) | |
| if matching_toc_texts and h['size'] >= max_size * 0.9: | |
| best_match = max(matching_toc_texts, | |
| key=lambda x: (len(x), -len(x.replace(norm_text, '')))) | |
| h['text'] = normalize_text(clean_toc_entry(best_match)) | |
| h['level'] = 0 | |
| if h['text'] not in toc_text_match: | |
| toc_matches.append(h) | |
| toc_text_match.append(h['text']) | |
| elif matching_toc_texts and h['size'] < max_size * 0.9 and h['size'] > nbsheadersize : # h['size'] < max_size * 0.9 and h['size'] > max_size*0.75: | |
| print(h['text'],matching_toc_texts) | |
| headers.remove(h) | |
| continue | |
| # Remove duplicates - keep only first occurrence of each level 0 header | |
| unique_level0 = [] | |
| seen_level0 = set() | |
| for h in toc_matches: | |
| # Use the cleaned text for duplicate checking | |
| cleaned_text = clean_toc_entry(h['text']) | |
| norm_cleaned_text = normalize(cleaned_text) | |
| if norm_cleaned_text not in seen_level0: | |
| seen_level0.add(norm_cleaned_text) | |
| # Update the header text with cleaned version | |
| h['text'] = cleaned_text | |
| unique_level0.append(h) | |
| print(f"Added unique header: {cleaned_text} (normalized: {norm_cleaned_text})") | |
| # Step 3: Process headers under each level 0 to identify level 1 format | |
| # First, group headers by their level 0 parent | |
| level0_headers = [h for h in headers if h['level'] == 0] | |
| header_groups = [] | |
| for i, level0 in enumerate(level0_headers): | |
| start_idx = headers.index(level0) | |
| end_idx = headers.index(level0_headers[i+1]) if i+1 < len(level0_headers) else len(headers) | |
| group = headers[start_idx:end_idx] | |
| header_groups.append(group) | |
| # Now process each group to identify level 1 format | |
| for group in header_groups: | |
| level0 = group[0] | |
| level1_candidates = [h for h in group[1:] if h['level'] == -1] | |
| if not level1_candidates: | |
| continue | |
| # The first candidate is our reference level 1 | |
| first_level1 = level1_candidates[0] | |
| level1_format = { | |
| 'font': first_level1['font'], | |
| 'color': first_level1['color'], | |
| 'starts_with_number': is_numbered(first_level1['text']), | |
| 'size': first_level1['size'], | |
| 'bold': first_level1['bold'] | |
| # 'italic': first_level1['italic'] | |
| } | |
| # Assign levels based on the reference format | |
| for h in level1_candidates: | |
| current_format = { | |
| 'font': h['font'], | |
| 'color': h['color'], | |
| 'starts_with_number': is_numbered(h['text']), | |
| 'size': h['size'], | |
| 'bold': h['bold'] | |
| # 'italic': h['italic'] | |
| } | |
| # Compare with level1 format | |
| if (current_format['font'] == level1_format['font'] and | |
| current_format['color'] == level1_format['color'] and | |
| current_format['starts_with_number'] == level1_format['starts_with_number'] and | |
| abs(current_format['size'] - level1_format['size']) <= 0.1 and | |
| current_format['bold'] == level1_format['bold'] ): #and | |
| # current_format['italic'] == level1_format['italic']): | |
| h['level'] = 1 | |
| else: | |
| h['level'] = 2 | |
| # Step 4: Assign levels to remaining unassigned headers | |
| unassigned = [h for h in headers if h['level'] == -1] | |
| if unassigned: | |
| # Cluster by size with tolerance | |
| sizes = sorted({h['size'] for h in unassigned}, reverse=True) | |
| clusters = [] | |
| for size in sizes: | |
| found_cluster = False | |
| for cluster in clusters: | |
| if abs(size - cluster['size']) <= max(size, cluster['size']) * 0.1: | |
| cluster['headers'].extend([h for h in unassigned if abs(h['size'] - size) <= size * 0.1]) | |
| found_cluster = True | |
| break | |
| if not found_cluster: | |
| clusters.append({ | |
| 'size': size, | |
| 'headers': [h for h in unassigned if abs(h['size'] - size) <= size * 0.1] | |
| }) | |
| # Assign levels starting from 1 | |
| clusters.sort(key=lambda x: -x['size']) | |
| for i, cluster in enumerate(clusters): | |
| for h in cluster['headers']: | |
| base_level = i + 1 | |
| if h['bold']: | |
| base_level = max(1, base_level - 1) | |
| h['level'] = base_level | |
| # Step 5: Build hierarchy | |
| root = [] | |
| stack = [] | |
| # Create a set of normalized texts from unique_level0 to avoid duplicates | |
| unique_level0_texts = {h['norm_text'] for h in unique_level0} | |
| # Filter out any headers from the original list that match unique_level0 headers | |
| filtered_headers = [] | |
| for h in headers: | |
| if h['norm_text'] in unique_level0_texts and h not in unique_level0: | |
| h['level'] = 0 | |
| filtered_headers.append(h) | |
| # Combine all headers - unique_level0 first, then the filtered headers | |
| all_headers = unique_level0 + filtered_headers | |
| all_headers.sort(key=lambda h: (h['page'], h['y'])) | |
| # Track which level 0 headers we've already added | |
| added_level0 = set() | |
| for header in all_headers: | |
| if header['level'] < 0: | |
| continue | |
| if header['level'] == 0: | |
| norm_text = header['norm_text'] | |
| if norm_text in added_level0: | |
| continue | |
| added_level0.add(norm_text) | |
| # Pop stack until we find a parent | |
| while stack and stack[-1]['level'] >= header['level']: | |
| stack.pop() | |
| current_parent = stack[-1] if stack else None | |
| if current_parent: | |
| current_parent['children'].append(header) | |
| else: | |
| root.append(header) | |
| stack.append(header) | |
| # Step 6: Enforce proper nesting | |
| def enforce_nesting(node_list, parent_level=-1): | |
| for node in node_list: | |
| if node['level'] <= parent_level: | |
| node['level'] = parent_level + 1 | |
| enforce_nesting(node['children'], node['level']) | |
| enforce_nesting(root) | |
| root = [h for h in root if not (h['level'] == 0 and not h['children'])] | |
| header_tree = enforce_level_hierarchy(root) | |
| return header_tree | |
| def adjust_levels_if_level0_not_in_toc(doc, toc_pages, root): | |
| def normalize(text): | |
| return re.sub(r'\s+', ' ', text.strip().lower()) | |
| toc_text = "" | |
| for pno in toc_pages: | |
| page = doc.load_page(pno) | |
| toc_text += page.get_text() | |
| toc_text_normalized = normalize(toc_text) | |
| def is_level0_in_toc_text(header): | |
| return header['level'] == 0 and normalize(header['text']) in toc_text_normalized | |
| if any(is_level0_in_toc_text(h) for h in root): | |
| return # No change needed | |
| def increase_levels(node_list): | |
| for node in node_list: | |
| node['level'] += 1 | |
| increase_levels(node['children']) | |
| def assign_numbers_to_headers(headers, prefix=None): | |
| for idx, header in enumerate(headers, 1): | |
| current_number = f"{prefix}.{idx}" if prefix else str(idx) | |
| header["number"] = current_number | |
| assign_numbers_to_headers(header["children"], current_number) | |
| def print_tree_with_numbers(headers, indent=0): | |
| for header in headers: | |
| size_info = f"size:{header['original_size']:.1f}" if 'original_size' in header else "" | |
| print(" " * indent + | |
| f"{header.get('number', '?')} {header['text']} " + | |
| f"(Level {header['level']}, p:{header['page']+1}, {size_info})") | |
| print_tree_with_numbers(header["children"], indent + 1) | |
| def process_document_headers(doc, toc_pages, most_common_font_size, most_common_color, most_common_font, top_margin=70, bottom_margin=50): | |
| print(f"Processing with margins - top:{top_margin}pt, bottom:{bottom_margin}pt") | |
| header_tree = build_header_hierarchy(doc, toc_pages, most_common_font_size, most_common_color, most_common_font, top_margin, bottom_margin) | |
| adjust_levels_if_level0_not_in_toc(doc, toc_pages, header_tree) | |
| print("Assigning numbers...") | |
| assign_numbers_to_headers(header_tree) | |
| print("Document structure (excluding margins):") | |
| print_tree_with_numbers(header_tree) | |
| return header_tree | |
| def highlight_boxes(doc, highlights, stringtowrite, fixed_width=500): # Set your desired width here | |
| for page_num, bbox in highlights.items(): | |
| page = doc.load_page(page_num) | |
| page_width = page.rect.width | |
| # Get original rect for vertical coordinates | |
| orig_rect = fitz.Rect(bbox) | |
| rect_height = orig_rect.height | |
| if rect_height > 30: | |
| if orig_rect.width > 10: | |
| # Center horizontally using fixed width | |
| center_x = page_width / 2 | |
| new_x0 = center_x - fixed_width / 2 | |
| new_x1 = center_x + fixed_width / 2 | |
| new_rect = fitz.Rect(new_x0, orig_rect.y0, new_x1, orig_rect.y1) | |
| # Add highlight rectangle | |
| annot = page.add_rect_annot(new_rect) | |
| if stringtowrite.startswith('Not'): | |
| annot.set_colors(stroke=(0.5, 0.5, 0.5), fill=(0.5, 0.5, 0.5)) | |
| else: | |
| annot.set_colors(stroke=(1, 1, 0), fill=(1, 1, 0)) | |
| annot.set_opacity(0.3) | |
| annot.update() | |
| # Add right-aligned freetext annotation inside the fixed-width box | |
| text = '['+stringtowrite +']' | |
| annot1 = page.add_freetext_annot( | |
| new_rect, | |
| text, | |
| fontsize=15, | |
| fontname='helv', | |
| text_color=(1, 0, 0), | |
| rotate=page.rotation, | |
| align=2 # right alignment | |
| ) | |
| annot1.update() | |
| def get_leaf_headers_with_paths(listtoloop, path=None, output=None): | |
| if path is None: | |
| path = [] | |
| if output is None: | |
| output = [] | |
| for header in listtoloop: | |
| current_path = path + [header['text']] | |
| if not header['children']: | |
| if header['level'] != 0 and header['level'] != 1: | |
| output.append((header, current_path)) | |
| else: | |
| get_leaf_headers_with_paths(header['children'], current_path, output) | |
| return output | |
| # Add this helper function at the top of your code | |
| def words_match_ratio(text1, text2): | |
| words1 = set(text1.split()) | |
| words2 = set(text2.split()) | |
| if not words1 or not words2: | |
| return 0.0 | |
| common_words = words1 & words2 | |
| return len(common_words) / len(words1) | |
| def same_start_word(s1, s2): | |
| # Split both strings into words | |
| words1 = s1.strip().split() | |
| words2 = s2.strip().split() | |
| # Check if both have at least one word and compare the first ones | |
| if words1 and words2: | |
| return words1[0].lower() == words2[0].lower() | |
| return False | |
| def extract_section_under_header(multiplePDF_Paths): | |
| filenames=[] | |
| keywords = {'installation', 'execution', 'miscellaneous items', 'workmanship', 'testing', 'labeling'} | |
| arrayofPDFS=multiplePDF_Paths.split(',') | |
| print(multiplePDF_Paths) | |
| print(arrayofPDFS,len(arrayofPDFS)) | |
| docarray=[] | |
| jsons=[] | |
| df = pd.DataFrame(columns=["PDF Name","NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2","BodyText"]) | |
| for pdf_path in arrayofPDFS: | |
| headertoContinue1 = False | |
| headertoContinue2=False | |
| Alltexttobebilled='' | |
| parsed_url = urlparse(pdf_path) | |
| filename = os.path.basename(parsed_url.path) | |
| filename = unquote(filename) # decode URL-encoded characters | |
| filenames.append(filename) | |
| # Optimized URL handling | |
| if pdf_path and ('http' in pdf_path or 'dropbox' in pdf_path): | |
| pdf_path = pdf_path.replace('dl=0', 'dl=1') | |
| # Cache frequently used values | |
| response = requests.get(pdf_path) | |
| pdf_content = BytesIO(response.content) | |
| if not pdf_content: | |
| raise ValueError("No valid PDF content found.") | |
| doc = fitz.open(stream=pdf_content, filetype="pdf") | |
| docHighlights = fitz.open(stream=pdf_content, filetype="pdf") | |
| most_common_font_size, most_common_color, most_common_font = get_regular_font_size_and_color(doc) | |
| # Precompute regex patterns | |
| dot_pattern = re.compile(r'\.{3,}') | |
| url_pattern = re.compile(r'https?://\S+|www\.\S+') | |
| toc_pages = get_toc_page_numbers(doc) | |
| headers, top_3_font_sizes, smallest_font_size, headersSpans = extract_headers( | |
| doc, toc_pages, most_common_font_size, most_common_color, most_common_font, top_margin, bottom_margin | |
| ) | |
| hierarchy = build_header_hierarchy(doc, toc_pages, most_common_font_size, most_common_color, most_common_font) | |
| listofHeaderstoMarkup = get_leaf_headers_with_paths(hierarchy) | |
| # Precompute all children headers once | |
| allchildrenheaders = [normalize_text(item['text']) for item, p in listofHeaderstoMarkup] | |
| allchildrenheaders_set = set(allchildrenheaders) # For faster lookups | |
| df = pd.DataFrame(columns=["NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2"]) | |
| dictionaryNBS={} | |
| data_list_JSON = [] | |
| if len(top_3_font_sizes)==3: | |
| mainHeaderFontSize, subHeaderFontSize, subsubheaderFontSize = top_3_font_sizes | |
| elif len(top_3_font_sizes)==2: | |
| mainHeaderFontSize= top_3_font_sizes[0] | |
| subHeaderFontSize= top_3_font_sizes[1] | |
| subsubheaderFontSize= top_3_font_sizes[1] | |
| # Preload all pages to avoid repeated loading | |
| # pages = [doc.load_page(page_num) for page_num in range(len(doc)) if page_num not in toc_pages] | |
| for heading_to_searchDict, paths in listofHeaderstoMarkup: | |
| heading_to_search = heading_to_searchDict['text'] | |
| heading_to_searchPageNum = heading_to_searchDict['page'] | |
| # Initialize variables | |
| headertoContinue1 = False | |
| headertoContinue2 = False | |
| matched_header_line = None | |
| done = False | |
| collecting = False | |
| collected_lines = [] | |
| page_highlights = {} | |
| current_bbox = {} | |
| last_y1s = {} | |
| mainHeader = '' | |
| subHeader = '' | |
| matched_header_line_norm = heading_to_search | |
| break_collecting = False | |
| heading_norm = normalize_text(heading_to_search) | |
| paths_norm = [normalize_text(p) for p in paths[0]] if paths and paths[0] else [] | |
| for page_num in range(heading_to_searchPageNum,len(doc)): | |
| if page_num in toc_pages: | |
| continue | |
| if break_collecting: | |
| break | |
| page=doc[page_num] | |
| page_height = page.rect.height | |
| blocks = page.get_text("dict")["blocks"] | |
| for block in blocks: | |
| if break_collecting: | |
| break | |
| lines = block.get("lines", []) | |
| i = 0 | |
| while i < len(lines): | |
| if break_collecting: | |
| break | |
| spans = lines[i].get("spans", []) | |
| if not spans: | |
| i += 1 | |
| continue | |
| y0 = spans[0]["bbox"][1] | |
| y1 = spans[0]["bbox"][3] | |
| if y0 < top_margin or y1 > (page_height - bottom_margin): | |
| i += 1 | |
| continue | |
| line_text = get_spaced_text_from_spans(spans).lower() | |
| line_text_norm = normalize_text(line_text) | |
| # Combine with next line if available | |
| if i + 1 < len(lines): | |
| next_spans = lines[i + 1].get("spans", []) | |
| next_line_text = get_spaced_text_from_spans(next_spans).lower() | |
| combined_line_norm = normalize_text(line_text + " " + next_line_text) | |
| else: | |
| combined_line_norm = line_text_norm | |
| # Check if we should continue processing | |
| if combined_line_norm and combined_line_norm in paths[0]: | |
| headertoContinue1 = combined_line_norm | |
| if combined_line_norm and combined_line_norm in paths[-2]: | |
| headertoContinue2 = combined_line_norm | |
| if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : | |
| stringtowrite='Not to be billed' | |
| else: | |
| stringtowrite='To be billed' | |
| # Optimized header matching | |
| existsfull = ( | |
| ( combined_line_norm in allchildrenheaders_set or | |
| combined_line_norm in allchildrenheaders ) and heading_to_search in combined_line_norm | |
| ) | |
| # New word-based matching | |
| current_line_words = set(combined_line_norm.split()) | |
| heading_words = set(heading_norm.split()) | |
| all_words_match = current_line_words.issubset(heading_words) and len(current_line_words) > 0 | |
| substring_match = ( | |
| heading_norm in combined_line_norm or | |
| combined_line_norm in heading_norm or | |
| all_words_match # Include the new word-based matching | |
| ) | |
| # substring_match = ( | |
| # heading_norm in combined_line_norm or | |
| # combined_line_norm in heading_norm | |
| # ) | |
| if (substring_match and existsfull and not collecting and | |
| len(combined_line_norm) > 0 ):#and (headertoContinue1 or headertoContinue2) ): | |
| # Check header conditions more efficiently | |
| header_spans = [ | |
| span for span in spans | |
| if (is_header(span, most_common_font_size, most_common_color, most_common_font) | |
| # and span['size'] >= subsubheaderFontSize | |
| and span['size'] < mainHeaderFontSize) | |
| ] | |
| if header_spans: | |
| collecting = True | |
| matched_header_font_size = max(span["size"] for span in header_spans) | |
| collected_lines.append(line_text) | |
| valid_spans = [span for span in spans if span.get("bbox")] | |
| if valid_spans: | |
| x0s = [span["bbox"][0] for span in valid_spans] | |
| x1s = [span["bbox"][2] for span in valid_spans] | |
| y0s = [span["bbox"][1] for span in valid_spans] | |
| y1s = [span["bbox"][3] for span in valid_spans] | |
| header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] | |
| if page_num in current_bbox: | |
| cb = current_bbox[page_num] | |
| current_bbox[page_num] = [ | |
| min(cb[0], header_bbox[0]), | |
| min(cb[1], header_bbox[1]), | |
| max(cb[2], header_bbox[2]), | |
| max(cb[3], header_bbox[3]) | |
| ] | |
| else: | |
| current_bbox[page_num] = header_bbox | |
| last_y1s[page_num] = header_bbox[3] | |
| x0, y0, x1, y1 = header_bbox | |
| zoom = 200 | |
| left = int(x0) | |
| top = int(y0) | |
| zoom_str = f"{zoom},{left},{top}" | |
| pageNumberFound = page_num + 1 | |
| # Build the query parameters | |
| # params = { | |
| # 'pdfLink': pdf_path, # Your PDF link | |
| # 'keyword': heading_to_search, # Your keyword (could be a string or list) | |
| # } | |
| # # URL encode each parameter | |
| # encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} | |
| # # Construct the final encoded link | |
| # encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) | |
| # # Correctly construct the final URL with page and zoom | |
| # final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" | |
| # Get current date and time | |
| now = datetime.now() | |
| # Format the output | |
| formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") | |
| # Optionally, add the URL to a DataFrame | |
| data_entry = { | |
| "NBSLink": zoom_str, | |
| "Subject": heading_to_search, | |
| "Page": str(pageNumberFound), | |
| "Author": "ADR", | |
| "Creation Date": formatted_time, | |
| "Layer": "Initial", | |
| "Code": stringtowrite, | |
| "head above 1": paths[-2], | |
| "head above 2": paths[0], | |
| # "MC Connnection": 'Go to ' + paths[0].strip().split()[0] +'/'+ heading_to_search.strip().split()[0] + ' in '+ filename | |
| } | |
| data_list_JSON.append(data_entry) | |
| # Convert list to JSON | |
| # json_output = json.dumps(data_list_JSON, indent=4) | |
| i += 2 | |
| continue | |
| else: | |
| if (substring_match and not collecting and | |
| len(combined_line_norm) > 0): # and (headertoContinue1 or headertoContinue2) ): | |
| # Calculate word match percentage | |
| word_match_percent = words_match_ratio(heading_norm, combined_line_norm) * 100 | |
| # Check if at least 70% of header words exist in this line | |
| meets_word_threshold = word_match_percent >= 100 | |
| # Check header conditions (including word threshold) | |
| header_spans = [ | |
| span for span in spans | |
| if (is_header(span, most_common_font_size, most_common_color, most_common_font) | |
| # and span['size'] >= subsubheaderFontSize | |
| and span['size'] < mainHeaderFontSize) | |
| ] | |
| if header_spans and (meets_word_threshold or same_start_word(heading_to_search, combined_line_norm) ): | |
| collecting = True | |
| matched_header_font_size = max(span["size"] for span in header_spans) | |
| collected_lines.append(line_text) | |
| valid_spans = [span for span in spans if span.get("bbox")] | |
| if valid_spans: | |
| x0s = [span["bbox"][0] for span in valid_spans] | |
| x1s = [span["bbox"][2] for span in valid_spans] | |
| y0s = [span["bbox"][1] for span in valid_spans] | |
| y1s = [span["bbox"][3] for span in valid_spans] | |
| header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] | |
| if page_num in current_bbox: | |
| cb = current_bbox[page_num] | |
| current_bbox[page_num] = [ | |
| min(cb[0], header_bbox[0]), | |
| min(cb[1], header_bbox[1]), | |
| max(cb[2], header_bbox[2]), | |
| max(cb[3], header_bbox[3]) | |
| ] | |
| else: | |
| current_bbox[page_num] = header_bbox | |
| last_y1s[page_num] = header_bbox[3] | |
| x0, y0, x1, y1 = header_bbox | |
| zoom = 200 | |
| left = int(x0) | |
| top = int(y0) | |
| zoom_str = f"{zoom},{left},{top}" | |
| pageNumberFound = page_num + 1 | |
| # Build the query parameters | |
| # params = { | |
| # 'pdfLink': pdf_path, # Your PDF link | |
| # 'keyword': heading_to_search, # Your keyword (could be a string or list) | |
| # } | |
| # # URL encode each parameter | |
| # encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} | |
| # # Construct the final encoded link | |
| # encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) | |
| # # Correctly construct the final URL with page and zoom | |
| # final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" | |
| # Get current date and time | |
| now = datetime.now() | |
| # Format the output | |
| formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") | |
| # Optionally, add the URL to a DataFrame | |
| data_entry = { | |
| "NBSLink": zoom_str, | |
| "Subject": heading_to_search, | |
| "Page": str(pageNumberFound), | |
| "Author": "ADR", | |
| "Creation Date": formatted_time, | |
| "Layer": "Initial", | |
| "Code": stringtowrite, | |
| "head above 1": paths[-2], | |
| "head above 2": paths[0], | |
| # "MC Connnection": 'Go to ' + paths[0].strip().split()[0] +'/'+ heading_to_search.strip().split()[0] + ' in '+ filename | |
| } | |
| data_list_JSON.append(data_entry) | |
| # Convert list to JSON | |
| # json_output = json.dumps(data_list_JSON, indent=4) | |
| i += 2 | |
| continue | |
| if collecting: | |
| norm_line = normalize_text(line_text) | |
| # Optimized URL check | |
| if url_pattern.match(norm_line): | |
| line_is_header = False | |
| else: | |
| line_is_header = any(is_header(span, most_common_font_size, most_common_color, most_common_font) for span in spans) | |
| if line_is_header: | |
| header_font_size = max(span["size"] for span in spans) | |
| is_probably_real_header = ( | |
| header_font_size >= matched_header_font_size and | |
| is_header(spans[0], most_common_font_size, most_common_color, most_common_font) and | |
| len(line_text.strip()) > 2 | |
| ) | |
| if (norm_line != matched_header_line_norm and | |
| norm_line != heading_norm and | |
| is_probably_real_header): | |
| if line_text not in heading_norm: | |
| collecting = False | |
| done = True | |
| headertoContinue1 = False | |
| headertoContinue2=False | |
| for page_num, bbox in current_bbox.items(): | |
| bbox[3] = last_y1s.get(page_num, bbox[3]) | |
| page_highlights[page_num] = bbox | |
| highlight_boxes(docHighlights, page_highlights,stringtowrite) | |
| break_collecting = True | |
| break | |
| if break_collecting: | |
| break | |
| collected_lines.append(line_text) | |
| valid_spans = [span for span in spans if span.get("bbox")] | |
| if valid_spans: | |
| x0s = [span["bbox"][0] for span in valid_spans] | |
| x1s = [span["bbox"][2] for span in valid_spans] | |
| y0s = [span["bbox"][1] for span in valid_spans] | |
| y1s = [span["bbox"][3] for span in valid_spans] | |
| line_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] | |
| if page_num in current_bbox: | |
| cb = current_bbox[page_num] | |
| current_bbox[page_num] = [ | |
| min(cb[0], line_bbox[0]), | |
| min(cb[1], line_bbox[1]), | |
| max(cb[2], line_bbox[2]), | |
| max(cb[3], line_bbox[3]) | |
| ] | |
| else: | |
| current_bbox[page_num] = line_bbox | |
| last_y1s[page_num] = line_bbox[3] | |
| i += 1 | |
| if not done: | |
| for page_num, bbox in current_bbox.items(): | |
| bbox[3] = last_y1s.get(page_num, bbox[3]) | |
| page_highlights[page_num] = bbox | |
| if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : | |
| stringtowrite='Not to be billed' | |
| else: | |
| stringtowrite='To be billed' | |
| highlight_boxes(docHighlights, page_highlights,stringtowrite) | |
| docarray.append(docHighlights) | |
| jsons.append(data_list_JSON) | |
| print('lenght of json:',len(jsons)) | |
| dbxTeam = tsadropboxretrieval.ADR_Access_DropboxTeam('user') | |
| dbPath = '/TSA JOBS/ADR Test/FIND/' | |
| jsonCombined=[] | |
| for i in range(len(arrayofPDFS)): | |
| pdflink = tsadropboxretrieval.uploadanyFile(doc=docarray[i], path=dbPath, pdfname=filenames[i]) | |
| json_input = copy.deepcopy(jsons[i]) # make a deep copy | |
| json_output1 = changepdflinks(json_input, pdflink) | |
| jsonCombined.extend(json_output1) | |
| pdf_bytes = BytesIO() | |
| docHighlights.save(pdf_bytes) | |
| combined_json_str = json.dumps(jsonCombined, indent=1) | |
| print('lenght of json:',len(combined_json_str)) | |
| return pdf_bytes.getvalue(), docHighlights , combined_json_str | |
| ######################################################################################################################################################## | |
| ######################################################################################################################################################## | |
| def extract_section_under_header_tobebilledOnly(pdf_path): | |
| Alltexttobebilled='' | |
| alltextWithoutNotbilled='' | |
| # keywordstoSkip=["installation", "execution", "miscellaneous items", "workmanship", "testing", "labeling"] | |
| headertoContinue1 = False | |
| headertoContinue2=False | |
| parsed_url = urlparse(pdf_path) | |
| filename = os.path.basename(parsed_url.path) | |
| filename = unquote(filename) # decode URL-encoded characters | |
| # Optimized URL handling | |
| if pdf_path and ('http' in pdf_path or 'dropbox' in pdf_path): | |
| pdf_path = pdf_path.replace('dl=0', 'dl=1') | |
| # Cache frequently used values | |
| response = requests.get(pdf_path) | |
| pdf_content = BytesIO(response.content) | |
| if not pdf_content: | |
| raise ValueError("No valid PDF content found.") | |
| doc = fitz.open(stream=pdf_content, filetype="pdf") | |
| docHighlights = fitz.open(stream=pdf_content, filetype="pdf") | |
| parsed_url = urlparse(pdf_path) | |
| filename = os.path.basename(parsed_url.path) | |
| filename = unquote(filename) # decode URL-encoded characters | |
| most_common_font_size, most_common_color, most_common_font = get_regular_font_size_and_color(doc) | |
| # Precompute regex patterns | |
| dot_pattern = re.compile(r'\.{3,}') | |
| url_pattern = re.compile(r'https?://\S+|www\.\S+') | |
| toc_pages = get_toc_page_numbers(doc) | |
| headers, top_3_font_sizes, smallest_font_size, headersSpans = extract_headers( | |
| doc, toc_pages, most_common_font_size, most_common_color, most_common_font, top_margin, bottom_margin | |
| ) | |
| hierarchy = build_header_hierarchy(doc, toc_pages, most_common_font_size, most_common_color, most_common_font) | |
| listofHeaderstoMarkup = get_leaf_headers_with_paths(hierarchy) | |
| # Precompute all children headers once | |
| allchildrenheaders = [normalize_text(item['text']) for item, p in listofHeaderstoMarkup] | |
| allchildrenheaders_set = set(allchildrenheaders) # For faster lookups | |
| df = pd.DataFrame(columns=["NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2",'BodyText']) | |
| dictionaryNBS={} | |
| data_list_JSON = [] | |
| if len(top_3_font_sizes)==3: | |
| mainHeaderFontSize, subHeaderFontSize, subsubheaderFontSize = top_3_font_sizes | |
| elif len(top_3_font_sizes)==2: | |
| mainHeaderFontSize= top_3_font_sizes[0] | |
| subHeaderFontSize= top_3_font_sizes[1] | |
| subsubheaderFontSize= top_3_font_sizes[1] | |
| # Preload all pages to avoid repeated loading | |
| # pages = [doc.load_page(page_num) for page_num in range(len(doc)) if page_num not in toc_pages] | |
| for heading_to_searchDict, paths in listofHeaderstoMarkup: | |
| heading_to_search = heading_to_searchDict['text'] | |
| heading_to_searchPageNum = heading_to_searchDict['page'] | |
| # Initialize variables | |
| headertoContinue1 = False | |
| headertoContinue2 = False | |
| matched_header_line = None | |
| done = False | |
| collecting = False | |
| collected_lines = [] | |
| page_highlights = {} | |
| current_bbox = {} | |
| last_y1s = {} | |
| mainHeader = '' | |
| subHeader = '' | |
| matched_header_line_norm = heading_to_search | |
| break_collecting = False | |
| heading_norm = normalize_text(heading_to_search) | |
| paths_norm = [normalize_text(p) for p in paths[0]] if paths and paths[0] else [] | |
| for page_num in range(heading_to_searchPageNum,len(doc)): | |
| if page_num in toc_pages: | |
| continue | |
| if break_collecting: | |
| break | |
| page=doc[page_num] | |
| page_height = page.rect.height | |
| blocks = page.get_text("dict")["blocks"] | |
| for block in blocks: | |
| if break_collecting: | |
| break | |
| lines = block.get("lines", []) | |
| i = 0 | |
| while i < len(lines): | |
| if break_collecting: | |
| break | |
| spans = lines[i].get("spans", []) | |
| if not spans: | |
| i += 1 | |
| continue | |
| y0 = spans[0]["bbox"][1] | |
| y1 = spans[0]["bbox"][3] | |
| if y0 < top_margin or y1 > (page_height - bottom_margin): | |
| i += 1 | |
| continue | |
| line_text = get_spaced_text_from_spans(spans).lower() | |
| line_text_norm = normalize_text(line_text) | |
| # Combine with next line if available | |
| if i + 1 < len(lines): | |
| next_spans = lines[i + 1].get("spans", []) | |
| next_line_text = get_spaced_text_from_spans(next_spans).lower() | |
| combined_line_norm = normalize_text(line_text + " " + next_line_text) | |
| else: | |
| combined_line_norm = line_text_norm | |
| # Check if we should continue processing | |
| if combined_line_norm and combined_line_norm in paths[0]: | |
| headertoContinue1 = combined_line_norm | |
| if combined_line_norm and combined_line_norm in paths[-2]: | |
| headertoContinue2 = combined_line_norm | |
| if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : | |
| # if any(word in paths[-2].lower() for word in keywordstoSkip): | |
| stringtowrite='Not to be billed' | |
| else: | |
| stringtowrite='To be billed' | |
| if stringtowrite!='To be billed': | |
| alltextWithoutNotbilled+= combined_line_norm ################################################# | |
| # Optimized header matching | |
| existsfull = ( | |
| ( combined_line_norm in allchildrenheaders_set or | |
| combined_line_norm in allchildrenheaders ) and heading_to_search in combined_line_norm | |
| ) | |
| # New word-based matching | |
| current_line_words = set(combined_line_norm.split()) | |
| heading_words = set(heading_norm.split()) | |
| all_words_match = current_line_words.issubset(heading_words) and len(current_line_words) > 0 | |
| substring_match = ( | |
| heading_norm in combined_line_norm or | |
| combined_line_norm in heading_norm or | |
| all_words_match # Include the new word-based matching | |
| ) | |
| # substring_match = ( | |
| # heading_norm in combined_line_norm or | |
| # combined_line_norm in heading_norm | |
| # ) | |
| if (substring_match and existsfull and not collecting and | |
| len(combined_line_norm) > 0 ):#and (headertoContinue1 or headertoContinue2) ): | |
| # Check header conditions more efficiently | |
| header_spans = [ | |
| span for span in spans | |
| if (is_header(span, most_common_font_size, most_common_color, most_common_font) | |
| # and span['size'] >= subsubheaderFontSize | |
| and span['size'] < mainHeaderFontSize) | |
| ] | |
| if header_spans and stringtowrite.startswith('To'): | |
| collecting = True | |
| matched_header_font_size = max(span["size"] for span in header_spans) | |
| Alltexttobebilled+= ' '+ combined_line_norm | |
| # collected_lines.append(line_text) | |
| valid_spans = [span for span in spans if span.get("bbox")] | |
| if valid_spans: | |
| x0s = [span["bbox"][0] for span in valid_spans] | |
| x1s = [span["bbox"][2] for span in valid_spans] | |
| y0s = [span["bbox"][1] for span in valid_spans] | |
| y1s = [span["bbox"][3] for span in valid_spans] | |
| header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] | |
| if page_num in current_bbox: | |
| cb = current_bbox[page_num] | |
| current_bbox[page_num] = [ | |
| min(cb[0], header_bbox[0]), | |
| min(cb[1], header_bbox[1]), | |
| max(cb[2], header_bbox[2]), | |
| max(cb[3], header_bbox[3]) | |
| ] | |
| else: | |
| current_bbox[page_num] = header_bbox | |
| last_y1s[page_num] = header_bbox[3] | |
| x0, y0, x1, y1 = header_bbox | |
| zoom = 200 | |
| left = int(x0) | |
| top = int(y0) | |
| zoom_str = f"{zoom},{left},{top}" | |
| pageNumberFound = page_num + 1 | |
| # Build the query parameters | |
| params = { | |
| 'pdfLink': pdf_path, # Your PDF link | |
| 'keyword': heading_to_search, # Your keyword (could be a string or list) | |
| } | |
| # URL encode each parameter | |
| encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} | |
| # Construct the final encoded link | |
| encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) | |
| # Correctly construct the final URL with page and zoom | |
| final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" | |
| # Get current date and time | |
| now = datetime.now() | |
| # Format the output | |
| formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") | |
| # Optionally, add the URL to a DataFrame | |
| data_entry = { | |
| "NBSLink": zoom_str, | |
| "Subject": heading_to_search, | |
| "Page": str(pageNumberFound), | |
| "Author": "ADR", | |
| "Creation Date": formatted_time, | |
| "Layer": "Initial", | |
| "Code": stringtowrite, | |
| "head above 1": paths[-2], | |
| "head above 2": paths[0], | |
| "BodyText": collected_lines, | |
| "MC Connnection": 'Go to ' + paths[0].strip().split()[0] +'/'+ heading_to_search.strip().split()[0] + ' in '+ filename | |
| } | |
| data_list_JSON.append(data_entry) | |
| # Convert list to JSON | |
| json_output = json.dumps(data_list_JSON, indent=4) | |
| i += 2 | |
| continue | |
| else: | |
| if (substring_match and not collecting and | |
| len(combined_line_norm) > 0): # and (headertoContinue1 or headertoContinue2) ): | |
| # Calculate word match percentage | |
| word_match_percent = words_match_ratio(heading_norm, combined_line_norm) * 100 | |
| # Check if at least 70% of header words exist in this line | |
| meets_word_threshold = word_match_percent >= 100 | |
| # Check header conditions (including word threshold) | |
| header_spans = [ | |
| span for span in spans | |
| if (is_header(span, most_common_font_size, most_common_color, most_common_font) | |
| # and span['size'] >= subsubheaderFontSize | |
| and span['size'] < mainHeaderFontSize) | |
| ] | |
| if header_spans and (meets_word_threshold or same_start_word(heading_to_search, combined_line_norm) ) and stringtowrite.startswith('To'): | |
| collecting = True | |
| matched_header_font_size = max(span["size"] for span in header_spans) | |
| Alltexttobebilled+= ' '+ combined_line_norm | |
| collected_lines.append(line_text) | |
| valid_spans = [span for span in spans if span.get("bbox")] | |
| if valid_spans: | |
| x0s = [span["bbox"][0] for span in valid_spans] | |
| x1s = [span["bbox"][2] for span in valid_spans] | |
| y0s = [span["bbox"][1] for span in valid_spans] | |
| y1s = [span["bbox"][3] for span in valid_spans] | |
| header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] | |
| if page_num in current_bbox: | |
| cb = current_bbox[page_num] | |
| current_bbox[page_num] = [ | |
| min(cb[0], header_bbox[0]), | |
| min(cb[1], header_bbox[1]), | |
| max(cb[2], header_bbox[2]), | |
| max(cb[3], header_bbox[3]) | |
| ] | |
| else: | |
| current_bbox[page_num] = header_bbox | |
| last_y1s[page_num] = header_bbox[3] | |
| x0, y0, x1, y1 = header_bbox | |
| zoom = 200 | |
| left = int(x0) | |
| top = int(y0) | |
| zoom_str = f"{zoom},{left},{top}" | |
| pageNumberFound = page_num + 1 | |
| # Build the query parameters | |
| params = { | |
| 'pdfLink': pdf_path, # Your PDF link | |
| 'keyword': heading_to_search, # Your keyword (could be a string or list) | |
| } | |
| # URL encode each parameter | |
| encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} | |
| # Construct the final encoded link | |
| encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) | |
| # Correctly construct the final URL with page and zoom | |
| final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" | |
| # Get current date and time | |
| now = datetime.now() | |
| # Format the output | |
| formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") | |
| # Optionally, add the URL to a DataFrame | |
| data_entry = { | |
| "NBSLink": zoom_str, | |
| "Subject": heading_to_search, | |
| "Page": str(pageNumberFound), | |
| "Author": "ADR", | |
| "Creation Date": formatted_time, | |
| "Layer": "Initial", | |
| "Code": stringtowrite, | |
| "head above 1": paths[-2], | |
| "head above 2": paths[0], | |
| "BodyText": collected_lines, | |
| "MC Connnection": 'Go to ' + paths[0].strip().split()[0] +'/'+ heading_to_search.strip().split()[0] + ' in '+ filename | |
| } | |
| data_list_JSON.append(data_entry) | |
| # Convert list to JSON | |
| json_output = json.dumps(data_list_JSON, indent=4) | |
| i += 2 | |
| continue | |
| if collecting: | |
| norm_line = normalize_text(line_text) | |
| # Optimized URL check | |
| if url_pattern.match(norm_line): | |
| line_is_header = False | |
| else: | |
| line_is_header = any(is_header(span, most_common_font_size, most_common_color, most_common_font) for span in spans) | |
| if line_is_header: | |
| header_font_size = max(span["size"] for span in spans) | |
| is_probably_real_header = ( | |
| header_font_size >= matched_header_font_size and | |
| is_header(spans[0], most_common_font_size, most_common_color, most_common_font) and | |
| len(line_text.strip()) > 2 | |
| ) | |
| if (norm_line != matched_header_line_norm and | |
| norm_line != heading_norm and | |
| is_probably_real_header): | |
| if line_text not in heading_norm: | |
| collecting = False | |
| done = True | |
| headertoContinue1 = False | |
| headertoContinue2=False | |
| for page_num, bbox in current_bbox.items(): | |
| bbox[3] = last_y1s.get(page_num, bbox[3]) | |
| page_highlights[page_num] = bbox | |
| highlight_boxes(docHighlights, page_highlights,stringtowrite) | |
| break_collecting = True | |
| break | |
| if break_collecting: | |
| break | |
| collected_lines.append(line_text) | |
| valid_spans = [span for span in spans if span.get("bbox")] | |
| if valid_spans: | |
| x0s = [span["bbox"][0] for span in valid_spans] | |
| x1s = [span["bbox"][2] for span in valid_spans] | |
| y0s = [span["bbox"][1] for span in valid_spans] | |
| y1s = [span["bbox"][3] for span in valid_spans] | |
| line_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] | |
| if page_num in current_bbox: | |
| cb = current_bbox[page_num] | |
| current_bbox[page_num] = [ | |
| min(cb[0], line_bbox[0]), | |
| min(cb[1], line_bbox[1]), | |
| max(cb[2], line_bbox[2]), | |
| max(cb[3], line_bbox[3]) | |
| ] | |
| else: | |
| current_bbox[page_num] = line_bbox | |
| last_y1s[page_num] = line_bbox[3] | |
| i += 1 | |
| if not done: | |
| for page_num, bbox in current_bbox.items(): | |
| bbox[3] = last_y1s.get(page_num, bbox[3]) | |
| page_highlights[page_num] = bbox | |
| if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : | |
| stringtowrite='Not to be billed' | |
| else: | |
| stringtowrite='To be billed' | |
| highlight_boxes(docHighlights, page_highlights,stringtowrite) | |
| # docHighlights.save("highlighted_output.pdf", garbage=4, deflate=True) | |
| dbxTeam = tsadropboxretrieval.ADR_Access_DropboxTeam('user') | |
| metadata = dbxTeam.sharing_get_shared_link_metadata(pdf_path) | |
| dbPath = '/TSA JOBS/ADR Test/FIND/' | |
| pdf_bytes = BytesIO() | |
| docHighlights.save(pdf_bytes) | |
| pdflink = tsadropboxretrieval.uploadanyFile(doc=docHighlights, path=dbPath, pdfname=filename) | |
| json_output=changepdflinks(json_output,pdflink) | |
| return pdf_bytes.getvalue(), docHighlights , json_output , Alltexttobebilled , alltextWithoutNotbilled , filename | |
| def extract_section_under_header_tobebilled2(pdf_path): | |
| # keywordstoSkip=["installation", "execution", "miscellaneous items", "workmanship", "testing", "labeling"] | |
| keywords = {'installation', 'execution', 'miscellaneous items', 'workmanship', 'testing', 'labeling'} | |
| headertoContinue1 = False | |
| headertoContinue2=False | |
| Alltexttobebilled='' | |
| parsed_url = urlparse(pdf_path) | |
| filename = os.path.basename(parsed_url.path) | |
| filename = unquote(filename) # decode URL-encoded characters | |
| # Optimized URL handling | |
| if pdf_path and ('http' in pdf_path or 'dropbox' in pdf_path): | |
| pdf_path = pdf_path.replace('dl=0', 'dl=1') | |
| # Cache frequently used values | |
| response = requests.get(pdf_path) | |
| pdf_content = BytesIO(response.content) | |
| if not pdf_content: | |
| raise ValueError("No valid PDF content found.") | |
| doc = fitz.open(stream=pdf_content, filetype="pdf") | |
| docHighlights = fitz.open(stream=pdf_content, filetype="pdf") | |
| most_common_font_size, most_common_color, most_common_font = get_regular_font_size_and_color(doc) | |
| # Precompute regex patterns | |
| dot_pattern = re.compile(r'\.{3,}') | |
| url_pattern = re.compile(r'https?://\S+|www\.\S+') | |
| toc_pages = get_toc_page_numbers(doc) | |
| headers, top_3_font_sizes, smallest_font_size, headersSpans = extract_headers( | |
| doc, toc_pages, most_common_font_size, most_common_color, most_common_font, top_margin, bottom_margin | |
| ) | |
| hierarchy = build_header_hierarchy(doc, toc_pages, most_common_font_size, most_common_color, most_common_font) | |
| listofHeaderstoMarkup = get_leaf_headers_with_paths(hierarchy) | |
| # Precompute all children headers once | |
| allchildrenheaders = [normalize_text(item['text']) for item, p in listofHeaderstoMarkup] | |
| allchildrenheaders_set = set(allchildrenheaders) # For faster lookups | |
| df = pd.DataFrame(columns=["NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2","BodyText"]) | |
| dictionaryNBS={} | |
| data_list_JSON = [] | |
| currentgroupname='' | |
| if len(top_3_font_sizes)==3: | |
| mainHeaderFontSize, subHeaderFontSize, subsubheaderFontSize = top_3_font_sizes | |
| elif len(top_3_font_sizes)==2: | |
| mainHeaderFontSize= top_3_font_sizes[0] | |
| subHeaderFontSize= top_3_font_sizes[1] | |
| subsubheaderFontSize= top_3_font_sizes[1] | |
| # Preload all pages to avoid repeated loading | |
| # pages = [doc.load_page(page_num) for page_num in range(len(doc)) if page_num not in toc_pages] | |
| for heading_to_searchDict, paths in listofHeaderstoMarkup: | |
| heading_to_search = heading_to_searchDict['text'] | |
| heading_to_searchPageNum = heading_to_searchDict['page'] | |
| # Initialize variables | |
| headertoContinue1 = False | |
| headertoContinue2 = False | |
| matched_header_line = None | |
| done = False | |
| collecting = False | |
| collected_lines = [] | |
| page_highlights = {} | |
| current_bbox = {} | |
| last_y1s = {} | |
| mainHeader = '' | |
| subHeader = '' | |
| matched_header_line_norm = heading_to_search | |
| break_collecting = False | |
| heading_norm = normalize_text(heading_to_search) | |
| paths_norm = [normalize_text(p) for p in paths[0]] if paths and paths[0] else [] | |
| for page_num in range(heading_to_searchPageNum,len(doc)): | |
| print(heading_to_search) | |
| if paths[0].strip().lower() != currentgroupname.strip().lower(): | |
| Alltexttobebilled+= paths[0] +'\n' | |
| currentgroupname=paths[0] | |
| print(paths[0]) | |
| if page_num in toc_pages: | |
| continue | |
| if break_collecting: | |
| break | |
| page=doc[page_num] | |
| page_height = page.rect.height | |
| blocks = page.get_text("dict")["blocks"] | |
| for block in blocks: | |
| if break_collecting: | |
| break | |
| lines = block.get("lines", []) | |
| i = 0 | |
| while i < len(lines): | |
| if break_collecting: | |
| break | |
| spans = lines[i].get("spans", []) | |
| if not spans: | |
| i += 1 | |
| continue | |
| y0 = spans[0]["bbox"][1] | |
| y1 = spans[0]["bbox"][3] | |
| if y0 < top_margin or y1 > (page_height - bottom_margin): | |
| i += 1 | |
| continue | |
| line_text = get_spaced_text_from_spans(spans).lower() | |
| line_text_norm = normalize_text(line_text) | |
| # Combine with next line if available | |
| if i + 1 < len(lines): | |
| next_spans = lines[i + 1].get("spans", []) | |
| next_line_text = get_spaced_text_from_spans(next_spans).lower() | |
| combined_line_norm = normalize_text(line_text + " " + next_line_text) | |
| else: | |
| combined_line_norm = line_text_norm | |
| # Check if we should continue processing | |
| if combined_line_norm and combined_line_norm in paths[0]: | |
| headertoContinue1 = combined_line_norm | |
| if combined_line_norm and combined_line_norm in paths[-2]: | |
| headertoContinue2 = combined_line_norm | |
| # if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : | |
| last_path = paths[-2].lower() | |
| # if any(word in paths[-2].lower() for word in keywordstoSkip): | |
| # if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() or 'workmanship' in paths[-2].lower() or 'testing' in paths[-2].lower() or 'labeling' in paths[-2].lower(): | |
| if any(keyword in last_path for keyword in keywords): | |
| stringtowrite='Not to be billed' | |
| else: | |
| stringtowrite='To be billed' | |
| if stringtowrite=='To be billed': | |
| # Alltexttobebilled+= combined_line_norm ################################################# | |
| if matched_header_line_norm in combined_line_norm: | |
| Alltexttobebilled+='\n' | |
| Alltexttobebilled+= ' '+combined_line_norm | |
| # Optimized header matching | |
| existsfull = ( | |
| ( combined_line_norm in allchildrenheaders_set or | |
| combined_line_norm in allchildrenheaders ) and heading_to_search in combined_line_norm | |
| ) | |
| # New word-based matching | |
| current_line_words = set(combined_line_norm.split()) | |
| heading_words = set(heading_norm.split()) | |
| all_words_match = current_line_words.issubset(heading_words) and len(current_line_words) > 0 | |
| substring_match = ( | |
| heading_norm in combined_line_norm or | |
| combined_line_norm in heading_norm or | |
| all_words_match # Include the new word-based matching | |
| ) | |
| # substring_match = ( | |
| # heading_norm in combined_line_norm or | |
| # combined_line_norm in heading_norm | |
| # ) | |
| if (substring_match and existsfull and not collecting and | |
| len(combined_line_norm) > 0 ):#and (headertoContinue1 or headertoContinue2) ): | |
| # Check header conditions more efficiently | |
| header_spans = [ | |
| span for span in spans | |
| if (is_header(span, most_common_font_size, most_common_color, most_common_font) | |
| # and span['size'] >= subsubheaderFontSize | |
| and span['size'] < mainHeaderFontSize) | |
| ] | |
| if header_spans and stringtowrite.startswith('To'): | |
| collecting = True | |
| # if stringtowrite=='To be billed': | |
| # Alltexttobebilled+='\n' | |
| matched_header_font_size = max(span["size"] for span in header_spans) | |
| # collected_lines.append(line_text) | |
| valid_spans = [span for span in spans if span.get("bbox")] | |
| if valid_spans: | |
| x0s = [span["bbox"][0] for span in valid_spans] | |
| x1s = [span["bbox"][2] for span in valid_spans] | |
| y0s = [span["bbox"][1] for span in valid_spans] | |
| y1s = [span["bbox"][3] for span in valid_spans] | |
| header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] | |
| if page_num in current_bbox: | |
| cb = current_bbox[page_num] | |
| current_bbox[page_num] = [ | |
| min(cb[0], header_bbox[0]), | |
| min(cb[1], header_bbox[1]), | |
| max(cb[2], header_bbox[2]), | |
| max(cb[3], header_bbox[3]) | |
| ] | |
| else: | |
| current_bbox[page_num] = header_bbox | |
| last_y1s[page_num] = header_bbox[3] | |
| x0, y0, x1, y1 = header_bbox | |
| zoom = 200 | |
| left = int(x0) | |
| top = int(y0) | |
| zoom_str = f"{zoom},{left},{top}" | |
| pageNumberFound = page_num + 1 | |
| # Build the query parameters | |
| params = { | |
| 'pdfLink': pdf_path, # Your PDF link | |
| 'keyword': heading_to_search, # Your keyword (could be a string or list) | |
| } | |
| # URL encode each parameter | |
| encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} | |
| # Construct the final encoded link | |
| encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) | |
| # Correctly construct the final URL with page and zoom | |
| final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" | |
| # Get current date and time | |
| now = datetime.now() | |
| # Format the output | |
| formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") | |
| # Optionally, add the URL to a DataFrame | |
| data_entry = { | |
| "NBSLink": zoom_str, | |
| "Subject": heading_to_search, | |
| "Page": str(pageNumberFound), | |
| "Author": "ADR", | |
| "Creation Date": formatted_time, | |
| "Layer": "Initial", | |
| "Code": stringtowrite, | |
| "head above 1": paths[-2], | |
| "head above 2": paths[0], | |
| "BodyText":collected_lines, | |
| "MC Connnection": 'Go to ' + paths[0].strip().split()[0] +'/'+ heading_to_search.strip().split()[0] + ' in '+ filename | |
| } | |
| data_list_JSON.append(data_entry) | |
| # Convert list to JSON | |
| json_output = json.dumps(data_list_JSON, indent=4) | |
| i += 2 | |
| continue | |
| else: | |
| if (substring_match and not collecting and | |
| len(combined_line_norm) > 0): # and (headertoContinue1 or headertoContinue2) ): | |
| # Calculate word match percentage | |
| word_match_percent = words_match_ratio(heading_norm, combined_line_norm) * 100 | |
| # Check if at least 70% of header words exist in this line | |
| meets_word_threshold = word_match_percent >= 100 | |
| # Check header conditions (including word threshold) | |
| header_spans = [ | |
| span for span in spans | |
| if (is_header(span, most_common_font_size, most_common_color, most_common_font) | |
| # and span['size'] >= subsubheaderFontSize | |
| and span['size'] < mainHeaderFontSize) | |
| ] | |
| if header_spans and (meets_word_threshold or same_start_word(heading_to_search, combined_line_norm) ) and stringtowrite.startswith('To'): | |
| collecting = True | |
| if stringtowrite=='To be billed': | |
| Alltexttobebilled+='\n' | |
| # if stringtowrite=='To be billed': | |
| # Alltexttobebilled+= ' '+ combined_line_norm | |
| matched_header_font_size = max(span["size"] for span in header_spans) | |
| collected_lines.append(line_text) | |
| valid_spans = [span for span in spans if span.get("bbox")] | |
| if valid_spans: | |
| x0s = [span["bbox"][0] for span in valid_spans] | |
| x1s = [span["bbox"][2] for span in valid_spans] | |
| y0s = [span["bbox"][1] for span in valid_spans] | |
| y1s = [span["bbox"][3] for span in valid_spans] | |
| header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] | |
| if page_num in current_bbox: | |
| cb = current_bbox[page_num] | |
| current_bbox[page_num] = [ | |
| min(cb[0], header_bbox[0]), | |
| min(cb[1], header_bbox[1]), | |
| max(cb[2], header_bbox[2]), | |
| max(cb[3], header_bbox[3]) | |
| ] | |
| else: | |
| current_bbox[page_num] = header_bbox | |
| last_y1s[page_num] = header_bbox[3] | |
| x0, y0, x1, y1 = header_bbox | |
| zoom = 200 | |
| left = int(x0) | |
| top = int(y0) | |
| zoom_str = f"{zoom},{left},{top}" | |
| pageNumberFound = page_num + 1 | |
| # Build the query parameters | |
| params = { | |
| 'pdfLink': pdf_path, # Your PDF link | |
| 'keyword': heading_to_search, # Your keyword (could be a string or list) | |
| } | |
| # URL encode each parameter | |
| encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} | |
| # Construct the final encoded link | |
| encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) | |
| # Correctly construct the final URL with page and zoom | |
| final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" | |
| # Get current date and time | |
| now = datetime.now() | |
| # Format the output | |
| formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") | |
| # Optionally, add the URL to a DataFrame | |
| data_entry = { | |
| "NBSLink": zoom_str, | |
| "Subject": heading_to_search, | |
| "Page": str(pageNumberFound), | |
| "Author": "ADR", | |
| "Creation Date": formatted_time, | |
| "Layer": "Initial", | |
| "Code": stringtowrite, | |
| "head above 1": paths[-2], | |
| "head above 2": paths[0], | |
| "BodyText":collected_lines, | |
| "MC Connnection": 'Go to ' + paths[0].strip().split()[0] +'/'+ heading_to_search.strip().split()[0] + ' in '+ filename | |
| } | |
| data_list_JSON.append(data_entry) | |
| # Convert list to JSON | |
| json_output = json.dumps(data_list_JSON, indent=4) | |
| i += 2 | |
| continue | |
| if collecting: | |
| norm_line = normalize_text(line_text) | |
| # Optimized URL check | |
| if url_pattern.match(norm_line): | |
| line_is_header = False | |
| else: | |
| line_is_header = any(is_header(span, most_common_font_size, most_common_color, most_common_font) for span in spans) | |
| if line_is_header: | |
| header_font_size = max(span["size"] for span in spans) | |
| is_probably_real_header = ( | |
| header_font_size >= matched_header_font_size and | |
| is_header(spans[0], most_common_font_size, most_common_color, most_common_font) and | |
| len(line_text.strip()) > 2 | |
| ) | |
| if (norm_line != matched_header_line_norm and | |
| norm_line != heading_norm and | |
| is_probably_real_header): | |
| if line_text not in heading_norm: | |
| collecting = False | |
| done = True | |
| headertoContinue1 = False | |
| headertoContinue2=False | |
| for page_num, bbox in current_bbox.items(): | |
| bbox[3] = last_y1s.get(page_num, bbox[3]) | |
| page_highlights[page_num] = bbox | |
| highlight_boxes(docHighlights, page_highlights,stringtowrite) | |
| break_collecting = True | |
| break | |
| if break_collecting: | |
| break | |
| collected_lines.append(line_text) | |
| valid_spans = [span for span in spans if span.get("bbox")] | |
| if valid_spans: | |
| x0s = [span["bbox"][0] for span in valid_spans] | |
| x1s = [span["bbox"][2] for span in valid_spans] | |
| y0s = [span["bbox"][1] for span in valid_spans] | |
| y1s = [span["bbox"][3] for span in valid_spans] | |
| line_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] | |
| if page_num in current_bbox: | |
| cb = current_bbox[page_num] | |
| current_bbox[page_num] = [ | |
| min(cb[0], line_bbox[0]), | |
| min(cb[1], line_bbox[1]), | |
| max(cb[2], line_bbox[2]), | |
| max(cb[3], line_bbox[3]) | |
| ] | |
| else: | |
| current_bbox[page_num] = line_bbox | |
| last_y1s[page_num] = line_bbox[3] | |
| i += 1 | |
| if not done: | |
| for page_num, bbox in current_bbox.items(): | |
| bbox[3] = last_y1s.get(page_num, bbox[3]) | |
| page_highlights[page_num] = bbox | |
| if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : | |
| stringtowrite='Not to be billed' | |
| else: | |
| stringtowrite='To be billed' | |
| highlight_boxes(docHighlights, page_highlights,stringtowrite) | |
| # docHighlights.save("highlighted_output.pdf", garbage=4, deflate=True) | |
| dbxTeam = tsadropboxretrieval.ADR_Access_DropboxTeam('user') | |
| metadata = dbxTeam.sharing_get_shared_link_metadata(pdf_path) | |
| dbPath = '/TSA JOBS/ADR Test/FIND/' | |
| pdf_bytes = BytesIO() | |
| docHighlights.save(pdf_bytes) | |
| pdflink = tsadropboxretrieval.uploadanyFile(doc=docHighlights, path=dbPath, pdfname=filename) | |
| json_output=changepdflinks(json_output,pdflink) | |
| return pdf_bytes.getvalue(), docHighlights , json_output, Alltexttobebilled , filename | |
| def extract_section_under_header_tobebilledMultiplePDFS(multiplePDF_Paths): | |
| # keywordstoSkip=["installation", "execution", "miscellaneous items", "workmanship", "testing", "labeling"] | |
| filenames=[] | |
| keywords = {'installation', 'execution', 'miscellaneous items', 'workmanship', 'testing', 'labeling'} | |
| arrayofPDFS=multiplePDF_Paths.split(',') | |
| print(multiplePDF_Paths) | |
| print(arrayofPDFS) | |
| docarray=[] | |
| jsons=[] | |
| df = pd.DataFrame(columns=["PDF Name","NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2","BodyText"]) | |
| for pdf_path in arrayofPDFS: | |
| headertoContinue1 = False | |
| headertoContinue2=False | |
| Alltexttobebilled='' | |
| parsed_url = urlparse(pdf_path) | |
| filename = os.path.basename(parsed_url.path) | |
| filename = unquote(filename) # decode URL-encoded characters | |
| filenames.append(filename) | |
| # Optimized URL handling | |
| if pdf_path and ('http' in pdf_path or 'dropbox' in pdf_path): | |
| pdf_path = pdf_path.replace('dl=0', 'dl=1') | |
| # Cache frequently used values | |
| response = requests.get(pdf_path) | |
| pdf_content = BytesIO(response.content) | |
| if not pdf_content: | |
| raise ValueError("No valid PDF content found.") | |
| doc = fitz.open(stream=pdf_content, filetype="pdf") | |
| docHighlights = fitz.open(stream=pdf_content, filetype="pdf") | |
| most_common_font_size, most_common_color, most_common_font = get_regular_font_size_and_color(doc) | |
| # Precompute regex patterns | |
| dot_pattern = re.compile(r'\.{3,}') | |
| url_pattern = re.compile(r'https?://\S+|www\.\S+') | |
| toc_pages = get_toc_page_numbers(doc) | |
| headers, top_3_font_sizes, smallest_font_size, headersSpans = extract_headers( | |
| doc, toc_pages, most_common_font_size, most_common_color, most_common_font, top_margin, bottom_margin | |
| ) | |
| hierarchy = build_header_hierarchy(doc, toc_pages, most_common_font_size, most_common_color, most_common_font) | |
| listofHeaderstoMarkup = get_leaf_headers_with_paths(hierarchy) | |
| # Precompute all children headers once | |
| allchildrenheaders = [normalize_text(item['text']) for item, p in listofHeaderstoMarkup] | |
| allchildrenheaders_set = set(allchildrenheaders) # For faster lookups | |
| # df = pd.DataFrame(columns=["NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2","BodyText"]) | |
| dictionaryNBS={} | |
| data_list_JSON = [] | |
| json_output=[] | |
| currentgroupname='' | |
| if len(top_3_font_sizes)==3: | |
| mainHeaderFontSize, subHeaderFontSize, subsubheaderFontSize = top_3_font_sizes | |
| elif len(top_3_font_sizes)==2: | |
| mainHeaderFontSize= top_3_font_sizes[0] | |
| subHeaderFontSize= top_3_font_sizes[1] | |
| subsubheaderFontSize= top_3_font_sizes[1] | |
| # Preload all pages to avoid repeated loading | |
| # pages = [doc.load_page(page_num) for page_num in range(len(doc)) if page_num not in toc_pages] | |
| for heading_to_searchDict, paths in listofHeaderstoMarkup: | |
| heading_to_search = heading_to_searchDict['text'] | |
| heading_to_searchPageNum = heading_to_searchDict['page'] | |
| # Initialize variables | |
| headertoContinue1 = False | |
| headertoContinue2 = False | |
| matched_header_line = None | |
| done = False | |
| collecting = False | |
| collected_lines = [] | |
| page_highlights = {} | |
| current_bbox = {} | |
| last_y1s = {} | |
| mainHeader = '' | |
| subHeader = '' | |
| matched_header_line_norm = heading_to_search | |
| break_collecting = False | |
| heading_norm = normalize_text(heading_to_search) | |
| paths_norm = [normalize_text(p) for p in paths[0]] if paths and paths[0] else [] | |
| for page_num in range(heading_to_searchPageNum,len(doc)): | |
| # print(heading_to_search) | |
| if paths[0].strip().lower() != currentgroupname.strip().lower(): | |
| Alltexttobebilled+= paths[0] +'\n' | |
| currentgroupname=paths[0] | |
| # print(paths[0]) | |
| if page_num in toc_pages: | |
| continue | |
| if break_collecting: | |
| break | |
| page=doc[page_num] | |
| page_height = page.rect.height | |
| blocks = page.get_text("dict")["blocks"] | |
| for block in blocks: | |
| if break_collecting: | |
| break | |
| lines = block.get("lines", []) | |
| i = 0 | |
| while i < len(lines): | |
| if break_collecting: | |
| break | |
| spans = lines[i].get("spans", []) | |
| if not spans: | |
| i += 1 | |
| continue | |
| y0 = spans[0]["bbox"][1] | |
| y1 = spans[0]["bbox"][3] | |
| if y0 < top_margin or y1 > (page_height - bottom_margin): | |
| i += 1 | |
| continue | |
| line_text = get_spaced_text_from_spans(spans).lower() | |
| line_text_norm = normalize_text(line_text) | |
| # Combine with next line if available | |
| if i + 1 < len(lines): | |
| next_spans = lines[i + 1].get("spans", []) | |
| next_line_text = get_spaced_text_from_spans(next_spans).lower() | |
| combined_line_norm = normalize_text(line_text + " " + next_line_text) | |
| else: | |
| combined_line_norm = line_text_norm | |
| # Check if we should continue processing | |
| if combined_line_norm and combined_line_norm in paths[0]: | |
| headertoContinue1 = combined_line_norm | |
| if combined_line_norm and combined_line_norm in paths[-2]: | |
| headertoContinue2 = combined_line_norm | |
| # if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : | |
| last_path = paths[-2].lower() | |
| # if any(word in paths[-2].lower() for word in keywordstoSkip): | |
| # if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() or 'workmanship' in paths[-2].lower() or 'testing' in paths[-2].lower() or 'labeling' in paths[-2].lower(): | |
| if any(keyword in last_path for keyword in keywords): | |
| stringtowrite='Not to be billed' | |
| else: | |
| stringtowrite='To be billed' | |
| if stringtowrite=='To be billed': | |
| # Alltexttobebilled+= combined_line_norm ################################################# | |
| if matched_header_line_norm in combined_line_norm: | |
| Alltexttobebilled+='\n' | |
| Alltexttobebilled+= ' '+combined_line_norm | |
| # Optimized header matching | |
| existsfull = ( | |
| ( combined_line_norm in allchildrenheaders_set or | |
| combined_line_norm in allchildrenheaders ) and heading_to_search in combined_line_norm | |
| ) | |
| # New word-based matching | |
| current_line_words = set(combined_line_norm.split()) | |
| heading_words = set(heading_norm.split()) | |
| all_words_match = current_line_words.issubset(heading_words) and len(current_line_words) > 0 | |
| substring_match = ( | |
| heading_norm in combined_line_norm or | |
| combined_line_norm in heading_norm or | |
| all_words_match # Include the new word-based matching | |
| ) | |
| # substring_match = ( | |
| # heading_norm in combined_line_norm or | |
| # combined_line_norm in heading_norm | |
| # ) | |
| if (substring_match and existsfull and not collecting and | |
| len(combined_line_norm) > 0 ):#and (headertoContinue1 or headertoContinue2) ): | |
| # Check header conditions more efficiently | |
| header_spans = [ | |
| span for span in spans | |
| if (is_header(span, most_common_font_size, most_common_color, most_common_font) | |
| # and span['size'] >= subsubheaderFontSize | |
| and span['size'] < mainHeaderFontSize) | |
| ] | |
| if header_spans and stringtowrite.startswith('To') : | |
| collecting = True | |
| # if stringtowrite=='To be billed': | |
| # Alltexttobebilled+='\n' | |
| matched_header_font_size = max(span["size"] for span in header_spans) | |
| # collected_lines.append(line_text) | |
| valid_spans = [span for span in spans if span.get("bbox")] | |
| if valid_spans: | |
| x0s = [span["bbox"][0] for span in valid_spans] | |
| x1s = [span["bbox"][2] for span in valid_spans] | |
| y0s = [span["bbox"][1] for span in valid_spans] | |
| y1s = [span["bbox"][3] for span in valid_spans] | |
| header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] | |
| if page_num in current_bbox: | |
| cb = current_bbox[page_num] | |
| current_bbox[page_num] = [ | |
| min(cb[0], header_bbox[0]), | |
| min(cb[1], header_bbox[1]), | |
| max(cb[2], header_bbox[2]), | |
| max(cb[3], header_bbox[3]) | |
| ] | |
| else: | |
| current_bbox[page_num] = header_bbox | |
| last_y1s[page_num] = header_bbox[3] | |
| x0, y0, x1, y1 = header_bbox | |
| zoom = 200 | |
| left = int(x0) | |
| top = int(y0) | |
| zoom_str = f"{zoom},{left},{top}" | |
| pageNumberFound = page_num + 1 | |
| # Build the query parameters | |
| params = { | |
| 'pdfLink': pdf_path, # Your PDF link | |
| 'keyword': heading_to_search, # Your keyword (could be a string or list) | |
| } | |
| # URL encode each parameter | |
| encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} | |
| # Construct the final encoded link | |
| encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) | |
| # Correctly construct the final URL with page and zoom | |
| final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" | |
| # Get current date and time | |
| now = datetime.now() | |
| # Format the output | |
| formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") | |
| # Optionally, add the URL to a DataFrame | |
| data_entry = { | |
| "PDF Name":filename, | |
| "NBSLink": zoom_str, | |
| "Subject": heading_to_search, | |
| "Page": str(pageNumberFound), | |
| "Author": "ADR", | |
| "Creation Date": formatted_time, | |
| "Layer": "Initial", | |
| "Code": stringtowrite, | |
| "head above 1": paths[-2], | |
| "head above 2": paths[0], | |
| "BodyText":collected_lines, | |
| "MC Connnection": 'Go to ' + paths[0].strip().split()[0] +'/'+ heading_to_search.strip().split()[0] + ' in '+ filename | |
| } | |
| data_list_JSON.append(data_entry) | |
| # Convert list to JSON | |
| # json_output = [data_list_JSON] | |
| # json_output = json.dumps(data_list_JSON, indent=4) | |
| i += 2 | |
| continue | |
| else: | |
| if (substring_match and not collecting and | |
| len(combined_line_norm) > 0): # and (headertoContinue1 or headertoContinue2) ): | |
| # Calculate word match percentage | |
| word_match_percent = words_match_ratio(heading_norm, combined_line_norm) * 100 | |
| # Check if at least 70% of header words exist in this line | |
| meets_word_threshold = word_match_percent >= 100 | |
| # Check header conditions (including word threshold) | |
| header_spans = [ | |
| span for span in spans | |
| if (is_header(span, most_common_font_size, most_common_color, most_common_font) | |
| # and span['size'] >= subsubheaderFontSize | |
| and span['size'] < mainHeaderFontSize) | |
| ] | |
| if header_spans and (meets_word_threshold or same_start_word(heading_to_search, combined_line_norm) ) and stringtowrite.startswith('To'): | |
| collecting = True | |
| if stringtowrite=='To be billed': | |
| Alltexttobebilled+='\n' | |
| # if stringtowrite=='To be billed': | |
| # Alltexttobebilled+= ' '+ combined_line_norm | |
| matched_header_font_size = max(span["size"] for span in header_spans) | |
| collected_lines.append(line_text) | |
| valid_spans = [span for span in spans if span.get("bbox")] | |
| if valid_spans: | |
| x0s = [span["bbox"][0] for span in valid_spans] | |
| x1s = [span["bbox"][2] for span in valid_spans] | |
| y0s = [span["bbox"][1] for span in valid_spans] | |
| y1s = [span["bbox"][3] for span in valid_spans] | |
| header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] | |
| if page_num in current_bbox: | |
| cb = current_bbox[page_num] | |
| current_bbox[page_num] = [ | |
| min(cb[0], header_bbox[0]), | |
| min(cb[1], header_bbox[1]), | |
| max(cb[2], header_bbox[2]), | |
| max(cb[3], header_bbox[3]) | |
| ] | |
| else: | |
| current_bbox[page_num] = header_bbox | |
| last_y1s[page_num] = header_bbox[3] | |
| x0, y0, x1, y1 = header_bbox | |
| zoom = 200 | |
| left = int(x0) | |
| top = int(y0) | |
| zoom_str = f"{zoom},{left},{top}" | |
| pageNumberFound = page_num + 1 | |
| # Build the query parameters | |
| params = { | |
| 'pdfLink': pdf_path, # Your PDF link | |
| 'keyword': heading_to_search, # Your keyword (could be a string or list) | |
| } | |
| # URL encode each parameter | |
| encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} | |
| # Construct the final encoded link | |
| encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) | |
| # Correctly construct the final URL with page and zoom | |
| final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" | |
| # Get current date and time | |
| now = datetime.now() | |
| # Format the output | |
| formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") | |
| # Optionally, add the URL to a DataFrame | |
| data_entry = { | |
| "PDF Name":filename, | |
| "NBSLink": zoom_str, | |
| "Subject": heading_to_search, | |
| "Page": str(pageNumberFound), | |
| "Author": "ADR", | |
| "Creation Date": formatted_time, | |
| "Layer": "Initial", | |
| "Code": stringtowrite, | |
| "head above 1": paths[-2], | |
| "head above 2": paths[0], | |
| "BodyText":collected_lines, | |
| "MC Connnection": 'Go to ' + paths[0].strip().split()[0] +'/'+ heading_to_search.strip().split()[0] + ' in '+ filename | |
| } | |
| data_list_JSON.append(data_entry) | |
| # Convert list to JSON | |
| # json_output = [data_list_JSON] | |
| # json_output = json.dumps(data_list_JSON, indent=4) | |
| i += 2 | |
| continue | |
| if collecting: | |
| norm_line = normalize_text(line_text) | |
| # Optimized URL check | |
| if url_pattern.match(norm_line): | |
| line_is_header = False | |
| else: | |
| line_is_header = any(is_header(span, most_common_font_size, most_common_color, most_common_font) for span in spans) | |
| if line_is_header: | |
| header_font_size = max(span["size"] for span in spans) | |
| is_probably_real_header = ( | |
| header_font_size >= matched_header_font_size and | |
| is_header(spans[0], most_common_font_size, most_common_color, most_common_font) and | |
| len(line_text.strip()) > 2 | |
| ) | |
| if (norm_line != matched_header_line_norm and | |
| norm_line != heading_norm and | |
| is_probably_real_header): | |
| if line_text not in heading_norm: | |
| collecting = False | |
| done = True | |
| headertoContinue1 = False | |
| headertoContinue2=False | |
| for page_num, bbox in current_bbox.items(): | |
| bbox[3] = last_y1s.get(page_num, bbox[3]) | |
| page_highlights[page_num] = bbox | |
| highlight_boxes(docHighlights, page_highlights,stringtowrite) | |
| break_collecting = True | |
| break | |
| if break_collecting: | |
| break | |
| collected_lines.append(line_text) | |
| valid_spans = [span for span in spans if span.get("bbox")] | |
| if valid_spans: | |
| x0s = [span["bbox"][0] for span in valid_spans] | |
| x1s = [span["bbox"][2] for span in valid_spans] | |
| y0s = [span["bbox"][1] for span in valid_spans] | |
| y1s = [span["bbox"][3] for span in valid_spans] | |
| line_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] | |
| if page_num in current_bbox: | |
| cb = current_bbox[page_num] | |
| current_bbox[page_num] = [ | |
| min(cb[0], line_bbox[0]), | |
| min(cb[1], line_bbox[1]), | |
| max(cb[2], line_bbox[2]), | |
| max(cb[3], line_bbox[3]) | |
| ] | |
| else: | |
| current_bbox[page_num] = line_bbox | |
| last_y1s[page_num] = line_bbox[3] | |
| i += 1 | |
| if not done: | |
| for page_num, bbox in current_bbox.items(): | |
| bbox[3] = last_y1s.get(page_num, bbox[3]) | |
| page_highlights[page_num] = bbox | |
| if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : | |
| stringtowrite='Not to be billed' | |
| else: | |
| stringtowrite='To be billed' | |
| highlight_boxes(docHighlights, page_highlights,stringtowrite) | |
| docarray.append(docHighlights) | |
| jsons.append(data_list_JSON) | |
| dbxTeam = tsadropboxretrieval.ADR_Access_DropboxTeam('user') | |
| dbPath = '/TSA JOBS/ADR Test/FIND/' | |
| jsonCombined=[] | |
| for i in range(len(arrayofPDFS)): | |
| singlepdf=arrayofPDFS[i] | |
| metadata = dbxTeam.sharing_get_shared_link_metadata(singlepdf) | |
| pdf_bytes = BytesIO() | |
| docHighlights.save(pdf_bytes) | |
| pdflink = tsadropboxretrieval.uploadanyFile(doc=docarray[i], path=dbPath, pdfname=filenames[i]) | |
| # json_copy = copy.deepcopy(jsons[i]) | |
| # Update links for this JSON | |
| # json_output1 = changepdflinks(json_copy, pdflink) | |
| json_output1=changepdflinks(jsons[i],pdflink) | |
| jsonCombined.extend(json_output1) | |
| combined_json_str = json.dumps(jsonCombined, indent=1) | |
| print(combined_json_str) | |
| return pdf_bytes.getvalue(), docHighlights , combined_json_str, Alltexttobebilled , filenames | |