openai-openapi-template / app /service /chat_service.py
cevheri's picture
chore: api layer refactoring
8f4cefb
import datetime
from typing import Any, List
from app.agent.chat_agent_scheme import UserChatAgentRequest
from app.repository.chat_repository import ChatRepository
from app.schema.chat_schema import ChatCompletionRequest, ChatCompletionResponse, ChatMessageResponse, ChatMessageRequest
from app.mapper.chat_mapper import ChatMapper
from app.mapper.conversation_mapper import ConversationMapper
import uuid
from loguru import logger
from app.schema.conversation_schema import ConversationItemResponse, ConversationResponse
from app.service.chat_validation import ChatValidation
from app.agent.chat_agent_client import ChatAgentClient
class ChatService:
def __init__(self):
self.chat_repository = ChatRepository()
self.chat_mapper = ChatMapper()
self.conversation_mapper = ConversationMapper()
self.chat_validation = ChatValidation()
self.chat_agent_client = ChatAgentClient()
async def find(self, query: dict, page: int, limit: int, sort: dict, project: dict = None) -> List[ChatCompletionResponse]:
logger.debug(f"BEGIN SERVICE: find for query: {query}, page: {page}, limit: {limit}, sort: {sort}, project: {project}")
entities = await self.chat_repository.find(query, page, limit, sort, project)
return self.chat_mapper.to_schema_list(entities)
async def find_by_id(self, completion_id: str, project: dict = None) -> ChatCompletionResponse:
entity = await self.chat_repository.find_by_id(completion_id, project)
return self.chat_mapper.to_schema(entity) if entity else None
async def find_messages(self, completion_id: str) -> List[ChatMessageResponse]:
logger.debug(f"BEGIN SERVICE: find_messages for completion_id: {completion_id}")
messages = await self.chat_repository.find_messages(completion_id)
logger.debug(f"END SERVICE: find_messages for completion_id: {completion_id}, messages: {len(messages)}")
messages_response = [
ChatMessageResponse(
message_id=message.message_id,
role=message.role,
content=message.content,
created_date=message.created_date,
figure=(message.figure),
)
for message in messages
]
return messages_response
# conversation service
async def find_all_conversations(self, username: str) -> ConversationResponse:
"""Find all conversations for a given username."""
query = {"created_by": username}
sort = {"last_updated_date": -1} # Sort by last updated date in descending order
entities = await self.chat_repository.find(query, page=1, limit=100, sort=sort)
result = self.conversation_mapper.to_schema_list(entities)
return ConversationResponse(items=result, total=len(result), limit=100, offset=0)
# conversation service
async def find_conversation_by_id(self, completion_id: str) -> ConversationItemResponse | None:
"""Find a conversation by its completion ID."""
logger.debug(f"BEGIN SERVICE: find_conversation_by_id for completion_id: {completion_id}")
projection = {"messages": 0, "_id": 0}
entity = await self.chat_repository.find_by_id(completion_id, projection=projection)
if entity:
conversation_item = self.conversation_mapper.to_schema(entity)
logger.debug(f"END SERVICE: find_conversation_by_id for completion_id: {completion_id}, entity: {conversation_item}")
return conversation_item
return None
async def find_plot_by_message(self, completion_id: str, message_id: str) -> dict[str, Any]:
logger.debug(f"BEGIN SERVICE: find_plot_by_message for completion_id: {completion_id}, message_id: {message_id}")
figure = await self.chat_repository.find_plot_by_message(completion_id, message_id)
if figure:
result = figure
else:
result = None
logger.warning(f"END SERVICE: no figure found for completion_id: {completion_id}, message_id: {message_id}")
logger.debug(f"END SERVICE: find_plot_by_message for completion_id: {completion_id}, message_id: {message_id} with figure")
return result
async def _save_chat_completion(self, chat_schema: ChatCompletionRequest, username: str) -> ChatCompletionResponse:
"""
Save a chat completion to the database.
"""
logger.debug("BEGIN SERVICE: Saving Chat Completion")
try:
# Convert request to model
chat_model = self.chat_mapper.to_model(chat_schema)
chat_model.last_updated_by = username
chat_model.last_updated_date = datetime.datetime.now()
if not chat_model.completion_id:
# generate a new chat completion_id this is a new chat starting
logger.info("Generating new chat completion_id for new chat starting")
chat_model.completion_id = str(uuid.uuid4())
# generate message_id and created_date for latest user message
last_user_message_model = chat_model.messages[-1]
last_user_message_model.message_id = str(uuid.uuid4())
last_user_message_model.created_date = datetime.datetime.now()
logger.trace(f"last_user_message_model: {last_user_message_model}")
logger.trace(f"finding by id. entity: {chat_model.completion_id}")
current_db_entity = await self.chat_repository.find_by_id(chat_model.completion_id)
logger.trace(f"found by id. Current entity: {current_db_entity.completion_id if current_db_entity else 'None'}")
# if chat completion is not found, create a new one
if not current_db_entity:
logger.info("Create new chat completion with new user request message")
chat_model.created_by = username
chat_model.created_date = datetime.datetime.now()
chat_model.last_updated_by = username
chat_model.last_updated_date = datetime.datetime.now()
# title can generate with LLM from user request message.content
chat_model.title = last_user_message_model.content[:20]
final_entity = await self.chat_repository.create(chat_model)
else:
logger.info("Update existing chat completion with new user request message")
logger.trace(f"before update. current db entity messages count: {len(current_db_entity.messages)}")
current_db_entity.messages.append(last_user_message_model)
logger.trace(f"after update. current db entity messages count: {len(current_db_entity.messages)}")
current_db_entity.last_updated_date = datetime.datetime.now()
final_entity = await self.chat_repository.update(current_db_entity)
logger.trace(f"after update. final entity messages count: {len(final_entity.messages)}")
# Convert model to response
result = self.chat_mapper.to_schema(final_entity, convert_last_message=True)
logger.debug("END SERVICE")
return result
except Exception as e:
logger.error(f"Error saving chat completion: {e}")
raise
async def chat_agent_client_process(self, user_chat_completion: ChatCompletionRequest, username: str):
logger.debug(f"BEGIN SERVICE: Agentic Chat AI process. username: {username}")
last_user_message = user_chat_completion.messages[-1].content
user_chat_agent_request = UserChatAgentRequest(message=last_user_message)
result = self.chat_agent_client.process(user_chat_agent_request)
logger.debug("END SERVICE: Agentic Chat AI process")
return result
async def handle_chat_completion(self, user_chat_completion: ChatCompletionRequest, username: str) -> ChatCompletionResponse:
last_user_message = user_chat_completion
logger.debug(f"BEGIN SERVICE: last_user_message: {last_user_message}, username: {username}")
# validate user message
self.chat_validation.validate_request(user_chat_completion)
# save user message to database
logger.info("Saving user message to database")
repo_user_message = await self._save_chat_completion(user_chat_completion, username)
logger.info(f"Saved user message to database with completion_id: {repo_user_message.completion_id}")
# region agentic-ai process start #########################################################
try:
logger.info("Agentic Chat AI process started")
agent_result = await self.chat_agent_client_process(user_chat_completion, username)
assistant_message = ChatMessageRequest(role="assistant", content=agent_result.message)
assistant_chat_completion = user_chat_completion
assistant_chat_completion.messages = [assistant_message] # replace user messages with assistant message
logger.info(f"Agentic Chat AI process completed. Part of Assistant Message...: {assistant_message.content[:50]}...")
except Exception as e:
logger.error(f"Error agentic-ai process: {e}")
raise
# endregion agentic-ai process start ######################################################
# validate agent response
self.chat_validation.validate_response(agent_result)
# save assistant message to database
repo_assistant_message = await self._save_chat_completion(assistant_chat_completion, username)
# generate api response with user, agent, db etc... TBD
result = repo_assistant_message
logger.debug("END SERVICE")
return result