# --- Library Imports -- from sqlalchemy.exc import SQLAlchemyError from openai import OpenAI import json from sqlalchemy import text # --- User Imports --- from app.config.env import env from app.utils.app_logger.logger import logger from ...utils.database import get_db_service # --- Constants --- client = OpenAI(api_key=env.OPENAI_KEY) def process_response(response): """Extracts data from the assistant's response and handles potential errors.""" try: logger.info(f"ai_response =======> {response}") return json.loads(response) except json.JSONDecodeError as e: logger.exception(f"Error: Invalid JSON data received: {e}") return None async def call_document_agent(file_path, user_profile_details, document_type_id, document_subtype_id): message_file = client.files.create( file=open(file_path, "rb"), purpose="assistants") user_input = f'''This document has a documentTypeId = {document_type_id} and documentSubtypeId = {document_subtype_id}. Please verify it. {str(user_profile_details)}''' thread = client.beta.threads.create( messages=[ { "role": "user", "content": user_input, "attachments": [{"file_id": message_file.id, "tools": [{"type": "file_search"}]}], } ] ) run = client.beta.threads.runs.create_and_poll( thread_id=thread.id, assistant_id=env.DOCUMENT_VERIFICATION_ASSISTANT_ID) messages = list(client.beta.threads.messages.list( thread_id=thread.id, run_id=run.id)) message_content = messages[0].content[0].text.value return process_response(message_content) async def verify_documents(doc_dict): """This is a background job that will call the document assistant to verify the document and update the db""" try: logger.info("Document Verification Started") logger.info(doc_dict) # Initiate DB db = get_db_service() # Extract data from doc_dict user_profile_id = doc_dict.get("user_profile_id") user_document_id = doc_dict.get("user_document_id") document_type_id = doc_dict.get("document_type_id") document_subtype_id = doc_dict.get("document_subtype_id") file_path = doc_dict.get("file_path") if not document_type_id: document_type_id = 7 db_query = f"""SELECT * FROM "UserProfiles" WHERE "userProfileId" = {user_profile_id}""" logger.info(f"db_query: {db_query}") result = db.execute( text(db_query), {"user_profile_id": user_profile_id}).fetchall() user_profile_details = result[0]._mapping logger.info(f"db_query_result: {result}") # Verify document via AI and store the updated document status in DB if user_profile_details is not None: processed_ai_response = await call_document_agent(file_path, user_profile_details, document_type_id, document_subtype_id) logger.info(f"processed_ai_response: {processed_ai_response}") verification_status = processed_ai_response.get( "verificationStatus", False) document_details = processed_ai_response.get( "documentDetails", None) remarks = processed_ai_response.get("remarks", "") # 2 = UPLOADED, 3 = PENDING VERIFICATION, 4 = VERIFICATION FAILED new_document_status = 2 if verification_status == True else 3 if document_type_id == 7: delete_subtype = False new_document_type_id = processed_ai_response.get( "documentTypeId", None) new_document_subtype_id = processed_ai_response.get( "documentSubTypeId", None) if new_document_subtype_id is None: new_document_subtype_id = document_subtype_id else: delete_subtype = True db_query = text( """UPDATE "UserDocuments" SET "documentStatusId"=:new_document_status,"documentTypeId"=:new_document_type_id, "documentSubTypeId"=:new_document_subtype_id, "remarks"=:remarks, "documentDetails"=:document_details WHERE "userDocumentId"=:user_document_id""") logger.info(f"update_query: {db_query}") db.execute(db_query, { "remarks": remarks, "new_document_status": new_document_status, "user_document_id": user_document_id, "new_document_subtype_id": new_document_subtype_id, "new_document_type_id": new_document_type_id, "document_details": document_details}) if (delete_subtype): db_query = text( """DELETE FROM "DocumentSubTypes" WHERE "documentSubTypeId"=:document_subtype_id""" ) db.execute( db_query, {"document_subtype_id": document_subtype_id}) else: update_query = text( """UPDATE "UserDocuments" SET "documentStatusId"=:new_document_status, "remarks"=:remarks, "documentDetails"=:document_details WHERE "userDocumentId"=:user_document_id""" ) logger.info(f"update_query: {update_query}") db.execute(update_query, { "remarks": remarks, "new_document_status": new_document_status, "document_details": document_details, "user_document_id": user_document_id }) db.commit() logger.info("Document Verification Ended") return except SQLAlchemyError as e: logger.exception(f"Document Verification error SQL: {e}") db.rollback() return except Exception as e: logger.exception(f"Document Verification update error: {e}") db.rollback() return