|
|
|
|
|
import pdfreader |
|
|
from pdfreader import PDFDocument, SimplePDFViewer |
|
|
|
|
|
from bs4 import BeautifulSoup |
|
|
import requests |
|
|
from core.NER import cleanText |
|
|
|
|
|
import tabula |
|
|
import fitz |
|
|
import os |
|
|
|
|
|
class PDF(): |
|
|
def __init__(self, pdf, saveFolder, doi=None): |
|
|
self.pdf = pdf |
|
|
self.doi = doi |
|
|
self.saveFolder = saveFolder |
|
|
|
|
|
def openPDFFile(self): |
|
|
if "https" in self.pdf: |
|
|
name = self.pdf.split("/")[-1] |
|
|
name = self.downloadPDF(self.saveFolder) |
|
|
if name != "no pdfLink to download": |
|
|
fileToOpen = os.path.join(self.saveFolder, name) |
|
|
else: |
|
|
fileToOpen = self.pdf |
|
|
else: |
|
|
fileToOpen = self.pdf |
|
|
return open(fileToOpen, "rb") |
|
|
|
|
|
def downloadPDF(self, saveFolder): |
|
|
pdfLink = '' |
|
|
if ".pdf" not in self.pdf and "https" not in self.pdf: |
|
|
r = requests.get(self.pdf) |
|
|
soup = BeautifulSoup(r.content, 'html.parser') |
|
|
links = soup.find_all("a") |
|
|
for link in links: |
|
|
if ".pdf" in link.get("href", ""): |
|
|
if self.doi in link.get("href"): |
|
|
pdfLink = link.get("href") |
|
|
break |
|
|
else: |
|
|
pdfLink = self.pdf |
|
|
|
|
|
if pdfLink != '': |
|
|
response = requests.get(pdfLink) |
|
|
name = pdfLink.split("/")[-1] |
|
|
print("inside download PDF and name and link are: ", pdfLink, name) |
|
|
print("saveFolder is: ", saveFolder) |
|
|
with open(os.path.join(saveFolder, name), 'wb') as pdf: |
|
|
print("len of response content: ", len(response.content)) |
|
|
pdf.write(response.content) |
|
|
print("pdf downloaded") |
|
|
return name |
|
|
else: |
|
|
return "no pdfLink to download" |
|
|
|
|
|
def extractText(self): |
|
|
try: |
|
|
fileToOpen = self.openPDFFile().name |
|
|
try: |
|
|
doc = fitz.open(fileToOpen) |
|
|
text = "" |
|
|
for page in doc: |
|
|
text += page.get_text("text") + "\n\n" |
|
|
doc.close() |
|
|
|
|
|
if len(text.strip()) < 100: |
|
|
print("Fallback to PDFReader due to weak text extraction.") |
|
|
text = self.extractTextWithPDFReader() |
|
|
return text |
|
|
except Exception as e: |
|
|
print("Failed with PyMuPDF, fallback to PDFReader:", e) |
|
|
return self.extractTextWithPDFReader() |
|
|
except: |
|
|
return "" |
|
|
def extract_text_excluding_tables(self): |
|
|
fileToOpen = self.openPDFFile().name |
|
|
text = "" |
|
|
try: |
|
|
doc = fitz.open(fileToOpen) |
|
|
for page in doc: |
|
|
blocks = page.get_text("dict")["blocks"] |
|
|
|
|
|
for block in blocks: |
|
|
if block["type"] == 0: |
|
|
lines = block.get("lines", []) |
|
|
|
|
|
if not lines: |
|
|
continue |
|
|
avg_words_per_line = sum(len(l["spans"]) for l in lines) / len(lines) |
|
|
if avg_words_per_line > 1: |
|
|
for line in lines: |
|
|
text += " ".join(span["text"] for span in line["spans"]) + "\n" |
|
|
doc.close() |
|
|
if len(text.strip()) < 100: |
|
|
print("Fallback to PDFReader due to weak text extraction.") |
|
|
text = self.extractTextWithPDFReader() |
|
|
return text |
|
|
except Exception as e: |
|
|
print("Failed with PyMuPDF, fallback to PDFReader:", e) |
|
|
return self.extractTextWithPDFReader() |
|
|
|
|
|
def extractTextWithPDFReader(self): |
|
|
jsonPage = {} |
|
|
try: |
|
|
pdf = self.openPDFFile() |
|
|
print("open pdf file") |
|
|
print(pdf) |
|
|
doc = PDFDocument(pdf) |
|
|
viewer = SimplePDFViewer(pdf) |
|
|
all_pages = [p for p in doc.pages()] |
|
|
cl = cleanText.cleanGenText() |
|
|
pdfText = "" |
|
|
for page in range(1, len(all_pages)): |
|
|
viewer.navigate(page) |
|
|
viewer.render() |
|
|
if str(page) not in jsonPage: |
|
|
jsonPage[str(page)] = {} |
|
|
text = "".join(viewer.canvas.strings) |
|
|
clean, filteredWord = cl.textPreprocessing(text) |
|
|
jsonPage[str(page)]["normalText"] = [text] |
|
|
jsonPage[str(page)]["cleanText"] = [' '.join(filteredWord)] |
|
|
jsonPage[str(page)]["image"] = [viewer.canvas.images] |
|
|
jsonPage[str(page)]["form"] = [viewer.canvas.forms] |
|
|
jsonPage[str(page)]["content"] = [viewer.canvas.text_content] |
|
|
jsonPage[str(page)]["inline_image"] = [viewer.canvas.inline_images] |
|
|
pdf.close() |
|
|
except: |
|
|
jsonPage = {} |
|
|
return self.mergeTextinJson(jsonPage) |
|
|
|
|
|
def extractTable(self,pages="all",saveFile=None,outputFormat=None): |
|
|
'''pages (str, int, iterable of int, optional) – |
|
|
An optional values specifying pages to extract from. It allows str,`int`, iterable of :int. Default: 1 |
|
|
Examples: '1-2,3', 'all', [1,2]''' |
|
|
df = [] |
|
|
if "https" in self.pdf: |
|
|
name = self.pdf.split("/")[-1] |
|
|
name = self.downloadPDF(self.saveFolder) |
|
|
if name != "no pdfLink to download": |
|
|
fileToOpen = self.saveFolder + "/" + name |
|
|
else: fileToOpen = self.pdf |
|
|
else: fileToOpen = self.pdf |
|
|
try: |
|
|
df = tabula.read_pdf(fileToOpen, pages=pages) |
|
|
|
|
|
|
|
|
|
|
|
except: |
|
|
df = [] |
|
|
print("No tables found in PDF file") |
|
|
return df |
|
|
|
|
|
def mergeTextinJson(self, jsonPDF): |
|
|
try: |
|
|
cl = cleanText.cleanGenText() |
|
|
pdfText = "" |
|
|
if jsonPDF: |
|
|
for page in jsonPDF: |
|
|
if len(jsonPDF[page]["normalText"]) > 0: |
|
|
for i in range(len(jsonPDF[page]["normalText"])): |
|
|
text = jsonPDF[page]["normalText"][i] |
|
|
if len(text) > 0: |
|
|
text = cl.removeTabWhiteSpaceNewLine(text) |
|
|
text = cl.removeExtraSpaceBetweenWords(text) |
|
|
jsonPDF[page]["normalText"][i] = text |
|
|
if i - 1 > 0: |
|
|
if jsonPDF[page]["normalText"][i - 1][-1] != ".": |
|
|
pdfText += ". " |
|
|
pdfText += jsonPDF[page]["normalText"][i] |
|
|
if len(jsonPDF[page]["normalText"][i]) > 0: |
|
|
if jsonPDF[page]["normalText"][i][-1] != ".": |
|
|
pdfText += "." |
|
|
pdfText += "\n\n" |
|
|
return pdfText |
|
|
except: |
|
|
return "" |
|
|
|
|
|
def getReference(self): |
|
|
pass |
|
|
|
|
|
def getSupMaterial(self): |
|
|
pass |
|
|
|
|
|
def removeHeaders(self): |
|
|
pass |
|
|
|
|
|
def removeFooters(self): |
|
|
pass |
|
|
|
|
|
def removeReference(self): |
|
|
pass |