| |
|
| | from sqlalchemy.exc import SQLAlchemyError
|
| | from openai import OpenAI
|
| | import json
|
| | from sqlalchemy import text
|
| |
|
| |
|
| | from app.config.env import env
|
| | from app.utils.app_logger.logger import logger
|
| | from ...utils.database import get_db_service
|
| |
|
| |
|
| | client = OpenAI(api_key=env.OPENAI_KEY)
|
| |
|
| |
|
| | def process_response(response):
|
| | """Extracts data from the assistant's response and handles potential errors."""
|
| | try:
|
| | logger.info(f"ai_response =======> {response}")
|
| | return json.loads(response)
|
| | except json.JSONDecodeError as e:
|
| | logger.exception(f"Error: Invalid JSON data received: {e}")
|
| | return None
|
| |
|
| |
|
| | async def call_document_agent(file_path, user_profile_details, document_type_id, document_subtype_id):
|
| | message_file = client.files.create(
|
| | file=open(file_path, "rb"), purpose="assistants")
|
| | user_input = f'''This document has a documentTypeId = {document_type_id} and documentSubtypeId = {document_subtype_id}. Please verify it. {str(user_profile_details)}'''
|
| |
|
| | thread = client.beta.threads.create(
|
| | messages=[
|
| | {
|
| | "role": "user",
|
| | "content": user_input,
|
| | "attachments": [{"file_id": message_file.id, "tools": [{"type": "file_search"}]}],
|
| | }
|
| | ]
|
| | )
|
| | run = client.beta.threads.runs.create_and_poll(
|
| | thread_id=thread.id, assistant_id=env.DOCUMENT_VERIFICATION_ASSISTANT_ID)
|
| | messages = list(client.beta.threads.messages.list(
|
| | thread_id=thread.id, run_id=run.id))
|
| | message_content = messages[0].content[0].text.value
|
| | return process_response(message_content)
|
| |
|
| |
|
| | async def verify_documents(doc_dict):
|
| | """This is a background job that will call the document assistant to verify the document and update the db"""
|
| | try:
|
| | logger.info("Document Verification Started")
|
| | logger.info(doc_dict)
|
| |
|
| |
|
| | db = get_db_service()
|
| |
|
| |
|
| | user_profile_id = doc_dict.get("user_profile_id")
|
| | user_document_id = doc_dict.get("user_document_id")
|
| | document_type_id = doc_dict.get("document_type_id")
|
| | document_subtype_id = doc_dict.get("document_subtype_id")
|
| | file_path = doc_dict.get("file_path")
|
| |
|
| | if not document_type_id:
|
| | document_type_id = 7
|
| |
|
| | db_query = f"""SELECT * FROM "UserProfiles" WHERE "userProfileId" = {user_profile_id}"""
|
| |
|
| | logger.info(f"db_query: {db_query}")
|
| |
|
| | result = db.execute(
|
| | text(db_query), {"user_profile_id": user_profile_id}).fetchall()
|
| | user_profile_details = result[0]._mapping
|
| |
|
| | logger.info(f"db_query_result: {result}")
|
| |
|
| |
|
| | if user_profile_details is not None:
|
| | processed_ai_response = await call_document_agent(file_path, user_profile_details, document_type_id, document_subtype_id)
|
| | logger.info(f"processed_ai_response: {processed_ai_response}")
|
| | verification_status = processed_ai_response.get(
|
| | "verificationStatus", False)
|
| | document_details = processed_ai_response.get(
|
| | "documentDetails", None)
|
| | remarks = processed_ai_response.get("remarks", "")
|
| |
|
| | new_document_status = 2 if verification_status == True else 3
|
| |
|
| | if document_type_id == 7:
|
| | delete_subtype = False
|
| | new_document_type_id = processed_ai_response.get(
|
| | "documentTypeId", None)
|
| | new_document_subtype_id = processed_ai_response.get(
|
| | "documentSubTypeId", None)
|
| | if new_document_subtype_id is None:
|
| | new_document_subtype_id = document_subtype_id
|
| | else:
|
| | delete_subtype = True
|
| |
|
| | db_query = text(
|
| | """UPDATE "UserDocuments" SET "documentStatusId"=:new_document_status,"documentTypeId"=:new_document_type_id, "documentSubTypeId"=:new_document_subtype_id, "remarks"=:remarks, "documentDetails"=:document_details WHERE "userDocumentId"=:user_document_id""")
|
| | logger.info(f"update_query: {db_query}")
|
| | db.execute(db_query, {
|
| | "remarks": remarks, "new_document_status": new_document_status, "user_document_id": user_document_id, "new_document_subtype_id": new_document_subtype_id, "new_document_type_id": new_document_type_id, "document_details": document_details})
|
| | if (delete_subtype):
|
| | db_query = text(
|
| | """DELETE FROM "DocumentSubTypes" WHERE "documentSubTypeId"=:document_subtype_id"""
|
| | )
|
| | db.execute(
|
| | db_query, {"document_subtype_id": document_subtype_id})
|
| |
|
| | else:
|
| | update_query = text(
|
| | """UPDATE "UserDocuments" SET "documentStatusId"=:new_document_status, "remarks"=:remarks, "documentDetails"=:document_details WHERE "userDocumentId"=:user_document_id"""
|
| | )
|
| | logger.info(f"update_query: {update_query}")
|
| | db.execute(update_query, {
|
| | "remarks": remarks,
|
| | "new_document_status": new_document_status,
|
| | "document_details": document_details,
|
| | "user_document_id": user_document_id
|
| | })
|
| |
|
| | db.commit()
|
| |
|
| | logger.info("Document Verification Ended")
|
| | return
|
| | except SQLAlchemyError as e:
|
| | logger.exception(f"Document Verification error SQL: {e}")
|
| | db.rollback()
|
| | return
|
| | except Exception as e:
|
| | logger.exception(f"Document Verification update error: {e}")
|
| | db.rollback()
|
| | return
|
| |
|