gemini-agent / src /domain /interfaces.py
likhon saheikh
Deploy full Gemini Agentic Platform
ad8ba8a
from abc import ABC, abstractmethod
from typing import List, Dict, Any, AsyncGenerator
from src.domain.models import Session, Message, SessionStatus
class AgentRepository(ABC):
@abstractmethod
async def chat(self, session: Session, message: Message) -> AsyncGenerator[Dict[str, Any], None]:
"""Send a message to the agent and get a stream of events"""
pass
class SandboxService(ABC):
@abstractmethod
async def execute_shell(self, session_id: str, command: str) -> str:
pass
@abstractmethod
async def read_file(self, session_id: str, path: str) -> str:
pass
@abstractmethod
async def start_session(self, session_id: str) -> None:
pass
@abstractmethod
async def stop_session(self, session_id: str) -> None:
pass
class SessionRepository(ABC):
@abstractmethod
async def create_session(self) -> Session:
pass
@abstractmethod
async def get_session(self, session_id: str) -> Session:
pass
@abstractmethod
async def list_sessions(self) -> List[Session]:
pass
@abstractmethod
async def delete_session(self, session_id: str) -> None:
pass
@abstractmethod
async def update_session(self, session: Session) -> None:
pass