|
|
import pdfplumber |
|
|
import os |
|
|
import multiprocessing |
|
|
from tqdm import tqdm |
|
|
|
|
|
from Logger import GetLogger |
|
|
|
|
|
class GetDataCleaning(): |
|
|
def __init__(self, root_folder, excluding_folder=[], logger=None): |
|
|
|
|
|
if not logger: |
|
|
obj = GetLogger() |
|
|
logger = obj.get_logger() |
|
|
self.logger = logger |
|
|
|
|
|
|
|
|
self.root_folder = root_folder |
|
|
self.excluding_folder = excluding_folder |
|
|
|
|
|
self.folder_list = [item for item in os.listdir(self.root_folder) if (("txt" not in item.split("_")) and (item not in excluding_folder))] |
|
|
self.logger.info("all the folder list is generated sucessfully") |
|
|
|
|
|
|
|
|
def pdf_to_txt(self, pdf_path, txt_path): |
|
|
text = "" |
|
|
with pdfplumber.open(pdf_path) as pdf: |
|
|
for page in pdf.pages: |
|
|
page_text = page.extract_text() |
|
|
if page_text: |
|
|
text += page_text + "\n" |
|
|
|
|
|
with open(txt_path, "w", encoding="utf-8") as f: |
|
|
f.write(text) |
|
|
|
|
|
def clean_txt(self, text): |
|
|
lines = text.split("\n") |
|
|
cleaned = [] |
|
|
|
|
|
for line in lines: |
|
|
line = line.strip() |
|
|
if not line: |
|
|
continue |
|
|
if line.isdigit(): |
|
|
continue |
|
|
if line in ["Infosys", "ICICI Bank"]: |
|
|
continue |
|
|
cleaned.append(line) |
|
|
|
|
|
return " ".join(cleaned) |
|
|
|
|
|
|
|
|
def process_file(self, folder, file, logger): |
|
|
"""Single file processing pipeline""" |
|
|
|
|
|
input_pdf = os.path.join(self.root_folder, folder, file) |
|
|
output_txt = os.path.join(self.root_folder, folder + "_txt", file.replace(".pdf", ".txt")) |
|
|
output_cleaned = os.path.join(self.root_folder, folder + "_cleaned_txt", file.replace(".pdf", ".txt")) |
|
|
|
|
|
|
|
|
self.pdf_to_txt(input_pdf, output_txt) |
|
|
|
|
|
|
|
|
raw_text = open(output_txt, encoding="utf-8").read() |
|
|
cleaned_text = self.clean_txt(raw_text) |
|
|
|
|
|
with open(output_cleaned, "w", encoding="utf-8") as f: |
|
|
f.write(cleaned_text) |
|
|
|
|
|
logger.info(f"β
Processed: {folder}/{file}") |
|
|
|
|
|
def run(self, workers=4): |
|
|
try: |
|
|
self.logger.info("π Starting Cleaning Process") |
|
|
for folder in self.folder_list: |
|
|
|
|
|
os.makedirs(os.path.join(self.root_folder, folder + "_txt"), exist_ok=True) |
|
|
os.makedirs(os.path.join(self.root_folder, folder + "_cleaned_txt"), exist_ok=True) |
|
|
|
|
|
pdf_files = [ |
|
|
f for f in os.listdir(os.path.join(self.root_folder, folder)) |
|
|
if f.endswith(".pdf") |
|
|
] |
|
|
|
|
|
|
|
|
with multiprocessing.Pool(processes=workers) as pool: |
|
|
pool.starmap(self.process_file, [(folder, f, self.logger) for f in pdf_files]) |
|
|
pool.close() |
|
|
pool.join() |
|
|
|
|
|
self.logger.info(f"Data Cleaning completed for folder:{folder}") |
|
|
except Exception as e: |
|
|
self.logger.error(f"Got Error: {e}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|