| |
|
| | from collections import defaultdict
|
| | from azure.storage.blob import BlobServiceClient
|
| | from datetime import datetime
|
| | from fastapi import HTTPException, status
|
| | from sqlalchemy.exc import SQLAlchemyError
|
| | from sqlalchemy.orm import Session
|
| | from sqlalchemy import text
|
| | from fastapi.responses import StreamingResponse
|
| | from app.models.application import Application
|
| | from app.models.user_profile import UserProfile
|
| | from fastapi import BackgroundTasks, UploadFile
|
| |
|
| |
|
| |
|
| |
|
| | from app.config.env import env
|
| | from app.schema.response_schema import ResponseSchema
|
| | from app.models.user_document import UserDocument
|
| | from app.models.onboarding_profile import OnboardingProfile
|
| | from app.models.document_type import DocumentType, DocumentSubType
|
| | from app.services.helpers.document_helper import *
|
| | from ..utils.utility import get_current_datetime, update_table
|
| | from ..services.background_services.verify_document import verify_documents
|
| |
|
| |
|
| |
|
| | CONNECTION_STRING = env.CONNECTION_STRING
|
| | CONTAINER_NAME = env.CONTAINER_NAME
|
| | MISCELLANEOUS_DOCUMENT_TYPE_ID = 7
|
| | VERIFY_DOC_QUEUE_NAME = "verify_doc_queue"
|
| |
|
| |
|
| |
|
| | blob_service_client = BlobServiceClient.from_connection_string(
|
| | CONNECTION_STRING)
|
| | container_client = blob_service_client.get_container_client(
|
| | container=CONTAINER_NAME)
|
| |
|
| |
|
| | class DocumentService:
|
| | async def upload_file_to_blob_storage(file: UploadFile, user, payload, db, background_tasks: BackgroundTasks):
|
| | """
|
| | Uploads a file to Azure Blob Storage and stores metadata in the database.
|
| |
|
| | Parameters:
|
| | - file: The file to upload.
|
| | - user: User information.
|
| | - payload: Additional metadata for the file, including document type and subtype.
|
| | - db: SQLAlchemy database session.
|
| | - background_tasks: library to run tasks in background
|
| |
|
| | Returns:
|
| | - ResponseSchema: A schema containing the status of the upload operation.
|
| | """
|
| |
|
| | try:
|
| |
|
| | file_path = await write_file(file)
|
| |
|
| | file_name = file.filename
|
| | file_size = file.size
|
| |
|
| | user_id = user["user_id"]
|
| |
|
| |
|
| | if payload["user_id"] is not None:
|
| | user_id = payload["user_id"]
|
| |
|
| |
|
| | blob_url = await upload_file(user_id, payload, file)
|
| |
|
| |
|
| | if (
|
| | payload["document_type_id"] is None
|
| | or payload["document_subtype_id"] is None
|
| | ):
|
| | document_title = payload["document_title"]
|
| | if document_title is None:
|
| | document_title = "_"
|
| |
|
| | subtype_dict = {
|
| | "documentTypeId": 7,
|
| | "name": document_title,
|
| | "code": generate_unique_code(document_title),
|
| | "applicationId": payload["application_id"],
|
| | }
|
| | subtype = DocumentSubType(**subtype_dict)
|
| | db.add(subtype)
|
| | db.commit()
|
| |
|
| |
|
| | user_document_data = UserDocument(
|
| | userId=user_id,
|
| | documentTypeId=payload["document_type_id"]
|
| | or MISCELLANEOUS_DOCUMENT_TYPE_ID,
|
| | documentSubTypeId=payload["document_subtype_id"]
|
| | or subtype.documentSubTypeId,
|
| | documentStatusId=3,
|
| | date=datetime.now(),
|
| | URL=blob_url,
|
| | applicationId=payload["application_id"],
|
| | userProfileId=payload["user_profile_id"],
|
| | documentName=file_name,
|
| | documentSize=file_size,
|
| | )
|
| | db.add(user_document_data)
|
| | db.commit()
|
| |
|
| |
|
| | db.refresh(user_document_data)
|
| | user_document_id = user_document_data.userDocumentId
|
| | document_type_id = user_document_data.documentTypeId
|
| | document_subtype_id = user_document_data.documentSubTypeId
|
| |
|
| | doc_dict = {
|
| | "file_path": file_path,
|
| | "user_profile_id": payload["user_profile_id"],
|
| | "user_document_id": user_document_id,
|
| | "document_type_id": document_type_id,
|
| | "document_subtype_id": document_subtype_id
|
| | }
|
| |
|
| |
|
| | background_tasks.add_task(
|
| | verify_documents, doc_dict)
|
| |
|
| |
|
| | return ResponseSchema(
|
| | status=200,
|
| | success=True,
|
| | message="File uploaded succesfully!",
|
| | )
|
| |
|
| |
|
| | except SQLAlchemyError as e:
|
| | db.rollback()
|
| | raise HTTPException(
|
| | status_code=status.HTTP_400_BAD_REQUEST, detail=e.args[0]
|
| | )
|
| |
|
| |
|
| | except Exception as e:
|
| | raise HTTPException(status_code=500, detail=e.args[0])
|
| |
|
| | async def update_file(file: UploadFile, user, payload, db):
|
| | """
|
| | Updates a file in Azure Blob Storage and updates metadata in the database.
|
| |
|
| | Parameters:
|
| | - file: The file to upload.
|
| | - user: User information.
|
| | - payload: Additional metadata for the file, including document type and subtype.
|
| | - db: SQLAlchemy database session.
|
| |
|
| | Returns:
|
| | - ResponseSchema: A schema containing the status of the update operation.
|
| | """
|
| |
|
| | try:
|
| | user_id = user["user_id"]
|
| |
|
| |
|
| | if payload["user_id"] is not None:
|
| | user_id = payload["user_id"]
|
| |
|
| |
|
| | existing_document = (
|
| | db.query(UserDocument)
|
| | .filter_by(userDocumentId=payload["document_id"], userId=user_id)
|
| | .first()
|
| | )
|
| |
|
| | if not existing_document:
|
| | raise HTTPException(
|
| | status_code=status.HTTP_404_NOT_FOUND,
|
| | detail="Document does not exist!",
|
| | )
|
| |
|
| | file_name = file.filename
|
| | file_size = file.size
|
| |
|
| |
|
| | blob_url = await upload_file(user_id, payload, file)
|
| |
|
| |
|
| | update_dict = {
|
| | "URL": blob_url,
|
| | "date": get_current_datetime(),
|
| | "documentName": file_name,
|
| | "documentSize": file_size,
|
| | }
|
| |
|
| | update_table(db, existing_document, update_dict)
|
| |
|
| |
|
| | return ResponseSchema(
|
| | status=200,
|
| | success=True,
|
| | message="File updated succesfully!",
|
| | )
|
| |
|
| |
|
| | except SQLAlchemyError as e:
|
| | db.rollback()
|
| | raise HTTPException(
|
| | status_code=status.HTTP_400_BAD_REQUEST, detail=e.args[0]
|
| | )
|
| |
|
| |
|
| | except Exception as e:
|
| | raise HTTPException(status_code=500, detail=e.args[0])
|
| |
|
| | async def read_file_from_blob_storage(document_id, user, user_id, db):
|
| | """
|
| | Reads a file from Azure Blob Storage and returns it as a streaming response.
|
| |
|
| | Parameters:
|
| | - document_id (str): The ID of the document to be read.
|
| | - user (dict): The user object containing user information.
|
| | - db: SQLAlchemy database session.
|
| |
|
| | Returns:
|
| | - StreamingResponse: A streaming response containing the file content.
|
| | """
|
| |
|
| | try:
|
| | userId = user["user_id"]
|
| | if user_id is not None:
|
| | userId = user_id
|
| |
|
| |
|
| | document_data = (
|
| | db.query(UserDocument)
|
| | .filter_by(userDocumentId=document_id, userId=userId)
|
| | .first()
|
| | )
|
| |
|
| | if document_data is None:
|
| | raise HTTPException(
|
| | status_code=status.HTTP_404_NOT_FOUND,
|
| | detail="Document does not exist!",
|
| | )
|
| |
|
| | document_data = document_data.dict()
|
| |
|
| |
|
| | parts = document_data["URL"].split("/")
|
| | index = parts.index(CONTAINER_NAME)
|
| | file_path = "/".join(parts[index + 1:])
|
| |
|
| |
|
| | file_stream = get_download_file_stream_from_blob_storage(file_path)
|
| |
|
| |
|
| | return StreamingResponse(
|
| | iter([file_stream]), media_type="application/octet-stream"
|
| | )
|
| | except Exception as e:
|
| | raise HTTPException(
|
| | status_code=e.status_code or 500, detail=e.detail or e.args[0]
|
| | )
|
| |
|
| | async def get_document_types(application_id, user, db: Session):
|
| | """
|
| | Retrieves document types along with their corresponding subtypes.
|
| |
|
| | Parameters:
|
| | - application_id: The Loan Application Id
|
| | - db: SQLAlchemy database session
|
| |
|
| | Returns:
|
| | - ResponseSchema: Response object containing document types and subtypes
|
| | """
|
| |
|
| | try:
|
| |
|
| | result = (
|
| | db.query(DocumentType, DocumentSubType)
|
| | .join(
|
| | DocumentSubType,
|
| | DocumentType.documentTypeId == DocumentSubType.documentTypeId,
|
| | )
|
| | .all()
|
| | )
|
| |
|
| |
|
| | result_dicts = {}
|
| | document_dict_list = []
|
| |
|
| |
|
| | for document_type, sub_type in result:
|
| |
|
| | document_type_dict = document_type.dict()
|
| | sub_type_dict = sub_type.dict()
|
| |
|
| |
|
| | document_type_id = document_type_dict.get("documentTypeId")
|
| |
|
| |
|
| | if document_type_id in result_dicts:
|
| |
|
| | result_dicts[document_type_id]["sub_types"].append(
|
| | sub_type_dict)
|
| | else:
|
| |
|
| | result_dicts[document_type_id] = document_type_dict
|
| | result_dicts[document_type_id]["sub_types"] = [
|
| | sub_type_dict]
|
| |
|
| | for documents in result_dicts:
|
| | document_dict_list.append(result_dicts[documents])
|
| |
|
| |
|
| | application_details = None
|
| | query = """
|
| | SELECT "Applications"."applicationId" AS "applicationId", "Applications"."loanPurposeId" AS "loanPurposeId", "UserEmploymentAndIncomes"."employmentTypeId" AS "employmentTypeId", "UserEmploymentAndIncomes"."OTIncome" AS "OTIncome", "UserEmploymentAndIncomes"."bonusIncome" AS "bonusIncome", "UserEmploymentAndIncomes"."otherIncome" AS "otherIncome", "UserProfiles"."ownershipId" AS "ownershipId", "UserProfiles"."citizenshipStatusId" AS "citizenshipStatusId", "UserAssets"."lenderCredit" AS "lenderCredit", "UserAssets"."realtorCredit" AS "realtorCredit", "UserAssets"."otherCredit" AS "otherCredit", "UserLiabilities"."isPayoffRequired" AS "isPayoffRequired", "UserLiabilities"."isExclude" AS "isExclude", "UserREOs"."escrowedPayment" AS "escrowedPayment" FROM "Applications" LEFT OUTER JOIN "UserProfiles" ON "UserProfiles"."applicationId" = "Applications"."applicationId" LEFT OUTER JOIN "UserLiabilities" ON "UserLiabilities"."applicationId" = "Applications"."applicationId" LEFT OUTER JOIN "UserAssets" ON "UserAssets"."applicationId" = "Applications"."applicationId" LEFT OUTER JOIN "UserEmploymentAndIncomes" ON "UserEmploymentAndIncomes"."applicationId" = "Applications"."applicationId" LEFT OUTER JOIN "UserREOs" ON "UserREOs"."applicationId" = "Applications"."applicationId" WHERE "Applications"."applicationId" = {application_id};
|
| | """.format(application_id=application_id)
|
| | application_result = db.execute(text(query))
|
| | column_names = application_result.keys()
|
| | for row in application_result.fetchall():
|
| | application_details = dict(zip(column_names, row))
|
| |
|
| |
|
| | if (
|
| | application_details.get("employmentTypeId", None) is None
|
| | or application_details.get("loanPurposeId", None) is None
|
| | ):
|
| | if isinstance(user, str):
|
| | user_id = user
|
| | else:
|
| | user_id = user["user_id"]
|
| | onboarding_details = (
|
| | db.query(
|
| | OnboardingProfile.loanPurposeId,
|
| | OnboardingProfile.employmentTypeId,
|
| | )
|
| | .filter_by(userId=user_id)
|
| | .first()
|
| | )
|
| |
|
| | application_details["loanPurposeId"] = (
|
| | application_details["loanPurposeId"]
|
| | if application_details["loanPurposeId"] is not None
|
| | else onboarding_details[0]
|
| | )
|
| | application_details["employmentTypeId"] = (
|
| | application_details["employmentTypeId"]
|
| | if application_details["employmentTypeId"] is not None
|
| | else onboarding_details[1]
|
| | )
|
| |
|
| |
|
| | filtered_document_dict_list = filter_documents(
|
| | document_dict_list, payload=application_details
|
| | )
|
| |
|
| |
|
| | user_documents = (
|
| | db.query(
|
| | UserDocument.userId,
|
| | UserDocument.userDocumentId,
|
| | UserDocument.documentStatusId,
|
| | UserDocument.documentTypeId,
|
| | UserDocument.documentSubTypeId,
|
| | UserDocument.date,
|
| | UserDocument.remarks,
|
| | UserDocument.documentName,
|
| | UserDocument.documentSize,
|
| | )
|
| | .filter_by(applicationId=application_id)
|
| | .all()
|
| | )
|
| |
|
| |
|
| |
|
| | document_lookup = defaultdict(dict)
|
| |
|
| |
|
| | all_documents_dict = {}
|
| |
|
| | for doc in user_documents:
|
| | document_lookup[doc.documentTypeId][doc.documentSubTypeId] = doc
|
| |
|
| | document_sub_type_id = doc.documentSubTypeId
|
| |
|
| |
|
| | if document_sub_type_id not in all_documents_dict:
|
| | all_documents_dict[document_sub_type_id] = []
|
| |
|
| | all_documents_dict[document_sub_type_id].append({
|
| | "userDocumentId": doc.userDocumentId,
|
| | "timestamp": int(datetime.timestamp(doc.date)),
|
| | "status": doc.documentStatusId,
|
| | "remarks": doc.remarks,
|
| | "name": doc.documentName if doc.documentName else "",
|
| | "size": format_bytes(doc.documentSize) if doc.documentSize else None,
|
| | })
|
| |
|
| |
|
| |
|
| | for doc_type in filtered_document_dict_list:
|
| | document_type_id = doc_type["documentTypeId"]
|
| |
|
| |
|
| | for sub_type in doc_type.get("sub_types", []):
|
| | document_subType_id = sub_type["documentSubTypeId"]
|
| | sub_type["applicationId"] = application_id
|
| |
|
| |
|
| | result = document_lookup.get(document_type_id, {}).get(
|
| | document_subType_id
|
| | )
|
| |
|
| |
|
| | sub_type["uploaded"] = result is not None
|
| |
|
| |
|
| | if sub_type["documentSubTypeId"] in all_documents_dict:
|
| | sub_type["documents"] = all_documents_dict[sub_type["documentSubTypeId"]]
|
| | else:
|
| | sub_type["documents"] = []
|
| |
|
| |
|
| | return ResponseSchema(
|
| | status=200,
|
| | success=True,
|
| | message="Document types retrieved successfully",
|
| | data=filtered_document_dict_list,
|
| | )
|
| | except Exception as e:
|
| |
|
| | raise HTTPException(status_code=500, detail=e.args[0])
|
| |
|
| | async def delete_document(db, document_id, user, user_id):
|
| | """
|
| | Deletes a user document.
|
| |
|
| | Parameters:
|
| | - db: SQLAlchemy database session
|
| | - document_id: ID of the document to be deleted
|
| |
|
| | Returns:
|
| | - ResponseSchema: Response object indicating success or failure of the deletion operation
|
| | """
|
| | try:
|
| | userId = user["user_id"]
|
| | if user_id is not None:
|
| | userId = user_id
|
| |
|
| |
|
| | result = (
|
| | db.query(UserDocument)
|
| | .filter_by(userDocumentId=document_id, userId=userId)
|
| | .first()
|
| | )
|
| |
|
| | if not result:
|
| | raise HTTPException(
|
| | status_code=status.HTTP_404_NOT_FOUND,
|
| | detail="No documents found",
|
| | )
|
| |
|
| | document_type_id = result.dict()["documentTypeId"]
|
| | document_subtype_id = result.dict()["documentSubTypeId"]
|
| |
|
| |
|
| | db.delete(result)
|
| | db.commit()
|
| |
|
| |
|
| | if document_type_id == MISCELLANEOUS_DOCUMENT_TYPE_ID:
|
| | subtype_data = (
|
| | db.query(DocumentSubType)
|
| | .filter(DocumentSubType.documentSubTypeId == document_subtype_id)
|
| | .first()
|
| | )
|
| | db.delete(subtype_data)
|
| | db.commit()
|
| |
|
| |
|
| | return ResponseSchema(
|
| | status=200,
|
| | success=True,
|
| | message="User Document deleted successfully",
|
| | )
|
| | except HTTPException as e:
|
| |
|
| | raise e
|
| | except Exception as e:
|
| |
|
| | raise HTTPException(status_code=500, detail=str(e))
|
| |
|
| | async def get_user_info(db, user):
|
| | """
|
| | Retrieves user information including application details and user profile.
|
| |
|
| | Parameters:
|
| | - db: SQLAlchemy database session
|
| | - user: Dictionary containing user information with 'user_id' key
|
| |
|
| | Returns:
|
| | - ResponseSchema: Response object containing user information
|
| | """
|
| | try:
|
| | result = (
|
| | db.query(
|
| | Application,
|
| | UserProfile,
|
| | )
|
| | .filter_by(userId=user["user_id"])
|
| | .outerjoin(
|
| | UserProfile, Application.applicationId == UserProfile.applicationId
|
| | )
|
| | .all()
|
| | )
|
| |
|
| | if not result:
|
| | raise HTTPException(
|
| | status_code=status.HTTP_404_NOT_FOUND,
|
| | detail="Applications not found",
|
| | )
|
| |
|
| | data = []
|
| | for app in result:
|
| | application = app[0]
|
| | app = app[1:]
|
| | user_profile_details = None
|
| | for index, value in enumerate(app):
|
| | if value is None:
|
| | break
|
| | if index == 0:
|
| | user_profile_details = value
|
| |
|
| | data_dict = {
|
| | "user_id": application.userId,
|
| | "application_id": application.applicationId,
|
| | "user_profile_id": user_profile_details.userProfileId
|
| | if user_profile_details
|
| | else None,
|
| | "first_name": user_profile_details.firstName
|
| | if user_profile_details
|
| | else None,
|
| | "middle_name": user_profile_details.middleName
|
| | if user_profile_details
|
| | else None,
|
| | "last_name": user_profile_details.lastName
|
| | if user_profile_details
|
| | else None,
|
| | "property_address": application.propertyAddress,
|
| | }
|
| | data.append(data_dict)
|
| |
|
| |
|
| | return ResponseSchema(
|
| | status=200,
|
| | success=True,
|
| | message="User info fetched successfully",
|
| | data=data,
|
| | )
|
| | except HTTPException as e:
|
| |
|
| | raise e
|
| | except Exception as e:
|
| |
|
| | raise HTTPException(status_code=500, detail=str(e))
|
| |
|