ans123's picture
Initial upload from Colab
ef1ad9e verified
# --- Library Imports --
from sqlalchemy.exc import SQLAlchemyError
from openai import OpenAI
import json
from sqlalchemy import text
# --- User Imports ---
from app.config.env import env
from app.utils.app_logger.logger import logger
from ...utils.database import get_db_service
# --- Constants ---
client = OpenAI(api_key=env.OPENAI_KEY)
def process_response(response):
"""Extracts data from the assistant's response and handles potential errors."""
try:
logger.info(f"ai_response =======> {response}")
return json.loads(response)
except json.JSONDecodeError as e:
logger.exception(f"Error: Invalid JSON data received: {e}")
return None
async def call_document_agent(file_path, user_profile_details, document_type_id, document_subtype_id):
message_file = client.files.create(
file=open(file_path, "rb"), purpose="assistants")
user_input = f'''This document has a documentTypeId = {document_type_id} and documentSubtypeId = {document_subtype_id}. Please verify it. {str(user_profile_details)}'''
thread = client.beta.threads.create(
messages=[
{
"role": "user",
"content": user_input,
"attachments": [{"file_id": message_file.id, "tools": [{"type": "file_search"}]}],
}
]
)
run = client.beta.threads.runs.create_and_poll(
thread_id=thread.id, assistant_id=env.DOCUMENT_VERIFICATION_ASSISTANT_ID)
messages = list(client.beta.threads.messages.list(
thread_id=thread.id, run_id=run.id))
message_content = messages[0].content[0].text.value
return process_response(message_content)
async def verify_documents(doc_dict):
"""This is a background job that will call the document assistant to verify the document and update the db"""
try:
logger.info("Document Verification Started")
logger.info(doc_dict)
# Initiate DB
db = get_db_service()
# Extract data from doc_dict
user_profile_id = doc_dict.get("user_profile_id")
user_document_id = doc_dict.get("user_document_id")
document_type_id = doc_dict.get("document_type_id")
document_subtype_id = doc_dict.get("document_subtype_id")
file_path = doc_dict.get("file_path")
if not document_type_id:
document_type_id = 7
db_query = f"""SELECT * FROM "UserProfiles" WHERE "userProfileId" = {user_profile_id}"""
logger.info(f"db_query: {db_query}")
result = db.execute(
text(db_query), {"user_profile_id": user_profile_id}).fetchall()
user_profile_details = result[0]._mapping
logger.info(f"db_query_result: {result}")
# Verify document via AI and store the updated document status in DB
if user_profile_details is not None:
processed_ai_response = await call_document_agent(file_path, user_profile_details, document_type_id, document_subtype_id)
logger.info(f"processed_ai_response: {processed_ai_response}")
verification_status = processed_ai_response.get(
"verificationStatus", False)
document_details = processed_ai_response.get(
"documentDetails", None)
remarks = processed_ai_response.get("remarks", "")
# 2 = UPLOADED, 3 = PENDING VERIFICATION, 4 = VERIFICATION FAILED
new_document_status = 2 if verification_status == True else 3
if document_type_id == 7:
delete_subtype = False
new_document_type_id = processed_ai_response.get(
"documentTypeId", None)
new_document_subtype_id = processed_ai_response.get(
"documentSubTypeId", None)
if new_document_subtype_id is None:
new_document_subtype_id = document_subtype_id
else:
delete_subtype = True
db_query = text(
"""UPDATE "UserDocuments" SET "documentStatusId"=:new_document_status,"documentTypeId"=:new_document_type_id, "documentSubTypeId"=:new_document_subtype_id, "remarks"=:remarks, "documentDetails"=:document_details WHERE "userDocumentId"=:user_document_id""")
logger.info(f"update_query: {db_query}")
db.execute(db_query, {
"remarks": remarks, "new_document_status": new_document_status, "user_document_id": user_document_id, "new_document_subtype_id": new_document_subtype_id, "new_document_type_id": new_document_type_id, "document_details": document_details})
if (delete_subtype):
db_query = text(
"""DELETE FROM "DocumentSubTypes" WHERE "documentSubTypeId"=:document_subtype_id"""
)
db.execute(
db_query, {"document_subtype_id": document_subtype_id})
else:
update_query = text(
"""UPDATE "UserDocuments" SET "documentStatusId"=:new_document_status, "remarks"=:remarks, "documentDetails"=:document_details WHERE "userDocumentId"=:user_document_id"""
)
logger.info(f"update_query: {update_query}")
db.execute(update_query, {
"remarks": remarks,
"new_document_status": new_document_status,
"document_details": document_details,
"user_document_id": user_document_id
})
db.commit()
logger.info("Document Verification Ended")
return
except SQLAlchemyError as e:
logger.exception(f"Document Verification error SQL: {e}")
db.rollback()
return
except Exception as e:
logger.exception(f"Document Verification update error: {e}")
db.rollback()
return