Spaces:
Build error
Build error
| import tempfile | |
| import os | |
| import pdf2image | |
| import zipfile | |
| from .logger import setup_logger | |
| import pandas as pd | |
| logger = setup_logger(__name__) | |
| def process_pdf(file_path: str, file_groups: dict, file_paths: list): | |
| images = pdf2image.convert_from_path(file_path) | |
| image_paths = [] | |
| for i, img in enumerate(images): | |
| img_path = f"{file_path}_page_{i}.png" | |
| img.save(img_path, "PNG") | |
| file_paths.append(img_path) | |
| image_paths.append(img_path) | |
| file_groups[file_path] = image_paths | |
| return file_groups, file_paths | |
| def process_uploaded_files(uploaded_files): | |
| file_paths = [] | |
| file_groups = {} | |
| application_form = None | |
| memo = None | |
| temp_dir = tempfile.mkdtemp() | |
| print("temp_dir", temp_dir) | |
| for uploaded_file in uploaded_files: | |
| file_path = os.path.join(temp_dir, uploaded_file.name) | |
| with open(file_path, "wb") as f: | |
| f.write(uploaded_file.getbuffer()) | |
| logger.info( | |
| f"file_path: {file_path}, uploaded_file.type : {uploaded_file.type}") | |
| if uploaded_file.type == "application/pdf": | |
| file_groups, file_paths = process_pdf( | |
| file_path=file_path, file_groups=file_groups, file_paths=file_paths) | |
| elif uploaded_file.type.startswith("image"): | |
| file_paths.append(file_path) | |
| file_groups[file_path] = [file_path] | |
| elif uploaded_file.type == "application/zip": | |
| with zipfile.ZipFile(file_path, 'r') as zip_ref: | |
| extract_dir = os.path.join( | |
| temp_dir, uploaded_file.name.replace(".zip", "")) | |
| print(f"extract_dir : {extract_dir}") | |
| zip_ref.extractall(extract_dir) | |
| for root, _, files in os.walk(extract_dir): | |
| for file in files: | |
| if file.lower().endswith((".pdf")): | |
| extracted_path = os.path.join(root, file) | |
| file_groups, file_paths = process_pdf( | |
| file_path=extracted_path, | |
| file_groups=file_groups, | |
| file_paths=file_paths) | |
| elif file.lower().endswith((".png", ".jpg", ".jpeg")): | |
| extracted_path = os.path.join(root, file) | |
| file_paths.append(extracted_path) | |
| file_groups[extracted_path] = [extracted_path] | |
| elif file.lower().endswith((".csv")): | |
| extracted_path = os.path.join(root, file) | |
| application_form = pd.read_csv( | |
| extracted_path, header=None) | |
| logger.info( | |
| f"application_form: {application_form}") | |
| application_form[0] = 'application_summary_' + \ | |
| application_form[0].str.strip() | |
| elif file.lower().endswith((".xlsx")): | |
| extracted_path = os.path.join(root, file) | |
| df_dict = pd.read_excel( | |
| extracted_path, sheet_name=None, header=None) | |
| # logger.info(f"df_dict: {df_dict}") | |
| yellow_df = pd.DataFrame() | |
| yellow_df = pd.concat( | |
| [yellow_df, df_dict['Sheet1'].iloc[31:32]], axis=0, ignore_index=True) | |
| yellow_df = pd.concat( | |
| [yellow_df, df_dict['Sheet1'].iloc[33:34]], axis=0, ignore_index=True) | |
| yellow_df = pd.concat( | |
| [yellow_df, df_dict['Sheet1'].iloc[50:51]], axis=0, ignore_index=True) | |
| yellow_df = yellow_df[[0, 1]] | |
| blue_df = pd.DataFrame() | |
| # Deposit details | |
| blue_df = pd.concat( | |
| [blue_df, df_dict['Sheet6'].iloc[44:47]], axis=0, ignore_index=True) | |
| # memo = pd.concat([memo, df['Sheet6'].iloc[50:51]], axis=0, ignore_index=True) | |
| blue_df = blue_df[[0, 1]] | |
| green_df = pd.DataFrame() | |
| # Monthly costs for both applicants | |
| green_df = pd.concat( | |
| [green_df, df_dict['Sheet7'].iloc[5:23]], axis=0, ignore_index=True) | |
| green_df = green_df[[0, 1]] | |
| memo = { | |
| "Mortgage Details": yellow_df, | |
| "Deposit details": blue_df, | |
| "Monthly costs for both applicants": green_df, | |
| } | |
| logger.info(f"memo : {memo}") | |
| else: | |
| extracted_path = os.path.join(root, file) | |
| file_paths.append(extracted_path) | |
| file_groups[extracted_path] = [extracted_path] | |
| print(f"file_groups : {file_groups}") | |
| return file_paths, file_groups, temp_dir, application_form, memo | |