|
|
|
|
|
import fitz |
|
|
from io import BytesIO |
|
|
import re |
|
|
import requests |
|
|
import pandas as pd |
|
|
from collections import Counter |
|
|
import fitz |
|
|
import re |
|
|
import urllib.parse |
|
|
import pandas as pd |
|
|
import tempfile |
|
|
from fpdf import FPDF |
|
|
import json |
|
|
from datetime import datetime |
|
|
|
|
|
baselink='https://marthee-nbslink.hf.space/view-pdf?' |
|
|
class PDF(FPDF): |
|
|
def header(self): |
|
|
self.set_font("Arial", "B", 12) |
|
|
self.cell(0, 10, "NBS Document Links", ln=True, align="C") |
|
|
self.ln(5) |
|
|
|
|
|
def save_df_to_pdf(df): |
|
|
pdf = PDF() |
|
|
pdf.set_auto_page_break(auto=True, margin=15) |
|
|
|
|
|
|
|
|
margin = 15 |
|
|
pdf.set_left_margin(margin) |
|
|
pdf.set_right_margin(margin) |
|
|
|
|
|
pdf.add_page() |
|
|
pdf.set_font("Arial", size=8) |
|
|
|
|
|
|
|
|
headers = ["NBSLink", "Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2"] |
|
|
num_cols = len(headers) |
|
|
|
|
|
|
|
|
max_table_width = pdf.w - 2 * margin |
|
|
col_width = max_table_width / num_cols |
|
|
table_width = col_width * num_cols |
|
|
|
|
|
|
|
|
page_width = pdf.w |
|
|
start_x = (page_width - table_width) / 2 |
|
|
|
|
|
pdf.set_x(start_x) |
|
|
|
|
|
|
|
|
pdf.set_fill_color(200, 200, 200) |
|
|
pdf.set_font("Arial", "B", 8) |
|
|
|
|
|
for header in headers: |
|
|
pdf.cell(col_width, 8, header, border=1, fill=True, align="C") |
|
|
pdf.ln() |
|
|
|
|
|
pdf.set_font("Arial", size=7) |
|
|
|
|
|
for _, row in df.iterrows(): |
|
|
x_start = start_x |
|
|
y_start = pdf.get_y() |
|
|
|
|
|
|
|
|
text_lines = {col: pdf.multi_cell(col_width, 5, row[col], border=0, align="L", split_only=True) for col in ["Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2"]} |
|
|
max_lines = max(len(lines) for lines in text_lines.values()) |
|
|
max_height = max_lines * 5 |
|
|
|
|
|
pdf.set_x(x_start) |
|
|
|
|
|
|
|
|
pdf.cell(col_width, max_height, "Click Here", border=1, link=row["NBSLink"], align="C") |
|
|
|
|
|
|
|
|
pdf.set_xy(x_start + col_width, y_start) |
|
|
|
|
|
|
|
|
for i, col_name in enumerate(["Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2"]): |
|
|
x_col = x_start + col_width * (i + 1) |
|
|
y_col = y_start |
|
|
pdf.multi_cell(col_width, 5, row[col_name], border=0, align="L") |
|
|
pdf.rect(x_col, y_col, col_width, max_height) |
|
|
pdf.set_xy(x_col + col_width, y_start) |
|
|
|
|
|
|
|
|
pdf.ln(max_height) |
|
|
|
|
|
pdf_output = pdf.output(dest="S").encode("latin1") |
|
|
return pdf_output |
|
|
|
|
|
|
|
|
|
|
|
def normalize_text(text): |
|
|
"""Lowercase, remove extra spaces, and strip special characters.""" |
|
|
text = text.lower().strip() |
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
return re.sub(r'[^\w\s]', '', text) |
|
|
def get_repeated_texts(pdf_document, threshold=0.85): |
|
|
""" |
|
|
Identify text that appears on most pages. |
|
|
:param pdf_document: The opened PDF document. |
|
|
:param threshold: The percentage of pages a text must appear on to be considered "repeated". |
|
|
""" |
|
|
text_counts = Counter() |
|
|
total_pages = pdf_document.page_count |
|
|
|
|
|
for page_num in range(total_pages): |
|
|
page = pdf_document.load_page(page_num) |
|
|
page_text = page.get_text("text") |
|
|
normalized_lines = {normalize_text(line) for line in page_text.splitlines() if line.strip()} |
|
|
|
|
|
text_counts.update(normalized_lines) |
|
|
|
|
|
|
|
|
min_occurrence = max(1, int(threshold * total_pages)) |
|
|
repeated_texts = {text for text, count in text_counts.items() if count >= min_occurrence} |
|
|
return repeated_texts |
|
|
|
|
|
|
|
|
def split_links(links_string): |
|
|
"""Split a comma-separated string of links into an array of trimmed links.""" |
|
|
return [link.strip() for link in links_string.split(',')] |
|
|
def annotate_text_from_pdf(pdfshareablelinks, LISTheading_to_search): |
|
|
""" |
|
|
Annotates text under a specific heading in a PDF, highlights it, |
|
|
and constructs zoom coordinates for the first occurrence of the heading. |
|
|
Args: |
|
|
pdfshareablelinks (list): List of shareable links to PDFs. |
|
|
heading_to_search (str): The heading to search for in the PDF. |
|
|
Returns: |
|
|
Tuple: Annotated PDF bytes, count of heading occurrences, and zoom string. |
|
|
""" |
|
|
print("Input links:", pdfshareablelinks) |
|
|
print(LISTheading_to_search) |
|
|
|
|
|
link = pdfshareablelinks[0] |
|
|
pdf_content = None |
|
|
headings_TOC = [] |
|
|
|
|
|
if link and ('http' in link or 'dropbox' in link): |
|
|
if 'dl=0' in link: |
|
|
link = link.replace('dl=0', 'dl=1') |
|
|
|
|
|
|
|
|
response = requests.get(link) |
|
|
pdf_content = BytesIO(response.content) |
|
|
if pdf_content is None: |
|
|
raise ValueError("No valid PDF content found.") |
|
|
|
|
|
|
|
|
pdf_document = fitz.open(stream=pdf_content, filetype="pdf") |
|
|
repeated_texts = get_repeated_texts(pdf_document) |
|
|
df = pd.DataFrame(columns=["NBSLink","Subject","Page","Author","Creation Date","Layer",'Code', 'head above 1', "head above 2"]) |
|
|
dictionaryNBS={} |
|
|
data_list_JSON = [] |
|
|
for NBSindex, heading_to_search in enumerate(LISTheading_to_search): |
|
|
if NBSindex == len(LISTheading_to_search) - 1: |
|
|
flagAllNBSvisited = True |
|
|
all_text = [] |
|
|
current_line = "" |
|
|
collecting_text = False |
|
|
f10_count = 0 |
|
|
current_y = None |
|
|
highlight_rect = None |
|
|
highlight_rectEnding=None |
|
|
highlight_rectBegin=None |
|
|
zoom_str = None |
|
|
toc_flag = False |
|
|
span_font_goal = None |
|
|
span_size_goal = None |
|
|
pageNumberFound = None |
|
|
groupheadings = [] |
|
|
merged_groupheadings = [] |
|
|
collectheader2 = False |
|
|
endingcontentFlag=True |
|
|
header2 = '' |
|
|
header2_first_span_size = 0 |
|
|
previous_header = '' |
|
|
next_span_text = '' |
|
|
current_line_span_size = 0 |
|
|
flagAllNBSvisited = False |
|
|
|
|
|
text = '' |
|
|
heading_to_searchNBS = heading_to_search |
|
|
heading_words = heading_to_search.split() |
|
|
first_word = heading_words[0] |
|
|
remaining_words = heading_words[1:] |
|
|
print(heading_words) |
|
|
heading_to_search = heading_to_search.replace(" ", "") |
|
|
|
|
|
|
|
|
for page_num in range(pdf_document.page_count): |
|
|
page = pdf_document.load_page(page_num) |
|
|
|
|
|
page_height = page.rect.height |
|
|
header_threshold = page_height * 0.1 |
|
|
footer_threshold = page_height * 0.9 |
|
|
|
|
|
|
|
|
text_dict = page.get_text("dict") |
|
|
|
|
|
|
|
|
header_threshold = 0 |
|
|
current_line_text = "" |
|
|
previous_y = None |
|
|
|
|
|
for block in text_dict['blocks']: |
|
|
for line_index, line in enumerate(block.get('lines', [])): |
|
|
spans = line.get('spans', []) |
|
|
if spans and any(span['text'].strip() for span in spans): |
|
|
for i, span in enumerate(spans): |
|
|
span_text = span['text'].strip() |
|
|
highlight_rect = span['bbox'] |
|
|
span_y = span['bbox'][1] |
|
|
span_font = span['font'] |
|
|
span_size = span['size'] |
|
|
if normalize_text(span_text) not in repeated_texts and not (span_text.startswith('Page')): |
|
|
if previous_y is None: |
|
|
previous_y = span_y |
|
|
|
|
|
|
|
|
if abs(span_y - previous_y) < 5: |
|
|
current_line_text += " " + span_text |
|
|
current_line_text = normalize_text(current_line_text) |
|
|
current_line_span_size = span_size |
|
|
else: |
|
|
|
|
|
if current_line_text.strip(): |
|
|
all_text.append(current_line_text.strip()) |
|
|
|
|
|
current_line_text = span_text |
|
|
previous_y = span_y |
|
|
text = span_text |
|
|
if collecting_text and span_font == span_font_goal and span_size == span_size_goal and span_text[0].isdigit(): |
|
|
print(f"Ending collection at heading: {span_text}") |
|
|
highlight_rectEnding=highlight_rect |
|
|
print("merged_groupheadings:", merged_groupheadings) |
|
|
print('groupheadingss',groupheadings) |
|
|
collecting_text = False |
|
|
continue |
|
|
if collecting_text: |
|
|
annot = page.add_rect_annot(highlight_rect) |
|
|
annot.set_colors(stroke=(1, 0, 0)) |
|
|
annot.update() |
|
|
|
|
|
if 'Content' in span_text: |
|
|
toc_flag = True |
|
|
TOC_start = span_text |
|
|
print('content', TOC_start, span_size) |
|
|
|
|
|
if toc_flag or endingcontentFlag: |
|
|
if 'Content' not in span_text: |
|
|
if current_y is None: |
|
|
current_y = span_y |
|
|
current_size = span_size |
|
|
|
|
|
if abs(span_size - current_size) > 1: |
|
|
toc_flag = False |
|
|
|
|
|
if abs(current_y - span_y) < 5: |
|
|
current_line += " " + span_text |
|
|
else: |
|
|
if current_line.strip(): |
|
|
clean_text = re.sub(r'\.{5,}\d*$', '', current_line, flags=re.MULTILINE) |
|
|
|
|
|
print(clean_text.strip()) |
|
|
if clean_text: |
|
|
groupheadings.append(clean_text) |
|
|
|
|
|
|
|
|
|
|
|
current_line = span_text |
|
|
current_y = span_y |
|
|
current_size = span_size |
|
|
|
|
|
if len(groupheadings) > 0: |
|
|
pattern = re.compile(r"^[A-Za-z]\d{2} ") |
|
|
merged_groupheadings = [] |
|
|
current_item = None |
|
|
|
|
|
for item in groupheadings: |
|
|
if pattern.match(item): |
|
|
if current_item: |
|
|
if current_item not in merged_groupheadings: |
|
|
extracted_text = re.split(r"\.{3,}", current_item)[0].strip() |
|
|
merged_groupheadings.append(extracted_text.strip()) |
|
|
current_item = item |
|
|
else: |
|
|
if current_item: |
|
|
if item not in current_item: |
|
|
current_item += " " + item |
|
|
|
|
|
|
|
|
if current_item: |
|
|
if current_item not in merged_groupheadings: |
|
|
extracted_text = re.split(r"\.{3,}", current_item)[0].strip() |
|
|
merged_groupheadings.append(extracted_text.strip()) |
|
|
if span_text == first_word: |
|
|
print('First word found:', span_text) |
|
|
|
|
|
print(i + 1, len(spans)) |
|
|
if i + 1 < len(spans): |
|
|
next_span_text = (spans[i + 1]['text'].strip()) |
|
|
|
|
|
if next_span_text.replace(" ", "") in heading_to_search.replace(" ", ""): |
|
|
text = (span_text + ' ' + next_span_text) |
|
|
|
|
|
if first_word == span_text: |
|
|
if line_index + 1 < len(block.get('lines', [])): |
|
|
next_line = block['lines'][line_index + 1] |
|
|
|
|
|
for next_span in next_line.get('spans', []): |
|
|
next_span_text = next_span['text'].strip() |
|
|
text = span_text + ' ' + next_span_text |
|
|
if len(merged_groupheadings) > 0: |
|
|
if re.match(r"[A-Za-z]\d{2}", span_text) and span_size > 10: |
|
|
toc_flag = False |
|
|
endingcontentFlag=False |
|
|
previous_header = span_text |
|
|
print('previous_header', span_text) |
|
|
|
|
|
groupmainheadingFromArray = [item for item in merged_groupheadings if previous_header in item] |
|
|
|
|
|
if previous_header: |
|
|
if not collectheader2: |
|
|
if header2_first_span_size == 0: |
|
|
spanSizeHeader = 10 |
|
|
else: |
|
|
spanSizeHeader = header2_first_span_size |
|
|
|
|
|
for item in groupmainheadingFromArray: |
|
|
if not any(normalize_text(current_line_text) in normalize_text(item) for item in groupmainheadingFromArray): |
|
|
if not current_line_text[0].isdigit() : |
|
|
if span_size >= spanSizeHeader: |
|
|
if not re.match(r"^\d{2}", current_line_text) and current_line_text not in repeated_texts and "Bold" in span["font"] : |
|
|
if len(header2) > 0 : |
|
|
header2_first_span_size = span_size |
|
|
header2 = current_line_text |
|
|
print('header2', header2, span_size, spanSizeHeader) |
|
|
|
|
|
trimmed_text = text.replace(" ", "") |
|
|
if len(text) > 0: |
|
|
if text.split()[0] in heading_words: |
|
|
if len(trimmed_text) > 0 and (heading_to_search.replace(" ", "") in trimmed_text): |
|
|
print(trimmed_text, heading_to_search) |
|
|
f10_count += 1 |
|
|
|
|
|
if f10_count == 1: |
|
|
collecting_text = True |
|
|
print(f"Starting collection under heading: {text}, {span_font}, {span_size}") |
|
|
collectheader2 = True |
|
|
NBS_heading = heading_to_searchNBS |
|
|
highlight_rectBegin=highlight_rect |
|
|
x0, y0, x1, y1 = highlight_rectBegin |
|
|
|
|
|
span_font_goal = span_font |
|
|
span_size_goal = span_size |
|
|
zoom = 200 |
|
|
left = int(x0) |
|
|
top = int(y0) |
|
|
zoom_str = f"{zoom},{left},{top}" |
|
|
pageNumberFound = page_num + 1 |
|
|
dictionaryNBS[heading_to_searchNBS] = [pageNumberFound, zoom_str] |
|
|
|
|
|
annot = page.add_rect_annot(highlight_rect) |
|
|
annot.set_colors(stroke=(1, 0, 0)) |
|
|
annot.update() |
|
|
groupmainheadingFromArray = [item for item in merged_groupheadings if previous_header in item] |
|
|
|
|
|
|
|
|
params = { |
|
|
'pdfLink': link, |
|
|
'keyword': NBS_heading, |
|
|
} |
|
|
|
|
|
|
|
|
encoded_params = {key: urllib.parse.quote(value, safe='') for key, value in params.items()} |
|
|
|
|
|
|
|
|
encoded_link = '&'.join([f"{key}={value}" for key, value in encoded_params.items()]) |
|
|
|
|
|
|
|
|
final_url = f"{baselink}{encoded_link}#page={str(pageNumberFound)}&zoom={zoom_str}" |
|
|
|
|
|
|
|
|
now = datetime.now() |
|
|
|
|
|
|
|
|
formatted_time = now.strftime("%d/%m/%Y %I:%M:%S %p") |
|
|
|
|
|
|
|
|
|
|
|
if len(groupmainheadingFromArray) > 0: |
|
|
data_entry = { |
|
|
"NBSLink": final_url, |
|
|
"Subject": NBS_heading, |
|
|
"Page": str(pageNumberFound), |
|
|
"Author": "ADR", |
|
|
"Creation Date": formatted_time, |
|
|
"Layer": "Initial", |
|
|
"Code": "to be added", |
|
|
"head above 1": header2, |
|
|
"head above 2": groupmainheadingFromArray[0] |
|
|
} |
|
|
data_list_JSON.append(data_entry) |
|
|
|
|
|
|
|
|
json_output = json.dumps(data_list_JSON, indent=4) |
|
|
|
|
|
print("Final URL:", final_url) |
|
|
|
|
|
if collecting_text: |
|
|
annot = page.add_rect_annot(highlight_rect) |
|
|
annot.set_colors(stroke=(1, 0, 0)) |
|
|
annot.update() |
|
|
if current_line.strip(): |
|
|
all_text += current_line.strip() + '\n' |
|
|
print(df) |
|
|
print(dictionaryNBS) |
|
|
|
|
|
|
|
|
pdf_bytes = BytesIO() |
|
|
pdf_document.save(pdf_bytes) |
|
|
print('JSONN',json_output) |
|
|
return pdf_bytes.getvalue(), pdf_document , df, json_output |