Spaces:
Sleeping
Sleeping
| # -*- coding: utf-8 -*- | |
| """FindSpecsTrial(Retrieving+boundingBoxes).ipynb | |
| Automatically generated by Colab. | |
| Original file is located at | |
| https://colab.research.google.com/drive/1mFuB1gtGuVh3NlOnNTzOFnDVuWSwn18q | |
| """ | |
| import fitz # PyMuPDF | |
| from io import BytesIO | |
| import re | |
| import requests | |
| import pandas as pd | |
| from collections import Counter | |
| import fitz # PyMuPDF | |
| import re | |
| import urllib.parse | |
| import pandas as pd | |
| import math | |
| import random | |
| # import tempfile | |
| # from fpdf import FPDF | |
| import json | |
| from datetime import datetime | |
| baselink='https://marthee-nbslink.hf.space/view-pdf?' | |
| def get_repeated_texts(pdf_document, threshold=0.85): | |
| """ | |
| Identify text that appears on most pages, with font size and color. | |
| :param pdf_document: The opened PDF document. | |
| :param threshold: The percentage of pages a text must appear on to be considered "repeated". | |
| :return: A list of dictionaries with text, font size, and color. | |
| """ | |
| text_counts = Counter() | |
| text_metadata = defaultdict(list) | |
| total_pages = pdf_document.page_count | |
| for page_num in range(total_pages): | |
| page = pdf_document.load_page(page_num) | |
| blocks = page.get_text("dict")["blocks"] | |
| seen_texts = set() # To avoid counting the same text twice per page | |
| for block in blocks: | |
| if "lines" not in block: | |
| continue | |
| for line in block["lines"]: | |
| for span in line["spans"]: | |
| text = span["text"].strip() | |
| if not text: | |
| continue | |
| if text not in seen_texts: | |
| seen_texts.add(text) | |
| text_counts[text] += 1 | |
| text_metadata[text].append({ | |
| "font_size": span.get("size"), | |
| "color": span.get("color") | |
| }) | |
| # Find texts that appear in at least `threshold * total_pages` pages | |
| min_occurrence = max(2, int(threshold * total_pages)) | |
| repeated_texts_info = [] | |
| for text, count in text_counts.items(): | |
| if count >= min_occurrence: | |
| sizes = [meta["font_size"] for meta in text_metadata[text]] | |
| colors = [meta["color"] for meta in text_metadata[text]] | |
| # Get the most common size and color used for this text | |
| most_common_size = max(set(sizes), key=sizes.count) | |
| most_common_color = max(set(colors), key=colors.count) | |
| repeated_texts_info.append({ | |
| "text": text, | |
| "font_size": most_common_size, | |
| "color": most_common_color | |
| }) | |
| return repeated_texts_info | |
| def get_regular_font_size_and_color(doc): | |
| font_sizes = [] | |
| colors = [] | |
| fonts = [] | |
| # Loop through all pages | |
| for page_num in range(len(doc)): | |
| page = doc.load_page(page_num) | |
| for span in page.get_text("dict")["blocks"]: | |
| if "lines" in span: | |
| for line in span["lines"]: | |
| for span in line["spans"]: | |
| font_sizes.append(span['size']) | |
| colors.append(span['color']) | |
| fonts.append(span['font']) | |
| # Get the most common font size, color, and font | |
| most_common_font_size = Counter(font_sizes).most_common(1)[0][0] if font_sizes else None | |
| most_common_color = Counter(colors).most_common(1)[0][0] if colors else None | |
| most_common_font = Counter(fonts).most_common(1)[0][0] if fonts else None | |
| return most_common_font_size, most_common_color, most_common_font | |
| import re | |
| from collections import defaultdict | |
| import fitz # PyMuPDF | |
| import requests | |
| from io import BytesIO | |
| def normalize_text(text): | |
| return re.sub(r'\s+', ' ', text.strip().lower()) | |
| def get_spaced_text_from_spans(spans): | |
| return normalize_text(" ".join(span["text"].strip() for span in spans)) | |
| def is_header(span, most_common_font_size, most_common_color, most_common_font): | |
| fontname = span.get("font", "").lower() | |
| is_italic = "italic" in fontname or "oblique" in fontname | |
| is_bold = "bold" in fontname or span.get("bold", False) | |
| return ( | |
| not is_italic and ( | |
| span["size"] > most_common_font_size or | |
| # span["color"] != most_common_color or | |
| span["font"].lower() != most_common_font.lower() or | |
| is_bold | |
| ) | |
| ) | |
| def merge_consecutive_words(headers): | |
| result = [] | |
| i = 0 | |
| while i < len(headers): | |
| if i + 1 < len(headers) and headers[i] + ' ' + headers[i + 1] in headers: | |
| result.append(headers[i] + ' ' + headers[i + 1]) | |
| i += 2 | |
| else: | |
| result.append(headers[i]) | |
| i += 1 | |
| return result | |
| def extract_headers(doc, toc_pages, most_common_font_size, most_common_color, most_common_font, top_margin, bottom_margin): | |
| print("Font baseline:", most_common_font_size, most_common_color, most_common_font) | |
| grouped_headers_by_y = defaultdict(list) | |
| for pageNum in range(len(doc)): | |
| if pageNum in toc_pages: | |
| continue | |
| page = doc.load_page(pageNum) | |
| page_height = page.rect.height | |
| text_instances = page.get_text("dict") | |
| for block in text_instances['blocks']: | |
| if block['type'] != 0: | |
| continue | |
| for line in block['lines']: | |
| for span in line['spans']: | |
| span_y = round(span['bbox'][1]) | |
| span_text = normalize_text(span.get('text', '')) | |
| span_y0 = span['bbox'][1] # Top Y of this span | |
| span_y1 = span['bbox'][3] # Bottom Y of this span | |
| if span_y0 < top_margin or span_y1 > (page_height - bottom_margin): | |
| continue | |
| if not span_text: | |
| continue | |
| if span_text.startswith('http://www') or span_text.startswith('www'): | |
| continue | |
| if any(( | |
| 'page' in span_text, | |
| not re.search(r'[a-z0-9]', span_text), | |
| 'end of section' in span_text, | |
| re.search(r'page\s+\d+\s+of\s+\d+', span_text), | |
| re.search(r'\b(?:\d{1,2}[/-])?\d{1,2}[/-]\d{2,4}\b', span_text), | |
| re.search(r'\b(?:jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)', span_text), | |
| 'specification:' in span_text | |
| )): | |
| continue | |
| span_text = re.sub(r'[.\-]{4,}.*$', '', span_text).strip() | |
| span_text = normalize_text(span_text) | |
| if is_header(span, most_common_font_size, most_common_color, most_common_font): | |
| grouped_headers_by_y[(pageNum, span_y)].append({ | |
| "text": span_text, | |
| "size": span["size"], | |
| "pageNum": pageNum | |
| }) | |
| headers = [] | |
| for (pageNum, y), spans in sorted(grouped_headers_by_y.items()): | |
| combined_text = " ".join(span['text'] for span in spans) | |
| first_span = spans[0] | |
| headers.append([combined_text, first_span['size'], first_span['pageNum'], y]) # <--- ADDED 'y' | |
| # Analyze font sizes | |
| font_sizes = [size for _, size, _, _ in headers] # <--- UNPACK 4 items now | |
| font_size_counts = Counter(font_sizes) | |
| top_3_font_sizes = sorted(font_size_counts.keys(), reverse=True)[:3] | |
| return headers, top_3_font_sizes | |
| class ColorManager: | |
| def __init__(self, palette, min_distance=100): | |
| self.palette = palette.copy() | |
| self.used_colors = palette.copy() | |
| self.idx = 0 | |
| self.min_distance = min_distance | |
| def color_distance(self, c1, c2): | |
| return math.sqrt(sum((a - b) ** 2 for a, b in zip(c1, c2))) | |
| def generate_new_color(self): | |
| max_attempts = 1000 | |
| for _ in range(max_attempts): | |
| new_color = (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)) | |
| if all(self.color_distance(new_color, existing) > self.min_distance for existing in self.used_colors): | |
| self.used_colors.append(new_color) | |
| return new_color | |
| raise ValueError("Couldn't find a distinct color after many attempts.") | |
| def get_next_color(self): | |
| if self.idx < len(self.palette): | |
| color = self.palette[self.idx] | |
| else: | |
| color = self.generate_new_color() | |
| self.idx += 1 | |
| return color | |
| # Your original color palette | |
| color_palette = [ | |
| (255, 0, 0), (0, 0, 255), (0, 255, 255), (0, 64, 0), (255, 204, 0), | |
| (255, 128, 64), (255, 0, 128), (255, 128, 192), (128, 128, 255), | |
| (128, 64, 0), (0, 255, 0), (0, 200, 0), (255, 128, 255), (128, 0, 255), | |
| (0, 128, 192), (128, 0, 128), (128, 0, 0), (0, 128, 255), (149, 1, 70), | |
| (255, 182, 128), (222, 48, 71), (240, 0, 112), (255, 0, 255), | |
| (192, 46, 65), (0, 0, 128), (0, 128, 64), (255, 255, 0), (128, 0, 80), | |
| (255, 255, 128), (90, 255, 140), (255, 200, 20), (91, 16, 51), | |
| (90, 105, 138), (114, 10, 138), (36, 82, 78), (225, 105, 190), | |
| (108, 150, 170), (11, 35, 75), (42, 176, 170), (255, 176, 170), | |
| (209, 151, 15), (81, 27, 85), (226, 106, 122), (67, 119, 149), | |
| (159, 179, 140), (159, 179, 30), (255, 85, 198), (255, 27, 85), | |
| (188, 158, 8), (140, 188, 120), (59, 61, 52), (65, 81, 21), | |
| (212, 255, 174), (15, 164, 90), (41, 217, 245), (213, 23, 182), | |
| (11, 85, 169), (78, 153, 239), (0, 66, 141), (64, 98, 232), | |
| (140, 112, 255), (57, 33, 154), (194, 117, 252), (116, 92, 135), | |
| (74, 43, 98), (188, 13, 123), (129, 58, 91), (255, 128, 100), | |
| (171, 122, 145), (255, 98, 98), (222, 48, 77) | |
| ] | |
| # Create ONE color manager and re-use it | |
| color_manager = ColorManager(color_palette) | |
| def highlight_boxes(doc, highlights,color): | |
| for page_num, bbox in highlights.items(): | |
| page = doc.load_page(page_num) | |
| rect = fitz.Rect(bbox) | |
| annot = page.add_rect_annot(rect) | |
| rgb_color = tuple(c / 255 for c in color) # Normalize | |
| annot.set_colors(stroke=rgb_color, fill=rgb_color) | |
| annot.set_opacity(0.3) | |
| annot.update() | |
| def find_full_line_in_toc(doc, toc_pages, substring): | |
| substring = normalize_text(substring) # Normalize for matching | |
| best_match = None | |
| for page_num in toc_pages: | |
| page = doc.load_page(page_num) | |
| blocks = page.get_text("dict")["blocks"] | |
| for block in blocks: | |
| for line in block.get("lines", []): | |
| line_text = get_spaced_text_from_spans(line.get("spans", [])).strip() | |
| normalized_line = normalize_text(line_text) | |
| if substring in normalized_line: | |
| # Remove dots and anything after | |
| line_text = re.split(r'\.{2,}', line_text)[0].strip() | |
| best_match = line_text | |
| return best_match # stop at first match | |
| return None | |
| def extract_section_under_header(pdf_path, target_header_LIST): | |
| top_margin=70 | |
| bottom_margin=50 | |
| df = pd.DataFrame(columns=["NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2"]) | |
| dictionaryNBS={} | |
| data_list_JSON = [] | |
| if pdf_path and ('http' in pdf_path or 'dropbox' in pdf_path): | |
| pdf_path = pdf_path.replace('dl=0', 'dl=1') | |
| response = requests.get(pdf_path) | |
| pdf_content = BytesIO(response.content) | |
| if not pdf_content: | |
| raise ValueError("No valid PDF content found.") | |
| doc = fitz.open(stream=pdf_content, filetype="pdf") | |
| most_common_font_size, most_common_color, most_common_font =get_regular_font_size_and_color(doc) | |
| def get_toc_page_numbers(doc, max_pages_to_check=15): | |
| toc_pages = [] | |
| for page_num in range(min(len(doc), max_pages_to_check)): | |
| page = doc.load_page(page_num) | |
| blocks = page.get_text("dict")["blocks"] | |
| dot_line_count = 0 | |
| lines_with_numbers_at_end = 0 | |
| for block in blocks: | |
| for line in block.get("lines", []): | |
| line_text = get_spaced_text_from_spans(line["spans"]).strip() | |
| if re.search(r'\.{3,}', line_text): | |
| dot_line_count += 1 | |
| # if re.search(r'\s\d{1,3}$', line_text): | |
| # lines_with_numbers_at_end += 1 | |
| if dot_line_count >= 3 :#or lines_with_numbers_at_end >= 4: | |
| toc_pages.append(page_num) | |
| if bool(toc_pages): | |
| return list(range(0, toc_pages[-1] + 1)) | |
| return toc_pages | |
| toc_pages = get_toc_page_numbers(doc) | |
| headers,top_3_font_sizes=extract_headers(doc,toc_pages,most_common_font_size, most_common_color, most_common_font,top_margin,bottom_margin) | |
| if top_3_font_sizes: | |
| mainHeaderFontSize, subHeaderFontSize, subsubheaderFontSize = top_3_font_sizes | |
| print("Detected headers:", headers) | |
| headers_set = set() | |
| headers_dict = {} | |
| for h in headers: | |
| norm_text = normalize_text(h[0]) # h[0] is the text | |
| headers_set.add(norm_text) | |
| headers_dict[norm_text] = (h[0], h[1], h[2]) # (text, size, pageNum) | |
| results = {} | |
| print("📌 Has TOC:", bool(toc_pages), " | Pages to skip:", toc_pages) | |
| matched_header_line = None # <-- Will store the line that acts as header | |
| for heading_to_search in target_header_LIST: | |
| print('headertosearch',heading_to_search) | |
| matched_header_line = None | |
| done=False | |
| target_header = normalize_text(heading_to_search) | |
| if target_header not in headers_set: | |
| print(f"Header '{target_header}' not found. Searching for best match...") | |
| heading_words = set(target_header.split()) | |
| best_match_score = 0 | |
| for page_num in range(len(doc)): | |
| page = doc.load_page(page_num) | |
| blocks = page.get_text("dict")["blocks"] | |
| for block in blocks: | |
| for line in block.get("lines", []): | |
| line_text = " ".join(span["text"].strip() for span in line.get("spans", [])) | |
| if not line_text: | |
| continue | |
| line_words = set(re.findall(r'\w+', line_text.lower())) | |
| match_count = len(heading_words & line_words) | |
| if match_count > best_match_score: | |
| best_match_score = match_count | |
| matched_header_line = line_text.strip() | |
| if matched_header_line: | |
| print(f"✅ Best match: '{matched_header_line}' with score {best_match_score}") | |
| else: | |
| print("❌ No suitable match found.") | |
| return | |
| else: | |
| matched_header_line = target_header # Exact match | |
| # matched_header_line = target_header | |
| matched_header_font_size = most_common_font_size | |
| collecting = False | |
| collected_lines = [] | |
| page_highlights = {} | |
| current_bbox = {} | |
| last_y1s = {} | |
| mainHeader='' | |
| subHeader='' | |
| matched_header_line_norm = normalize_text(matched_header_line) | |
| color = color_manager.get_next_color() | |
| for page_num in range(len(doc)): | |
| if page_num in toc_pages: | |
| continue | |
| page = doc.load_page(page_num) | |
| page_height = page.rect.height | |
| blocks = page.get_text("dict")["blocks"] | |
| for block in blocks: | |
| lines = block.get("lines", []) | |
| i = 0 | |
| while i < len(lines): | |
| spans = lines[i].get("spans", []) | |
| if not spans: | |
| i += 1 | |
| continue | |
| y0 = spans[0]["bbox"][1] | |
| y1 = spans[0]["bbox"][3] | |
| if y0 < top_margin or y1 > (page_height - bottom_margin): | |
| i += 1 | |
| continue | |
| # print(line_text) | |
| line_text = get_spaced_text_from_spans(spans).lower() | |
| line_text_norm = normalize_text(line_text) | |
| if i + 1 < len(lines): | |
| next_spans = lines[i + 1].get("spans", []) | |
| next_line_text = get_spaced_text_from_spans(next_spans).lower() | |
| combined_line = (line_text + " " + next_line_text).strip() | |
| combined_line_norm = normalize_text(combined_line) | |
| else: | |
| combined_line = line_text | |
| combined_line_norm = line_text_norm | |
| # if not done and not collecting: | |
| if not done and not collecting: | |
| for span in spans: | |
| if len(normalize_text(span['text'])) > 1: | |
| if is_header(span, most_common_font_size, most_common_color, most_common_font): | |
| for header in headers: | |
| header_text, header_size, header_page, header_y = header # 4 elements now! | |
| # Check if combined_line_norm is inside header text | |
| if combined_line_norm in header_text: | |
| # Also check that the Y position is close (for example, within 5 pixels) | |
| # if abs(span['bbox'][1] - header_y) < 1: | |
| print('comb:,',combined_line_norm) | |
| if header_size == mainHeaderFontSize: | |
| mainHeader=find_full_line_in_toc(doc, toc_pages, combined_line_norm) | |
| print('main:', mainHeader) | |
| elif header_size == subHeaderFontSize: | |
| subHeader = combined_line_norm | |
| print('sub:', subHeader) | |
| # Start collecting if we find the target header | |
| if matched_header_line_norm in combined_line_norm and not collecting: | |
| if any(is_header(span, most_common_font_size, most_common_color, most_common_font) for span in spans): | |
| collecting = True | |
| header_font_sizes = [span["size"] for span in spans if is_header(span, most_common_font_size, most_common_color, most_common_font)] | |
| if header_font_sizes: | |
| matched_header_font_size = max(header_font_sizes) | |
| print(f"📥 Start collecting after header: {combined_line} (Font size: {matched_header_font_size})") | |
| pageNumberFound = page_num +1 | |
| # Collect the header line text and bbox too! | |
| collected_lines.append(line_text) | |
| valid_spans = [span for span in spans if span.get("bbox")] | |
| if valid_spans: | |
| x0s = [span["bbox"][0] for span in valid_spans] | |
| x1s = [span["bbox"][2] for span in valid_spans] | |
| y0s = [span["bbox"][1] for span in valid_spans] | |
| y1s = [span["bbox"][3] for span in valid_spans] | |
| left = int(x0s[0]) | |
| top = int(y0s[0]) | |
| print(left,type(left),top,type(top)) | |
| header_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] | |
| if page_num in current_bbox: | |
| cb = current_bbox[page_num] | |
| current_bbox[page_num] = [ | |
| min(cb[0], header_bbox[0]), | |
| min(cb[1], header_bbox[1]), | |
| max(cb[2], header_bbox[2]), | |
| max(cb[3], header_bbox[3]) | |
| ] | |
| else: | |
| current_bbox[page_num] = header_bbox | |
| last_y1s[page_num] = header_bbox[3] | |
| i += 2 | |
| continue | |
| if collecting: | |
| norm_line = normalize_text(line_text) | |
| norm_combined = normalize_text(combined_line) | |
| # 🧠 Skip URL-like lines from being considered headers | |
| if re.match(r'https?://\S+|www\.\S+', norm_line): | |
| line_is_header = False | |
| else: | |
| line_is_header = any(is_header(span, most_common_font_size, most_common_color, most_common_font) for span in spans) | |
| if line_is_header: | |
| header_font_size = max(span["size"] for span in spans) | |
| is_probably_real_header = ( | |
| header_font_size >= matched_header_font_size and | |
| is_header(spans[0], most_common_font_size, most_common_color, most_common_font) and | |
| len(line_text.strip()) > 2 | |
| ) | |
| if (norm_line != matched_header_line_norm and | |
| norm_combined != matched_header_line_norm and | |
| is_probably_real_header): | |
| print(f"🛑 Stop at header with same or larger font: '{line_text}' ({header_font_size} ≥ {matched_header_font_size})") | |
| collecting = False | |
| done=True | |
| result_text = (matched_header_line + "\n" + "\n".join(collected_lines)).strip().lower() | |
| print("\n📄 Final collected section (early return):\n" , mainHeader,subHeader) | |
| print(result_text) | |
| for page_num, bbox in current_bbox.items(): | |
| # update y1 to stop exactly at last_y1 | |
| bbox[3] = last_y1s.get(page_num, bbox[3]) | |
| page_highlights[page_num] = bbox | |
| highlight_boxes(doc, page_highlights,color) | |
| zoom = 200 | |
| zoom_str = f"{zoom},{left},{top}" | |
| print('zoooom',zoom_str) | |
| params = { | |
| 'pdfLink': pdf_path, # Your PDF link | |
| 'keyword': heading_to_search, # Your keyword (could be a string or list) | |
| } | |
| # URL encode each parameter | |
| encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} | |
| # Construct the final encoded link | |
| encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) | |
| # Correctly construct the final URL with page and zoom | |
| zoom_str = f"{zoom},{left},{top}" | |
| final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" | |
| print(final_url) | |
| # Get current date and time | |
| now = datetime.now() | |
| # Format the output | |
| formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") | |
| if mainHeader: | |
| data_entry = { | |
| "NBSLink": final_url, | |
| "Subject": 'Markup (initial)', | |
| "Page": str(pageNumberFound), | |
| "Author": "ADR", | |
| "Creation Date": formatted_time, | |
| "Layer": "Initial", | |
| "Code": heading_to_search, | |
| "head above 1": mainHeader, | |
| "head above 2": subHeader | |
| } | |
| data_list_JSON.append(data_entry) | |
| # Convert list to JSON | |
| print('heree') | |
| # json_output = json.dumps(data_list_JSON, indent=4) | |
| # return result_text | |
| collected_lines.append(line_text) | |
| valid_spans = [span for span in spans if span.get("bbox")] | |
| if valid_spans: | |
| x0s = [span["bbox"][0] for span in valid_spans] | |
| x1s = [span["bbox"][2] for span in valid_spans] | |
| y0s = [span["bbox"][1] for span in valid_spans] | |
| y1s = [span["bbox"][3] for span in valid_spans] | |
| line_bbox = [min(x0s), min(y0s), max(x1s), max(y1s)] | |
| if page_num in current_bbox: | |
| cb = current_bbox[page_num] | |
| current_bbox[page_num] = [ | |
| min(cb[0], line_bbox[0]), | |
| min(cb[1], line_bbox[1]), | |
| max(cb[2], line_bbox[2]), | |
| max(cb[3], line_bbox[3]) | |
| ] | |
| else: | |
| current_bbox[page_num] = line_bbox | |
| last_y1s[page_num] = line_bbox[3] | |
| i += 1 | |
| # doc.save("highlighted_output.pdf", garbage=4, deflate=True) | |
| result_text = (matched_header_line + "\n" + "\n".join(collected_lines)).strip().lower() | |
| print("\n📄 Final collected section:\n") | |
| pdf_bytes = BytesIO() | |
| doc.save(pdf_bytes) | |
| print('aa') | |
| print('JSONN',data_list_JSON) | |
| return pdf_bytes.getvalue(), doc , df, data_list_JSON | |