gemini-agent / src /application /chat_service.py
likhon saheikh
Deploy full Gemini Agentic Platform
ad8ba8a
from typing import AsyncGenerator, Dict, Any
from src.domain.models import Session, Message, MessageRole
from src.domain.interfaces import AgentRepository, SessionRepository
from src.application.session_manager import SessionManager
class ChatService:
def __init__(self, session_manager: SessionManager, agent: AgentRepository):
self.session_manager = session_manager
self.agent = agent
async def chat(self, session_id: str, message_content: str) -> AsyncGenerator[Dict[str, Any], None]:
session = await self.session_manager.get_session_details(session_id)
if not session:
yield {"event": "error", "data": "Session not found"}
return
# Create user message
user_message = Message(role=MessageRole.USER, content=message_content)
session.messages.append(user_message)
await self.session_manager.repository.update_session(session)
# Stream response from agent
full_response = ""
async for event in self.agent.chat(session, user_message):
if event["event"] == "message":
full_response += event["data"]
yield event
# Save assistant message
if full_response:
assistant_message = Message(role=MessageRole.ASSISTANT, content=full_response)
session.messages.append(assistant_message)
await self.session_manager.repository.update_session(session)