Spaces:
Runtime error
Runtime error
| """ | |
| Simple speech-to-speech chatbot for Hugging Face Spaces using AWS Bedrock Nova Sonic | |
| Adapted from the original old_sonic_use.py | |
| """ | |
| import os | |
| import asyncio | |
| import base64 | |
| import json | |
| import uuid | |
| import warnings | |
| import numpy as np | |
| import pytz | |
| import random | |
| import hashlib | |
| import datetime | |
| import time | |
| import inspect | |
| from aws_sdk_bedrock_runtime.client import BedrockRuntimeClient, InvokeModelWithBidirectionalStreamOperationInput | |
| from aws_sdk_bedrock_runtime.models import InvokeModelWithBidirectionalStreamInputChunk, BidirectionalInputPayloadPart | |
| from aws_sdk_bedrock_runtime.config import Config, HTTPAuthSchemeResolver, SigV4AuthScheme | |
| from smithy_aws_core.credentials_resolvers.environment import EnvironmentCredentialsResolver | |
| # Suppress warnings | |
| warnings.filterwarnings("ignore") | |
| # Audio configuration | |
| INPUT_SAMPLE_RATE = 16000 | |
| OUTPUT_SAMPLE_RATE = 24000 | |
| CHANNELS = 1 | |
| CHUNK_SIZE = 1024 | |
| # Debug mode flag | |
| DEBUG = False | |
| # Initial greeting | |
| INITIAL_GREETING = "Hi there! I'm your AI conversation partner. How are you doing today? I'm here to chat about anything you'd like to discuss." | |
| # Conversation topics and contexts - these guide the conversation but aren't hard-coded responses | |
| CONVERSATION_CONTEXTS = [ | |
| { | |
| "topic": "daily_life", | |
| "personas": ["student", "professional", "retiree", "parent"], | |
| "emotions": ["happy", "reflective", "curious", "excited"], | |
| "interests": ["technology", "art", "travel", "food", "health", "learning"] | |
| }, | |
| { | |
| "topic": "future_thinking", | |
| "themes": ["technology", "environment", "society", "personal_growth"], | |
| "perspectives": ["optimistic", "cautious", "innovative", "traditional"], | |
| "timeframes": ["near future", "distant future", "theoretical possibilities"] | |
| }, | |
| { | |
| "topic": "creative_exploration", | |
| "media": ["books", "films", "music", "visual arts", "games"], | |
| "approaches": ["analysis", "personal connection", "recommendations", "creation"], | |
| "genres": ["science fiction", "drama", "comedy", "documentary", "fantasy", "thriller"] | |
| }, | |
| { | |
| "topic": "world_understanding", | |
| "areas": ["science", "philosophy", "history", "culture", "psychology"], | |
| "methods": ["questioning", "comparing viewpoints", "exploring implications", "personal relevance"], | |
| "goals": ["knowledge", "wisdom", "practical application", "enjoyment"] | |
| } | |
| ] | |
| # For Hugging Face environment detection | |
| HF_SPACES = "SPACE_ID" in os.environ or ("SYSTEM" in os.environ and os.environ.get("SYSTEM") == "spaces") | |
| def debug_print(message): | |
| """Print only if debug mode is enabled""" | |
| if DEBUG: | |
| functionName = inspect.stack()[1].function | |
| if functionName == 'time_it' or functionName == 'time_it_async': | |
| functionName = inspect.stack()[2].function | |
| print('{:%Y-%m-%d %H:%M:%S.%f}'.format(datetime.datetime.now())[:-3] + ' ' + functionName + ' ' + message) | |
| async def time_it_async(label, methodToRun): | |
| start_time = time.perf_counter() | |
| result = await methodToRun() | |
| end_time = time.perf_counter() | |
| debug_print(f"Execution time for {label}: {end_time - start_time:.4f} seconds") | |
| return result | |
| class BedrockStreamManager: | |
| """Manages bidirectional streaming with AWS Bedrock using asyncio""" | |
| # Event templates | |
| START_SESSION_EVENT = '''{ | |
| "event": { | |
| "sessionStart": { | |
| "inferenceConfiguration": { | |
| "maxTokens": 1024, | |
| "topP": 0.9, | |
| "temperature": 0.7 | |
| } | |
| } | |
| } | |
| }''' | |
| TEXT_CONTENT_START_EVENT = '''{ | |
| "event": { | |
| "contentStart": { | |
| "promptName": "%s", | |
| "contentName": "%s", | |
| "type": "TEXT", | |
| "role": "%s", | |
| "interactive": true, | |
| "textInputConfiguration": { | |
| "mediaType": "text/plain" | |
| } | |
| } | |
| } | |
| }''' | |
| TEXT_INPUT_EVENT = '''{ | |
| "event": { | |
| "textInput": { | |
| "promptName": "%s", | |
| "contentName": "%s", | |
| "content": "%s" | |
| } | |
| } | |
| }''' | |
| CONTENT_END_EVENT = '''{ | |
| "event": { | |
| "contentEnd": { | |
| "promptName": "%s", | |
| "contentName": "%s" | |
| } | |
| } | |
| }''' | |
| PROMPT_END_EVENT = '''{ | |
| "event": { | |
| "promptEnd": { | |
| "promptName": "%s" | |
| } | |
| } | |
| }''' | |
| SESSION_END_EVENT = '''{ | |
| "event": { | |
| "sessionEnd": {} | |
| } | |
| }''' | |
| def __init__(self, model_id='amazon.nova-sonic-v1:0', region='us-east-1'): | |
| """Initialize the stream manager.""" | |
| self.model_id = model_id | |
| self.region = region | |
| # Replace RxPy subjects with asyncio queues | |
| self.audio_input_queue = asyncio.Queue() | |
| self.audio_output_queue = asyncio.Queue() | |
| self.output_queue = asyncio.Queue() | |
| self.response_task = None | |
| self.stream_response = None | |
| self.is_active = False | |
| self.bedrock_client = None | |
| # Text response components | |
| self.display_assistant_text = False | |
| self.role = None | |
| self.current_text = "" | |
| # Session information | |
| self.prompt_name = str(uuid.uuid4()) | |
| self.content_name = str(uuid.uuid4()) | |
| # Conversation history | |
| self.conversation_history = [] | |
| def _initialize_client(self): | |
| """Initialize the Bedrock client.""" | |
| # Check for required environment variables | |
| required_vars = ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"] | |
| missing_vars = [var for var in required_vars if not os.environ.get(var)] | |
| if missing_vars: | |
| error_msg = f"Missing AWS credentials: {', '.join(missing_vars)}\n" | |
| if HF_SPACES: | |
| error_msg += "Please add these as secrets in your Hugging Face Space settings." | |
| else: | |
| error_msg += "Please set these environment variables." | |
| raise ValueError(error_msg) | |
| config = Config( | |
| endpoint_uri=f"https://bedrock-runtime.{self.region}.amazonaws.com", | |
| region=self.region, | |
| aws_credentials_identity_resolver=EnvironmentCredentialsResolver(), | |
| http_auth_scheme_resolver=HTTPAuthSchemeResolver(), | |
| http_auth_schemes={"aws.auth#sigv4": SigV4AuthScheme()} | |
| ) | |
| return BedrockRuntimeClient(config=config) | |
| async def initialize_stream(self): | |
| """Initialize the bidirectional stream with Bedrock.""" | |
| try: | |
| # Create client | |
| self.bedrock_client = self._initialize_client() | |
| self.stream_response = await time_it_async("invoke_model_with_bidirectional_stream", | |
| lambda: self.bedrock_client.invoke_model_with_bidirectional_stream( | |
| InvokeModelWithBidirectionalStreamOperationInput(model_id=self.model_id) | |
| ) | |
| ) | |
| self.is_active = True | |
| # Start the prompt | |
| prompt_start_event = { | |
| "event": { | |
| "promptStart": { | |
| "promptName": self.prompt_name, | |
| "textOutputConfiguration": { | |
| "mediaType": "text/plain" | |
| }, | |
| "audioOutputConfiguration": { | |
| "mediaType": "audio/lpcm", | |
| "sampleRateHertz": 24000, | |
| "sampleSizeBits": 16, | |
| "channelCount": 1, | |
| "voiceId": "matthew", | |
| "encoding": "base64", | |
| "audioType": "SPEECH" | |
| } | |
| } | |
| } | |
| } | |
| # Initialize with system prompt | |
| system_prompt = "You are a friendly, helpful conversation partner. Keep your responses natural and conversational. Act as if this is a spoken conversation between friends. Ask follow-up questions to keep the conversation going." | |
| # Send initialization events | |
| init_events = [ | |
| self.START_SESSION_EVENT, | |
| json.dumps(prompt_start_event), | |
| self.TEXT_CONTENT_START_EVENT % (self.prompt_name, "system_prompt", "SYSTEM"), | |
| self.TEXT_INPUT_EVENT % (self.prompt_name, "system_prompt", system_prompt), | |
| self.CONTENT_END_EVENT % (self.prompt_name, "system_prompt") | |
| ] | |
| for event in init_events: | |
| await self.send_raw_event(event) | |
| await asyncio.sleep(0.1) | |
| # Start listening for responses | |
| self.response_task = asyncio.create_task(self._process_responses()) | |
| print("Stream initialized successfully") | |
| return True | |
| except Exception as e: | |
| self.is_active = False | |
| print(f"Failed to initialize stream: {str(e)}") | |
| return False | |
| async def send_raw_event(self, event_json): | |
| """Send a raw event JSON to the Bedrock stream.""" | |
| if not self.stream_response or not self.is_active: | |
| debug_print("Stream not initialized or closed") | |
| return | |
| event = InvokeModelWithBidirectionalStreamInputChunk( | |
| value=BidirectionalInputPayloadPart(bytes_=event_json.encode('utf-8')) | |
| ) | |
| try: | |
| await self.stream_response.input_stream.send(event) | |
| # For debugging large events, you might want to log just the type | |
| if DEBUG: | |
| if len(event_json) > 200: | |
| event_type = json.loads(event_json).get("event", {}).keys() | |
| debug_print(f"Sent event type: {list(event_type)}") | |
| else: | |
| debug_print(f"Sent event: {event_json}") | |
| except Exception as e: | |
| debug_print(f"Error sending event: {str(e)}") | |
| if DEBUG: | |
| import traceback | |
| traceback.print_exc() | |
| async def _process_responses(self): | |
| """Process incoming responses from Bedrock.""" | |
| try: | |
| while self.is_active: | |
| try: | |
| output = await self.stream_response.await_output() | |
| result = await output[1].receive() | |
| if result.value and result.value.bytes_: | |
| try: | |
| response_data = result.value.bytes_.decode('utf-8') | |
| json_data = json.loads(response_data) | |
| # Handle different response types | |
| if 'event' in json_data: | |
| # Handle content start | |
| if 'contentStart' in json_data['event']: | |
| content_start = json_data['event']['contentStart'] | |
| self.role = content_start.get('role', '') | |
| # Handle text output | |
| elif 'textOutput' in json_data['event']: | |
| text_content = json_data['event']['textOutput']['content'] | |
| role = json_data['event']['textOutput']['role'] | |
| if role == "ASSISTANT": | |
| if self.current_text == "": | |
| # First chunk of text | |
| self.current_text = text_content | |
| print(f"\nAI: {text_content}", end="", flush=True) | |
| else: | |
| # Continuation of text | |
| delta = text_content[len(self.current_text):] | |
| self.current_text = text_content | |
| print(delta, end="", flush=True) | |
| # Handle audio output | |
| elif 'audioOutput' in json_data['event']: | |
| audio_content = json_data['event']['audioOutput']['content'] | |
| await self.audio_output_queue.put(base64.b64decode(audio_content)) | |
| # Handle content end | |
| elif 'contentEnd' in json_data['event']: | |
| if self.role == "ASSISTANT" and self.current_text: | |
| # Save to conversation history | |
| self.conversation_history.append({ | |
| "role": "assistant", | |
| "content": self.current_text | |
| }) | |
| self.current_text = "" | |
| print() # Add a new line | |
| # Put the response in the output queue for other components | |
| await self.output_queue.put(json_data) | |
| except json.JSONDecodeError: | |
| await self.output_queue.put({"raw_data": response_data}) | |
| except StopAsyncIteration: | |
| # Stream has ended | |
| break | |
| except Exception as e: | |
| if "ValidationException" in str(e): | |
| error_message = str(e) | |
| print(f"Validation error: {error_message}") | |
| else: | |
| print(f"Error receiving response: {e}") | |
| if DEBUG: | |
| import traceback | |
| traceback.print_exc() | |
| break | |
| except Exception as e: | |
| print(f"Response processing error: {e}") | |
| finally: | |
| self.is_active = False | |
| async def send_text_message(self, text): | |
| """Send a text message to Nova.""" | |
| if not self.is_active: | |
| debug_print("Stream is not active") | |
| return False | |
| try: | |
| message_id = f"msg_{int(time.time())}" | |
| # Send content start | |
| start_event = self.TEXT_CONTENT_START_EVENT % (self.prompt_name, message_id, "USER") | |
| await self.send_raw_event(start_event) | |
| # Send text content | |
| content_event = self.TEXT_INPUT_EVENT % (self.prompt_name, message_id, text) | |
| await self.send_raw_event(content_event) | |
| # Send content end | |
| end_event = self.CONTENT_END_EVENT % (self.prompt_name, message_id) | |
| await self.send_raw_event(end_event) | |
| # Add to conversation history | |
| self.conversation_history.append({ | |
| "role": "user", | |
| "content": text | |
| }) | |
| print(f"\nYou: {text}") | |
| return True | |
| except Exception as e: | |
| print(f"Error sending text message: {e}") | |
| return False | |
| async def close(self): | |
| """Close the stream properly.""" | |
| if not self.is_active: | |
| return | |
| self.is_active = False | |
| # Cancel the response task | |
| if self.response_task and not self.response_task.done(): | |
| self.response_task.cancel() | |
| try: | |
| await self.response_task | |
| except asyncio.CancelledError: | |
| pass | |
| try: | |
| # Send end events | |
| await self.send_raw_event(self.PROMPT_END_EVENT % (self.prompt_name)) | |
| await self.send_raw_event(self.SESSION_END_EVENT) | |
| # Close the stream | |
| if self.stream_response: | |
| await self.stream_response.input_stream.close() | |
| except Exception as e: | |
| debug_print(f"Error closing stream: {e}") | |
| class SimpleChatbot: | |
| """Simple speech-to-speech chatbot for Hugging Face Spaces""" | |
| def __init__(self): | |
| """Initialize the chatbot""" | |
| self.stream_manager = None | |
| self.is_running = False | |
| self.conversation_task = None | |
| self.region = os.environ.get("AWS_DEFAULT_REGION", "us-east-1") | |
| self.model_id = "amazon.nova-sonic-v1:0" | |
| async def start(self): | |
| """Start the chatbot""" | |
| print("\n==================================================") | |
| print(" Simple Speech-to-Speech Chatbot (HF Spaces Mode)") | |
| print("==================================================\n") | |
| try: | |
| # Initialize the stream manager | |
| self.stream_manager = BedrockStreamManager( | |
| model_id=self.model_id, | |
| region=self.region | |
| ) | |
| # Initialize the stream | |
| success = await self.stream_manager.initialize_stream() | |
| if not success: | |
| print("Failed to initialize stream") | |
| return False | |
| self.is_running = True | |
| # Send initial greeting | |
| print("\nStarting conversation...") | |
| await self.stream_manager.send_text_message(INITIAL_GREETING) | |
| # Start conversation simulation task | |
| self.conversation_task = asyncio.create_task(self.simulate_conversation()) | |
| return True | |
| except Exception as e: | |
| print(f"Error starting chatbot: {e}") | |
| return False | |
| async def simulate_conversation(self): | |
| """Simulate a dynamic, adaptive conversation""" | |
| try: | |
| # Wait for initial greeting to complete | |
| await asyncio.sleep(8) | |
| # Get initial context for conversation | |
| conversation_state = self._initialize_conversation_state() | |
| # Loop through dynamically generated conversation | |
| message_count = 0 | |
| max_messages = 100 # Limit to prevent infinite loops | |
| # Maintain conversation history for context | |
| conversation_history = [ | |
| {"role": "system", "content": "You are having a natural conversation. Be genuine and thoughtful."}, | |
| {"role": "assistant", "content": INITIAL_GREETING} | |
| ] | |
| while self.is_running and message_count < max_messages: | |
| # Generate a contextually appropriate message based on conversation state | |
| user_message = self._generate_contextual_message(conversation_state, conversation_history) | |
| # Send the message | |
| await self.stream_manager.send_text_message(user_message) | |
| # Add to conversation tracking | |
| conversation_history.append({"role": "user", "content": user_message}) | |
| # Wait for response (variable timing to seem more natural) | |
| wait_time = random.uniform(10, 15) | |
| await asyncio.sleep(wait_time) | |
| # Get Nova's response from the stream_manager if available | |
| if self.stream_manager.conversation_history: | |
| last_response = self.stream_manager.conversation_history[-1] | |
| if last_response["role"] == "assistant": | |
| conversation_history.append(last_response) | |
| # Evolve the conversation state based on the exchange | |
| conversation_state = self._evolve_conversation_state(conversation_state, conversation_history) | |
| message_count += 1 | |
| # Occasionally add some variety with pauses | |
| if random.random() < 0.1: # 10% chance | |
| pause_time = random.uniform(5, 10) | |
| await asyncio.sleep(pause_time) | |
| except asyncio.CancelledError: | |
| # Task was cancelled, exit gracefully | |
| pass | |
| except Exception as e: | |
| print(f"Error in conversation simulation: {e}") | |
| def _initialize_conversation_state(self): | |
| """Initialize the state that guides conversation flow""" | |
| # Select a random conversation context to start from | |
| context = random.choice(CONVERSATION_CONTEXTS) | |
| # Set initial conversation parameters | |
| state = { | |
| "current_topic": context["topic"], | |
| "context": context, | |
| "depth_level": 1, # Start shallow, go deeper | |
| "engagement": random.uniform(0.5, 0.8), # How engaged the conversation is | |
| "emotion": random.choice(["neutral", "curious", "interested"]), | |
| "turns_in_topic": 0 | |
| } | |
| # Add more specific elements from the context | |
| for key, values in context.items(): | |
| if isinstance(values, list) and values: | |
| state[key] = random.choice(values) | |
| return state | |
| def _evolve_conversation_state(self, state, conversation_history): | |
| """Evolve the conversation state based on recent exchanges""" | |
| # Extract the most recent exchanges | |
| recent_exchanges = conversation_history[-4:] if len(conversation_history) >= 4 else conversation_history | |
| # Increment turns in current topic | |
| state["turns_in_topic"] += 1 | |
| # Occasionally change topics to keep conversation fresh | |
| if state["turns_in_topic"] > random.randint(3, 7): | |
| # Choose a new context | |
| new_context = random.choice(CONVERSATION_CONTEXTS) | |
| while new_context["topic"] == state["current_topic"]: | |
| new_context = random.choice(CONVERSATION_CONTEXTS) | |
| state["current_topic"] = new_context["topic"] | |
| state["context"] = new_context | |
| state["turns_in_topic"] = 0 | |
| state["depth_level"] = 1 # Reset depth when changing topics | |
| # Add more specific elements from the new context | |
| for key, values in new_context.items(): | |
| if isinstance(values, list) and values: | |
| state[key] = random.choice(values) | |
| else: | |
| # If staying on topic, incrementally go deeper | |
| state["depth_level"] = min(state["depth_level"] + random.uniform(0.1, 0.5), 4) | |
| # Adjust engagement level based on complexity of responses | |
| if recent_exchanges: | |
| avg_response_length = sum(len(ex.get("content", "")) for ex in recent_exchanges) / len(recent_exchanges) | |
| # Longer responses typically indicate more engagement | |
| engagement_delta = (avg_response_length / 200) - 0.5 # Normalize | |
| state["engagement"] = max(0.1, min(1.0, state["engagement"] + engagement_delta * 0.2)) | |
| # Occasionally shift emotion for variety | |
| if random.random() < 0.2: # 20% chance | |
| emotions = ["neutral", "curious", "interested", "excited", "reflective", "amused", "thoughtful"] | |
| state["emotion"] = random.choice(emotions) | |
| return state | |
| def _generate_contextual_message(self, state, conversation_history): | |
| """Generate a contextually appropriate message based on conversation state""" | |
| # Extract most recent exchange | |
| last_message = conversation_history[-1]["content"] if conversation_history else "" | |
| # Get the current context | |
| context = state["context"] | |
| topic = state["current_topic"] | |
| # Base templates for different conversation flows | |
| templates = [ | |
| # Questions | |
| "What do you think about {subject}?", | |
| "Have you ever experienced {experience}?", | |
| "I'm curious about your perspective on {subject}. What's your take?", | |
| "How would you approach {situation}?", | |
| # Statements | |
| "I've been thinking about {subject} lately.", | |
| "I recently {experience} and it made me wonder about {related_subject}.", | |
| "It's interesting to consider {subject}, especially when you think about {perspective}.", | |
| # Follow-ups | |
| "That's fascinating. What about {related_subject}?", | |
| "I see your point about {referenced_point}. That makes me wonder about {question}.", | |
| "I hadn't thought about it that way. Does that mean you believe {implication}?" | |
| ] | |
| # Select template based on state | |
| template = random.choice(templates) | |
| # Content generators for different topics | |
| content_generators = { | |
| "daily_life": self._generate_daily_life_content, | |
| "future_thinking": self._generate_future_thinking_content, | |
| "creative_exploration": self._generate_creative_content, | |
| "world_understanding": self._generate_world_content | |
| } | |
| # Use the appropriate content generator | |
| generator = content_generators.get(topic, self._generate_general_content) | |
| content_elements = generator(state, last_message) | |
| # Fill in the template with contextual elements | |
| for key, value in content_elements.items(): | |
| template = template.replace("{" + key + "}", value) | |
| # If the template still has unfilled placeholders, use a more general message | |
| if "{" in template and "}" in template: | |
| return self._generate_general_message(state) | |
| return template | |
| def _generate_daily_life_content(self, state, last_message): | |
| """Generate content related to daily life topics""" | |
| persona = state.get("personas", random.choice(["student", "professional", "parent", "traveler"])) | |
| interest = state.get("interests", random.choice(["technology", "art", "travel", "food", "health"])) | |
| subjects = { | |
| "technology": ["digital assistants", "smartphones", "social media", "online learning", "working remotely"], | |
| "art": ["favorite music", "recent movies", "books", "creative hobbies", "art exhibitions"], | |
| "travel": ["dream destinations", "travel experiences", "local exploration", "cultural differences", "adventure sports"], | |
| "food": ["cooking at home", "favorite cuisines", "dietary choices", "restaurant experiences", "food traditions"], | |
| "health": ["exercise routines", "sleep habits", "mental wellness", "work-life balance", "mindfulness practices"] | |
| } | |
| experiences = { | |
| "student": ["studying for an exam", "joining a new club", "learning a difficult concept", "managing deadlines", "group projects"], | |
| "professional": ["handling work challenges", "career development", "workplace collaboration", "learning new skills", "professional networking"], | |
| "parent": ["family activities", "teaching moments", "balancing responsibilities", "childhood development", "family traditions"], | |
| "traveler": ["planning trips", "cultural experiences", "travel mishaps", "discovering new places", "meeting people while traveling"] | |
| } | |
| # Select specific elements | |
| subject_list = subjects.get(interest, subjects["technology"]) | |
| experience_list = experiences.get(persona, experiences["professional"]) | |
| return { | |
| "subject": random.choice(subject_list), | |
| "experience": random.choice(experience_list), | |
| "related_subject": random.choice(subject_list), | |
| "situation": f"{random.choice(experience_list)} for the first time", | |
| "perspective": f"how it affects daily {random.choice(['routines', 'habits', 'interactions', 'decisions'])}" | |
| } | |
| def _generate_future_thinking_content(self, state, last_message): | |
| """Generate content related to future and forward-looking topics""" | |
| theme = state.get("themes", random.choice(["technology", "environment", "society", "personal_growth"])) | |
| perspective = state.get("perspectives", random.choice(["optimistic", "cautious", "innovative"])) | |
| timeframe = state.get("timeframes", random.choice(["near future", "distant future", "theoretical possibilities"])) | |
| subjects = { | |
| "technology": ["artificial intelligence", "space exploration", "robotics", "virtual reality", "human augmentation"], | |
| "environment": ["renewable energy", "climate adaptation", "sustainable living", "conservation efforts", "environmental policy"], | |
| "society": ["future of work", "education evolution", "changing social norms", "global cooperation", "new economic models"], | |
| "personal_growth": ["lifelong learning", "adaptability", "future skills", "evolving careers", "human potential"] | |
| } | |
| implications = { | |
| "optimistic": ["create new opportunities", "solve major problems", "enhance human capabilities", "bring people together", "accelerate positive change"], | |
| "cautious": ["require careful consideration", "present both benefits and risks", "need ethical guidelines", "transform familiar systems", "challenge our adaptability"], | |
| "innovative": ["reshape our understanding", "combine unexpected elements", "create entirely new categories", "transcend current limitations", "evolve in surprising ways"] | |
| } | |
| # Select specific elements | |
| subject_list = subjects.get(theme, subjects["technology"]) | |
| implication_list = implications.get(perspective, implications["optimistic"]) | |
| return { | |
| "subject": f"{random.choice(subject_list)} in the {timeframe}", | |
| "experience": f"read about breakthroughs in {random.choice(subject_list)}", | |
| "related_subject": f"how {random.choice(subject_list)} might evolve", | |
| "perspective": f"from a {perspective} standpoint", | |
| "implication": f"these developments will {random.choice(implication_list)}" | |
| } | |
| def _generate_creative_content(self, state, last_message): | |
| """Generate content related to creative and artistic topics""" | |
| medium = state.get("media", random.choice(["books", "films", "music", "visual arts", "games"])) | |
| approach = state.get("approaches", random.choice(["analysis", "personal connection", "recommendations"])) | |
| genre = state.get("genres", random.choice(["science fiction", "drama", "comedy", "fantasy"])) | |
| creative_works = { | |
| "books": ["novels", "non-fiction books", "poetry collections", "autobiographies", "short stories"], | |
| "films": ["movies", "documentaries", "animated films", "classic cinema", "independent films"], | |
| "music": ["songs", "albums", "concerts", "musical genres", "musical instruments"], | |
| "visual arts": ["paintings", "sculptures", "photography", "digital art", "street art"], | |
| "games": ["video games", "board games", "role-playing games", "puzzle games", "game design"] | |
| } | |
| creative_aspects = { | |
| "analysis": ["themes", "creative techniques", "cultural impact", "historical context", "artistic innovation"], | |
| "personal connection": ["emotional resonance", "personal interpretations", "memorable experiences", "formative influences", "changing perceptions"], | |
| "recommendations": ["underrated works", "genre-defining examples", "recent discoveries", "personal favorites", "influential classics"] | |
| } | |
| # Select specific elements | |
| work_list = creative_works.get(medium, creative_works["books"]) | |
| aspect_list = creative_aspects.get(approach, creative_aspects["personal connection"]) | |
| return { | |
| "subject": f"{genre} {random.choice(work_list)}", | |
| "experience": f"discovered a fascinating {medium} about {random.choice(['relationships', 'adventure', 'human nature', 'society', 'identity'])}", | |
| "related_subject": f"{random.choice(aspect_list)} in {medium}", | |
| "referenced_point": f"how {medium} can {random.choice(['inspire', 'challenge', 'comfort', 'provoke', 'transform'])} us", | |
| "question": f"what makes a {medium} truly {random.choice(['memorable', 'impactful', 'meaningful', 'innovative', 'timeless'])}" | |
| } | |
| def _generate_world_content(self, state, last_message): | |
| """Generate content related to understanding the world topics""" | |
| area = state.get("areas", random.choice(["science", "philosophy", "history", "culture", "psychology"])) | |
| method = state.get("methods", random.choice(["questioning", "comparing viewpoints", "exploring implications"])) | |
| goal = state.get("goals", random.choice(["knowledge", "wisdom", "practical application", "enjoyment"])) | |
| subjects = { | |
| "science": ["scientific discoveries", "theoretical physics", "evolutionary biology", "neuroscience", "environmental science"], | |
| "philosophy": ["ethical dilemmas", "existential questions", "logic and reasoning", "consciousness", "human nature"], | |
| "history": ["historical patterns", "civilizational development", "pivotal moments", "overlooked histories", "cultural evolution"], | |
| "culture": ["cultural differences", "traditions", "artistic expressions", "language and communication", "social norms"], | |
| "psychology": ["human behavior", "cognitive biases", "emotional intelligence", "decision-making", "relationship dynamics"] | |
| } | |
| approaches = { | |
| "questioning": ["fundamental assumptions", "conventional wisdom", "apparent contradictions", "underlying principles", "hidden connections"], | |
| "comparing viewpoints": ["different cultural perspectives", "contrasting theories", "historical vs. modern views", "expert disagreements", "interdisciplinary approaches"], | |
| "exploring implications": ["real-world applications", "personal relevance", "societal impacts", "future possibilities", "ethical considerations"] | |
| } | |
| # Select specific elements | |
| subject_list = subjects.get(area, subjects["science"]) | |
| approach_list = approaches.get(method, approaches["questioning"]) | |
| return { | |
| "subject": f"{random.choice(subject_list)}", | |
| "experience": f"learned about {random.choice(subject_list)} through {random.choice(['reading', 'a conversation', 'an online course', 'personal reflection', 'direct experience'])}", | |
| "related_subject": f"{random.choice(subject_list)} in relation to {random.choice(['modern life', 'personal growth', 'social change', 'technological development', 'human understanding'])}", | |
| "perspective": f"examining {random.choice(approach_list)}", | |
| "implication": f"understanding this better could lead to {random.choice(['deeper insights', 'practical solutions', 'personal transformation', 'societal progress', 'new questions'])}" | |
| } | |
| def _generate_general_message(self, state): | |
| """Generate a general message as fallback""" | |
| general_messages = [ | |
| "That's an interesting perspective. Could you tell me more about your thoughts on this?", | |
| "I've been reflecting on what you said. It reminds me of how complex these topics can be.", | |
| "I wonder how different people might approach this same situation.", | |
| "It's fascinating how conversations can lead us to new insights.", | |
| "That makes me think about how our experiences shape our perspectives.", | |
| "I'm curious how these ideas connect to other aspects of life.", | |
| f"Speaking of {state.get('current_topic', 'interesting topics')}, what aspects do you find most intriguing?", | |
| "I appreciate your thoughts on this. It's given me something new to consider.", | |
| "It's remarkable how much there is to explore in even seemingly simple topics.", | |
| "I find it helpful to look at these questions from multiple angles." | |
| ] | |
| return random.choice(general_messages) | |
| async def stop(self): | |
| """Stop the chatbot""" | |
| if not self.is_running: | |
| return | |
| self.is_running = False | |
| # Cancel the conversation task | |
| if self.conversation_task and not self.conversation_task.done(): | |
| self.conversation_task.cancel() | |
| try: | |
| await self.conversation_task | |
| except asyncio.CancelledError: | |
| pass | |
| # Close the stream manager | |
| if self.stream_manager: | |
| await self.stream_manager.close() | |
| print("\nChatbot stopped") | |
| # Setup credentials for Hugging Face Spaces | |
| def setup_environment_variables(): | |
| """Set up AWS credentials from various sources including Hugging Face Spaces secrets""" | |
| # Explicitly check for HuggingFace Spaces secrets | |
| if HF_SPACES: | |
| print("Detected HuggingFace Spaces environment, checking for secrets...") | |
| # In HF Spaces, secrets might be in different formats | |
| # Check for HF_AWS_ACCESS_KEY_ID or AWS_ACCESS_KEY_ID | |
| if os.environ.get("HF_AWS_ACCESS_KEY_ID") and not os.environ.get("AWS_ACCESS_KEY_ID"): | |
| os.environ["AWS_ACCESS_KEY_ID"] = os.environ.get("HF_AWS_ACCESS_KEY_ID") | |
| print("Using HF_AWS_ACCESS_KEY_ID") | |
| if os.environ.get("HF_AWS_SECRET_ACCESS_KEY") and not os.environ.get("AWS_SECRET_ACCESS_KEY"): | |
| os.environ["AWS_SECRET_ACCESS_KEY"] = os.environ.get("HF_AWS_SECRET_ACCESS_KEY") | |
| print("Using HF_AWS_SECRET_ACCESS_KEY") | |
| # Set default region if not already set | |
| if not os.environ.get("AWS_DEFAULT_REGION"): | |
| os.environ["AWS_DEFAULT_REGION"] = "us-east-1" | |
| print("Set default AWS region to us-east-1") | |
| # Main function | |
| async def main(): | |
| """Main function to run the chatbot""" | |
| # Initialize environment variables | |
| setup_environment_variables() | |
| # Create chatbot instance | |
| chatbot = SimpleChatbot() | |
| try: | |
| # Start the chatbot | |
| success = await chatbot.start() | |
| if not success: | |
| print("Failed to start chatbot") | |
| return | |
| # Keep running until interrupted | |
| print("\nChatbot is running. Press Ctrl+C to stop...\n") | |
| # Wait indefinitely | |
| while True: | |
| await asyncio.sleep(1) | |
| except KeyboardInterrupt: | |
| print("\nUser interrupted. Stopping chatbot...") | |
| except Exception as e: | |
| print(f"Error running chatbot: {e}") | |
| finally: | |
| # Clean up resources | |
| await chatbot.stop() | |
| if __name__ == "__main__": | |
| try: | |
| asyncio.run(main()) | |
| except Exception as e: | |
| print(f"Application error: {e}") | |
| if DEBUG: | |
| import traceback | |
| traceback.print_exc() |