cps-api-tx / services /txagent_service.py
Ali2206's picture
Initial CPS-API deployment with TxAgent integration
11102bd
import aiohttp
import asyncio
import logging
from typing import Optional, Dict, Any, List
from core.txagent_config import txagent_config
logger = logging.getLogger(__name__)
class TxAgentService:
def __init__(self):
self.config = txagent_config
self.session = None
async def _get_session(self):
"""Obtient ou crée une session HTTP"""
if self.session is None:
self.session = aiohttp.ClientSession()
return self.session
async def _make_request(self, endpoint: str, method: str = "GET", data: Optional[Dict] = None) -> Dict[str, Any]:
"""Fait une requête vers le service TxAgent avec fallback"""
session = await self._get_session()
url = f"{self.config.get_txagent_url()}{endpoint}"
try:
if method.upper() == "GET":
async with session.get(url) as response:
return await response.json()
elif method.upper() == "POST":
async with session.post(url, json=data) as response:
return await response.json()
except Exception as e:
logger.error(f"Error calling TxAgent service: {e}")
# Fallback vers cloud si local échoue
if self.config.get_txagent_mode() == "local":
logger.info("Falling back to cloud TxAgent service")
self.config.mode = "cloud"
return await self._make_request(endpoint, method, data)
else:
raise
async def chat(self, message: str, history: Optional[list] = None, patient_id: Optional[str] = None) -> Dict[str, Any]:
"""Service de chat avec TxAgent"""
data = {
"message": message,
"history": history or [],
"patient_id": patient_id
}
return await self._make_request("/chat", "POST", data)
async def analyze_patient(self, patient_data: Dict[str, Any]) -> Dict[str, Any]:
"""Analyse de données patient avec TxAgent"""
return await self._make_request("/patients/analyze", "POST", patient_data)
async def voice_transcribe(self, audio_data: bytes) -> Dict[str, Any]:
"""Transcription vocale avec TxAgent"""
session = await self._get_session()
url = f"{self.config.get_txagent_url()}/voice/transcribe"
try:
form_data = aiohttp.FormData()
form_data.add_field('audio', audio_data, filename='audio.wav')
async with session.post(url, data=form_data) as response:
return await response.json()
except Exception as e:
logger.error(f"Error in voice transcription: {e}")
if self.config.get_txagent_mode() == "local":
self.config.mode = "cloud"
return await self.voice_transcribe(audio_data)
else:
raise
async def voice_synthesize(self, text: str, language: str = "en-US") -> bytes:
"""Synthèse vocale avec TxAgent"""
session = await self._get_session()
url = f"{self.config.get_txagent_url()}/voice/synthesize"
try:
data = {
"text": text,
"language": language,
"return_format": "mp3"
}
async with session.post(url, json=data) as response:
return await response.read()
except Exception as e:
logger.error(f"Error in voice synthesis: {e}")
if self.config.get_txagent_mode() == "local":
self.config.mode = "cloud"
return await self.voice_synthesize(text, language)
else:
raise
async def get_status(self) -> Dict[str, Any]:
"""Obtient le statut du service TxAgent"""
return await self._make_request("/status")
async def get_chats(self) -> List[Dict[str, Any]]:
"""Obtient l'historique des chats"""
return await self._make_request("/chats")
async def get_analysis_results(self, risk_filter: Optional[str] = None) -> List[Dict[str, Any]]:
"""Obtient les résultats d'analyse des patients"""
params = {}
if risk_filter:
params["risk_filter"] = risk_filter
return await self._make_request("/patients/analysis-results", "GET", params)
async def close(self):
"""Ferme la session HTTP"""
if self.session:
await self.session.close()
self.session = None
# Instance globale
txagent_service = TxAgentService()