Talker / src /chat_agent.py
BolyosCsaba
initial commit
e214abd
"""
Chat Agent
OFP-compliant conversational agent with LLM integration
"""
import logging
from typing import Dict, List, Optional
from datetime import datetime, timezone
import uuid
from .ofp_client import OFPClient
from .llm_client import LLMClient
from .models import Envelope, DialogEvent
logger = logging.getLogger(__name__)
class ChatAgent:
"""OFP Chat Agent with AI conversation capabilities"""
def __init__(
self,
speaker_uri: str,
service_url: str,
llm_client: LLMClient,
convener_uri: Optional[str] = None,
convener_url: Optional[str] = None
):
"""
Initialize chat agent
Args:
speaker_uri: Agent's unique speaker URI
service_url: Agent's service endpoint URL
llm_client: Configured LLM client instance
convener_uri: Convener's speaker URI (optional)
convener_url: Convener's service endpoint URL (optional)
"""
self.speaker_uri = speaker_uri
self.service_url = service_url
self.convener_uri = convener_uri
self.convener_url = convener_url
# Initialize LLM client
self.llm_client = llm_client
# Initialize OFP client
manifest = self._create_manifest()
self.ofp_client = OFPClient(speaker_uri, service_url, manifest)
# Conversation state
self.conversation_history: List[Dict[str, str]] = []
self.current_conversation_id: Optional[str] = None
self.messages_processed = 0
self.responses_sent = 0
self.activity_log = []
logger.info(f"Chat Agent initialized: {speaker_uri}")
def _create_manifest(self) -> Dict:
"""Create assistant manifest for chat agent"""
return {
"identification": {
"speakerUri": self.speaker_uri,
"serviceUrl": self.service_url,
"conversationalName": "Talker AI Assistant",
"role": "Conversational Agent",
"synopsis": "AI-powered conversational assistant for Open Floor Protocol"
},
"capabilities": [{
"keyphrases": ["chat", "conversation", "AI assistant", "questions", "help"],
"supportedLayers": ["text"],
"descriptions": ["Engages in natural conversations and answers questions"]
}]
}
def process_envelope(self, envelope: Envelope) -> Optional[str]:
"""
Process incoming OFP envelope
Args:
envelope: OFP envelope to process
Returns:
Response text if generated, None otherwise
"""
try:
self.messages_processed += 1
# Update conversation ID
if envelope.conversation.get('id'):
self.current_conversation_id = envelope.conversation['id']
for event in envelope.events:
event_type = event.get('eventType')
# Handle utterance events
if event_type == 'utterance':
return self._handle_utterance(envelope, event)
# Handle getManifests
elif event_type == 'getManifests':
self._log_activity("Received getManifests request")
return None
except Exception as e:
logger.error(f"Error processing envelope: {e}")
self._log_activity(f"ERROR: {str(e)}")
return None
def _handle_utterance(self, envelope: Envelope, event: Dict) -> Optional[str]:
"""
Handle utterance event and generate response
Args:
envelope: OFP envelope
event: Utterance event
Returns:
Generated response text
"""
try:
# Extract text from dialog event
params = event.get('parameters', {})
dialog_event = params.get('dialogEvent', {})
features = dialog_event.get('features', {})
text_feature = features.get('text', {})
tokens = text_feature.get('tokens', [])
# Combine tokens into text
text = ' '.join(token.get('value', '') for token in tokens)
if not text:
return None
# Get speaker info
speaker_uri = dialog_event.get('speakerUri', 'unknown')
# Don't respond to own messages
if speaker_uri == self.speaker_uri:
return None
self._log_activity(f"Received: {text[:50]}..." if len(text) > 50 else f"Received: {text}")
# Add to conversation history
self.conversation_history.append({
"role": "user",
"content": text
})
# Generate response using LLM
response = self.llm_client.generate_response(
message=text,
conversation_history=self.conversation_history[:-1] # Exclude current message
)
# Add response to history
self.conversation_history.append({
"role": "assistant",
"content": response
})
self.responses_sent += 1
self._log_activity(f"Sent: {response[:50]}..." if len(response) > 50 else f"Sent: {response}")
return response
except Exception as e:
logger.error(f"Error handling utterance: {e}")
return f"Sorry, I encountered an error processing your message."
def send_message(self, text: str, recipient_url: Optional[str] = None) -> bool:
"""
Send message to the floor
Args:
text: Message text to send
recipient_url: Recipient URL (uses convener_url if not specified)
Returns:
True if sent successfully
"""
try:
if not self.current_conversation_id:
self.current_conversation_id = f"conv:{uuid.uuid4()}"
url = recipient_url or self.convener_url
if not url:
logger.error("No recipient URL specified")
return False
success = self.ofp_client.send_utterance(
conversation_id=self.current_conversation_id,
recipient_url=url,
text=text
)
if success:
self.responses_sent += 1
self._log_activity(f"Sent: {text[:50]}..." if len(text) > 50 else f"Sent: {text}")
return success
except Exception as e:
logger.error(f"Error sending message: {e}")
return False
def clear_history(self):
"""Clear conversation history"""
self.conversation_history = []
self._log_activity("Conversation history cleared")
def get_status(self) -> Dict:
"""Get current agent status"""
return {
"speaker_uri": self.speaker_uri,
"conversation_id": self.current_conversation_id,
"messages_processed": self.messages_processed,
"responses_sent": self.responses_sent,
"history_length": len(self.conversation_history),
"recent_logs": self.activity_log[-10:] if self.activity_log else []
}
def get_manifest(self) -> Dict:
"""Get assistant manifest"""
return self.ofp_client.get_manifest()
def _log_activity(self, message: str):
"""Log activity with timestamp"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] {message}"
self.activity_log.append(log_entry)
# Keep only last 100 entries
if len(self.activity_log) > 100:
self.activity_log = self.activity_log[-100:]