financial-qa-agent / Data_Cleaning.py
codewithpurav's picture
Add Dockerfile for Streamlit deployment
3efe7a4
import pdfplumber
import os
import multiprocessing
from tqdm import tqdm
from Logger import GetLogger
class GetDataCleaning():
def __init__(self, root_folder, excluding_folder=[], logger=None):
if not logger:
obj = GetLogger()
logger = obj.get_logger()
self.logger = logger
self.root_folder = root_folder
self.excluding_folder = excluding_folder
self.folder_list = [item for item in os.listdir(self.root_folder) if (("txt" not in item.split("_")) and (item not in excluding_folder))]
self.logger.info("all the folder list is generated sucessfully")
def pdf_to_txt(self, pdf_path, txt_path):
text = ""
with pdfplumber.open(pdf_path) as pdf:
for page in pdf.pages:
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
with open(txt_path, "w", encoding="utf-8") as f:
f.write(text)
def clean_txt(self, text):
lines = text.split("\n")
cleaned = []
for line in lines:
line = line.strip()
if not line:
continue
if line.isdigit():
continue
if line in ["Infosys", "ICICI Bank"]:
continue
cleaned.append(line)
return " ".join(cleaned)
def process_file(self, folder, file, logger):
"""Single file processing pipeline"""
input_pdf = os.path.join(self.root_folder, folder, file)
output_txt = os.path.join(self.root_folder, folder + "_txt", file.replace(".pdf", ".txt"))
output_cleaned = os.path.join(self.root_folder, folder + "_cleaned_txt", file.replace(".pdf", ".txt"))
# Convert PDF β†’ TXT
self.pdf_to_txt(input_pdf, output_txt)
# Clean text
raw_text = open(output_txt, encoding="utf-8").read()
cleaned_text = self.clean_txt(raw_text)
with open(output_cleaned, "w", encoding="utf-8") as f:
f.write(cleaned_text)
logger.info(f"βœ… Processed: {folder}/{file}")
def run(self, workers=4):
try:
self.logger.info("πŸš€ Starting Cleaning Process")
for folder in self.folder_list:
os.makedirs(os.path.join(self.root_folder, folder + "_txt"), exist_ok=True)
os.makedirs(os.path.join(self.root_folder, folder + "_cleaned_txt"), exist_ok=True)
pdf_files = [
f for f in os.listdir(os.path.join(self.root_folder, folder))
if f.endswith(".pdf")
]
# Run parallel processing
with multiprocessing.Pool(processes=workers) as pool:
pool.starmap(self.process_file, [(folder, f, self.logger) for f in pdf_files])
pool.close()
pool.join()
self.logger.info(f"Data Cleaning completed for folder:{folder}")
except Exception as e:
self.logger.error(f"Got Error: {e}")
# if __name__ == "__main__":
# obj = Cleaning(root_folder="financial_reports", excluding_folder=["ICICI"])
# obj.run()
# obj.process_file("ICICI", "icici-bank-23.pdf") # for experiment only