gemini-agent / src /application /session_manager.py
likhon saheikh
Deploy full Gemini Agentic Platform
ad8ba8a
import uuid
import time
from typing import List, Dict, Optional
from src.domain.models import Session, SessionStatus, Message
from src.domain.interfaces import SessionRepository
class InMemorySessionRepository(SessionRepository):
def __init__(self):
self._sessions: Dict[str, Session] = {}
async def create_session(self) -> Session:
session_id = str(uuid.uuid4())
session = Session(
session_id=session_id,
title="New Chat",
status=SessionStatus.ACTIVE
)
self._sessions[session_id] = session
return session
async def get_session(self, session_id: str) -> Optional[Session]:
return self._sessions.get(session_id)
async def list_sessions(self) -> List[Session]:
return list(self._sessions.values())
async def delete_session(self, session_id: str) -> None:
if session_id in self._sessions:
del self._sessions[session_id]
async def update_session(self, session: Session) -> None:
self._sessions[session.session_id] = session
class SessionManager:
def __init__(self, repository: SessionRepository):
self.repository = repository
async def create_new_session(self) -> Session:
return await self.repository.create_session()
async def get_session_details(self, session_id: str) -> Optional[Session]:
return await self.repository.get_session(session_id)
async def list_all_sessions(self) -> List[Session]:
return await self.repository.list_sessions()
async def delete_session(self, session_id: str) -> None:
await self.repository.delete_session(session_id)
async def stop_session(self, session_id: str) -> None:
session = await self.repository.get_session(session_id)
if session:
session.status = SessionStatus.STOPPED
await self.repository.update_session(session)
# Singleton instance for simplicity in this demo
session_repository = InMemorySessionRepository()
session_manager = SessionManager(session_repository)