| | |
| | """Copy of FindSpecsTrial(Retrieving+boundingBoxes)-InitialMarkups(ALL)_CleanedUp.ipynb |
| | |
| | Automatically generated by Colab. |
| | |
| | Original file is located at |
| | https://colab.research.google.com/drive/12XfVkmKmN3oVjHhLVE0_GgkftgArFEK2 |
| | """ |
| | baselink='https://adr.trevorsadd.co.uk/view-pdf?' |
| |
|
| | newlink='https://adr.trevorsadd.co.uk/view-highlight?' |
| | tobebilledonlyLink='https://adr.trevorsadd.co.uk/view-pdf-tobebilled?' |
| |
|
| | |
| | from urllib.parse import urlparse, unquote |
| | import os |
| | from io import BytesIO |
| | import re |
| | import requests |
| | import pandas as pd |
| | import fitz |
| | import re |
| | import urllib.parse |
| | import pandas as pd |
| | import math |
| | import random |
| | import json |
| | from datetime import datetime |
| | from collections import defaultdict, Counter |
| | import difflib |
| | from fuzzywuzzy import fuzz |
| |
|
| | def filteredJsons(pdf_path,filteredjsonsfromrawan): |
| | |
| | extract_section_under_headerRawan (pdf_path=pdf_path,listofheadingsfromrawan=filteredjsonsfromrawan) |
| |
|
| | |
| |
|
| | |
| | def get_regular_font_size_and_color(doc): |
| | font_sizes = [] |
| | colors = [] |
| | fonts = [] |
| |
|
| | |
| | for page_num in range(len(doc)): |
| | page = doc.load_page(page_num) |
| | for span in page.get_text("dict")["blocks"]: |
| | if "lines" in span: |
| | for line in span["lines"]: |
| | for span in line["spans"]: |
| | font_sizes.append(span['size']) |
| | colors.append(span['color']) |
| | fonts.append(span['font']) |
| |
|
| | |
| | most_common_font_size = Counter(font_sizes).most_common(1)[0][0] if font_sizes else None |
| | most_common_color = Counter(colors).most_common(1)[0][0] if colors else None |
| | most_common_font = Counter(fonts).most_common(1)[0][0] if fonts else None |
| |
|
| | return most_common_font_size, most_common_color, most_common_font |
| |
|
| | def normalize_text(text): |
| | if text is None: |
| | return "" |
| | return re.sub(r'\s+', ' ', text.strip().lower()) |
| |
|
| | def get_spaced_text_from_spans(spans): |
| | return normalize_text(" ".join(span["text"].strip() for span in spans)) |
| |
|
| | def is_header(span, most_common_font_size, most_common_color, most_common_font): |
| | fontname = span.get("font", "").lower() |
| | |
| | is_bold = "bold" in fontname or span.get("bold", False) |
| | return ( |
| | ( |
| | span["size"] > most_common_font_size or |
| | span["font"].lower() != most_common_font.lower() or |
| | (is_bold and span["size"] > most_common_font_size ) |
| | ) |
| | ) |
| |
|
| | def add_span_to_nearest_group(span_y, grouped_dict, pageNum=None, threshold=0.5): |
| | for (p, y) in grouped_dict: |
| | if pageNum is not None and p != pageNum: |
| | continue |
| | if abs(y - span_y) <= threshold: |
| | return (p, y) |
| | return (pageNum, span_y) |
| |
|
| | def extract_headers(doc, toc_pages, most_common_font_size, most_common_color, most_common_font, top_margin, bottom_margin): |
| |
|
| | grouped_headers = defaultdict(list) |
| | spans = [] |
| | line_merge_threshold = 1.5 |
| |
|
| | for pageNum in range(len(doc)): |
| | if pageNum in toc_pages: |
| | continue |
| | page = doc.load_page(pageNum) |
| | page_height = page.rect.height |
| | text_instances = page.get_text("dict") |
| |
|
| | |
| | potential_header_spans = [] |
| | for block in text_instances['blocks']: |
| | if block['type'] != 0: |
| | continue |
| |
|
| | for line in block['lines']: |
| | for span in line['spans']: |
| | span_y0 = span['bbox'][1] |
| | span_y1 = span['bbox'][3] |
| |
|
| | if span_y0 < top_margin or span_y1 > (page_height - bottom_margin): |
| | continue |
| |
|
| | span_text = normalize_text(span.get('text', '')) |
| | if not span_text: |
| | continue |
| | if span_text.startswith('http://www') or span_text.startswith('www'): |
| | continue |
| | if any(( |
| | 'page' in span_text, |
| | not re.search(r'[a-z0-9]', span_text), |
| | 'end of section' in span_text, |
| | re.search(r'page\s+\d+\s+of\s+\d+', span_text), |
| | re.search(r'\b(?:\d{1,2}[/-])?\d{1,2}[/-]\d{2,4}\b', span_text), |
| | |
| | 'specification:' in span_text |
| | )): |
| | continue |
| |
|
| | cleaned_text = re.sub(r'[.\-]{4,}.*$', '', span_text).strip() |
| | cleaned_text = normalize_text(cleaned_text) |
| |
|
| | if is_header(span, most_common_font_size, most_common_color, most_common_font): |
| | potential_header_spans.append({ |
| | 'text': cleaned_text, |
| | 'size': span['size'], |
| | 'pageNum': pageNum, |
| | 'y0': span_y0, |
| | 'y1': span_y1, |
| | 'x0': span['bbox'][0], |
| | 'x1': span['bbox'][2], |
| | 'span': span |
| | }) |
| |
|
| | |
| | potential_header_spans.sort(key=lambda s: (s['pageNum'], s['y0'])) |
| |
|
| | |
| | i = 0 |
| | while i < len(potential_header_spans): |
| | current = potential_header_spans[i] |
| | header_text = current['text'] |
| | header_size = current['size'] |
| | header_page = current['pageNum'] |
| | min_y = current['y0'] |
| | max_y = current['y1'] |
| | spans_group = [current['span']] |
| |
|
| | |
| | j = i + 1 |
| | while j < len(potential_header_spans): |
| | next_span = potential_header_spans[j] |
| | |
| | if (next_span['pageNum'] == header_page and |
| | next_span['y0'] - max_y < line_merge_threshold and |
| | abs(next_span['size'] - header_size) < 0.5): |
| | header_text += " " + next_span['text'] |
| | max_y = next_span['y1'] |
| | spans_group.append(next_span['span']) |
| | j += 1 |
| | else: |
| | break |
| |
|
| | |
| | grouped_headers[(header_page, min_y)].append({ |
| | "text": header_text.strip(), |
| | "size": header_size, |
| | "pageNum": header_page, |
| | "spans": spans_group |
| | }) |
| | spans.extend(spans_group) |
| | i = j |
| |
|
| | |
| | headers = [] |
| | for (pageNum, y), header_groups in sorted(grouped_headers.items()): |
| | for group in header_groups: |
| | headers.append([ |
| | group['text'], |
| | group['size'], |
| | group['pageNum'], |
| | y |
| | ]) |
| |
|
| | font_sizes = [size for _, size, _, _ in headers] |
| | font_size_counts = Counter(font_sizes) |
| |
|
| | |
| | valid_font_sizes = [size for size, count in font_size_counts.items() if count >= 3] |
| |
|
| | |
| | valid_font_sizes_sorted = sorted(valid_font_sizes, reverse=True) |
| |
|
| | |
| | if len(valid_font_sizes_sorted) == 2: |
| | top_3_font_sizes = [valid_font_sizes_sorted[0], valid_font_sizes_sorted[1], valid_font_sizes_sorted[1]] |
| | else: |
| | top_3_font_sizes = valid_font_sizes_sorted[:3] |
| |
|
| | |
| | smallest_font_size = min(valid_font_sizes) if valid_font_sizes else None |
| | |
| | return headers, top_3_font_sizes, smallest_font_size, spans |
| |
|
| | def is_numbered(text): |
| | return bool(re.match(r'^\d', text.strip())) |
| |
|
| | def is_similar(a, b, threshold=0.85): |
| | return difflib.SequenceMatcher(None, a, b).ratio() > threshold |
| |
|
| | def normalize(text): |
| | text = text.lower() |
| | text = re.sub(r'\.{2,}', '', text) |
| | text = re.sub(r'\s+', ' ', text) |
| | return text.strip() |
| |
|
| | def clean_toc_entry(toc_text): |
| | """Remove page numbers and formatting from TOC entries""" |
| | |
| | return re.sub(r'[\.\s]+\d+.*$', '', toc_text).strip('. ') |
| |
|
| | def build_header_hierarchy(doc, toc_pages, most_common_font_size, most_common_color, most_common_font, top_margin=70, bottom_margin=70): |
| | |
| | headers_list, top_3_font_sizes, smallest_font_size, spans = extract_headers( |
| | doc, |
| | toc_pages=toc_pages, |
| | most_common_font_size=most_common_font_size, |
| | most_common_color=most_common_color, |
| | most_common_font=most_common_font, |
| | top_margin=top_margin, |
| | bottom_margin=bottom_margin |
| | ) |
| |
|
| | |
| | headers = [] |
| | seen_headers = set() |
| |
|
| | |
| | toc_entries = {} |
| | for pno in toc_pages: |
| | page = doc.load_page(pno) |
| | toc_text = page.get_text() |
| | for line in toc_text.split('\n'): |
| | clean_line = line.strip() |
| | if clean_line: |
| | norm_line = normalize(clean_line) |
| | toc_entries[norm_line] = clean_line |
| |
|
| | for h in headers_list: |
| | text, size, pageNum, y = h[:4] |
| | page = doc.load_page(pageNum) |
| | page_height = page.rect.height |
| |
|
| | |
| | if y < top_margin or y > (page_height - bottom_margin): |
| | continue |
| |
|
| | norm_text = normalize(text) |
| | if len(norm_text) > 2 and size >= most_common_font_size: |
| | headers.append({ |
| | "text": text, |
| | "page": pageNum, |
| | "y": y, |
| | "size": size, |
| | "bold": h[4] if len(h) > 4 else False, |
| | |
| | "color": h[6] if len(h) > 6 else None, |
| | "font": h[7] if len(h) > 7 else None, |
| | "children": [], |
| | "is_numbered": is_numbered(text), |
| | "original_size": size, |
| | "norm_text": norm_text, |
| | "level": -1 |
| | }) |
| |
|
| | |
| | headers.sort(key=lambda h: (h['page'], h['y'])) |
| | |
| | i = 0 |
| | while i < len(headers) - 1: |
| | current = headers[i] |
| | next_header = headers[i+1] |
| |
|
| | |
| | if (current['page'] == next_header['page'] and |
| | abs(current['y'] - next_header['y']) < 20): |
| |
|
| | |
| | if current['level'] == -1 and next_header['level'] == -1: |
| | current['level'] = 1 |
| | next_header['level'] = 2 |
| | i += 1 |
| |
|
| | |
| | elif current['level'] == -1 and next_header['level'] != -1: |
| | current['level'] = max(1, next_header['level'] - 1) |
| |
|
| | |
| | elif current['level'] != -1 and next_header['level'] == -1: |
| | next_header['level'] = current['level'] + 1 |
| | i += 1 |
| | i += 1 |
| | |
| | |
| | max_size,subheaderSize,nbsheadersize=top_3_font_sizes |
| | |
| | toc_text_match=[] |
| | |
| | toc_matches = [] |
| | for h in headers: |
| | norm_text = h['norm_text'] |
| | matching_toc_texts = [] |
| |
|
| | |
| | for toc_norm, toc_text in toc_entries.items(): |
| | |
| | if norm_text == toc_norm and len(toc_text)>4 and h['size']==max_size: |
| | matching_toc_texts.append(toc_text) |
| | |
| | elif norm_text in toc_norm and len(toc_text)>4 and h['size']==max_size: |
| | matching_toc_texts.append(toc_text) |
| | |
| | elif toc_norm in norm_text and len(toc_text)>4 and h['size']==max_size: |
| | matching_toc_texts.append(toc_text) |
| |
|
| | if matching_toc_texts and h['size'] >= max_size * 0.9: |
| | best_match = max(matching_toc_texts, |
| | key=lambda x: (len(x), -len(x.replace(norm_text, '')))) |
| | h['text'] = normalize_text(clean_toc_entry(best_match)) |
| | h['level'] = 0 |
| | if h['text'] not in toc_text_match: |
| | toc_matches.append(h) |
| | toc_text_match.append(h['text']) |
| | elif matching_toc_texts and h['size'] < max_size * 0.9 and h['size'] > nbsheadersize : |
| | headers.remove(h) |
| | continue |
| |
|
| |
|
| | |
| | unique_level0 = [] |
| | seen_level0 = set() |
| | for h in toc_matches: |
| | |
| | cleaned_text = clean_toc_entry(h['text']) |
| | norm_cleaned_text = normalize(cleaned_text) |
| |
|
| | if norm_cleaned_text not in seen_level0: |
| | seen_level0.add(norm_cleaned_text) |
| | |
| | h['text'] = cleaned_text |
| | unique_level0.append(h) |
| | |
| | |
| |
|
| | |
| | level0_headers = [h for h in headers if h['level'] == 0] |
| | header_groups = [] |
| |
|
| | for i, level0 in enumerate(level0_headers): |
| | start_idx = headers.index(level0) |
| | end_idx = headers.index(level0_headers[i+1]) if i+1 < len(level0_headers) else len(headers) |
| | group = headers[start_idx:end_idx] |
| | header_groups.append(group) |
| |
|
| | |
| | for group in header_groups: |
| | level0 = group[0] |
| | level1_candidates = [h for h in group[1:] if h['level'] == -1] |
| |
|
| | if not level1_candidates: |
| | continue |
| |
|
| | |
| | first_level1 = level1_candidates[0] |
| | level1_format = { |
| | 'font': first_level1['font'], |
| | 'color': first_level1['color'], |
| | 'starts_with_number': is_numbered(first_level1['text']), |
| | 'size': first_level1['size'], |
| | 'bold': first_level1['bold'] |
| | |
| | } |
| |
|
| | |
| | for h in level1_candidates: |
| | current_format = { |
| | 'font': h['font'], |
| | 'color': h['color'], |
| | 'starts_with_number': is_numbered(h['text']), |
| | 'size': h['size'], |
| | 'bold': h['bold'] |
| | |
| | } |
| |
|
| | |
| | if (current_format['font'] == level1_format['font'] and |
| | current_format['color'] == level1_format['color'] and |
| | current_format['starts_with_number'] == level1_format['starts_with_number'] and |
| | abs(current_format['size'] - level1_format['size']) <= 0.1 and |
| | current_format['bold'] == level1_format['bold'] ): |
| | |
| | h['level'] = 1 |
| | else: |
| | h['level'] = 2 |
| |
|
| | |
| | unassigned = [h for h in headers if h['level'] == -1] |
| | if unassigned: |
| | |
| | sizes = sorted({h['size'] for h in unassigned}, reverse=True) |
| | clusters = [] |
| |
|
| | for size in sizes: |
| | found_cluster = False |
| | for cluster in clusters: |
| | if abs(size - cluster['size']) <= max(size, cluster['size']) * 0.1: |
| | cluster['headers'].extend([h for h in unassigned if abs(h['size'] - size) <= size * 0.1]) |
| | found_cluster = True |
| | break |
| | if not found_cluster: |
| | clusters.append({ |
| | 'size': size, |
| | 'headers': [h for h in unassigned if abs(h['size'] - size) <= size * 0.1] |
| | }) |
| |
|
| | |
| | clusters.sort(key=lambda x: -x['size']) |
| | for i, cluster in enumerate(clusters): |
| | for h in cluster['headers']: |
| | base_level = i + 1 |
| | if h['bold']: |
| | base_level = max(1, base_level - 1) |
| | h['level'] = base_level |
| |
|
| | |
| | root = [] |
| | stack = [] |
| |
|
| | |
| | unique_level0_texts = {h['norm_text'] for h in unique_level0} |
| |
|
| | |
| | filtered_headers = [] |
| | for h in headers: |
| | if h['norm_text'] in unique_level0_texts and h not in unique_level0: |
| | h['level'] = 0 |
| | filtered_headers.append(h) |
| |
|
| | |
| | all_headers = unique_level0 + filtered_headers |
| | all_headers.sort(key=lambda h: (h['page'], h['y'])) |
| |
|
| | |
| | added_level0 = set() |
| |
|
| | for header in all_headers: |
| | if header['level'] < 0: |
| | continue |
| |
|
| | if header['level'] == 0: |
| | norm_text = header['norm_text'] |
| | if norm_text in added_level0: |
| | continue |
| | added_level0.add(norm_text) |
| |
|
| | |
| | while stack and stack[-1]['level'] >= header['level']: |
| | stack.pop() |
| |
|
| | current_parent = stack[-1] if stack else None |
| |
|
| | if current_parent: |
| | current_parent['children'].append(header) |
| | else: |
| | root.append(header) |
| |
|
| | stack.append(header) |
| |
|
| | |
| | def enforce_nesting(node_list, parent_level=-1): |
| | for node in node_list: |
| | if node['level'] <= parent_level: |
| | node['level'] = parent_level + 1 |
| | enforce_nesting(node['children'], node['level']) |
| |
|
| | enforce_nesting(root) |
| | root = [h for h in root if not (h['level'] == 0 and not h['children'])] |
| | return root |
| |
|
| | def adjust_levels_if_level0_not_in_toc(doc, toc_pages, root): |
| | def normalize(text): |
| | return re.sub(r'\s+', ' ', text.strip().lower()) |
| |
|
| | toc_text = "" |
| | for pno in toc_pages: |
| | page = doc.load_page(pno) |
| | toc_text += page.get_text() |
| | toc_text_normalized = normalize(toc_text) |
| |
|
| | def is_level0_in_toc_text(header): |
| | return header['level'] == 0 and normalize(header['text']) in toc_text_normalized |
| |
|
| | if any(is_level0_in_toc_text(h) for h in root): |
| | return |
| |
|
| | def increase_levels(node_list): |
| | for node in node_list: |
| | node['level'] += 1 |
| | increase_levels(node['children']) |
| |
|
| | def assign_numbers_to_headers(headers, prefix=None): |
| | for idx, header in enumerate(headers, 1): |
| | current_number = f"{prefix}.{idx}" if prefix else str(idx) |
| | header["number"] = current_number |
| | assign_numbers_to_headers(header["children"], current_number) |
| |
|
| | def print_tree_with_numbers(headers, indent=0): |
| | for header in headers: |
| | size_info = f"size:{header['original_size']:.1f}" if 'original_size' in header else "" |
| | print(" " * indent + |
| | f"{header.get('number', '?')} {header['text']} " + |
| | f"(Level {header['level']}, p:{header['page']+1}, {size_info})") |
| | print_tree_with_numbers(header["children"], indent + 1) |
| |
|
| |
|
| | def highlight_boxes(doc, highlights, stringtowrite, fixed_width=500): |
| | for page_num, bbox in highlights.items(): |
| | page = doc.load_page(page_num) |
| | page_width = page.rect.width |
| |
|
| | |
| | orig_rect = fitz.Rect(bbox) |
| | rect_height = orig_rect.height |
| | if rect_height > 30: |
| | if orig_rect.width > 10: |
| | |
| | center_x = page_width / 2 |
| | new_x0 = center_x - fixed_width / 2 |
| | new_x1 = center_x + fixed_width / 2 |
| | new_rect = fitz.Rect(new_x0, orig_rect.y0, new_x1, orig_rect.y1) |
| | |
| | |
| | annot = page.add_rect_annot(new_rect) |
| | if stringtowrite.startswith('Not'): |
| | annot.set_colors(stroke=(0.5, 0.5, 0.5), fill=(0.5, 0.5, 0.5)) |
| | else: |
| | annot.set_colors(stroke=(1, 1, 0), fill=(1, 1, 0)) |
| | |
| | annot.set_opacity(0.3) |
| | annot.update() |
| | |
| | |
| | text = '['+stringtowrite +']' |
| | annot1 = page.add_freetext_annot( |
| | new_rect, |
| | text, |
| | fontsize=15, |
| | fontname='helv', |
| | text_color=(1, 0, 0), |
| | rotate=page.rotation, |
| | align=2 |
| | ) |
| | annot1.update() |
| |
|
| | def get_leaf_headers_with_paths(listtoloop, path=None, output=None): |
| | if path is None: |
| | path = [] |
| | if output is None: |
| | output = [] |
| | for header in listtoloop: |
| | current_path = path + [header['text']] |
| | if not header['children']: |
| | if header['level'] != 0 and header['level'] != 1: |
| | output.append((header, current_path)) |
| | else: |
| | get_leaf_headers_with_paths(header['children'], current_path, output) |
| | return output |
| |
|
| | |
| | def words_match_ratio(text1, text2): |
| | words1 = set(text1.split()) |
| | words2 = set(text2.split()) |
| | if not words1 or not words2: |
| | return 0.0 |
| | common_words = words1 & words2 |
| | return len(common_words) / len(words1) |
| |
|
| | def same_start_word(s1, s2): |
| | |
| | words1 = s1.strip().split() |
| | words2 = s2.strip().split() |
| |
|
| | |
| | if words1 and words2: |
| | return words1[0].lower() == words2[0].lower() |
| | return False |
| |
|
| |
|
| | def extract_section_under_header(pdf_path): |
| | top_margin = 70 |
| | bottom_margin = 50 |
| | headertoContinue1 = False |
| | headertoContinue2=False |
| | |
| | parsed_url = urlparse(pdf_path) |
| | filename = os.path.basename(parsed_url.path) |
| | filename = unquote(filename) |
| |
|
| | |
| | if pdf_path and ('http' in pdf_path or 'dropbox' in pdf_path): |
| | pdf_path = pdf_path.replace('dl=0', 'dl=1') |
| |
|
| | |
| | response = requests.get(pdf_path) |
| | pdf_content = BytesIO(response.content) |
| | if not pdf_content: |
| | raise ValueError("No valid PDF content found.") |
| |
|
| | doc = fitz.open(stream=pdf_content, filetype="pdf") |
| | docHighlights = fitz.open(stream=pdf_content, filetype="pdf") |
| | most_common_font_size, most_common_color, most_common_font = get_regular_font_size_and_color(doc) |
| |
|
| | |
| | dot_pattern = re.compile(r'\.{3,}') |
| | url_pattern = re.compile(r'https?://\S+|www\.\S+') |
| |
|
| | def get_toc_page_numbers(doc, max_pages_to_check=15): |
| | toc_pages = [] |
| | for page_num in range(min(len(doc), max_pages_to_check)): |
| | page = doc.load_page(page_num) |
| | blocks = page.get_text("dict")["blocks"] |
| |
|
| | dot_line_count = 0 |
| | for block in blocks: |
| | for line in block.get("lines", []): |
| | line_text = get_spaced_text_from_spans(line["spans"]).strip() |
| | if dot_pattern.search(line_text): |
| | dot_line_count += 1 |
| |
|
| | if dot_line_count >= 3: |
| | toc_pages.append(page_num) |
| |
|
| | return list(range(0, toc_pages[-1] +1)) if toc_pages else toc_pages |
| |
|
| | toc_pages = get_toc_page_numbers(doc) |
| |
|
| | headers, top_3_font_sizes, smallest_font_size, headersSpans = extract_headers( |
| | doc, toc_pages, most_common_font_size, most_common_color, most_common_font, top_margin, bottom_margin |
| | ) |
| |
|
| | hierarchy = build_header_hierarchy(doc, toc_pages, most_common_font_size, most_common_color, most_common_font) |
| | listofHeaderstoMarkup = get_leaf_headers_with_paths(hierarchy) |
| | |
| | |
| | allchildrenheaders = [normalize_text(item['text']) for item, p in listofHeaderstoMarkup] |
| | allchildrenheaders_set = set(allchildrenheaders) |
| |
|
| | df = pd.DataFrame(columns=["NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2"]) |
| | dictionaryNBS={} |
| | data_list_JSON = [] |
| |
|
| | if len(top_3_font_sizes)==3: |
| | mainHeaderFontSize, subHeaderFontSize, subsubheaderFontSize = top_3_font_sizes |
| | elif len(top_3_font_sizes)==2: |
| | mainHeaderFontSize= top_3_font_sizes[0] |
| | subHeaderFontSize= top_3_font_sizes[1] |
| | subsubheaderFontSize= top_3_font_sizes[1] |
| |
|
| | |
| |
|
| | |
| | |
| |
|
| | for heading_to_searchDict, paths in listofHeaderstoMarkup: |
| | heading_to_search = heading_to_searchDict['text'] |
| | heading_to_searchPageNum = heading_to_searchDict['page'] |
| |
|
| | |
| | headertoContinue1 = False |
| | headertoContinue2 = False |
| | matched_header_line = None |
| | done = False |
| | collecting = False |
| | collected_lines = [] |
| | page_highlights = {} |
| | current_bbox = {} |
| | last_y1s = {} |
| | mainHeader = '' |
| | subHeader = '' |
| | matched_header_line_norm = heading_to_search |
| | break_collecting = False |
| | heading_norm = normalize_text(heading_to_search) |
| | paths_norm = [normalize_text(p) for p in paths[0]] if paths and paths[0] else [] |
| |
|
| | for page_num in range(heading_to_searchPageNum,len(doc)): |
| | if page_num in toc_pages: |
| | continue |
| | if break_collecting: |
| | break |
| | page=doc[page_num] |
| | page_height = page.rect.height |
| | blocks = page.get_text("dict")["blocks"] |
| |
|
| | for block in blocks: |
| | if break_collecting: |
| | break |
| |
|
| | lines = block.get("lines", []) |
| | i = 0 |
| | while i < len(lines): |
| | if break_collecting: |
| | break |
| |
|
| | spans = lines[i].get("spans", []) |
| | if not spans: |
| | i += 1 |
| | continue |
| |
|
| | y0 = spans[0]["bbox"][1] |
| | y1 = spans[0]["bbox"][3] |
| | if y0 < top_margin or y1 > (page_height - bottom_margin): |
| | i += 1 |
| | continue |
| |
|
| | line_text = get_spaced_text_from_spans(spans).lower() |
| | line_text_norm = normalize_text(line_text) |
| |
|
| | |
| | if i + 1 < len(lines): |
| | next_spans = lines[i + 1].get("spans", []) |
| | next_line_text = get_spaced_text_from_spans(next_spans).lower() |
| | combined_line_norm = normalize_text(line_text + " " + next_line_text) |
| | else: |
| | combined_line_norm = line_text_norm |
| |
|
| | |
| | if combined_line_norm and combined_line_norm in paths[0]: |
| | |
| | headertoContinue1 = combined_line_norm |
| | if combined_line_norm and combined_line_norm in paths[-2]: |
| | |
| | headertoContinue2 = combined_line_norm |
| | if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : |
| | stringtowrite='Not to be billed' |
| | else: |
| | stringtowrite='To be billed' |
| | |
| | existsfull = ( |
| | ( combined_line_norm in allchildrenheaders_set or |
| | combined_line_norm in allchildrenheaders ) and heading_to_search in combined_line_norm |
| | ) |
| |
|
| | |
| | current_line_words = set(combined_line_norm.split()) |
| | heading_words = set(heading_norm.split()) |
| | all_words_match = current_line_words.issubset(heading_words) and len(current_line_words) > 0 |
| |
|
| | substring_match = ( |
| | heading_norm in combined_line_norm or |
| | combined_line_norm in heading_norm or |
| | all_words_match |
| | ) |
| | |
| | |
| | |
| | |
| |
|
| | if (substring_match and existsfull and not collecting and |
| | len(combined_line_norm) > 0 ): |
| |
|
| | |
| | header_spans = [ |
| | span for span in spans |
| | if (is_header(span, most_common_font_size, most_common_color, most_common_font) |
| | |
| | and span['size'] < mainHeaderFontSize) |
| | ] |
| | if header_spans: |
| | collecting = True |
| | matched_header_font_size = max(span["size"] for span in header_spans) |
| |
|
| | collected_lines.append(line_text) |
| | valid_spans = [span for span in spans if span.get("bbox")] |
| |
|
| | if valid_spans: |
| | x0s = [span["bbox"][0] for span in valid_spans] |
| | x1s = [span["bbox"][2] for span in valid_spans] |
| | y0s = [span["bbox"][1] for span in valid_spans] |
| | y1s = [span["bbox"][3] for span in valid_spans] |
| |
|
| | header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] |
| |
|
| | if page_num in current_bbox: |
| | cb = current_bbox[page_num] |
| | current_bbox[page_num] = [ |
| | min(cb[0], header_bbox[0]), |
| | min(cb[1], header_bbox[1]), |
| | max(cb[2], header_bbox[2]), |
| | max(cb[3], header_bbox[3]) |
| | ] |
| | else: |
| | current_bbox[page_num] = header_bbox |
| | last_y1s[page_num] = header_bbox[3] |
| | x0, y0, x1, y1 = header_bbox |
| |
|
| | zoom = 200 |
| | left = int(x0) |
| | top = int(y0) |
| | zoom_str = f"{zoom},{left},{top}" |
| | pageNumberFound = page_num + 1 |
| |
|
| | |
| | params = { |
| | 'pdfLink': pdf_path, |
| | 'keyword': heading_to_search, |
| | } |
| |
|
| | |
| | encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} |
| |
|
| | |
| | encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) |
| |
|
| | |
| | final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" |
| |
|
| | |
| | now = datetime.now() |
| |
|
| | |
| | formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") |
| | |
| |
|
| |
|
| | data_entry = { |
| | "NBSLink": final_url, |
| | "Subject": heading_to_search, |
| | "Page": str(pageNumberFound), |
| | "Author": "ADR", |
| | "Creation Date": formatted_time, |
| | "Layer": "Initial", |
| | "Code": stringtowrite, |
| | "head above 1": paths[-2], |
| | "head above 2": paths[0], |
| | "MC Connnection": 'Go to ' + paths[0].strip().split()[0] +'/'+ heading_to_search.strip().split()[0] + ' in '+ filename |
| | } |
| | data_list_JSON.append(data_entry) |
| |
|
| | |
| | json_output = json.dumps(data_list_JSON, indent=4) |
| |
|
| | i += 2 |
| | continue |
| | else: |
| | if (substring_match and not collecting and |
| | len(combined_line_norm) > 0): |
| |
|
| | |
| | word_match_percent = words_match_ratio(heading_norm, combined_line_norm) * 100 |
| |
|
| | |
| | meets_word_threshold = word_match_percent >= 100 |
| |
|
| | |
| | header_spans = [ |
| | span for span in spans |
| | if (is_header(span, most_common_font_size, most_common_color, most_common_font) |
| | |
| | and span['size'] < mainHeaderFontSize) |
| | ] |
| |
|
| | if header_spans and (meets_word_threshold or same_start_word(heading_to_search, combined_line_norm) ): |
| | collecting = True |
| | matched_header_font_size = max(span["size"] for span in header_spans) |
| | |
| | collected_lines.append(line_text) |
| | valid_spans = [span for span in spans if span.get("bbox")] |
| |
|
| | if valid_spans: |
| | x0s = [span["bbox"][0] for span in valid_spans] |
| | x1s = [span["bbox"][2] for span in valid_spans] |
| | y0s = [span["bbox"][1] for span in valid_spans] |
| | y1s = [span["bbox"][3] for span in valid_spans] |
| |
|
| | header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] |
| |
|
| | if page_num in current_bbox: |
| | cb = current_bbox[page_num] |
| | current_bbox[page_num] = [ |
| | min(cb[0], header_bbox[0]), |
| | min(cb[1], header_bbox[1]), |
| | max(cb[2], header_bbox[2]), |
| | max(cb[3], header_bbox[3]) |
| | ] |
| | else: |
| | current_bbox[page_num] = header_bbox |
| |
|
| | last_y1s[page_num] = header_bbox[3] |
| | x0, y0, x1, y1 = header_bbox |
| | zoom = 200 |
| | left = int(x0) |
| | top = int(y0) |
| | zoom_str = f"{zoom},{left},{top}" |
| | pageNumberFound = page_num + 1 |
| |
|
| | |
| | params = { |
| | 'pdfLink': pdf_path, |
| | 'keyword': heading_to_search, |
| | } |
| |
|
| | |
| | encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} |
| |
|
| | |
| | encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) |
| |
|
| | |
| | final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" |
| |
|
| | |
| | now = datetime.now() |
| |
|
| | |
| | formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") |
| | |
| |
|
| |
|
| | data_entry = { |
| | "NBSLink": final_url, |
| | "Subject": heading_to_search, |
| | "Page": str(pageNumberFound), |
| | "Author": "ADR", |
| | "Creation Date": formatted_time, |
| | "Layer": "Initial", |
| | "Code": stringtowrite, |
| | "head above 1": paths[-2], |
| | "head above 2": paths[0], |
| | "MC Connnection": 'Go to ' + paths[0].strip().split()[0] +'/'+ heading_to_search.strip().split()[0] + ' in '+ filename |
| | } |
| | data_list_JSON.append(data_entry) |
| |
|
| | |
| | json_output = json.dumps(data_list_JSON, indent=4) |
| |
|
| | |
| | i += 2 |
| | continue |
| | if collecting: |
| | norm_line = normalize_text(line_text) |
| |
|
| | |
| | if url_pattern.match(norm_line): |
| | line_is_header = False |
| | else: |
| | line_is_header = any(is_header(span, most_common_font_size, most_common_color, most_common_font) for span in spans) |
| |
|
| | if line_is_header: |
| | header_font_size = max(span["size"] for span in spans) |
| | is_probably_real_header = ( |
| | header_font_size >= matched_header_font_size and |
| | is_header(spans[0], most_common_font_size, most_common_color, most_common_font) and |
| | len(line_text.strip()) > 2 |
| | ) |
| |
|
| | if (norm_line != matched_header_line_norm and |
| | norm_line != heading_norm and |
| | is_probably_real_header): |
| | if line_text not in heading_norm: |
| | collecting = False |
| | done = True |
| | headertoContinue1 = False |
| | headertoContinue2=False |
| | for page_num, bbox in current_bbox.items(): |
| | bbox[3] = last_y1s.get(page_num, bbox[3]) |
| | page_highlights[page_num] = bbox |
| | highlight_boxes(docHighlights, page_highlights,stringtowrite) |
| |
|
| | break_collecting = True |
| | break |
| |
|
| | if break_collecting: |
| | break |
| |
|
| | collected_lines.append(line_text) |
| | valid_spans = [span for span in spans if span.get("bbox")] |
| | if valid_spans: |
| | x0s = [span["bbox"][0] for span in valid_spans] |
| | x1s = [span["bbox"][2] for span in valid_spans] |
| | y0s = [span["bbox"][1] for span in valid_spans] |
| | y1s = [span["bbox"][3] for span in valid_spans] |
| |
|
| | line_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] |
| |
|
| | if page_num in current_bbox: |
| | cb = current_bbox[page_num] |
| | current_bbox[page_num] = [ |
| | min(cb[0], line_bbox[0]), |
| | min(cb[1], line_bbox[1]), |
| | max(cb[2], line_bbox[2]), |
| | max(cb[3], line_bbox[3]) |
| | ] |
| | else: |
| | current_bbox[page_num] = line_bbox |
| |
|
| | last_y1s[page_num] = line_bbox[3] |
| | i += 1 |
| |
|
| | if not done: |
| | for page_num, bbox in current_bbox.items(): |
| | bbox[3] = last_y1s.get(page_num, bbox[3]) |
| | page_highlights[page_num] = bbox |
| | if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : |
| | stringtowrite='Not to be billed' |
| | else: |
| | stringtowrite='To be billed' |
| | highlight_boxes(docHighlights, page_highlights,stringtowrite) |
| |
|
| | |
| |
|
| | pdf_bytes = BytesIO() |
| | docHighlights.save(pdf_bytes) |
| | return pdf_bytes.getvalue(), docHighlights , json_output |
| |
|
| |
|
| |
|
| |
|
| | |
| | |
| |
|
| |
|
| |
|
| | def extract_section_under_header_tobebilledOnly(pdf_path): |
| | Alltexttobebilled='' |
| | alltextWithoutNotbilled='' |
| | top_margin = 70 |
| | bottom_margin = 50 |
| | headertoContinue1 = False |
| | headertoContinue2=False |
| | |
| | parsed_url = urlparse(pdf_path) |
| | filename = os.path.basename(parsed_url.path) |
| | filename = unquote(filename) |
| |
|
| | |
| | if pdf_path and ('http' in pdf_path or 'dropbox' in pdf_path): |
| | pdf_path = pdf_path.replace('dl=0', 'dl=1') |
| |
|
| | |
| | response = requests.get(pdf_path) |
| | pdf_content = BytesIO(response.content) |
| | if not pdf_content: |
| | raise ValueError("No valid PDF content found.") |
| |
|
| | doc = fitz.open(stream=pdf_content, filetype="pdf") |
| | docHighlights = fitz.open(stream=pdf_content, filetype="pdf") |
| | most_common_font_size, most_common_color, most_common_font = get_regular_font_size_and_color(doc) |
| |
|
| | |
| | dot_pattern = re.compile(r'\.{3,}') |
| | url_pattern = re.compile(r'https?://\S+|www\.\S+') |
| |
|
| | def get_toc_page_numbers(doc, max_pages_to_check=15): |
| | toc_pages = [] |
| | for page_num in range(min(len(doc), max_pages_to_check)): |
| | page = doc.load_page(page_num) |
| | blocks = page.get_text("dict")["blocks"] |
| |
|
| | dot_line_count = 0 |
| | for block in blocks: |
| | for line in block.get("lines", []): |
| | line_text = get_spaced_text_from_spans(line["spans"]).strip() |
| | if dot_pattern.search(line_text): |
| | dot_line_count += 1 |
| |
|
| | if dot_line_count >= 3: |
| | toc_pages.append(page_num) |
| |
|
| | return list(range(0, toc_pages[-1] +1)) if toc_pages else toc_pages |
| |
|
| | toc_pages = get_toc_page_numbers(doc) |
| |
|
| | headers, top_3_font_sizes, smallest_font_size, headersSpans = extract_headers( |
| | doc, toc_pages, most_common_font_size, most_common_color, most_common_font, top_margin, bottom_margin |
| | ) |
| |
|
| | hierarchy = build_header_hierarchy(doc, toc_pages, most_common_font_size, most_common_color, most_common_font) |
| | listofHeaderstoMarkup = get_leaf_headers_with_paths(hierarchy) |
| | |
| | |
| | allchildrenheaders = [normalize_text(item['text']) for item, p in listofHeaderstoMarkup] |
| | allchildrenheaders_set = set(allchildrenheaders) |
| |
|
| | df = pd.DataFrame(columns=["NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2"]) |
| | dictionaryNBS={} |
| | data_list_JSON = [] |
| |
|
| | if len(top_3_font_sizes)==3: |
| | mainHeaderFontSize, subHeaderFontSize, subsubheaderFontSize = top_3_font_sizes |
| | elif len(top_3_font_sizes)==2: |
| | mainHeaderFontSize= top_3_font_sizes[0] |
| | subHeaderFontSize= top_3_font_sizes[1] |
| | subsubheaderFontSize= top_3_font_sizes[1] |
| |
|
| | |
| |
|
| | |
| | |
| |
|
| | for heading_to_searchDict, paths in listofHeaderstoMarkup: |
| | heading_to_search = heading_to_searchDict['text'] |
| | heading_to_searchPageNum = heading_to_searchDict['page'] |
| |
|
| | |
| | headertoContinue1 = False |
| | headertoContinue2 = False |
| | matched_header_line = None |
| | done = False |
| | collecting = False |
| | collected_lines = [] |
| | page_highlights = {} |
| | current_bbox = {} |
| | last_y1s = {} |
| | mainHeader = '' |
| | subHeader = '' |
| | matched_header_line_norm = heading_to_search |
| | break_collecting = False |
| | heading_norm = normalize_text(heading_to_search) |
| | paths_norm = [normalize_text(p) for p in paths[0]] if paths and paths[0] else [] |
| |
|
| | for page_num in range(heading_to_searchPageNum,len(doc)): |
| | if page_num in toc_pages: |
| | continue |
| | if break_collecting: |
| | break |
| | page=doc[page_num] |
| | page_height = page.rect.height |
| | blocks = page.get_text("dict")["blocks"] |
| |
|
| | for block in blocks: |
| | if break_collecting: |
| | break |
| |
|
| | lines = block.get("lines", []) |
| | i = 0 |
| | while i < len(lines): |
| | if break_collecting: |
| | break |
| |
|
| | spans = lines[i].get("spans", []) |
| | if not spans: |
| | i += 1 |
| | continue |
| |
|
| | y0 = spans[0]["bbox"][1] |
| | y1 = spans[0]["bbox"][3] |
| | if y0 < top_margin or y1 > (page_height - bottom_margin): |
| | i += 1 |
| | continue |
| |
|
| | line_text = get_spaced_text_from_spans(spans).lower() |
| | line_text_norm = normalize_text(line_text) |
| |
|
| | |
| | if i + 1 < len(lines): |
| | next_spans = lines[i + 1].get("spans", []) |
| | next_line_text = get_spaced_text_from_spans(next_spans).lower() |
| | combined_line_norm = normalize_text(line_text + " " + next_line_text) |
| | else: |
| | combined_line_norm = line_text_norm |
| |
|
| | |
| | if combined_line_norm and combined_line_norm in paths[0]: |
| | |
| | headertoContinue1 = combined_line_norm |
| | if combined_line_norm and combined_line_norm in paths[-2]: |
| | |
| | headertoContinue2 = combined_line_norm |
| | if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : |
| | stringtowrite='Not to be billed' |
| | else: |
| | stringtowrite='To be billed' |
| | if stringtowrite!='To be billed': |
| | alltextWithoutNotbilled+= combined_line_norm |
| | |
| | existsfull = ( |
| | ( combined_line_norm in allchildrenheaders_set or |
| | combined_line_norm in allchildrenheaders ) and heading_to_search in combined_line_norm |
| | ) |
| |
|
| | |
| | current_line_words = set(combined_line_norm.split()) |
| | heading_words = set(heading_norm.split()) |
| | all_words_match = current_line_words.issubset(heading_words) and len(current_line_words) > 0 |
| |
|
| | substring_match = ( |
| | heading_norm in combined_line_norm or |
| | combined_line_norm in heading_norm or |
| | all_words_match |
| | ) |
| | |
| | |
| | |
| | |
| |
|
| | if (substring_match and existsfull and not collecting and |
| | len(combined_line_norm) > 0 ): |
| |
|
| | |
| | header_spans = [ |
| | span for span in spans |
| | if (is_header(span, most_common_font_size, most_common_color, most_common_font) |
| | |
| | and span['size'] < mainHeaderFontSize) |
| | ] |
| | if header_spans and stringtowrite.startswith('To'): |
| | collecting = True |
| | matched_header_font_size = max(span["size"] for span in header_spans) |
| | Alltexttobebilled+= ' '+ combined_line_norm |
| | collected_lines.append(line_text) |
| | valid_spans = [span for span in spans if span.get("bbox")] |
| |
|
| | if valid_spans: |
| | x0s = [span["bbox"][0] for span in valid_spans] |
| | x1s = [span["bbox"][2] for span in valid_spans] |
| | y0s = [span["bbox"][1] for span in valid_spans] |
| | y1s = [span["bbox"][3] for span in valid_spans] |
| |
|
| | header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] |
| |
|
| | if page_num in current_bbox: |
| | cb = current_bbox[page_num] |
| | current_bbox[page_num] = [ |
| | min(cb[0], header_bbox[0]), |
| | min(cb[1], header_bbox[1]), |
| | max(cb[2], header_bbox[2]), |
| | max(cb[3], header_bbox[3]) |
| | ] |
| | else: |
| | current_bbox[page_num] = header_bbox |
| | last_y1s[page_num] = header_bbox[3] |
| | x0, y0, x1, y1 = header_bbox |
| |
|
| | zoom = 200 |
| | left = int(x0) |
| | top = int(y0) |
| | zoom_str = f"{zoom},{left},{top}" |
| | pageNumberFound = page_num + 1 |
| |
|
| | |
| | params = { |
| | 'pdfLink': pdf_path, |
| | 'keyword': heading_to_search, |
| | } |
| |
|
| | |
| | encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} |
| |
|
| | |
| | encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) |
| |
|
| | |
| | final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" |
| |
|
| | |
| | now = datetime.now() |
| |
|
| | |
| | formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") |
| | |
| |
|
| |
|
| | data_entry = { |
| | "NBSLink": final_url, |
| | "Subject": heading_to_search, |
| | "Page": str(pageNumberFound), |
| | "Author": "ADR", |
| | "Creation Date": formatted_time, |
| | "Layer": "Initial", |
| | "Code": stringtowrite, |
| | "head above 1": paths[-2], |
| | "head above 2": paths[0], |
| | "MC Connnection": 'Go to ' + paths[0].strip().split()[0] +'/'+ heading_to_search.strip().split()[0] + ' in '+ filename |
| | } |
| | data_list_JSON.append(data_entry) |
| |
|
| | |
| | json_output = json.dumps(data_list_JSON, indent=4) |
| |
|
| | i += 2 |
| | continue |
| | else: |
| | if (substring_match and not collecting and |
| | len(combined_line_norm) > 0): |
| |
|
| | |
| | word_match_percent = words_match_ratio(heading_norm, combined_line_norm) * 100 |
| |
|
| | |
| | meets_word_threshold = word_match_percent >= 100 |
| |
|
| | |
| | header_spans = [ |
| | span for span in spans |
| | if (is_header(span, most_common_font_size, most_common_color, most_common_font) |
| | |
| | and span['size'] < mainHeaderFontSize) |
| | ] |
| |
|
| | if header_spans and (meets_word_threshold or same_start_word(heading_to_search, combined_line_norm) ) and stringtowrite.startswith('To'): |
| | collecting = True |
| | matched_header_font_size = max(span["size"] for span in header_spans) |
| | Alltexttobebilled+= ' '+ combined_line_norm |
| | collected_lines.append(line_text) |
| | valid_spans = [span for span in spans if span.get("bbox")] |
| |
|
| | if valid_spans: |
| | x0s = [span["bbox"][0] for span in valid_spans] |
| | x1s = [span["bbox"][2] for span in valid_spans] |
| | y0s = [span["bbox"][1] for span in valid_spans] |
| | y1s = [span["bbox"][3] for span in valid_spans] |
| |
|
| | header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] |
| |
|
| | if page_num in current_bbox: |
| | cb = current_bbox[page_num] |
| | current_bbox[page_num] = [ |
| | min(cb[0], header_bbox[0]), |
| | min(cb[1], header_bbox[1]), |
| | max(cb[2], header_bbox[2]), |
| | max(cb[3], header_bbox[3]) |
| | ] |
| | else: |
| | current_bbox[page_num] = header_bbox |
| |
|
| | last_y1s[page_num] = header_bbox[3] |
| | x0, y0, x1, y1 = header_bbox |
| | zoom = 200 |
| | left = int(x0) |
| | top = int(y0) |
| | zoom_str = f"{zoom},{left},{top}" |
| | pageNumberFound = page_num + 1 |
| |
|
| | |
| | params = { |
| | 'pdfLink': pdf_path, |
| | 'keyword': heading_to_search, |
| | } |
| |
|
| | |
| | encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} |
| |
|
| | |
| | encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) |
| |
|
| | |
| | final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" |
| |
|
| | |
| | now = datetime.now() |
| |
|
| | |
| | formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") |
| | |
| |
|
| |
|
| | data_entry = { |
| | "NBSLink": final_url, |
| | "Subject": heading_to_search, |
| | "Page": str(pageNumberFound), |
| | "Author": "ADR", |
| | "Creation Date": formatted_time, |
| | "Layer": "Initial", |
| | "Code": stringtowrite, |
| | "head above 1": paths[-2], |
| | "head above 2": paths[0], |
| | "MC Connnection": 'Go to ' + paths[0].strip().split()[0] +'/'+ heading_to_search.strip().split()[0] + ' in '+ filename |
| | } |
| | data_list_JSON.append(data_entry) |
| |
|
| | |
| | json_output = json.dumps(data_list_JSON, indent=4) |
| |
|
| | |
| | i += 2 |
| | continue |
| | if collecting: |
| | norm_line = normalize_text(line_text) |
| |
|
| | |
| | if url_pattern.match(norm_line): |
| | line_is_header = False |
| | else: |
| | line_is_header = any(is_header(span, most_common_font_size, most_common_color, most_common_font) for span in spans) |
| |
|
| | if line_is_header: |
| | header_font_size = max(span["size"] for span in spans) |
| | is_probably_real_header = ( |
| | header_font_size >= matched_header_font_size and |
| | is_header(spans[0], most_common_font_size, most_common_color, most_common_font) and |
| | len(line_text.strip()) > 2 |
| | ) |
| |
|
| | if (norm_line != matched_header_line_norm and |
| | norm_line != heading_norm and |
| | is_probably_real_header): |
| | if line_text not in heading_norm: |
| | collecting = False |
| | done = True |
| | headertoContinue1 = False |
| | headertoContinue2=False |
| | for page_num, bbox in current_bbox.items(): |
| | bbox[3] = last_y1s.get(page_num, bbox[3]) |
| | page_highlights[page_num] = bbox |
| | highlight_boxes(docHighlights, page_highlights,stringtowrite) |
| |
|
| | break_collecting = True |
| | break |
| |
|
| | if break_collecting: |
| | break |
| |
|
| | collected_lines.append(line_text) |
| | valid_spans = [span for span in spans if span.get("bbox")] |
| | if valid_spans: |
| | x0s = [span["bbox"][0] for span in valid_spans] |
| | x1s = [span["bbox"][2] for span in valid_spans] |
| | y0s = [span["bbox"][1] for span in valid_spans] |
| | y1s = [span["bbox"][3] for span in valid_spans] |
| |
|
| | line_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] |
| |
|
| | if page_num in current_bbox: |
| | cb = current_bbox[page_num] |
| | current_bbox[page_num] = [ |
| | min(cb[0], line_bbox[0]), |
| | min(cb[1], line_bbox[1]), |
| | max(cb[2], line_bbox[2]), |
| | max(cb[3], line_bbox[3]) |
| | ] |
| | else: |
| | current_bbox[page_num] = line_bbox |
| |
|
| | last_y1s[page_num] = line_bbox[3] |
| | i += 1 |
| |
|
| | if not done: |
| | for page_num, bbox in current_bbox.items(): |
| | bbox[3] = last_y1s.get(page_num, bbox[3]) |
| | page_highlights[page_num] = bbox |
| | if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : |
| | stringtowrite='Not to be billed' |
| | else: |
| | stringtowrite='To be billed' |
| | highlight_boxes(docHighlights, page_highlights,stringtowrite) |
| |
|
| | |
| |
|
| | pdf_bytes = BytesIO() |
| | docHighlights.save(pdf_bytes) |
| | return pdf_bytes.getvalue(), docHighlights , json_output , Alltexttobebilled , alltextWithoutNotbilled |
| |
|
| |
|
| | def extract_section_under_header_tobebilled2(pdf_path): |
| | top_margin = 70 |
| | bottom_margin = 50 |
| | headertoContinue1 = False |
| | headertoContinue2=False |
| | Alltexttobebilled='' |
| | parsed_url = urlparse(pdf_path) |
| | filename = os.path.basename(parsed_url.path) |
| | filename = unquote(filename) |
| |
|
| | |
| | if pdf_path and ('http' in pdf_path or 'dropbox' in pdf_path): |
| | pdf_path = pdf_path.replace('dl=0', 'dl=1') |
| |
|
| | |
| | response = requests.get(pdf_path) |
| | pdf_content = BytesIO(response.content) |
| | if not pdf_content: |
| | raise ValueError("No valid PDF content found.") |
| |
|
| | doc = fitz.open(stream=pdf_content, filetype="pdf") |
| | docHighlights = fitz.open(stream=pdf_content, filetype="pdf") |
| | most_common_font_size, most_common_color, most_common_font = get_regular_font_size_and_color(doc) |
| |
|
| | |
| | dot_pattern = re.compile(r'\.{3,}') |
| | url_pattern = re.compile(r'https?://\S+|www\.\S+') |
| |
|
| | def get_toc_page_numbers(doc, max_pages_to_check=15): |
| | toc_pages = [] |
| | for page_num in range(min(len(doc), max_pages_to_check)): |
| | page = doc.load_page(page_num) |
| | blocks = page.get_text("dict")["blocks"] |
| |
|
| | dot_line_count = 0 |
| | for block in blocks: |
| | for line in block.get("lines", []): |
| | line_text = get_spaced_text_from_spans(line["spans"]).strip() |
| | if dot_pattern.search(line_text): |
| | dot_line_count += 1 |
| |
|
| | if dot_line_count >= 3: |
| | toc_pages.append(page_num) |
| |
|
| | return list(range(0, toc_pages[-1] +1)) if toc_pages else toc_pages |
| |
|
| | toc_pages = get_toc_page_numbers(doc) |
| |
|
| | headers, top_3_font_sizes, smallest_font_size, headersSpans = extract_headers( |
| | doc, toc_pages, most_common_font_size, most_common_color, most_common_font, top_margin, bottom_margin |
| | ) |
| |
|
| | hierarchy = build_header_hierarchy(doc, toc_pages, most_common_font_size, most_common_color, most_common_font) |
| | listofHeaderstoMarkup = get_leaf_headers_with_paths(hierarchy) |
| | |
| | |
| | allchildrenheaders = [normalize_text(item['text']) for item, p in listofHeaderstoMarkup] |
| | allchildrenheaders_set = set(allchildrenheaders) |
| |
|
| | df = pd.DataFrame(columns=["NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2"]) |
| | dictionaryNBS={} |
| | data_list_JSON = [] |
| | currentgroupname='' |
| | if len(top_3_font_sizes)==3: |
| | mainHeaderFontSize, subHeaderFontSize, subsubheaderFontSize = top_3_font_sizes |
| | elif len(top_3_font_sizes)==2: |
| | mainHeaderFontSize= top_3_font_sizes[0] |
| | subHeaderFontSize= top_3_font_sizes[1] |
| | subsubheaderFontSize= top_3_font_sizes[1] |
| |
|
| | |
| |
|
| | |
| | |
| |
|
| | for heading_to_searchDict, paths in listofHeaderstoMarkup: |
| | heading_to_search = heading_to_searchDict['text'] |
| | heading_to_searchPageNum = heading_to_searchDict['page'] |
| |
|
| | |
| | headertoContinue1 = False |
| | headertoContinue2 = False |
| | matched_header_line = None |
| | done = False |
| | collecting = False |
| | collected_lines = [] |
| | page_highlights = {} |
| | current_bbox = {} |
| | last_y1s = {} |
| | mainHeader = '' |
| | subHeader = '' |
| | matched_header_line_norm = heading_to_search |
| | break_collecting = False |
| | heading_norm = normalize_text(heading_to_search) |
| | paths_norm = [normalize_text(p) for p in paths[0]] if paths and paths[0] else [] |
| | for page_num in range(heading_to_searchPageNum,len(doc)): |
| | print(heading_to_search) |
| | if paths[0].strip().lower() != currentgroupname.strip().lower(): |
| | Alltexttobebilled+= paths[0] +'\n' |
| | currentgroupname=paths[0] |
| | print(paths[0]) |
| |
|
| | |
| | if page_num in toc_pages: |
| | continue |
| | if break_collecting: |
| | break |
| | page=doc[page_num] |
| | page_height = page.rect.height |
| | blocks = page.get_text("dict")["blocks"] |
| |
|
| | for block in blocks: |
| | if break_collecting: |
| | break |
| |
|
| | lines = block.get("lines", []) |
| | i = 0 |
| | while i < len(lines): |
| | if break_collecting: |
| | break |
| |
|
| | spans = lines[i].get("spans", []) |
| | if not spans: |
| | i += 1 |
| | continue |
| |
|
| | y0 = spans[0]["bbox"][1] |
| | y1 = spans[0]["bbox"][3] |
| | if y0 < top_margin or y1 > (page_height - bottom_margin): |
| | i += 1 |
| | continue |
| |
|
| | line_text = get_spaced_text_from_spans(spans).lower() |
| | line_text_norm = normalize_text(line_text) |
| |
|
| | |
| | if i + 1 < len(lines): |
| | next_spans = lines[i + 1].get("spans", []) |
| | next_line_text = get_spaced_text_from_spans(next_spans).lower() |
| | combined_line_norm = normalize_text(line_text + " " + next_line_text) |
| | else: |
| | combined_line_norm = line_text_norm |
| |
|
| | |
| | if combined_line_norm and combined_line_norm in paths[0]: |
| | |
| | headertoContinue1 = combined_line_norm |
| | if combined_line_norm and combined_line_norm in paths[-2]: |
| | |
| | headertoContinue2 = combined_line_norm |
| | if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : |
| | stringtowrite='Not to be billed' |
| | else: |
| | stringtowrite='To be billed' |
| | if stringtowrite=='To be billed': |
| | |
| | if matched_header_line_norm in combined_line_norm: |
| | Alltexttobebilled+='\n' |
| | Alltexttobebilled+= ' '+combined_line_norm |
| | |
| | existsfull = ( |
| | ( combined_line_norm in allchildrenheaders_set or |
| | combined_line_norm in allchildrenheaders ) and heading_to_search in combined_line_norm |
| | ) |
| |
|
| | |
| | current_line_words = set(combined_line_norm.split()) |
| | heading_words = set(heading_norm.split()) |
| | all_words_match = current_line_words.issubset(heading_words) and len(current_line_words) > 0 |
| |
|
| | substring_match = ( |
| | heading_norm in combined_line_norm or |
| | combined_line_norm in heading_norm or |
| | all_words_match |
| | ) |
| | |
| | |
| | |
| | |
| |
|
| | if (substring_match and existsfull and not collecting and |
| | len(combined_line_norm) > 0 ): |
| |
|
| | |
| | header_spans = [ |
| | span for span in spans |
| | if (is_header(span, most_common_font_size, most_common_color, most_common_font) |
| | |
| | and span['size'] < mainHeaderFontSize) |
| | ] |
| | if header_spans: |
| | collecting = True |
| | |
| | |
| | matched_header_font_size = max(span["size"] for span in header_spans) |
| |
|
| | collected_lines.append(line_text) |
| | valid_spans = [span for span in spans if span.get("bbox")] |
| |
|
| | if valid_spans: |
| | x0s = [span["bbox"][0] for span in valid_spans] |
| | x1s = [span["bbox"][2] for span in valid_spans] |
| | y0s = [span["bbox"][1] for span in valid_spans] |
| | y1s = [span["bbox"][3] for span in valid_spans] |
| |
|
| | header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] |
| |
|
| | if page_num in current_bbox: |
| | cb = current_bbox[page_num] |
| | current_bbox[page_num] = [ |
| | min(cb[0], header_bbox[0]), |
| | min(cb[1], header_bbox[1]), |
| | max(cb[2], header_bbox[2]), |
| | max(cb[3], header_bbox[3]) |
| | ] |
| | else: |
| | current_bbox[page_num] = header_bbox |
| | last_y1s[page_num] = header_bbox[3] |
| | x0, y0, x1, y1 = header_bbox |
| |
|
| | zoom = 200 |
| | left = int(x0) |
| | top = int(y0) |
| | zoom_str = f"{zoom},{left},{top}" |
| | pageNumberFound = page_num + 1 |
| |
|
| | |
| | params = { |
| | 'pdfLink': pdf_path, |
| | 'keyword': heading_to_search, |
| | } |
| |
|
| | |
| | encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} |
| |
|
| | |
| | encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) |
| |
|
| | |
| | final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" |
| |
|
| | |
| | now = datetime.now() |
| |
|
| | |
| | formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") |
| | |
| |
|
| |
|
| | data_entry = { |
| | "NBSLink": final_url, |
| | "Subject": heading_to_search, |
| | "Page": str(pageNumberFound), |
| | "Author": "ADR", |
| | "Creation Date": formatted_time, |
| | "Layer": "Initial", |
| | "Code": stringtowrite, |
| | "head above 1": paths[-2], |
| | "head above 2": paths[0], |
| | "MC Connnection": 'Go to ' + paths[0].strip().split()[0] +'/'+ heading_to_search.strip().split()[0] + ' in '+ filename |
| | } |
| | data_list_JSON.append(data_entry) |
| |
|
| | |
| | json_output = json.dumps(data_list_JSON, indent=4) |
| |
|
| | i += 2 |
| | continue |
| | else: |
| | if (substring_match and not collecting and |
| | len(combined_line_norm) > 0): |
| |
|
| | |
| | word_match_percent = words_match_ratio(heading_norm, combined_line_norm) * 100 |
| |
|
| | |
| | meets_word_threshold = word_match_percent >= 100 |
| |
|
| | |
| | header_spans = [ |
| | span for span in spans |
| | if (is_header(span, most_common_font_size, most_common_color, most_common_font) |
| | |
| | and span['size'] < mainHeaderFontSize) |
| | ] |
| |
|
| | if header_spans and (meets_word_threshold or same_start_word(heading_to_search, combined_line_norm) ): |
| | collecting = True |
| | if stringtowrite=='To be billed': |
| | Alltexttobebilled+='\n' |
| | |
| | |
| | matched_header_font_size = max(span["size"] for span in header_spans) |
| | |
| | collected_lines.append(line_text) |
| | valid_spans = [span for span in spans if span.get("bbox")] |
| |
|
| | if valid_spans: |
| | x0s = [span["bbox"][0] for span in valid_spans] |
| | x1s = [span["bbox"][2] for span in valid_spans] |
| | y0s = [span["bbox"][1] for span in valid_spans] |
| | y1s = [span["bbox"][3] for span in valid_spans] |
| |
|
| | header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] |
| |
|
| | if page_num in current_bbox: |
| | cb = current_bbox[page_num] |
| | current_bbox[page_num] = [ |
| | min(cb[0], header_bbox[0]), |
| | min(cb[1], header_bbox[1]), |
| | max(cb[2], header_bbox[2]), |
| | max(cb[3], header_bbox[3]) |
| | ] |
| | else: |
| | current_bbox[page_num] = header_bbox |
| |
|
| | last_y1s[page_num] = header_bbox[3] |
| | x0, y0, x1, y1 = header_bbox |
| | zoom = 200 |
| | left = int(x0) |
| | top = int(y0) |
| | zoom_str = f"{zoom},{left},{top}" |
| | pageNumberFound = page_num + 1 |
| |
|
| | |
| | params = { |
| | 'pdfLink': pdf_path, |
| | 'keyword': heading_to_search, |
| | } |
| |
|
| | |
| | encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} |
| |
|
| | |
| | encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) |
| |
|
| | |
| | final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" |
| |
|
| | |
| | now = datetime.now() |
| |
|
| | |
| | formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") |
| | |
| |
|
| |
|
| | data_entry = { |
| | "NBSLink": final_url, |
| | "Subject": heading_to_search, |
| | "Page": str(pageNumberFound), |
| | "Author": "ADR", |
| | "Creation Date": formatted_time, |
| | "Layer": "Initial", |
| | "Code": stringtowrite, |
| | "head above 1": paths[-2], |
| | "head above 2": paths[0], |
| | "MC Connnection": 'Go to ' + paths[0].strip().split()[0] +'/'+ heading_to_search.strip().split()[0] + ' in '+ filename |
| | } |
| | data_list_JSON.append(data_entry) |
| |
|
| | |
| | json_output = json.dumps(data_list_JSON, indent=4) |
| |
|
| | |
| | i += 2 |
| | continue |
| | if collecting: |
| | norm_line = normalize_text(line_text) |
| |
|
| | |
| | if url_pattern.match(norm_line): |
| | line_is_header = False |
| | else: |
| | line_is_header = any(is_header(span, most_common_font_size, most_common_color, most_common_font) for span in spans) |
| |
|
| | if line_is_header: |
| | header_font_size = max(span["size"] for span in spans) |
| | is_probably_real_header = ( |
| | header_font_size >= matched_header_font_size and |
| | is_header(spans[0], most_common_font_size, most_common_color, most_common_font) and |
| | len(line_text.strip()) > 2 |
| | ) |
| |
|
| | if (norm_line != matched_header_line_norm and |
| | norm_line != heading_norm and |
| | is_probably_real_header): |
| | if line_text not in heading_norm: |
| | collecting = False |
| | done = True |
| | headertoContinue1 = False |
| | headertoContinue2=False |
| | for page_num, bbox in current_bbox.items(): |
| | bbox[3] = last_y1s.get(page_num, bbox[3]) |
| | page_highlights[page_num] = bbox |
| | highlight_boxes(docHighlights, page_highlights,stringtowrite) |
| |
|
| | break_collecting = True |
| | break |
| |
|
| | if break_collecting: |
| | break |
| |
|
| | collected_lines.append(line_text) |
| | valid_spans = [span for span in spans if span.get("bbox")] |
| | if valid_spans: |
| | x0s = [span["bbox"][0] for span in valid_spans] |
| | x1s = [span["bbox"][2] for span in valid_spans] |
| | y0s = [span["bbox"][1] for span in valid_spans] |
| | y1s = [span["bbox"][3] for span in valid_spans] |
| |
|
| | line_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] |
| |
|
| | if page_num in current_bbox: |
| | cb = current_bbox[page_num] |
| | current_bbox[page_num] = [ |
| | min(cb[0], line_bbox[0]), |
| | min(cb[1], line_bbox[1]), |
| | max(cb[2], line_bbox[2]), |
| | max(cb[3], line_bbox[3]) |
| | ] |
| | else: |
| | current_bbox[page_num] = line_bbox |
| |
|
| | last_y1s[page_num] = line_bbox[3] |
| | i += 1 |
| |
|
| | if not done: |
| | for page_num, bbox in current_bbox.items(): |
| | bbox[3] = last_y1s.get(page_num, bbox[3]) |
| | page_highlights[page_num] = bbox |
| | if 'installation' in paths[-2].lower() or 'execution' in paths[-2].lower() or 'miscellaneous items' in paths[-2].lower() : |
| | stringtowrite='Not to be billed' |
| | else: |
| | stringtowrite='To be billed' |
| | highlight_boxes(docHighlights, page_highlights,stringtowrite) |
| |
|
| | |
| |
|
| | pdf_bytes = BytesIO() |
| | docHighlights.save(pdf_bytes) |
| |
|
| | return pdf_bytes.getvalue(), docHighlights , json_output, Alltexttobebilled |
| |
|
| |
|
| |
|
| |
|