InitialMarkups / Find_Hyperlinking_text.py
Marthee's picture
Update Find_Hyperlinking_text.py
1bf5d60 verified
import fitz # PyMuPDF
from io import BytesIO
import re
import requests
import pandas as pd
from collections import Counter
import fitz # PyMuPDF
import re
import urllib.parse
import pandas as pd
import tempfile
from fpdf import FPDF
import json
from datetime import datetime
baselink='https://marthee-nbslink.hf.space/view-pdf?'
class PDF(FPDF):
def header(self):
self.set_font("Arial", "B", 12)
self.cell(0, 10, "NBS Document Links", ln=True, align="C")
self.ln(5) # Space after header
def save_df_to_pdf(df):
pdf = PDF()
pdf.set_auto_page_break(auto=True, margin=15)
# Set equal margins
margin = 15
pdf.set_left_margin(margin)
pdf.set_right_margin(margin)
pdf.add_page()
pdf.set_font("Arial", size=8) # Reduce font size to fit more text
# Table headers
headers = ["NBSLink", "Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2"]
num_cols = len(headers)
# Calculate column width dynamically
max_table_width = pdf.w - 2 * margin # Total available width
col_width = max_table_width / num_cols # Distribute evenly
table_width = col_width * num_cols
# Get page width and calculate left alignment
page_width = pdf.w
start_x = (page_width - table_width) / 2 # Centering the table
pdf.set_x(start_x) # Move to calculated start position
# Table headers
pdf.set_fill_color(200, 200, 200) # Light gray background
pdf.set_font("Arial", "B", 8)
for header in headers:
pdf.cell(col_width, 8, header, border=1, fill=True, align="C")
pdf.ln()
pdf.set_font("Arial", size=7) # Reduce font size for data rows
for _, row in df.iterrows():
x_start = start_x # Ensure every row starts at the same position
y_start = pdf.get_y()
# Calculate max height needed for this row
text_lines = {col: pdf.multi_cell(col_width, 5, row[col], border=0, align="L", split_only=True) for col in ["Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2"]}
max_lines = max(len(lines) for lines in text_lines.values())
max_height = max_lines * 5
pdf.set_x(x_start) # Ensure correct alignment for each row
# Clickable link cell (keeps same height as others)
pdf.cell(col_width, max_height, "Click Here", border=1, link=row["NBSLink"], align="C")
# Move to next column
pdf.set_xy(x_start + col_width, y_start)
# Draw each cell manually, ensuring equal height
for i, col_name in enumerate(["Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2"]):
x_col = x_start + col_width * (i + 1)
y_col = y_start
pdf.multi_cell(col_width, 5, row[col_name], border=0, align="L") # Draw text
pdf.rect(x_col, y_col, col_width, max_height) # Draw border
pdf.set_xy(x_col + col_width, y_start) # Move to next column
# Move to the next row
pdf.ln(max_height)
# Save PDF to memory
pdf_output = pdf.output(dest="S").encode("latin1")
return pdf_output
def normalize_text(text):
"""Lowercase, remove extra spaces, and strip special characters."""
text = text.lower().strip()
text = re.sub(r'\s+', ' ', text) # Normalize multiple spaces
return re.sub(r'[^\w\s]', '', text) # Remove punctuation
def get_repeated_texts(pdf_document, threshold=0.85):
"""
Identify text that appears on most pages.
:param pdf_document: The opened PDF document.
:param threshold: The percentage of pages a text must appear on to be considered "repeated".
"""
text_counts = Counter()
total_pages = pdf_document.page_count
for page_num in range(total_pages):
page = pdf_document.load_page(page_num)
page_text = page.get_text("text")
normalized_lines = {normalize_text(line) for line in page_text.splitlines() if line.strip()}
text_counts.update(normalized_lines)
# Find texts that appear in at least `threshold * total_pages` pages
min_occurrence = max(1, int(threshold * total_pages))
repeated_texts = {text for text, count in text_counts.items() if count >= min_occurrence}
return repeated_texts
def split_links(links_string):
"""Split a comma-separated string of links into an array of trimmed links."""
return [link.strip() for link in links_string.split(',')]
def annotate_text_from_pdf(pdfshareablelinks, LISTheading_to_search):
"""
Annotates text under a specific heading in a PDF, highlights it,
and constructs zoom coordinates for the first occurrence of the heading.
Args:
pdfshareablelinks (list): List of shareable links to PDFs.
heading_to_search (str): The heading to search for in the PDF.
Returns:
Tuple: Annotated PDF bytes, count of heading occurrences, and zoom string.
"""
print("Input links:", pdfshareablelinks)
print(LISTheading_to_search)
link = pdfshareablelinks[0]
pdf_content = None
headings_TOC = []
# Modify Dropbox shareable link for direct download
if link and ('http' in link or 'dropbox' in link):
if 'dl=0' in link:
link = link.replace('dl=0', 'dl=1')
# Download the PDF content from the shareable link
response = requests.get(link)
pdf_content = BytesIO(response.content) # Store the content in memory
if pdf_content is None:
raise ValueError("No valid PDF content found.")
# Open the PDF using PyMuPDF
pdf_document = fitz.open(stream=pdf_content, filetype="pdf")
repeated_texts = get_repeated_texts(pdf_document)
df = pd.DataFrame(columns=["NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2"])
dictionaryNBS={}
data_list_JSON = []
for NBSindex, heading_to_search in enumerate(LISTheading_to_search):
if NBSindex == len(LISTheading_to_search) - 1:
flagAllNBSvisited = True
all_text = []
current_line = ""
collecting_text = False
f10_count = 0
current_y = None
highlight_rect = None
highlight_rectEnding=None
highlight_rectBegin=None
zoom_str = None
toc_flag = False
span_font_goal = None
span_size_goal = None
pageNumberFound = None
groupheadings = []
merged_groupheadings = []
collectheader2 = False
endingcontentFlag=True
header2 = ''
header2_first_span_size = 0
previous_header = ''
next_span_text = ''
current_line_span_size = 0
flagAllNBSvisited = False
text = ''
heading_to_searchNBS = heading_to_search
heading_words = heading_to_search.split() # Split heading into words
first_word = heading_words[0] # First word to search for
remaining_words = heading_words[1:] # Remaining words to verify
print(heading_words)
heading_to_search = heading_to_search.replace(" ", "")
# Process each page in the PDF
for page_num in range(pdf_document.page_count):
page = pdf_document.load_page(page_num)
# Get page dimensions
page_height = page.rect.height
header_threshold = page_height * 0.1 # Top 10% of the page height
footer_threshold = page_height * 0.9 # Bottom 10% of the page height
# Extract text in dictionary format
text_dict = page.get_text("dict")
# Collect header y-coordinates to detect header area
header_threshold = 0 # Header area: top 10% of the page height
current_line_text = ""
previous_y = None
# Process text blocks
for block in text_dict['blocks']:
for line_index, line in enumerate(block.get('lines', [])):
spans = line.get('spans', [])
if spans and any(span['text'].strip() for span in spans):
for i, span in enumerate(spans):
span_text = span['text'].strip()
highlight_rect = span['bbox']
span_y = span['bbox'][1]
span_font = span['font']
span_size = span['size']
if normalize_text(span_text) not in repeated_texts and not (span_text.startswith('Page')):
if previous_y is None:
previous_y = span_y # Initialize on first span
# If same Y coordinate as previous, append to the current line
if abs(span_y - previous_y) < 5: # Allow a small margin for OCR variations
current_line_text += " " + span_text
current_line_text = normalize_text(current_line_text)
current_line_span_size = span_size
else:
# Store the complete line and reset for the new line
if current_line_text.strip():
all_text.append(current_line_text.strip())
current_line_text = span_text # Start a new line
previous_y = span_y # Update the reference Y
text = span_text
if collecting_text and span_font == span_font_goal and span_size == span_size_goal and span_text[0].isdigit():
print(f"Ending collection at heading: {span_text}")
highlight_rectEnding=highlight_rect
print("merged_groupheadings:", merged_groupheadings)
print('groupheadingss',groupheadings)
collecting_text = False
continue
if collecting_text:
annot = page.add_rect_annot(highlight_rect) # Create a rectangle annotation
annot.set_colors(stroke=(1, 0, 0)) # Set border color (Red)
annot.update() # Apply changes
if 'Content' in span_text:
toc_flag = True
TOC_start = span_text
print('content', TOC_start, span_size)
if toc_flag or endingcontentFlag:
if 'Content' not in span_text:
if current_y is None:
current_y = span_y
current_size = span_size # Initialize the reference span size
# Check if the current span size deviates significantly
if abs(span_size - current_size) > 1: # Threshold for size difference
toc_flag = False
if abs(current_y - span_y) < 5: # Allowing more flexibility for multi-line headings
current_line += " " + span_text # Keep accumulating text
else:
if current_line.strip(): # Only process non-empty lines
clean_text = re.sub(r'\.{5,}\d*$', '', current_line, flags=re.MULTILINE) # Remove dots and trailing numbers
print(clean_text.strip())
if clean_text:
groupheadings.append(clean_text)
# else:
# toc_flag = False
current_line = span_text
current_y = span_y
current_size = span_size # Update reference span size
# print('outofcurrent')
if len(groupheadings) > 0:
pattern = re.compile(r"^[A-Za-z]\d{2} ") # Match headings starting with letter + 2 digits
merged_groupheadings = []
current_item = None # Start as None to avoid an initial blank entry
for item in groupheadings:
if pattern.match(item): # If item starts with correct pattern, it's a new heading
if current_item: # Append only if current_item is not empty
if current_item not in merged_groupheadings:
extracted_text = re.split(r"\.{3,}", current_item)[0].strip()
merged_groupheadings.append(extracted_text.strip())
current_item = item # Start new heading
else:
if current_item:
if item not in current_item:
current_item += " " + item # Merge with previous heading
# Append last merged item after loop
if current_item:
if current_item not in merged_groupheadings:
extracted_text = re.split(r"\.{3,}", current_item)[0].strip()
merged_groupheadings.append(extracted_text.strip())
if span_text == first_word:
print('First word found:', span_text)
# Check if it's not the last span in the current line
print(i + 1, len(spans))
if i + 1 < len(spans):
next_span_text = (spans[i + 1]['text'].strip())
# Check if the next span's text is in the heading list
if next_span_text.replace(" ", "") in heading_to_search.replace(" ", ""):
text = (span_text + ' ' + next_span_text)
# After processing the current line, check if there's a next line
if first_word == span_text:
if line_index + 1 < len(block.get('lines', [])):
next_line = block['lines'][line_index + 1]
# You can process the spans of the next line here
for next_span in next_line.get('spans', []):
next_span_text = next_span['text'].strip()
text = span_text + ' ' + next_span_text
if len(merged_groupheadings) > 0:
if re.match(r"[A-Za-z]\d{2}", span_text) and span_size > 10:
toc_flag = False
endingcontentFlag=False
previous_header = span_text # Store last detected header
print('previous_header', span_text)
groupmainheadingFromArray = [item for item in merged_groupheadings if previous_header in item]
if previous_header:
if not collectheader2:
if header2_first_span_size == 0:
spanSizeHeader = 10
else:
spanSizeHeader = header2_first_span_size
for item in groupmainheadingFromArray:
if not any(normalize_text(current_line_text) in normalize_text(item) for item in groupmainheadingFromArray):
if not current_line_text[0].isdigit() :
if span_size >= spanSizeHeader:
if not re.match(r"^\d{2}", current_line_text) and current_line_text not in repeated_texts and "Bold" in span["font"] :
if len(header2) > 0 :
header2_first_span_size = span_size
header2 = current_line_text
print('header2', header2, span_size, spanSizeHeader)
trimmed_text = text.replace(" ", "")
if len(text) > 0:
if text.split()[0] in heading_words:
if len(trimmed_text) > 0 and (heading_to_search.replace(" ", "") in trimmed_text):
print(trimmed_text, heading_to_search)
f10_count += 1
# Start collecting text under the second occurrence of the heading
if f10_count == 1:
collecting_text = True
print(f"Starting collection under heading: {text}, {span_font}, {span_size}")
collectheader2 = True
NBS_heading = heading_to_searchNBS
highlight_rectBegin=highlight_rect
x0, y0, x1, y1 = highlight_rectBegin
span_font_goal = span_font # Capture the font at the first heading match
span_size_goal = span_size # Capture the size at the first heading match
zoom = 200
left = int(x0)
top = int(y0)
zoom_str = f"{zoom},{left},{top}"
pageNumberFound = page_num + 1
dictionaryNBS[heading_to_searchNBS] = [pageNumberFound, zoom_str]
annot = page.add_rect_annot(highlight_rect) # Create a rectangle annotation
annot.set_colors(stroke=(1, 0, 0)) # Set border color (Red)
annot.update() # Apply changes
groupmainheadingFromArray = [item for item in merged_groupheadings if previous_header in item]
# Build the query parameters
params = {
'pdfLink': link, # Your PDF link
'keyword': NBS_heading, # Your keyword (could be a string or list)
}
# URL encode each parameter
encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()}
# Construct the final encoded link
encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()])
# Correctly construct the final URL with page and zoom
final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}"
# Get current date and time
now = datetime.now()
# Format the output
formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p")
# Optionally, add the URL to a DataFrame
if len(groupmainheadingFromArray) > 0:
data_entry = {
"NBSLink": final_url,
"Subject": NBS_heading,
"Page": str(pageNumberFound),
"Author": "ADR",
"Creation Date": formatted_time,
"Layer": "Initial",
"Code": "to be added",
"head above 1": header2,
"head above 2": groupmainheadingFromArray[0]
}
data_list_JSON.append(data_entry)
# Convert list to JSON
json_output = json.dumps(data_list_JSON, indent=4)
print("Final URL:", final_url)
if collecting_text:
annot = page.add_rect_annot(highlight_rect) # Create a rectangle annotation
annot.set_colors(stroke=(1, 0, 0)) # Set border color (Red)
annot.update() # Apply changes
if current_line.strip():
all_text += current_line.strip() + '\n' # Append the current line
print(df)
print(dictionaryNBS)
# xx=save_df_to_pdf(df)
# outputpdfFitz =fitz.open('pdf',xx)
pdf_bytes = BytesIO()
pdf_document.save(pdf_bytes)
print('JSONN',json_output)
return pdf_bytes.getvalue(), pdf_document , df, json_output