Spaces:
Sleeping
Sleeping
| import hashlib | |
| import os | |
| from collections import defaultdict | |
| from dataclasses import dataclass | |
| from pathlib import Path, PurePath | |
| from typing import List, Union | |
| import cv2 | |
| import gradio as gr | |
| import img2pdf | |
| import pandas as pd | |
| import pytesseract | |
| from pdf2image import convert_from_path | |
| cache = {} | |
| class MetaData: | |
| """ | |
| Class to store metadata for each processed pdf file. | |
| """ | |
| image_path: str = None | |
| xlsx_path: str = None | |
| cache = defaultdict(MetaData) | |
| def get_latest_file(directory, pattern="*"): | |
| files = list(Path(directory).glob(pattern)) | |
| if not files: | |
| return None | |
| latest_file = max(files, key=os.path.getmtime) | |
| return latest_file.absolute() | |
| def convert_images_to_pdf(image_paths: str, output_pdf_path: str): | |
| """ | |
| Convert images to PDF using img2pdf for better quality preservation. | |
| Args: | |
| image_paths (list): List of paths to image files | |
| output_pdf_path (str): Path where the output PDF will be saved | |
| """ | |
| # Check if the list is empty | |
| if not image_paths: | |
| print("No images provided!") | |
| return | |
| # Convert images to PDF | |
| with open(output_pdf_path, "wb") as f: | |
| f.write( | |
| img2pdf.convert( | |
| [ | |
| i | |
| for i in image_paths | |
| if i.lower().endswith((".png", ".jpg", ".jpeg", ".tiff", ".bmp")) | |
| ] | |
| ) | |
| ) | |
| def hash_file(filepath) -> str: | |
| """Generate a hash for the file at the given path.""" | |
| hasher = hashlib.md5() | |
| with open(filepath, "rb") as f: | |
| while chunk := f.read(8192): | |
| hasher.update(chunk) | |
| return hasher.hexdigest() | |
| def convert_txt(pdf_paths: List[str], is_save_image: bool = False): | |
| """ | |
| Convert a list of PDF paths to a list of image paths. | |
| :param pdf_paths: List of PDF paths | |
| :param is_save_image: Whether to save the images or not | |
| """ | |
| text_filepaths = [] | |
| for pdf_path in pdf_paths: | |
| pdf_path = Path(pdf_path) | |
| suffix = PurePath(pdf_path).stem | |
| df = pd.DataFrame(columns=["text", "page"]) | |
| # Generate a hash for the PDF file, store it in cache and check if it exists | |
| pdf_hash = hash_file(pdf_path) | |
| print(f"Hash for {pdf_path}: {pdf_hash}") | |
| if pdf_hash in cache.keys(): | |
| print(f"Skipping {pdf_path}, already processed.") | |
| continue | |
| else: | |
| cache[pdf_hash] = MetaData() | |
| # Convert PDF to images | |
| images = convert_from_path(pdf_path) | |
| image_folder_path = Path(f"./tmp/{suffix}") | |
| text_folder_path = Path(f"./text/{suffix}") | |
| cache[pdf_hash].image_path = str(image_folder_path.absolute()) | |
| if not text_folder_path.exists(): | |
| os.makedirs(text_folder_path, exist_ok=True) | |
| if not image_folder_path.exists(): | |
| os.makedirs(image_folder_path, exist_ok=True) | |
| for i, image in enumerate(images): | |
| image_path = image_folder_path / f"{suffix}_{i + 1}.jpg" | |
| image.save(image_path) | |
| if i > 1: | |
| image = cv2.imread(str(image_path)) | |
| image = cv2.rotate(image, cv2.ROTATE_90_COUNTERCLOCKWISE) | |
| text_filepath = text_folder_path / f"{suffix}_{i + 1}.txt" | |
| txt = pytesseract.image_to_string( | |
| image, lang="vie", config="--oem 1 --psm 6" | |
| ) | |
| text_filepath.write_text(txt, encoding="utf-8") | |
| text_filepaths.append(str(text_filepath.absolute())) | |
| df = pd.concat([df, pd.DataFrame({"text": [txt], "page": [i + 1]})]) | |
| # del images | |
| return text_filepaths | |
| # # if not is_save_image: | |
| # # os.remove(str(image_folder_path.absolute())) | |
| # | |
| # excel_path = Path(f"./excel/{suffix}.xlsx") | |
| # if not excel_path.exists(): | |
| # os.makedirs(excel_path.parent, exist_ok=True) | |
| # | |
| # cache[pdf_hash].xlsx_path = str(excel_path.absolute()) | |
| # print(f"Saving {pdf_path} to {excel_path}") | |
| # df.to_excel(str(excel_path), index=False) | |
| def filter_by_keyword(keywords: Union[str, List[str]], hash_id: str = ""): | |
| """ | |
| Filter the text in the Excel file by keyword. | |
| :param keyword: Keyword to filter by | |
| """ | |
| if isinstance(keywords, str): | |
| keywords = [keywords] | |
| page_id_folder = Path("./page_id") | |
| if not os.path.exists(page_id_folder): | |
| os.makedirs(page_id_folder, exist_ok=True) | |
| if hash_id != "": | |
| excel_path = get_latest_file(Path("./excel"), pattern="*.xlsx") | |
| else: | |
| excel_path = cache[hash_id].xlsx_path | |
| print(f"-------Excel path --------- {excel_path}") | |
| df = pd.read_excel(str(excel_path.absolute())) | |
| page_id_path = page_id_folder / f"{excel_path.stem}.txt" | |
| with (page_id_path).open("w+") as f: | |
| for k in keywords: | |
| f.write(f"\n{k}\n") | |
| for _, row in df.iterrows(): | |
| text = row["text"] | |
| if isinstance(text, str) and k.lower() in text.lower(): | |
| f.write(f"{row['page']}\n") | |
| content = page_id_path.read_text() | |
| return content | |
| def gradio_interface(file, keyword=None): | |
| """ | |
| Gradio interface for the PDF processing and filtering. | |
| :param file: Uploaded PDF file | |
| :return: Path to the filtered text file | |
| """ | |
| pdf_path = file.name | |
| hash_id = hash_file(pdf_path) | |
| if hash_id in cache.items(): | |
| print(f"Skipping {pdf_path}, already processed.") | |
| else: | |
| filepaths = convert_txt([pdf_path]) | |
| if keyword: | |
| content = filter_by_keyword(keyword, hash_id) | |
| return filepaths | |
| # return content | |
| if __name__ == "__main__": | |
| os.system( | |
| "apt-get update && apt-get install -y poppler-utils tesseract-ocr tesseract-ocr-vie" | |
| ) | |
| os.system("pip install -q pytesseract openpyxl") | |
| demo = gr.Interface( | |
| fn=gradio_interface, | |
| inputs=[ | |
| gr.File(label="Upload PDF"), | |
| gr.Textbox(label="Keyword"), | |
| ], | |
| # outputs=gr.Textbox(label="Filtered Text"), | |
| outputs=gr.Files(label="Filtered Text File"), | |
| title="PDF Keyword Filter", | |
| description="Upload a PDF file and enter a keyword to filter the text.", | |
| ) | |
| demo.launch(debug=True) | |