Spaces:
Sleeping
Sleeping
| """ | |
| Chat Agent | |
| OFP-compliant conversational agent with LLM integration | |
| """ | |
| import logging | |
| from typing import Dict, List, Optional | |
| from datetime import datetime, timezone | |
| import uuid | |
| from .ofp_client import OFPClient | |
| from .llm_client import LLMClient | |
| from .models import Envelope, DialogEvent | |
| logger = logging.getLogger(__name__) | |
| class ChatAgent: | |
| """OFP Chat Agent with AI conversation capabilities""" | |
| def __init__( | |
| self, | |
| speaker_uri: str, | |
| service_url: str, | |
| llm_client: LLMClient, | |
| convener_uri: Optional[str] = None, | |
| convener_url: Optional[str] = None | |
| ): | |
| """ | |
| Initialize chat agent | |
| Args: | |
| speaker_uri: Agent's unique speaker URI | |
| service_url: Agent's service endpoint URL | |
| llm_client: Configured LLM client instance | |
| convener_uri: Convener's speaker URI (optional) | |
| convener_url: Convener's service endpoint URL (optional) | |
| """ | |
| self.speaker_uri = speaker_uri | |
| self.service_url = service_url | |
| self.convener_uri = convener_uri | |
| self.convener_url = convener_url | |
| # Initialize LLM client | |
| self.llm_client = llm_client | |
| # Initialize OFP client | |
| manifest = self._create_manifest() | |
| self.ofp_client = OFPClient(speaker_uri, service_url, manifest) | |
| # Conversation state | |
| self.conversation_history: List[Dict[str, str]] = [] | |
| self.current_conversation_id: Optional[str] = None | |
| self.messages_processed = 0 | |
| self.responses_sent = 0 | |
| self.activity_log = [] | |
| logger.info(f"Chat Agent initialized: {speaker_uri}") | |
| def _create_manifest(self) -> Dict: | |
| """Create assistant manifest for chat agent""" | |
| return { | |
| "identification": { | |
| "speakerUri": self.speaker_uri, | |
| "serviceUrl": self.service_url, | |
| "conversationalName": "Talker AI Assistant", | |
| "role": "Conversational Agent", | |
| "synopsis": "AI-powered conversational assistant for Open Floor Protocol" | |
| }, | |
| "capabilities": [{ | |
| "keyphrases": ["chat", "conversation", "AI assistant", "questions", "help"], | |
| "supportedLayers": ["text"], | |
| "descriptions": ["Engages in natural conversations and answers questions"] | |
| }] | |
| } | |
| def process_envelope(self, envelope: Envelope) -> Optional[str]: | |
| """ | |
| Process incoming OFP envelope | |
| Args: | |
| envelope: OFP envelope to process | |
| Returns: | |
| Response text if generated, None otherwise | |
| """ | |
| try: | |
| self.messages_processed += 1 | |
| # Update conversation ID | |
| if envelope.conversation.get('id'): | |
| self.current_conversation_id = envelope.conversation['id'] | |
| for event in envelope.events: | |
| event_type = event.get('eventType') | |
| # Handle utterance events | |
| if event_type == 'utterance': | |
| return self._handle_utterance(envelope, event) | |
| # Handle getManifests | |
| elif event_type == 'getManifests': | |
| self._log_activity("Received getManifests request") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error processing envelope: {e}") | |
| self._log_activity(f"ERROR: {str(e)}") | |
| return None | |
| def _handle_utterance(self, envelope: Envelope, event: Dict) -> Optional[str]: | |
| """ | |
| Handle utterance event and generate response | |
| Args: | |
| envelope: OFP envelope | |
| event: Utterance event | |
| Returns: | |
| Generated response text | |
| """ | |
| try: | |
| # Extract text from dialog event | |
| params = event.get('parameters', {}) | |
| dialog_event = params.get('dialogEvent', {}) | |
| features = dialog_event.get('features', {}) | |
| text_feature = features.get('text', {}) | |
| tokens = text_feature.get('tokens', []) | |
| # Combine tokens into text | |
| text = ' '.join(token.get('value', '') for token in tokens) | |
| if not text: | |
| return None | |
| # Get speaker info | |
| speaker_uri = dialog_event.get('speakerUri', 'unknown') | |
| # Don't respond to own messages | |
| if speaker_uri == self.speaker_uri: | |
| return None | |
| self._log_activity(f"Received: {text[:50]}..." if len(text) > 50 else f"Received: {text}") | |
| # Add to conversation history | |
| self.conversation_history.append({ | |
| "role": "user", | |
| "content": text | |
| }) | |
| # Generate response using LLM | |
| response = self.llm_client.generate_response( | |
| message=text, | |
| conversation_history=self.conversation_history[:-1] # Exclude current message | |
| ) | |
| # Add response to history | |
| self.conversation_history.append({ | |
| "role": "assistant", | |
| "content": response | |
| }) | |
| self.responses_sent += 1 | |
| self._log_activity(f"Sent: {response[:50]}..." if len(response) > 50 else f"Sent: {response}") | |
| return response | |
| except Exception as e: | |
| logger.error(f"Error handling utterance: {e}") | |
| return f"Sorry, I encountered an error processing your message." | |
| def send_message(self, text: str, recipient_url: Optional[str] = None) -> bool: | |
| """ | |
| Send message to the floor | |
| Args: | |
| text: Message text to send | |
| recipient_url: Recipient URL (uses convener_url if not specified) | |
| Returns: | |
| True if sent successfully | |
| """ | |
| try: | |
| if not self.current_conversation_id: | |
| self.current_conversation_id = f"conv:{uuid.uuid4()}" | |
| url = recipient_url or self.convener_url | |
| if not url: | |
| logger.error("No recipient URL specified") | |
| return False | |
| success = self.ofp_client.send_utterance( | |
| conversation_id=self.current_conversation_id, | |
| recipient_url=url, | |
| text=text | |
| ) | |
| if success: | |
| self.responses_sent += 1 | |
| self._log_activity(f"Sent: {text[:50]}..." if len(text) > 50 else f"Sent: {text}") | |
| return success | |
| except Exception as e: | |
| logger.error(f"Error sending message: {e}") | |
| return False | |
| def clear_history(self): | |
| """Clear conversation history""" | |
| self.conversation_history = [] | |
| self._log_activity("Conversation history cleared") | |
| def get_status(self) -> Dict: | |
| """Get current agent status""" | |
| return { | |
| "speaker_uri": self.speaker_uri, | |
| "conversation_id": self.current_conversation_id, | |
| "messages_processed": self.messages_processed, | |
| "responses_sent": self.responses_sent, | |
| "history_length": len(self.conversation_history), | |
| "recent_logs": self.activity_log[-10:] if self.activity_log else [] | |
| } | |
| def get_manifest(self) -> Dict: | |
| """Get assistant manifest""" | |
| return self.ofp_client.get_manifest() | |
| def _log_activity(self, message: str): | |
| """Log activity with timestamp""" | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| log_entry = f"[{timestamp}] {message}" | |
| self.activity_log.append(log_entry) | |
| # Keep only last 100 entries | |
| if len(self.activity_log) > 100: | |
| self.activity_log = self.activity_log[-100:] |