Spaces:
Build error
Build error
| import streamlit as st | |
| import os | |
| from PIL import Image | |
| import pymupdf | |
| from datetime import datetime | |
| import re | |
| from utils import setup_logger | |
| from .passport import prune_passport_for_display, display_passport | |
| from .driving_license import prune_driving_license_for_display, display_driving_license | |
| from .bank_statement import prune_bank_statement_for_display, display_bank_statement | |
| from .payslip import prune_payslip_for_display, display_payslip | |
| from .p60 import prune_p60_for_display, display_p60 | |
| from .others import display_others | |
| logger = setup_logger(__name__) | |
| def load_pdf_as_image(file_path): | |
| # Open PDF file | |
| doc = pymupdf.Document(file_path) | |
| # Get the first page | |
| page = doc[0] | |
| # Convert to image | |
| pix = page.get_pixmap() | |
| # Convert to PIL Image | |
| img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) | |
| return img | |
| def generate_metadata(file_path): | |
| """Generate metadata dictionary from file path and properties""" | |
| file_stat = os.stat(file_path) | |
| file_name = os.path.basename(file_path) | |
| parent_dir = os.path.basename(os.path.dirname(file_path)) | |
| metadata = { | |
| "File Name": file_name, | |
| "Directory": parent_dir, | |
| "File Size": f"{file_stat.st_size / 1024:.2f} KB", | |
| "Last Modified": datetime.fromtimestamp(file_stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'), | |
| "Created": datetime.fromtimestamp(file_stat.st_ctime).strftime('%Y-%m-%d %H:%M:%S'), | |
| "File Extension": os.path.splitext(file_name)[1], | |
| "Full Path": file_path | |
| } | |
| # Add image-specific metadata if it's an image | |
| if file_name.lower().endswith(('.png', '.jpg', '.jpeg', '.gif')): | |
| try: | |
| with Image.open(file_path) as img: | |
| metadata.update({ | |
| "Image Size": f"{img.size[0]}x{img.size[1]}", | |
| "Image Mode": img.mode, | |
| "Image Format": img.format | |
| }) | |
| except Exception as e: | |
| st.error(f"Error reading image metadata: {str(e)}") | |
| # Add PDF-specific metadata if it's a PDF | |
| elif file_name.lower().endswith('.pdf'): | |
| try: | |
| doc = pymupdf.Document(file_path) | |
| metadata.update({ | |
| "Page Count": len(doc), | |
| "PDF Version": doc.pdf_version, | |
| "Document Info": doc.metadata if doc.metadata else "No PDF metadata available" | |
| }) | |
| except Exception as e: | |
| st.error(f"Error reading PDF metadata: {str(e)}") | |
| return metadata | |
| def merge_dict_values(dict1, dict2): | |
| """ | |
| Merge two dictionaries based on the following rules: | |
| 1. If key is unique, keep the value | |
| 2. If key exists in both dicts, keep non-None value; if both have values, keep in a list | |
| 3. If value is None in both, keep the key with None value | |
| """ | |
| result = {} | |
| # Get all unique keys from both dictionaries | |
| all_keys = set(dict1.keys()).union(set(dict2.keys())) | |
| for key in all_keys: | |
| # Case 1: Key only in dict1 | |
| if key in dict1 and key not in dict2: | |
| result[key] = dict1[key] | |
| # Case 2: Key only in dict2 | |
| elif key in dict2 and key not in dict1: | |
| result[key] = dict2[key] | |
| # Case 3: Key in both dictionaries | |
| else: | |
| value1 = dict1[key] | |
| value2 = dict2[key] | |
| # If both are dictionaries, recursively merge them | |
| if isinstance(value1, dict) and isinstance(value2, dict): | |
| result[key] = merge_dict_values(value1, value2) | |
| # If one is None, use the non-None value | |
| elif value1 is None and value2 is not None: | |
| result[key] = value2 | |
| elif value2 is None and value1 is not None: | |
| result[key] = value1 | |
| # If both have values, store in a list (unless they're the same) | |
| elif value1 is not None and value2 is not None and value1 != value2: | |
| result[key] = [value1, value2] | |
| # If both are None or identical, keep one | |
| else: | |
| result[key] = value1 | |
| return result | |
| def merge_json_file(json_data): | |
| """ | |
| Process a JSON file with nested dictionaries to create a consolidated result. | |
| """ | |
| result = {} | |
| # Process each top-level key | |
| for file_path, pages in json_data.items(): | |
| # Initialize an empty dictionary for this file path | |
| result[file_path] = {} | |
| # Get all page dictionaries for this file | |
| page_dicts = list(pages.values()) | |
| # Start with the first page as the base | |
| merged_dict = page_dicts[0] | |
| # Merge with each subsequent page | |
| for page_dict in page_dicts[1:]: | |
| merged_dict = merge_dict_values(merged_dict, page_dict) | |
| result[file_path] = merged_dict | |
| return result | |
| def display_based_on_card(original_file, analysis_results_for_original_file, extracted_files, current_upload): | |
| try: | |
| analysis_results_for_id = merge_json_file( | |
| {original_file: analysis_results_for_original_file}) | |
| analysis_results_for_id = analysis_results_for_id[original_file] | |
| logger.info(f"analysis_results_for_id : {analysis_results_for_id}") | |
| except Exception as e: | |
| logger.info( | |
| f"Exception while trying to merge results of {original_file}") | |
| logger.info( | |
| f"analysis_results_for_original_file : {analysis_results_for_original_file}") | |
| logger.info(f"error : {e}") | |
| return analysis_results_for_original_file | |
| analysis_results_for_id_updated = analysis_results_for_id | |
| try: | |
| document_type = analysis_results_for_id.get( | |
| "document_type", "None") | |
| logger.info(f"document_type for {original_file}: {document_type}") | |
| document_type = document_type.lower() | |
| document_type = re.sub('[^A-Za-z0-9]+', '', document_type) | |
| print(f"document_type : {document_type}") | |
| # analysis_results_pruned = {} | |
| if document_type == "passport": | |
| analysis_results_pruned = prune_passport_for_display( | |
| analysis_results_for_id) | |
| display_passport(extracted_files, analysis_results_pruned) | |
| elif document_type == "drivinglicense": | |
| analysis_results_pruned = prune_driving_license_for_display( | |
| analysis_results_for_id) | |
| # if original_file not in st.session_state['tab_ocr']['values_display']: | |
| # st.session_state['tab_ocr']['values_display'][original_file] = analysis_results_for_id_updated | |
| display_driving_license(extracted_files, analysis_results_pruned) | |
| elif document_type == "bankstatement": | |
| analysis_results_pruned = prune_bank_statement_for_display( | |
| analysis_results_for_id) | |
| # if original_file not in st.session_state['tab_ocr']['values_display']: | |
| # st.session_state['tab_ocr']['values_display'][original_file] = analysis_results_for_id_updated | |
| display_bank_statement(extracted_files, analysis_results_pruned) | |
| elif document_type == "payslip": | |
| analysis_results_pruned = prune_payslip_for_display( | |
| analysis_results_for_id) | |
| # if original_file not in st.session_state['tab_ocr']['values_display']: | |
| # st.session_state['tab_ocr']['values_display'][original_file] = analysis_results_for_id_updated | |
| display_payslip(extracted_files, analysis_results_pruned) | |
| elif document_type == "p60": | |
| analysis_results_pruned = prune_p60_for_display( | |
| analysis_results_for_id) | |
| # if original_file not in st.session_state['tab_ocr']['values_display']: | |
| # st.session_state['tab_ocr']['values_display'][original_file] = analysis_results_for_id_updated | |
| display_p60(extracted_files, analysis_results_pruned) | |
| else: | |
| analysis_results_for_id_updated["document_type"] = analysis_results_for_id.get( | |
| "document_type", None) | |
| # if original_file not in st.session_state['tab_ocr']['values_display']: | |
| # st.session_state['tab_ocr']['values_display'][original_file] = analysis_results_for_id_updated | |
| display_others(extracted_files, analysis_results_for_id_updated) | |
| except Exception as e: | |
| logger.info( | |
| f"Exception for processing analysis results of {analysis_results_for_id}: {e}") | |
| analysis_results_for_id_updated = analysis_results_for_id | |
| # if original_file not in st.session_state['tab_ocr']['values_display']: | |
| # st.session_state['tab_ocr']['values_display'][original_file] = analysis_results_for_id_updated | |
| display_others(extracted_files, analysis_results_for_id_updated) | |
| if original_file not in st.session_state['uploads'][current_upload]['values_display']: | |
| st.session_state['uploads'][current_upload]['values_display'][original_file] = analysis_results_for_id_updated | |
| # return analysis_results_for_id_updated | |