Spaces:
Build error
Build error
| from typing import Any, Dict | |
| import schemas | |
| from utils.logger import setup_logger | |
| logger = setup_logger(__name__) | |
| def group_documents_by_type(obj, result=None): | |
| if result is None: | |
| result = { | |
| "payslip": [], | |
| "bank_statement": [], | |
| "passport": [], | |
| "driving_license": [], | |
| } | |
| if isinstance(obj, dict): | |
| doc_type = obj.get("document_type") | |
| if doc_type in result: | |
| result[doc_type].append(obj) | |
| for value in obj.values(): | |
| group_documents_by_type(value, result) | |
| elif isinstance(obj, list): | |
| for item in obj: | |
| group_documents_by_type(item, result) | |
| return result | |
| # Transformation Functions | |
| def transform_validate_payslip( | |
| data: Dict[str, Any], application_form_dict: Dict[str, str] | |
| ) -> schemas.UKPayslipSchema: | |
| # return schemas.UKPayslipSchema( | |
| # pay_period_start_date=data.get("pay_period_start"), | |
| # pay_period_end_date=data.get("pay_period_end"), | |
| # pay_date=data.get("payslip_date"), | |
| # full_name=data.get("employee_name"), | |
| # employer_name=data.get("employer_name"), | |
| # is_basic_pay_net_pay_other_salary_components_present=bool( | |
| # data.get("basic_pay") and data.get("net_pay") | |
| # ), | |
| # is_tax_deducation_present=bool(data.get("tax_deduction")), | |
| # is_ni_deduction_present=bool(data.get("ni_contribution")), | |
| # complete_employee_address=None, | |
| # employee_number=None, | |
| # ) | |
| payslip_payload = { | |
| "pay_period_start_date": data.get("pay_period_start"), | |
| "pay_period_end_date": data.get("pay_period_end"), | |
| "pay_date": data.get("payslip_date"), | |
| "full_name": data.get("employee_name"), | |
| "employer_name": data.get("employer_name"), | |
| "is_basic_pay_net_pay_other_salary_components_present": bool( | |
| data.get("basic_pay") and data.get("net_pay") | |
| ), | |
| "is_tax_deducation_present": bool(data.get("tax_deduction")), | |
| "is_ni_deduction_present": bool(data.get("ni_contribution")), | |
| "complete_employee_address": data.get("employee_address"), | |
| # "employee_number": data.get("employee_id"), | |
| } | |
| # return payslip_payload | |
| return schemas.UKPayslipSchema.model_validate( | |
| payslip_payload, | |
| context=application_form_dict, | |
| ).model_dump() | |
| def transform_validate_passport( | |
| data: Dict[str, Any], application_form_dict: Dict[str, str] | |
| ) -> schemas.UKPassportSchema: | |
| # name = data.get("full_name") or f"{data.get('given_names', '')} {data.get('surname', '')}".strip() | |
| passport_payload = { | |
| "full_name": data.get("given_names"), | |
| "expiry_date": data.get("date_of_expiry"), | |
| } | |
| # return schemas.UKPassportSchema( | |
| # full_name=name, | |
| # expiry_date=data.get("date_of_expiry"), | |
| # ) | |
| # return passport_payload | |
| return schemas.UKPassportSchema.model_validate( | |
| passport_payload, | |
| context=application_form_dict, | |
| ).model_dump() | |
| def transform_validate_driving_license( | |
| data: Dict[str, Any], application_form_dict: Dict[str, str] | |
| ) -> schemas.UKDrivingLicense: | |
| name = data.get("full_name") or f"{data.get('first_name', '')} {data.get('surname', '')}".strip() | |
| driving_license_payload = {"full_name": name,} | |
| # return schemas.UKPassportSchema( | |
| # full_name=name, | |
| # expiry_date=data.get("date_of_expiry"), | |
| # ) | |
| # return passport_payload | |
| return schemas.UKDrivingLicense.model_validate( | |
| driving_license_payload, | |
| context=application_form_dict, | |
| ).model_dump() | |
| def transform_validate_bank_statement( | |
| data: Dict[str, Any], application_form_dict: Dict[str, str] | |
| ) -> schemas.UKBankAccountStatement: | |
| # First salary deposit date from 'salary_credits' if available | |
| salary_credits = data.get("salary_credits", []) | |
| first_salary_date = None | |
| if salary_credits: | |
| try: | |
| # first_salary_date = int(salary_credits[0]["date"].split("-")[2]) | |
| first_salary_date = salary_credits[0]["date"] | |
| except (IndexError, ValueError, KeyError): | |
| pass | |
| # return schemas.UKBankAccountStatement( | |
| # statement_start_date=data.get("statement_start_date"), | |
| # statement_end_date=data.get("statement_end_date"), | |
| # first_salary_deposit_date_present=first_salary_date, | |
| # bank_name=None, # Not present in this JSON sample | |
| # full_name=data.get("account_holder_name"), | |
| # account_number=None, | |
| # sort_code=None, | |
| # ) | |
| bank_statement_payload = { | |
| "statement_start_date": data.get("statement_start_date"), | |
| "statement_end_date": data.get("statement_end_date"), | |
| "first_salary_deposit_date_present": first_salary_date, | |
| "bank_name": data.get("bank_name"), # Not present in this JSON sample | |
| "full_name": data.get("account_holder_name"), | |
| "account_number": data.get("account_number"), | |
| "sort_code": data.get("sort_code"), | |
| } | |
| # return bank_statement_payload | |
| return schemas.UKBankAccountStatement.model_validate( | |
| bank_statement_payload, | |
| context=application_form_dict, | |
| ).model_dump() | |
| def process_extracted_data( | |
| extracted_data: Dict[str, Any], application_form: Dict[str, Any], full_data_transformed# schemas.CustomAppFormUpload | |
| ): | |
| # full_data = json.loads(extracted_json_data) | |
| # application_form_dict = application_form.model_dump() | |
| grouped_docs = group_documents_by_type(extracted_data) | |
| # for key in grouped_docs: | |
| # if not grouped_docs[key]: | |
| # return f"{key} document type file not uploaded" | |
| transformed_validated_data = { | |
| # "payslips": [transform_payslip(doc) for doc in grouped_docs["payslip"]], | |
| # "bank_statements": [transform_bank_statement(doc) for doc in grouped_docs["bank_statement"]], | |
| # "passports": [transform_passport(doc) for doc in grouped_docs["passport"]], | |
| "payslips": [ | |
| transform_validate_payslip(doc, application_form) | |
| for doc in grouped_docs["payslip"] | |
| ], | |
| "bank_statements": [ | |
| transform_validate_bank_statement(doc, application_form) | |
| for doc in grouped_docs["bank_statement"] | |
| ], | |
| "passports": [ | |
| transform_validate_passport(doc, application_form) | |
| for doc in grouped_docs["passport"] | |
| ], | |
| "driving_licenses": [ | |
| transform_validate_driving_license(doc, application_form) | |
| for doc in grouped_docs["driving_license"] | |
| ], | |
| } | |
| logger.info(f"transformed_validated_data: {transformed_validated_data}") | |
| # `names_across_docs` is a set that stores unique lowercase versions of full names extracted from | |
| # the transformed and validated data. It is used to check if the names across the uploaded | |
| # documents match. The set ensures that only unique names are stored, and it is used to determine | |
| # if there is consistency in the names provided across the different types of documents. | |
| names_across_docs = set() | |
| names_all = [] | |
| for docs in transformed_validated_data.values(): | |
| for doc in docs: | |
| if "full_name" in doc and doc['full_name'] is not None: | |
| names_across_docs.add(doc["full_name"].lower().replace(" ", "")) | |
| names_all.append(doc["full_name"]) | |
| names_across_docs_match = len(names_across_docs) <= 1 | |
| if names_across_docs_match: | |
| cross_docs_name_eq_check = { | |
| # "Policy": "The applicant's name must match across the uploaded documents", | |
| "Policy": "Document Consistency", | |
| "Value": names_all[-1], | |
| "Status": names_across_docs_match, | |
| "Message": "Applicant's name matches across the uploaded documents", | |
| } | |
| else: | |
| cross_docs_name_eq_check = { | |
| # "Policy": "The applicant's name must match across the uploaded documents", | |
| "Policy": "Document Consistency", | |
| "Value": names_all, | |
| "Status": names_across_docs_match, | |
| "Message": "Applicant's name does not match across the uploaded documents" | |
| } | |
| return transformed_validated_data, cross_docs_name_eq_check | |