""" Chat Agent OFP-compliant conversational agent with LLM integration """ import logging from typing import Dict, List, Optional from datetime import datetime, timezone import uuid from .ofp_client import OFPClient from .llm_client import LLMClient from .models import Envelope, DialogEvent logger = logging.getLogger(__name__) class ChatAgent: """OFP Chat Agent with AI conversation capabilities""" def __init__( self, speaker_uri: str, service_url: str, llm_client: LLMClient, convener_uri: Optional[str] = None, convener_url: Optional[str] = None ): """ Initialize chat agent Args: speaker_uri: Agent's unique speaker URI service_url: Agent's service endpoint URL llm_client: Configured LLM client instance convener_uri: Convener's speaker URI (optional) convener_url: Convener's service endpoint URL (optional) """ self.speaker_uri = speaker_uri self.service_url = service_url self.convener_uri = convener_uri self.convener_url = convener_url # Initialize LLM client self.llm_client = llm_client # Initialize OFP client manifest = self._create_manifest() self.ofp_client = OFPClient(speaker_uri, service_url, manifest) # Conversation state self.conversation_history: List[Dict[str, str]] = [] self.current_conversation_id: Optional[str] = None self.messages_processed = 0 self.responses_sent = 0 self.activity_log = [] logger.info(f"Chat Agent initialized: {speaker_uri}") def _create_manifest(self) -> Dict: """Create assistant manifest for chat agent""" return { "identification": { "speakerUri": self.speaker_uri, "serviceUrl": self.service_url, "conversationalName": "Talker AI Assistant", "role": "Conversational Agent", "synopsis": "AI-powered conversational assistant for Open Floor Protocol" }, "capabilities": [{ "keyphrases": ["chat", "conversation", "AI assistant", "questions", "help"], "supportedLayers": ["text"], "descriptions": ["Engages in natural conversations and answers questions"] }] } def process_envelope(self, envelope: Envelope) -> Optional[str]: """ Process incoming OFP envelope Args: envelope: OFP envelope to process Returns: Response text if generated, None otherwise """ try: self.messages_processed += 1 # Update conversation ID if envelope.conversation.get('id'): self.current_conversation_id = envelope.conversation['id'] for event in envelope.events: event_type = event.get('eventType') # Handle utterance events if event_type == 'utterance': return self._handle_utterance(envelope, event) # Handle getManifests elif event_type == 'getManifests': self._log_activity("Received getManifests request") return None except Exception as e: logger.error(f"Error processing envelope: {e}") self._log_activity(f"ERROR: {str(e)}") return None def _handle_utterance(self, envelope: Envelope, event: Dict) -> Optional[str]: """ Handle utterance event and generate response Args: envelope: OFP envelope event: Utterance event Returns: Generated response text """ try: # Extract text from dialog event params = event.get('parameters', {}) dialog_event = params.get('dialogEvent', {}) features = dialog_event.get('features', {}) text_feature = features.get('text', {}) tokens = text_feature.get('tokens', []) # Combine tokens into text text = ' '.join(token.get('value', '') for token in tokens) if not text: return None # Get speaker info speaker_uri = dialog_event.get('speakerUri', 'unknown') # Don't respond to own messages if speaker_uri == self.speaker_uri: return None self._log_activity(f"Received: {text[:50]}..." if len(text) > 50 else f"Received: {text}") # Add to conversation history self.conversation_history.append({ "role": "user", "content": text }) # Generate response using LLM response = self.llm_client.generate_response( message=text, conversation_history=self.conversation_history[:-1] # Exclude current message ) # Add response to history self.conversation_history.append({ "role": "assistant", "content": response }) self.responses_sent += 1 self._log_activity(f"Sent: {response[:50]}..." if len(response) > 50 else f"Sent: {response}") return response except Exception as e: logger.error(f"Error handling utterance: {e}") return f"Sorry, I encountered an error processing your message." def send_message(self, text: str, recipient_url: Optional[str] = None) -> bool: """ Send message to the floor Args: text: Message text to send recipient_url: Recipient URL (uses convener_url if not specified) Returns: True if sent successfully """ try: if not self.current_conversation_id: self.current_conversation_id = f"conv:{uuid.uuid4()}" url = recipient_url or self.convener_url if not url: logger.error("No recipient URL specified") return False success = self.ofp_client.send_utterance( conversation_id=self.current_conversation_id, recipient_url=url, text=text ) if success: self.responses_sent += 1 self._log_activity(f"Sent: {text[:50]}..." if len(text) > 50 else f"Sent: {text}") return success except Exception as e: logger.error(f"Error sending message: {e}") return False def clear_history(self): """Clear conversation history""" self.conversation_history = [] self._log_activity("Conversation history cleared") def get_status(self) -> Dict: """Get current agent status""" return { "speaker_uri": self.speaker_uri, "conversation_id": self.current_conversation_id, "messages_processed": self.messages_processed, "responses_sent": self.responses_sent, "history_length": len(self.conversation_history), "recent_logs": self.activity_log[-10:] if self.activity_log else [] } def get_manifest(self) -> Dict: """Get assistant manifest""" return self.ofp_client.get_manifest() def _log_activity(self, message: str): """Log activity with timestamp""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_entry = f"[{timestamp}] {message}" self.activity_log.append(log_entry) # Keep only last 100 entries if len(self.activity_log) > 100: self.activity_log = self.activity_log[-100:]