InitialMarkups2 / Find_Hyperlinking_text.py
Marthee's picture
Rename pdftotext.py to Find_Hyperlinking_text.py
80d514c verified
raw
history blame
15.7 kB
import fitz # PyMuPDF
from io import BytesIO
import re
import requests
import pandas as pd
from collections import Counter
import fitz # PyMuPDF
import re
def normalize_text(text):
"""Lowercase, remove extra spaces, and strip special characters."""
text = text.lower().strip()
text = re.sub(r'\s+', ' ', text) # Normalize multiple spaces
return re.sub(r'[^\w\s]', '', text) # Remove punctuation
def get_repeated_texts(pdf_document, threshold=0.9):
"""
Identify text that appears on most pages.
:param pdf_document: The opened PDF document.
:param threshold: The percentage of pages a text must appear on to be considered "repeated".
"""
text_counts = Counter()
total_pages = pdf_document.page_count
for page_num in range(total_pages):
page = pdf_document.load_page(page_num)
page_text = page.get_text("text")
normalized_lines = {normalize_text(line) for line in page_text.splitlines() if line.strip()}
text_counts.update(normalized_lines)
# Find texts that appear in at least `threshold * total_pages` pages
min_occurrence = max(1, int(threshold * total_pages))
repeated_texts = {text for text, count in text_counts.items() if count >= min_occurrence}
return repeated_texts
def split_links(links_string):
"""Split a comma-separated string of links into an array of trimmed links."""
return [link.strip() for link in links_string.split(',')]
def annotate_text_from_pdf(pdfshareablelinks, LISTheading_to_search):
"""
Annotates text under a specific heading in a PDF, highlights it,
and constructs zoom coordinates for the first occurrence of the heading.
Args:
pdfshareablelinks (list): List of shareable links to PDFs.
heading_to_search (str): The heading to search for in the PDF.
Returns:
Tuple: Annotated PDF bytes, count of heading occurrences, and zoom string.
"""
print("Input links:", pdfshareablelinks)
print(LISTheading_to_search)
link = pdfshareablelinks[0]
pdf_content = None
headings_TOC = []
# Modify Dropbox shareable link for direct download
if link and ('http' in link or 'dropbox' in link):
if 'dl=0' in link:
link = link.replace('dl=0', 'dl=1')
# Download the PDF content from the shareable link
response = requests.get(link)
pdf_content = BytesIO(response.content) # Store the content in memory
if pdf_content is None:
raise ValueError("No valid PDF content found.")
# Open the PDF using PyMuPDF
pdf_document = fitz.open(stream=pdf_content, filetype="pdf")
repeated_texts = get_repeated_texts(pdf_document)
df = pd.DataFrame(columns=["NBS Link","NBS", 'head above 1', "head above 2"])
dictionaryNBS={}
for NBSindex, heading_to_search in enumerate(LISTheading_to_search):
if NBSindex == len(LISTheading_to_search) - 1:
flagAllNBSvisited = True
all_text = []
current_line = ""
collecting_text = False
f10_count = 0
current_y = None
highlight_rect = None
zoom_str = None
toc_flag = False
span_font_goal = None
span_size_goal = None
pageNumberFound = None
groupheadings = []
merged_groupheadings = []
collectheader2 = False
header2 = ''
header2_first_span_size = 0
previous_header = ''
next_span_text = ''
current_line_span_size = 0
flagAllNBSvisited = False
text = ''
heading_to_searchNBS = heading_to_search
heading_words = heading_to_search.split() # Split heading into words
first_word = heading_words[0] # First word to search for
remaining_words = heading_words[1:] # Remaining words to verify
print(heading_words)
heading_to_search = heading_to_search.replace(" ", "")
# Process each page in the PDF
for page_num in range(pdf_document.page_count):
page = pdf_document.load_page(page_num)
# Get page dimensions
page_height = page.rect.height
header_threshold = page_height * 0.1 # Top 10% of the page height
footer_threshold = page_height * 0.9 # Bottom 10% of the page height
# Extract text in dictionary format
text_dict = page.get_text("dict")
# Collect header y-coordinates to detect header area
header_threshold = 0 # Header area: top 10% of the page height
current_line_text = ""
previous_y = None
# Process text blocks
for block in text_dict['blocks']:
for line_index, line in enumerate(block.get('lines', [])):
spans = line.get('spans', [])
if spans and any(span['text'].strip() for span in spans):
for i, span in enumerate(spans):
span_text = span['text'].strip()
highlight_rect = span['bbox']
span_y = span['bbox'][1]
span_font = span['font']
span_size = span['size']
if previous_y is None:
previous_y = span_y # Initialize on first span
# If same Y coordinate as previous, append to the current line
if abs(span_y - previous_y) < 5: # Allow a small margin for OCR variations
current_line_text += " " + span_text
current_line_text = normalize_text(current_line_text)
current_line_span_size = span_size
else:
# Store the complete line and reset for the new line
if current_line_text.strip():
all_text.append(current_line_text.strip())
current_line_text = span_text # Start a new line
previous_y = span_y # Update the reference Y
text = span_text
if collecting_text and span_font == span_font_goal and span_size == span_size_goal and span_text[0].isdigit():
print(f"Ending collection at heading: {span_text}")
print("merged_groupheadings:", merged_groupheadings)
collecting_text = False
continue
if collecting_text:
annot = page.add_highlight_annot(highlight_rect)
annot.update()
if 'Content' in span_text:
toc_flag = True
TOC_start = span_text
print('content', TOC_start, span_size)
if toc_flag:
if 'Content' not in span_text:
if current_y is None:
current_y = span_y
current_size = span_size # Initialize the reference span size
# Check if the current span size deviates significantly
if abs(span_size - current_size) > 1: # Threshold for size difference
toc_flag = False
if abs(current_y - span_y) < 5: # Allowing more flexibility for multi-line headings
current_line += " " + span_text # Keep accumulating text
else:
if current_line.strip(): # Only process non-empty lines
pattern = r"^([A-Za-z0-9\s\/\-,]+)(?=\.+)"
match = re.match(pattern, current_line.strip())
if match:
groupheadings.append(match.group(1).strip())
current_line = span_text
current_y = span_y
current_size = span_size # Update reference span size
if len(groupheadings) > 0:
pattern = re.compile(r"^[A-Za-z]\d{2} ") # Match headings starting with letter + 2 digits
merged_groupheadings = []
current_item = None # Start as None to avoid an initial blank entry
for item in groupheadings:
if pattern.match(item): # If item starts with correct pattern, it's a new heading
if current_item: # Append only if current_item is not empty
merged_groupheadings.append(current_item.strip())
current_item = item # Start new heading
else:
if current_item:
current_item += " " + item # Merge with previous heading
# Append last merged item after loop
if current_item:
merged_groupheadings.append(current_item.strip())
if span_text == first_word:
print('First word found:', span_text)
# Check if it's not the last span in the current line
print(i + 1, len(spans))
if i + 1 < len(spans):
next_span_text = (spans[i + 1]['text'].strip())
# Check if the next span's text is in the heading list
if next_span_text.replace(" ", "") in heading_to_search.replace(" ", ""):
text = (span_text + ' ' + next_span_text)
# After processing the current line, check if there's a next line
if first_word == span_text:
if line_index + 1 < len(block.get('lines', [])):
next_line = block['lines'][line_index + 1]
# You can process the spans of the next line here
for next_span in next_line.get('spans', []):
next_span_text = next_span['text'].strip()
text = span_text + ' ' + next_span_text
if len(merged_groupheadings) > 0:
if re.match(r"[A-Za-z]\d{2}", span_text) and span_size > 10:
previous_header = span_text # Store last detected header
print('previous_header', span_text)
groupmainheadingFromArray = [item for item in merged_groupheadings if previous_header in item]
if previous_header:
if not collectheader2:
if header2_first_span_size == 0:
spanSizeHeader = 10
else:
spanSizeHeader = header2_first_span_size
for item in groupmainheadingFromArray:
if not any(normalize_text(current_line_text) in normalize_text(item) for item in groupmainheadingFromArray):
if span_size >= spanSizeHeader:
if not re.match(r"^\d{2}", current_line_text) and current_line_text not in repeated_texts and "Bold" in span["font"] :
if len(header2) > 0:
header2_first_span_size = span_size
header2 = current_line_text
print('header2', header2, span_size, spanSizeHeader)
trimmed_text = text.replace(" ", "")
if len(text) > 0:
if text.split()[0] in heading_words:
if len(trimmed_text) > 0 and (heading_to_search.replace(" ", "") in trimmed_text):
print(trimmed_text, heading_to_search)
f10_count += 1
# Start collecting text under the second occurrence of the heading
if f10_count == 1:
collecting_text = True
print(f"Starting collection under heading: {text}, {span_font}, {span_size}")
collectheader2 = True
NBS_heading = heading_to_searchNBS
x0, y0, x1, y1 = highlight_rect
span_font_goal = span_font # Capture the font at the first heading match
span_size_goal = span_size # Capture the size at the first heading match
zoom = 200
left = int(x0)
top = int(y0)
zoom_str = f"{zoom},{left},{top}"
pageNumberFound = page_num + 1
dictionaryNBS[heading_to_searchNBS] = [pageNumberFound, zoom_str]
annot = page.add_highlight_annot(highlight_rect)
annot.update()
groupmainheadingFromArray = [item for item in merged_groupheadings if previous_header in item]
if len(groupmainheadingFromArray) > 0:
df = pd.concat([df, pd.DataFrame([{"NBS": NBS_heading, 'head above 1': header2, "head above 2": groupmainheadingFromArray[0]}])], ignore_index=True)
# Highlight the text
if collecting_text:
annot = page.add_highlight_annot(highlight_rect)
annot.update()
if current_line.strip():
all_text += current_line.strip() + '\n' # Append the current line
print(df)
print(dictionaryNBS)
# Save the annotated PDF to bytes
pdf_bytes = BytesIO()
pdf_document.save(pdf_bytes)
pdf_document.close()
return pdf_bytes.getvalue(), pageNumberFound, zoom_str