Spaces:
Sleeping
Sleeping
| import fitz # PyMuPDF | |
| from io import BytesIO | |
| import re | |
| import requests | |
| import pandas as pd | |
| from collections import Counter | |
| import fitz # PyMuPDF | |
| import re | |
| import urllib.parse | |
| import pandas as pd | |
| import tempfile | |
| from fpdf import FPDF | |
| import json | |
| from datetime import datetime | |
| baselink='https://marthee-nbslink.hf.space/view-pdf?' | |
| class PDF(FPDF): | |
| def header(self): | |
| self.set_font("Arial", "B", 12) | |
| self.cell(0, 10, "NBS Document Links", ln=True, align="C") | |
| self.ln(5) # Space after header | |
| def save_df_to_pdf(df): | |
| pdf = PDF() | |
| pdf.set_auto_page_break(auto=True, margin=15) | |
| # Set equal margins | |
| margin = 15 | |
| pdf.set_left_margin(margin) | |
| pdf.set_right_margin(margin) | |
| pdf.add_page() | |
| pdf.set_font("Arial", size=8) # Reduce font size to fit more text | |
| # Table headers | |
| headers = ["NBSLink", "Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2"] | |
| num_cols = len(headers) | |
| # Calculate column width dynamically | |
| max_table_width = pdf.w - 2 * margin # Total available width | |
| col_width = max_table_width / num_cols # Distribute evenly | |
| table_width = col_width * num_cols | |
| # Get page width and calculate left alignment | |
| page_width = pdf.w | |
| start_x = (page_width - table_width) / 2 # Centering the table | |
| pdf.set_x(start_x) # Move to calculated start position | |
| # Table headers | |
| pdf.set_fill_color(200, 200, 200) # Light gray background | |
| pdf.set_font("Arial", "B", 8) | |
| for header in headers: | |
| pdf.cell(col_width, 8, header, border=1, fill=True, align="C") | |
| pdf.ln() | |
| pdf.set_font("Arial", size=7) # Reduce font size for data rows | |
| for _, row in df.iterrows(): | |
| x_start = start_x # Ensure every row starts at the same position | |
| y_start = pdf.get_y() | |
| # Calculate max height needed for this row | |
| text_lines = {col: pdf.multi_cell(col_width, 5, row[col], border=0, align="L", split_only=True) for col in ["Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2"]} | |
| max_lines = max(len(lines) for lines in text_lines.values()) | |
| max_height = max_lines * 5 | |
| pdf.set_x(x_start) # Ensure correct alignment for each row | |
| # Clickable link cell (keeps same height as others) | |
| pdf.cell(col_width, max_height, "Click Here", border=1, link=row["NBSLink"], align="C") | |
| # Move to next column | |
| pdf.set_xy(x_start + col_width, y_start) | |
| # Draw each cell manually, ensuring equal height | |
| for i, col_name in enumerate(["Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2"]): | |
| x_col = x_start + col_width * (i + 1) | |
| y_col = y_start | |
| pdf.multi_cell(col_width, 5, row[col_name], border=0, align="L") # Draw text | |
| pdf.rect(x_col, y_col, col_width, max_height) # Draw border | |
| pdf.set_xy(x_col + col_width, y_start) # Move to next column | |
| # Move to the next row | |
| pdf.ln(max_height) | |
| # Save PDF to memory | |
| pdf_output = pdf.output(dest="S").encode("latin1") | |
| return pdf_output | |
| def normalize_text(text): | |
| """Lowercase, remove extra spaces, and strip special characters.""" | |
| text = text.lower().strip() | |
| text = re.sub(r'\s+', ' ', text) # Normalize multiple spaces | |
| return re.sub(r'[^\w\s]', '', text) # Remove punctuation | |
| def get_repeated_texts(pdf_document, threshold=0.85): | |
| """ | |
| Identify text that appears on most pages. | |
| :param pdf_document: The opened PDF document. | |
| :param threshold: The percentage of pages a text must appear on to be considered "repeated". | |
| """ | |
| text_counts = Counter() | |
| total_pages = pdf_document.page_count | |
| for page_num in range(total_pages): | |
| page = pdf_document.load_page(page_num) | |
| page_text = page.get_text("text") | |
| normalized_lines = {normalize_text(line) for line in page_text.splitlines() if line.strip()} | |
| text_counts.update(normalized_lines) | |
| # Find texts that appear in at least `threshold * total_pages` pages | |
| min_occurrence = max(1, int(threshold * total_pages)) | |
| repeated_texts = {text for text, count in text_counts.items() if count >= min_occurrence} | |
| return repeated_texts | |
| def split_links(links_string): | |
| """Split a comma-separated string of links into an array of trimmed links.""" | |
| return [link.strip() for link in links_string.split(',')] | |
| def annotate_text_from_pdf(pdfshareablelinks, LISTheading_to_search): | |
| """ | |
| Annotates text under a specific heading in a PDF, highlights it, | |
| and constructs zoom coordinates for the first occurrence of the heading. | |
| Args: | |
| pdfshareablelinks (list): List of shareable links to PDFs. | |
| heading_to_search (str): The heading to search for in the PDF. | |
| Returns: | |
| Tuple: Annotated PDF bytes, count of heading occurrences, and zoom string. | |
| """ | |
| print("Input links:", pdfshareablelinks) | |
| print(LISTheading_to_search) | |
| link = pdfshareablelinks[0] | |
| pdf_content = None | |
| headings_TOC = [] | |
| # Modify Dropbox shareable link for direct download | |
| if link and ('http' in link or 'dropbox' in link): | |
| if 'dl=0' in link: | |
| link = link.replace('dl=0', 'dl=1') | |
| # Download the PDF content from the shareable link | |
| response = requests.get(link) | |
| pdf_content = BytesIO(response.content) # Store the content in memory | |
| if pdf_content is None: | |
| raise ValueError("No valid PDF content found.") | |
| # Open the PDF using PyMuPDF | |
| pdf_document = fitz.open(stream=pdf_content, filetype="pdf") | |
| repeated_texts = get_repeated_texts(pdf_document) | |
| df = pd.DataFrame(columns=["NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2"]) | |
| dictionaryNBS={} | |
| data_list_JSON = [] | |
| for NBSindex, heading_to_search in enumerate(LISTheading_to_search): | |
| if NBSindex == len(LISTheading_to_search) - 1: | |
| flagAllNBSvisited = True | |
| all_text = [] | |
| current_line = "" | |
| collecting_text = False | |
| f10_count = 0 | |
| current_y = None | |
| highlight_rect = None | |
| highlight_rectEnding=None | |
| highlight_rectBegin=None | |
| zoom_str = None | |
| toc_flag = False | |
| span_font_goal = None | |
| span_size_goal = None | |
| pageNumberFound = None | |
| groupheadings = [] | |
| merged_groupheadings = [] | |
| collectheader2 = False | |
| endingcontentFlag=True | |
| header2 = '' | |
| header2_first_span_size = 0 | |
| previous_header = '' | |
| next_span_text = '' | |
| current_line_span_size = 0 | |
| flagAllNBSvisited = False | |
| text = '' | |
| heading_to_searchNBS = heading_to_search | |
| heading_words = heading_to_search.split() # Split heading into words | |
| first_word = heading_words[0] # First word to search for | |
| remaining_words = heading_words[1:] # Remaining words to verify | |
| print(heading_words) | |
| heading_to_search = heading_to_search.replace(" ", "") | |
| # Process each page in the PDF | |
| for page_num in range(pdf_document.page_count): | |
| page = pdf_document.load_page(page_num) | |
| # Get page dimensions | |
| page_height = page.rect.height | |
| header_threshold = page_height * 0.1 # Top 10% of the page height | |
| footer_threshold = page_height * 0.9 # Bottom 10% of the page height | |
| # Extract text in dictionary format | |
| text_dict = page.get_text("dict") | |
| # Collect header y-coordinates to detect header area | |
| header_threshold = 0 # Header area: top 10% of the page height | |
| current_line_text = "" | |
| previous_y = None | |
| # Process text blocks | |
| for block in text_dict['blocks']: | |
| for line_index, line in enumerate(block.get('lines', [])): | |
| spans = line.get('spans', []) | |
| if spans and any(span['text'].strip() for span in spans): | |
| for i, span in enumerate(spans): | |
| span_text = span['text'].strip() | |
| highlight_rect = span['bbox'] | |
| span_y = span['bbox'][1] | |
| span_font = span['font'] | |
| span_size = span['size'] | |
| if normalize_text(span_text) not in repeated_texts and not (span_text.startswith('Page')): | |
| if previous_y is None: | |
| previous_y = span_y # Initialize on first span | |
| # If same Y coordinate as previous, append to the current line | |
| if abs(span_y - previous_y) < 5: # Allow a small margin for OCR variations | |
| current_line_text += " " + span_text | |
| current_line_text = normalize_text(current_line_text) | |
| current_line_span_size = span_size | |
| else: | |
| # Store the complete line and reset for the new line | |
| if current_line_text.strip(): | |
| all_text.append(current_line_text.strip()) | |
| current_line_text = span_text # Start a new line | |
| previous_y = span_y # Update the reference Y | |
| text = span_text | |
| if collecting_text and span_font == span_font_goal and span_size == span_size_goal and span_text[0].isdigit(): | |
| print(f"Ending collection at heading: {span_text}") | |
| highlight_rectEnding=highlight_rect | |
| print("merged_groupheadings:", merged_groupheadings) | |
| print('groupheadingss',groupheadings) | |
| collecting_text = False | |
| continue | |
| if collecting_text: | |
| annot = page.add_rect_annot(highlight_rect) # Create a rectangle annotation | |
| annot.set_colors(stroke=(1, 0, 0)) # Set border color (Red) | |
| annot.update() # Apply changes | |
| if 'Content' in span_text: | |
| toc_flag = True | |
| TOC_start = span_text | |
| print('content', TOC_start, span_size) | |
| if toc_flag or endingcontentFlag: | |
| if 'Content' not in span_text: | |
| if current_y is None: | |
| current_y = span_y | |
| current_size = span_size # Initialize the reference span size | |
| # Check if the current span size deviates significantly | |
| if abs(span_size - current_size) > 1: # Threshold for size difference | |
| toc_flag = False | |
| if abs(current_y - span_y) < 5: # Allowing more flexibility for multi-line headings | |
| current_line += " " + span_text # Keep accumulating text | |
| else: | |
| if current_line.strip(): # Only process non-empty lines | |
| clean_text = re.sub(r'\.{5,}\d*$', '', current_line, flags=re.MULTILINE) # Remove dots and trailing numbers | |
| print(clean_text.strip()) | |
| if clean_text: | |
| groupheadings.append(clean_text) | |
| # else: | |
| # toc_flag = False | |
| current_line = span_text | |
| current_y = span_y | |
| current_size = span_size # Update reference span size | |
| # print('outofcurrent') | |
| if len(groupheadings) > 0: | |
| pattern = re.compile(r"^[A-Za-z]\d{2} ") # Match headings starting with letter + 2 digits | |
| merged_groupheadings = [] | |
| current_item = None # Start as None to avoid an initial blank entry | |
| for item in groupheadings: | |
| if pattern.match(item): # If item starts with correct pattern, it's a new heading | |
| if current_item: # Append only if current_item is not empty | |
| if current_item not in merged_groupheadings: | |
| extracted_text = re.split(r"\.{3,}", current_item)[0].strip() | |
| merged_groupheadings.append(extracted_text.strip()) | |
| current_item = item # Start new heading | |
| else: | |
| if current_item: | |
| if item not in current_item: | |
| current_item += " " + item # Merge with previous heading | |
| # Append last merged item after loop | |
| if current_item: | |
| if current_item not in merged_groupheadings: | |
| extracted_text = re.split(r"\.{3,}", current_item)[0].strip() | |
| merged_groupheadings.append(extracted_text.strip()) | |
| if span_text == first_word: | |
| print('First word found:', span_text) | |
| # Check if it's not the last span in the current line | |
| print(i + 1, len(spans)) | |
| if i + 1 < len(spans): | |
| next_span_text = (spans[i + 1]['text'].strip()) | |
| # Check if the next span's text is in the heading list | |
| if next_span_text.replace(" ", "") in heading_to_search.replace(" ", ""): | |
| text = (span_text + ' ' + next_span_text) | |
| # After processing the current line, check if there's a next line | |
| if first_word == span_text: | |
| if line_index + 1 < len(block.get('lines', [])): | |
| next_line = block['lines'][line_index + 1] | |
| # You can process the spans of the next line here | |
| for next_span in next_line.get('spans', []): | |
| next_span_text = next_span['text'].strip() | |
| text = span_text + ' ' + next_span_text | |
| if len(merged_groupheadings) > 0: | |
| if re.match(r"[A-Za-z]\d{2}", span_text) and span_size > 10: | |
| toc_flag = False | |
| endingcontentFlag=False | |
| previous_header = span_text # Store last detected header | |
| print('previous_header', span_text) | |
| groupmainheadingFromArray = [item for item in merged_groupheadings if previous_header in item] | |
| if previous_header: | |
| if not collectheader2: | |
| if header2_first_span_size == 0: | |
| spanSizeHeader = 10 | |
| else: | |
| spanSizeHeader = header2_first_span_size | |
| for item in groupmainheadingFromArray: | |
| if not any(normalize_text(current_line_text) in normalize_text(item) for item in groupmainheadingFromArray): | |
| if not current_line_text[0].isdigit() : | |
| if span_size >= spanSizeHeader: | |
| if not re.match(r"^\d{2}", current_line_text) and current_line_text not in repeated_texts and "Bold" in span["font"] : | |
| if len(header2) > 0 : | |
| header2_first_span_size = span_size | |
| header2 = current_line_text | |
| print('header2', header2, span_size, spanSizeHeader) | |
| trimmed_text = text.replace(" ", "") | |
| if len(text) > 0: | |
| if text.split()[0] in heading_words: | |
| if len(trimmed_text) > 0 and (heading_to_search.replace(" ", "") in trimmed_text): | |
| print(trimmed_text, heading_to_search) | |
| f10_count += 1 | |
| # Start collecting text under the second occurrence of the heading | |
| if f10_count == 1: | |
| collecting_text = True | |
| print(f"Starting collection under heading: {text}, {span_font}, {span_size}") | |
| collectheader2 = True | |
| NBS_heading = heading_to_searchNBS | |
| highlight_rectBegin=highlight_rect | |
| x0, y0, x1, y1 = highlight_rectBegin | |
| span_font_goal = span_font # Capture the font at the first heading match | |
| span_size_goal = span_size # Capture the size at the first heading match | |
| zoom = 200 | |
| left = int(x0) | |
| top = int(y0) | |
| zoom_str = f"{zoom},{left},{top}" | |
| pageNumberFound = page_num + 1 | |
| dictionaryNBS[heading_to_searchNBS] = [pageNumberFound, zoom_str] | |
| annot = page.add_rect_annot(highlight_rect) # Create a rectangle annotation | |
| annot.set_colors(stroke=(1, 0, 0)) # Set border color (Red) | |
| annot.update() # Apply changes | |
| groupmainheadingFromArray = [item for item in merged_groupheadings if previous_header in item] | |
| # Build the query parameters | |
| params = { | |
| 'pdfLink': link, # Your PDF link | |
| 'keyword': NBS_heading, # Your keyword (could be a string or list) | |
| } | |
| # URL encode each parameter | |
| encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} | |
| # Construct the final encoded link | |
| encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) | |
| # Correctly construct the final URL with page and zoom | |
| final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" | |
| # Get current date and time | |
| now = datetime.now() | |
| # Format the output | |
| formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") | |
| # Optionally, add the URL to a DataFrame | |
| if len(groupmainheadingFromArray) > 0: | |
| data_entry = { | |
| "NBSLink": final_url, | |
| "Subject": NBS_heading, | |
| "Page": str(pageNumberFound), | |
| "Author": "ADR", | |
| "Creation Date": formatted_time, | |
| "Layer": "Initial", | |
| "Code": "to be added", | |
| "head above 1": header2, | |
| "head above 2": groupmainheadingFromArray[0] | |
| } | |
| data_list_JSON.append(data_entry) | |
| # Convert list to JSON | |
| json_output = json.dumps(data_list_JSON, indent=4) | |
| print("Final URL:", final_url) | |
| if collecting_text: | |
| annot = page.add_rect_annot(highlight_rect) # Create a rectangle annotation | |
| annot.set_colors(stroke=(1, 0, 0)) # Set border color (Red) | |
| annot.update() # Apply changes | |
| if current_line.strip(): | |
| all_text += current_line.strip() + '\n' # Append the current line | |
| print(df) | |
| print(dictionaryNBS) | |
| # xx=save_df_to_pdf(df) | |
| # outputpdfFitz =fitz.open('pdf',xx) | |
| pdf_bytes = BytesIO() | |
| pdf_document.save(pdf_bytes) | |
| print('JSONN',json_output) | |
| return pdf_bytes.getvalue(), pdf_document , df, json_output |