# --- Library Imports --- from collections import defaultdict from azure.storage.blob import BlobServiceClient from datetime import datetime from fastapi import HTTPException, status from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import Session from sqlalchemy import text from fastapi.responses import StreamingResponse from app.models.application import Application from app.models.user_profile import UserProfile from fastapi import BackgroundTasks, UploadFile # --- # --- User Imports --- from app.config.env import env from app.schema.response_schema import ResponseSchema from app.models.user_document import UserDocument from app.models.onboarding_profile import OnboardingProfile from app.models.document_type import DocumentType, DocumentSubType from app.services.helpers.document_helper import * from ..utils.utility import get_current_datetime, update_table from ..services.background_services.verify_document import verify_documents # --- # --- Constants --- CONNECTION_STRING = env.CONNECTION_STRING CONTAINER_NAME = env.CONTAINER_NAME MISCELLANEOUS_DOCUMENT_TYPE_ID = 7 VERIFY_DOC_QUEUE_NAME = "verify_doc_queue" # --- # Initiate connection to blob storage blob_service_client = BlobServiceClient.from_connection_string( CONNECTION_STRING) container_client = blob_service_client.get_container_client( container=CONTAINER_NAME) class DocumentService: async def upload_file_to_blob_storage(file: UploadFile, user, payload, db, background_tasks: BackgroundTasks): """ Uploads a file to Azure Blob Storage and stores metadata in the database. Parameters: - file: The file to upload. - user: User information. - payload: Additional metadata for the file, including document type and subtype. - db: SQLAlchemy database session. - background_tasks: library to run tasks in background Returns: - ResponseSchema: A schema containing the status of the upload operation. """ try: # Write file for processing it later for doc verification file_path = await write_file(file) file_name = file.filename file_size = file.size user_id = user["user_id"] # checking if user Id is present in body or not for loan officer if payload["user_id"] is not None: user_id = payload["user_id"] # Upload file blob_url = await upload_file(user_id, payload, file) # Create a subtype in case of miscellaneous document if ( payload["document_type_id"] is None or payload["document_subtype_id"] is None ): document_title = payload["document_title"] if document_title is None: document_title = "_" subtype_dict = { "documentTypeId": 7, "name": document_title, "code": generate_unique_code(document_title), "applicationId": payload["application_id"], } subtype = DocumentSubType(**subtype_dict) db.add(subtype) db.commit() # Store file metadata in database user_document_data = UserDocument( userId=user_id, documentTypeId=payload["document_type_id"] or MISCELLANEOUS_DOCUMENT_TYPE_ID, documentSubTypeId=payload["document_subtype_id"] or subtype.documentSubTypeId, documentStatusId=3, # Pending Verification date=datetime.now(), URL=blob_url, applicationId=payload["application_id"], userProfileId=payload["user_profile_id"], documentName=file_name, documentSize=file_size, ) db.add(user_document_data) db.commit() # Refresh the data to get the auto-generated userDocumentId db.refresh(user_document_data) user_document_id = user_document_data.userDocumentId document_type_id = user_document_data.documentTypeId document_subtype_id = user_document_data.documentSubTypeId doc_dict = { "file_path": file_path, "user_profile_id": payload["user_profile_id"], "user_document_id": user_document_id, "document_type_id": document_type_id, "document_subtype_id": document_subtype_id } # Send document to queue for verification background_tasks.add_task( verify_documents, doc_dict) # Return success response return ResponseSchema( status=200, success=True, message="File uploaded succesfully!", ) # Handle database errors except SQLAlchemyError as e: db.rollback() raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=e.args[0] ) # Handle other exceptions except Exception as e: raise HTTPException(status_code=500, detail=e.args[0]) async def update_file(file: UploadFile, user, payload, db): """ Updates a file in Azure Blob Storage and updates metadata in the database. Parameters: - file: The file to upload. - user: User information. - payload: Additional metadata for the file, including document type and subtype. - db: SQLAlchemy database session. Returns: - ResponseSchema: A schema containing the status of the update operation. """ try: user_id = user["user_id"] # checking if user Id is present in body or not if payload["user_id"] is not None: user_id = payload["user_id"] # Check if document exists existing_document = ( db.query(UserDocument) .filter_by(userDocumentId=payload["document_id"], userId=user_id) .first() ) if not existing_document: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Document does not exist!", ) file_name = file.filename file_size = file.size # Upload file blob_url = await upload_file(user_id, payload, file) # Update metadata in DB update_dict = { "URL": blob_url, "date": get_current_datetime(), "documentName": file_name, "documentSize": file_size, } update_table(db, existing_document, update_dict) # Return success response return ResponseSchema( status=200, success=True, message="File updated succesfully!", ) # Handle database errors except SQLAlchemyError as e: db.rollback() raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=e.args[0] ) # Handle other exceptions except Exception as e: raise HTTPException(status_code=500, detail=e.args[0]) async def read_file_from_blob_storage(document_id, user, user_id, db): """ Reads a file from Azure Blob Storage and returns it as a streaming response. Parameters: - document_id (str): The ID of the document to be read. - user (dict): The user object containing user information. - db: SQLAlchemy database session. Returns: - StreamingResponse: A streaming response containing the file content. """ try: userId = user["user_id"] if user_id is not None: userId = user_id # Fetch document data document_data = ( db.query(UserDocument) .filter_by(userDocumentId=document_id, userId=userId) .first() ) if document_data is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Document does not exist!", ) document_data = document_data.dict() # Specify file path parts = document_data["URL"].split("/") index = parts.index(CONTAINER_NAME) file_path = "/".join(parts[index + 1:]) # Get file stream from blob storage file_stream = get_download_file_stream_from_blob_storage(file_path) # Return a StreamingResponse with the file content return StreamingResponse( iter([file_stream]), media_type="application/octet-stream" ) except Exception as e: raise HTTPException( status_code=e.status_code or 500, detail=e.detail or e.args[0] ) async def get_document_types(application_id, user, db: Session): """ Retrieves document types along with their corresponding subtypes. Parameters: - application_id: The Loan Application Id - db: SQLAlchemy database session Returns: - ResponseSchema: Response object containing document types and subtypes """ try: # Query to get all details by joining DocumentType and DocumentSubType result = ( db.query(DocumentType, DocumentSubType) .join( DocumentSubType, DocumentType.documentTypeId == DocumentSubType.documentTypeId, ) .all() ) # Initialize an empty dictionary to store document types and their subtypes result_dicts = {} document_dict_list = [] # Iterate through the result for document_type, sub_type in result: # Convert DocumentType and DocumentSubType to dictionaries document_type_dict = document_type.dict() sub_type_dict = sub_type.dict() # Extract document type ID document_type_id = document_type_dict.get("documentTypeId") # Check if document type ID already exists in result_dicts if document_type_id in result_dicts: # Append subtype to the list of subtypes for the existing document type result_dicts[document_type_id]["sub_types"].append( sub_type_dict) else: # Add a new entry for the document type and initialize its sub_types list result_dicts[document_type_id] = document_type_dict result_dicts[document_type_id]["sub_types"] = [ sub_type_dict] for documents in result_dicts: document_dict_list.append(result_dicts[documents]) # Filter list according to application data application_details = None query = """ SELECT "Applications"."applicationId" AS "applicationId", "Applications"."loanPurposeId" AS "loanPurposeId", "UserEmploymentAndIncomes"."employmentTypeId" AS "employmentTypeId", "UserEmploymentAndIncomes"."OTIncome" AS "OTIncome", "UserEmploymentAndIncomes"."bonusIncome" AS "bonusIncome", "UserEmploymentAndIncomes"."otherIncome" AS "otherIncome", "UserProfiles"."ownershipId" AS "ownershipId", "UserProfiles"."citizenshipStatusId" AS "citizenshipStatusId", "UserAssets"."lenderCredit" AS "lenderCredit", "UserAssets"."realtorCredit" AS "realtorCredit", "UserAssets"."otherCredit" AS "otherCredit", "UserLiabilities"."isPayoffRequired" AS "isPayoffRequired", "UserLiabilities"."isExclude" AS "isExclude", "UserREOs"."escrowedPayment" AS "escrowedPayment" FROM "Applications" LEFT OUTER JOIN "UserProfiles" ON "UserProfiles"."applicationId" = "Applications"."applicationId" LEFT OUTER JOIN "UserLiabilities" ON "UserLiabilities"."applicationId" = "Applications"."applicationId" LEFT OUTER JOIN "UserAssets" ON "UserAssets"."applicationId" = "Applications"."applicationId" LEFT OUTER JOIN "UserEmploymentAndIncomes" ON "UserEmploymentAndIncomes"."applicationId" = "Applications"."applicationId" LEFT OUTER JOIN "UserREOs" ON "UserREOs"."applicationId" = "Applications"."applicationId" WHERE "Applications"."applicationId" = {application_id}; """.format(application_id=application_id) application_result = db.execute(text(query)) column_names = application_result.keys() for row in application_result.fetchall(): application_details = dict(zip(column_names, row)) # Get loan purpose id and employment id from onboarding data in case they are empty if ( application_details.get("employmentTypeId", None) is None or application_details.get("loanPurposeId", None) is None ): if isinstance(user, str): user_id = user else: user_id = user["user_id"] onboarding_details = ( db.query( OnboardingProfile.loanPurposeId, OnboardingProfile.employmentTypeId, ) .filter_by(userId=user_id) .first() ) application_details["loanPurposeId"] = ( application_details["loanPurposeId"] if application_details["loanPurposeId"] is not None else onboarding_details[0] ) application_details["employmentTypeId"] = ( application_details["employmentTypeId"] if application_details["employmentTypeId"] is not None else onboarding_details[1] ) # Get filtered documents according to application data filtered_document_dict_list = filter_documents( document_dict_list, payload=application_details ) # Fetch all documents associated with the given application_id user_documents = ( db.query( UserDocument.userId, UserDocument.userDocumentId, UserDocument.documentStatusId, UserDocument.documentTypeId, UserDocument.documentSubTypeId, UserDocument.date, UserDocument.remarks, UserDocument.documentName, UserDocument.documentSize, ) .filter_by(applicationId=application_id) .all() ) # Create a lookup dictionary from the queried data # This dictionary will map documentTypeId -> documentSubTypeId -> document object document_lookup = defaultdict(dict) # Initialize an empty dictionary to store the results all_documents_dict = {} for doc in user_documents: document_lookup[doc.documentTypeId][doc.documentSubTypeId] = doc document_sub_type_id = doc.documentSubTypeId # If the user_id is not already in the dictionary, add it with an empty list if document_sub_type_id not in all_documents_dict: all_documents_dict[document_sub_type_id] = [] # Append the document details to the list for the corresponding user_id all_documents_dict[document_sub_type_id].append({ "userDocumentId": doc.userDocumentId, "timestamp": int(datetime.timestamp(doc.date)), "status": doc.documentStatusId, "remarks": doc.remarks, "name": doc.documentName if doc.documentName else "", "size": format_bytes(doc.documentSize) if doc.documentSize else None, }) # Iterate through each document type in the filtered list for doc_type in filtered_document_dict_list: document_type_id = doc_type["documentTypeId"] # Iterate through each subtype within the current document type for sub_type in doc_type.get("sub_types", []): document_subType_id = sub_type["documentSubTypeId"] sub_type["applicationId"] = application_id # Retrieve the corresponding document from the lookup dictionary result = document_lookup.get(document_type_id, {}).get( document_subType_id ) # Set the 'uploaded' flag to True if the document exists, otherwise False sub_type["uploaded"] = result is not None # fetching data as per documentSubTypeId if sub_type["documentSubTypeId"] in all_documents_dict: sub_type["documents"] = all_documents_dict[sub_type["documentSubTypeId"]] else: sub_type["documents"] = [] # Return success response return ResponseSchema( status=200, success=True, message="Document types retrieved successfully", data=filtered_document_dict_list, ) except Exception as e: # Raise HTTPException if an error occurs raise HTTPException(status_code=500, detail=e.args[0]) async def delete_document(db, document_id, user, user_id): """ Deletes a user document. Parameters: - db: SQLAlchemy database session - document_id: ID of the document to be deleted Returns: - ResponseSchema: Response object indicating success or failure of the deletion operation """ try: userId = user["user_id"] if user_id is not None: userId = user_id # Query to retrieve user documents result = ( db.query(UserDocument) .filter_by(userDocumentId=document_id, userId=userId) .first() ) if not result: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="No documents found", ) document_type_id = result.dict()["documentTypeId"] document_subtype_id = result.dict()["documentSubTypeId"] # Delete the document db.delete(result) db.commit() # Delete the document subtype in case of misc docunment if document_type_id == MISCELLANEOUS_DOCUMENT_TYPE_ID: subtype_data = ( db.query(DocumentSubType) .filter(DocumentSubType.documentSubTypeId == document_subtype_id) .first() ) db.delete(subtype_data) db.commit() # Return success response return ResponseSchema( status=200, success=True, message="User Document deleted successfully", ) except HTTPException as e: # Raise HTTPException if an error occurs raise e except Exception as e: # Raise HTTPException if an error occurs raise HTTPException(status_code=500, detail=str(e)) async def get_user_info(db, user): """ Retrieves user information including application details and user profile. Parameters: - db: SQLAlchemy database session - user: Dictionary containing user information with 'user_id' key Returns: - ResponseSchema: Response object containing user information """ try: result = ( db.query( Application, UserProfile, ) .filter_by(userId=user["user_id"]) .outerjoin( UserProfile, Application.applicationId == UserProfile.applicationId ) .all() ) if not result: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Applications not found", ) data = [] for app in result: application = app[0] app = app[1:] user_profile_details = None for index, value in enumerate(app): if value is None: break if index == 0: user_profile_details = value data_dict = { "user_id": application.userId, "application_id": application.applicationId, "user_profile_id": user_profile_details.userProfileId if user_profile_details else None, "first_name": user_profile_details.firstName if user_profile_details else None, "middle_name": user_profile_details.middleName if user_profile_details else None, "last_name": user_profile_details.lastName if user_profile_details else None, "property_address": application.propertyAddress, } data.append(data_dict) # Return success response return ResponseSchema( status=200, success=True, message="User info fetched successfully", data=data, ) except HTTPException as e: # Raise HTTPException if an error occurs raise e except Exception as e: # Raise HTTPException if an error occurs raise HTTPException(status_code=500, detail=str(e))