hf_voicebot / app.py
SreekarB's picture
Upload 3 files
8ed5f07 verified
"""
Simple speech-to-speech chatbot for Hugging Face Spaces using AWS Bedrock Nova Sonic
Adapted from the original old_sonic_use.py
"""
import os
import asyncio
import base64
import json
import uuid
import warnings
import numpy as np
import pytz
import random
import hashlib
import datetime
import time
import inspect
from aws_sdk_bedrock_runtime.client import BedrockRuntimeClient, InvokeModelWithBidirectionalStreamOperationInput
from aws_sdk_bedrock_runtime.models import InvokeModelWithBidirectionalStreamInputChunk, BidirectionalInputPayloadPart
from aws_sdk_bedrock_runtime.config import Config, HTTPAuthSchemeResolver, SigV4AuthScheme
from smithy_aws_core.credentials_resolvers.environment import EnvironmentCredentialsResolver
# Suppress warnings
warnings.filterwarnings("ignore")
# Audio configuration
INPUT_SAMPLE_RATE = 16000
OUTPUT_SAMPLE_RATE = 24000
CHANNELS = 1
CHUNK_SIZE = 1024
# Debug mode flag
DEBUG = False
# Initial greeting
INITIAL_GREETING = "Hi there! I'm your AI conversation partner. How are you doing today? I'm here to chat about anything you'd like to discuss."
# Conversation topics and contexts - these guide the conversation but aren't hard-coded responses
CONVERSATION_CONTEXTS = [
{
"topic": "daily_life",
"personas": ["student", "professional", "retiree", "parent"],
"emotions": ["happy", "reflective", "curious", "excited"],
"interests": ["technology", "art", "travel", "food", "health", "learning"]
},
{
"topic": "future_thinking",
"themes": ["technology", "environment", "society", "personal_growth"],
"perspectives": ["optimistic", "cautious", "innovative", "traditional"],
"timeframes": ["near future", "distant future", "theoretical possibilities"]
},
{
"topic": "creative_exploration",
"media": ["books", "films", "music", "visual arts", "games"],
"approaches": ["analysis", "personal connection", "recommendations", "creation"],
"genres": ["science fiction", "drama", "comedy", "documentary", "fantasy", "thriller"]
},
{
"topic": "world_understanding",
"areas": ["science", "philosophy", "history", "culture", "psychology"],
"methods": ["questioning", "comparing viewpoints", "exploring implications", "personal relevance"],
"goals": ["knowledge", "wisdom", "practical application", "enjoyment"]
}
]
# For Hugging Face environment detection
HF_SPACES = "SPACE_ID" in os.environ or ("SYSTEM" in os.environ and os.environ.get("SYSTEM") == "spaces")
def debug_print(message):
"""Print only if debug mode is enabled"""
if DEBUG:
functionName = inspect.stack()[1].function
if functionName == 'time_it' or functionName == 'time_it_async':
functionName = inspect.stack()[2].function
print('{:%Y-%m-%d %H:%M:%S.%f}'.format(datetime.datetime.now())[:-3] + ' ' + functionName + ' ' + message)
async def time_it_async(label, methodToRun):
start_time = time.perf_counter()
result = await methodToRun()
end_time = time.perf_counter()
debug_print(f"Execution time for {label}: {end_time - start_time:.4f} seconds")
return result
class BedrockStreamManager:
"""Manages bidirectional streaming with AWS Bedrock using asyncio"""
# Event templates
START_SESSION_EVENT = '''{
"event": {
"sessionStart": {
"inferenceConfiguration": {
"maxTokens": 1024,
"topP": 0.9,
"temperature": 0.7
}
}
}
}'''
TEXT_CONTENT_START_EVENT = '''{
"event": {
"contentStart": {
"promptName": "%s",
"contentName": "%s",
"type": "TEXT",
"role": "%s",
"interactive": true,
"textInputConfiguration": {
"mediaType": "text/plain"
}
}
}
}'''
TEXT_INPUT_EVENT = '''{
"event": {
"textInput": {
"promptName": "%s",
"contentName": "%s",
"content": "%s"
}
}
}'''
CONTENT_END_EVENT = '''{
"event": {
"contentEnd": {
"promptName": "%s",
"contentName": "%s"
}
}
}'''
PROMPT_END_EVENT = '''{
"event": {
"promptEnd": {
"promptName": "%s"
}
}
}'''
SESSION_END_EVENT = '''{
"event": {
"sessionEnd": {}
}
}'''
def __init__(self, model_id='amazon.nova-sonic-v1:0', region='us-east-1'):
"""Initialize the stream manager."""
self.model_id = model_id
self.region = region
# Replace RxPy subjects with asyncio queues
self.audio_input_queue = asyncio.Queue()
self.audio_output_queue = asyncio.Queue()
self.output_queue = asyncio.Queue()
self.response_task = None
self.stream_response = None
self.is_active = False
self.bedrock_client = None
# Text response components
self.display_assistant_text = False
self.role = None
self.current_text = ""
# Session information
self.prompt_name = str(uuid.uuid4())
self.content_name = str(uuid.uuid4())
# Conversation history
self.conversation_history = []
def _initialize_client(self):
"""Initialize the Bedrock client."""
# Check for required environment variables
required_vars = ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"]
missing_vars = [var for var in required_vars if not os.environ.get(var)]
if missing_vars:
error_msg = f"Missing AWS credentials: {', '.join(missing_vars)}\n"
if HF_SPACES:
error_msg += "Please add these as secrets in your Hugging Face Space settings."
else:
error_msg += "Please set these environment variables."
raise ValueError(error_msg)
config = Config(
endpoint_uri=f"https://bedrock-runtime.{self.region}.amazonaws.com",
region=self.region,
aws_credentials_identity_resolver=EnvironmentCredentialsResolver(),
http_auth_scheme_resolver=HTTPAuthSchemeResolver(),
http_auth_schemes={"aws.auth#sigv4": SigV4AuthScheme()}
)
return BedrockRuntimeClient(config=config)
async def initialize_stream(self):
"""Initialize the bidirectional stream with Bedrock."""
try:
# Create client
self.bedrock_client = self._initialize_client()
self.stream_response = await time_it_async("invoke_model_with_bidirectional_stream",
lambda: self.bedrock_client.invoke_model_with_bidirectional_stream(
InvokeModelWithBidirectionalStreamOperationInput(model_id=self.model_id)
)
)
self.is_active = True
# Start the prompt
prompt_start_event = {
"event": {
"promptStart": {
"promptName": self.prompt_name,
"textOutputConfiguration": {
"mediaType": "text/plain"
},
"audioOutputConfiguration": {
"mediaType": "audio/lpcm",
"sampleRateHertz": 24000,
"sampleSizeBits": 16,
"channelCount": 1,
"voiceId": "matthew",
"encoding": "base64",
"audioType": "SPEECH"
}
}
}
}
# Initialize with system prompt
system_prompt = "You are a friendly, helpful conversation partner. Keep your responses natural and conversational. Act as if this is a spoken conversation between friends. Ask follow-up questions to keep the conversation going."
# Send initialization events
init_events = [
self.START_SESSION_EVENT,
json.dumps(prompt_start_event),
self.TEXT_CONTENT_START_EVENT % (self.prompt_name, "system_prompt", "SYSTEM"),
self.TEXT_INPUT_EVENT % (self.prompt_name, "system_prompt", system_prompt),
self.CONTENT_END_EVENT % (self.prompt_name, "system_prompt")
]
for event in init_events:
await self.send_raw_event(event)
await asyncio.sleep(0.1)
# Start listening for responses
self.response_task = asyncio.create_task(self._process_responses())
print("Stream initialized successfully")
return True
except Exception as e:
self.is_active = False
print(f"Failed to initialize stream: {str(e)}")
return False
async def send_raw_event(self, event_json):
"""Send a raw event JSON to the Bedrock stream."""
if not self.stream_response or not self.is_active:
debug_print("Stream not initialized or closed")
return
event = InvokeModelWithBidirectionalStreamInputChunk(
value=BidirectionalInputPayloadPart(bytes_=event_json.encode('utf-8'))
)
try:
await self.stream_response.input_stream.send(event)
# For debugging large events, you might want to log just the type
if DEBUG:
if len(event_json) > 200:
event_type = json.loads(event_json).get("event", {}).keys()
debug_print(f"Sent event type: {list(event_type)}")
else:
debug_print(f"Sent event: {event_json}")
except Exception as e:
debug_print(f"Error sending event: {str(e)}")
if DEBUG:
import traceback
traceback.print_exc()
async def _process_responses(self):
"""Process incoming responses from Bedrock."""
try:
while self.is_active:
try:
output = await self.stream_response.await_output()
result = await output[1].receive()
if result.value and result.value.bytes_:
try:
response_data = result.value.bytes_.decode('utf-8')
json_data = json.loads(response_data)
# Handle different response types
if 'event' in json_data:
# Handle content start
if 'contentStart' in json_data['event']:
content_start = json_data['event']['contentStart']
self.role = content_start.get('role', '')
# Handle text output
elif 'textOutput' in json_data['event']:
text_content = json_data['event']['textOutput']['content']
role = json_data['event']['textOutput']['role']
if role == "ASSISTANT":
if self.current_text == "":
# First chunk of text
self.current_text = text_content
print(f"\nAI: {text_content}", end="", flush=True)
else:
# Continuation of text
delta = text_content[len(self.current_text):]
self.current_text = text_content
print(delta, end="", flush=True)
# Handle audio output
elif 'audioOutput' in json_data['event']:
audio_content = json_data['event']['audioOutput']['content']
await self.audio_output_queue.put(base64.b64decode(audio_content))
# Handle content end
elif 'contentEnd' in json_data['event']:
if self.role == "ASSISTANT" and self.current_text:
# Save to conversation history
self.conversation_history.append({
"role": "assistant",
"content": self.current_text
})
self.current_text = ""
print() # Add a new line
# Put the response in the output queue for other components
await self.output_queue.put(json_data)
except json.JSONDecodeError:
await self.output_queue.put({"raw_data": response_data})
except StopAsyncIteration:
# Stream has ended
break
except Exception as e:
if "ValidationException" in str(e):
error_message = str(e)
print(f"Validation error: {error_message}")
else:
print(f"Error receiving response: {e}")
if DEBUG:
import traceback
traceback.print_exc()
break
except Exception as e:
print(f"Response processing error: {e}")
finally:
self.is_active = False
async def send_text_message(self, text):
"""Send a text message to Nova."""
if not self.is_active:
debug_print("Stream is not active")
return False
try:
message_id = f"msg_{int(time.time())}"
# Send content start
start_event = self.TEXT_CONTENT_START_EVENT % (self.prompt_name, message_id, "USER")
await self.send_raw_event(start_event)
# Send text content
content_event = self.TEXT_INPUT_EVENT % (self.prompt_name, message_id, text)
await self.send_raw_event(content_event)
# Send content end
end_event = self.CONTENT_END_EVENT % (self.prompt_name, message_id)
await self.send_raw_event(end_event)
# Add to conversation history
self.conversation_history.append({
"role": "user",
"content": text
})
print(f"\nYou: {text}")
return True
except Exception as e:
print(f"Error sending text message: {e}")
return False
async def close(self):
"""Close the stream properly."""
if not self.is_active:
return
self.is_active = False
# Cancel the response task
if self.response_task and not self.response_task.done():
self.response_task.cancel()
try:
await self.response_task
except asyncio.CancelledError:
pass
try:
# Send end events
await self.send_raw_event(self.PROMPT_END_EVENT % (self.prompt_name))
await self.send_raw_event(self.SESSION_END_EVENT)
# Close the stream
if self.stream_response:
await self.stream_response.input_stream.close()
except Exception as e:
debug_print(f"Error closing stream: {e}")
class SimpleChatbot:
"""Simple speech-to-speech chatbot for Hugging Face Spaces"""
def __init__(self):
"""Initialize the chatbot"""
self.stream_manager = None
self.is_running = False
self.conversation_task = None
self.region = os.environ.get("AWS_DEFAULT_REGION", "us-east-1")
self.model_id = "amazon.nova-sonic-v1:0"
async def start(self):
"""Start the chatbot"""
print("\n==================================================")
print(" Simple Speech-to-Speech Chatbot (HF Spaces Mode)")
print("==================================================\n")
try:
# Initialize the stream manager
self.stream_manager = BedrockStreamManager(
model_id=self.model_id,
region=self.region
)
# Initialize the stream
success = await self.stream_manager.initialize_stream()
if not success:
print("Failed to initialize stream")
return False
self.is_running = True
# Send initial greeting
print("\nStarting conversation...")
await self.stream_manager.send_text_message(INITIAL_GREETING)
# Start conversation simulation task
self.conversation_task = asyncio.create_task(self.simulate_conversation())
return True
except Exception as e:
print(f"Error starting chatbot: {e}")
return False
async def simulate_conversation(self):
"""Simulate a dynamic, adaptive conversation"""
try:
# Wait for initial greeting to complete
await asyncio.sleep(8)
# Get initial context for conversation
conversation_state = self._initialize_conversation_state()
# Loop through dynamically generated conversation
message_count = 0
max_messages = 100 # Limit to prevent infinite loops
# Maintain conversation history for context
conversation_history = [
{"role": "system", "content": "You are having a natural conversation. Be genuine and thoughtful."},
{"role": "assistant", "content": INITIAL_GREETING}
]
while self.is_running and message_count < max_messages:
# Generate a contextually appropriate message based on conversation state
user_message = self._generate_contextual_message(conversation_state, conversation_history)
# Send the message
await self.stream_manager.send_text_message(user_message)
# Add to conversation tracking
conversation_history.append({"role": "user", "content": user_message})
# Wait for response (variable timing to seem more natural)
wait_time = random.uniform(10, 15)
await asyncio.sleep(wait_time)
# Get Nova's response from the stream_manager if available
if self.stream_manager.conversation_history:
last_response = self.stream_manager.conversation_history[-1]
if last_response["role"] == "assistant":
conversation_history.append(last_response)
# Evolve the conversation state based on the exchange
conversation_state = self._evolve_conversation_state(conversation_state, conversation_history)
message_count += 1
# Occasionally add some variety with pauses
if random.random() < 0.1: # 10% chance
pause_time = random.uniform(5, 10)
await asyncio.sleep(pause_time)
except asyncio.CancelledError:
# Task was cancelled, exit gracefully
pass
except Exception as e:
print(f"Error in conversation simulation: {e}")
def _initialize_conversation_state(self):
"""Initialize the state that guides conversation flow"""
# Select a random conversation context to start from
context = random.choice(CONVERSATION_CONTEXTS)
# Set initial conversation parameters
state = {
"current_topic": context["topic"],
"context": context,
"depth_level": 1, # Start shallow, go deeper
"engagement": random.uniform(0.5, 0.8), # How engaged the conversation is
"emotion": random.choice(["neutral", "curious", "interested"]),
"turns_in_topic": 0
}
# Add more specific elements from the context
for key, values in context.items():
if isinstance(values, list) and values:
state[key] = random.choice(values)
return state
def _evolve_conversation_state(self, state, conversation_history):
"""Evolve the conversation state based on recent exchanges"""
# Extract the most recent exchanges
recent_exchanges = conversation_history[-4:] if len(conversation_history) >= 4 else conversation_history
# Increment turns in current topic
state["turns_in_topic"] += 1
# Occasionally change topics to keep conversation fresh
if state["turns_in_topic"] > random.randint(3, 7):
# Choose a new context
new_context = random.choice(CONVERSATION_CONTEXTS)
while new_context["topic"] == state["current_topic"]:
new_context = random.choice(CONVERSATION_CONTEXTS)
state["current_topic"] = new_context["topic"]
state["context"] = new_context
state["turns_in_topic"] = 0
state["depth_level"] = 1 # Reset depth when changing topics
# Add more specific elements from the new context
for key, values in new_context.items():
if isinstance(values, list) and values:
state[key] = random.choice(values)
else:
# If staying on topic, incrementally go deeper
state["depth_level"] = min(state["depth_level"] + random.uniform(0.1, 0.5), 4)
# Adjust engagement level based on complexity of responses
if recent_exchanges:
avg_response_length = sum(len(ex.get("content", "")) for ex in recent_exchanges) / len(recent_exchanges)
# Longer responses typically indicate more engagement
engagement_delta = (avg_response_length / 200) - 0.5 # Normalize
state["engagement"] = max(0.1, min(1.0, state["engagement"] + engagement_delta * 0.2))
# Occasionally shift emotion for variety
if random.random() < 0.2: # 20% chance
emotions = ["neutral", "curious", "interested", "excited", "reflective", "amused", "thoughtful"]
state["emotion"] = random.choice(emotions)
return state
def _generate_contextual_message(self, state, conversation_history):
"""Generate a contextually appropriate message based on conversation state"""
# Extract most recent exchange
last_message = conversation_history[-1]["content"] if conversation_history else ""
# Get the current context
context = state["context"]
topic = state["current_topic"]
# Base templates for different conversation flows
templates = [
# Questions
"What do you think about {subject}?",
"Have you ever experienced {experience}?",
"I'm curious about your perspective on {subject}. What's your take?",
"How would you approach {situation}?",
# Statements
"I've been thinking about {subject} lately.",
"I recently {experience} and it made me wonder about {related_subject}.",
"It's interesting to consider {subject}, especially when you think about {perspective}.",
# Follow-ups
"That's fascinating. What about {related_subject}?",
"I see your point about {referenced_point}. That makes me wonder about {question}.",
"I hadn't thought about it that way. Does that mean you believe {implication}?"
]
# Select template based on state
template = random.choice(templates)
# Content generators for different topics
content_generators = {
"daily_life": self._generate_daily_life_content,
"future_thinking": self._generate_future_thinking_content,
"creative_exploration": self._generate_creative_content,
"world_understanding": self._generate_world_content
}
# Use the appropriate content generator
generator = content_generators.get(topic, self._generate_general_content)
content_elements = generator(state, last_message)
# Fill in the template with contextual elements
for key, value in content_elements.items():
template = template.replace("{" + key + "}", value)
# If the template still has unfilled placeholders, use a more general message
if "{" in template and "}" in template:
return self._generate_general_message(state)
return template
def _generate_daily_life_content(self, state, last_message):
"""Generate content related to daily life topics"""
persona = state.get("personas", random.choice(["student", "professional", "parent", "traveler"]))
interest = state.get("interests", random.choice(["technology", "art", "travel", "food", "health"]))
subjects = {
"technology": ["digital assistants", "smartphones", "social media", "online learning", "working remotely"],
"art": ["favorite music", "recent movies", "books", "creative hobbies", "art exhibitions"],
"travel": ["dream destinations", "travel experiences", "local exploration", "cultural differences", "adventure sports"],
"food": ["cooking at home", "favorite cuisines", "dietary choices", "restaurant experiences", "food traditions"],
"health": ["exercise routines", "sleep habits", "mental wellness", "work-life balance", "mindfulness practices"]
}
experiences = {
"student": ["studying for an exam", "joining a new club", "learning a difficult concept", "managing deadlines", "group projects"],
"professional": ["handling work challenges", "career development", "workplace collaboration", "learning new skills", "professional networking"],
"parent": ["family activities", "teaching moments", "balancing responsibilities", "childhood development", "family traditions"],
"traveler": ["planning trips", "cultural experiences", "travel mishaps", "discovering new places", "meeting people while traveling"]
}
# Select specific elements
subject_list = subjects.get(interest, subjects["technology"])
experience_list = experiences.get(persona, experiences["professional"])
return {
"subject": random.choice(subject_list),
"experience": random.choice(experience_list),
"related_subject": random.choice(subject_list),
"situation": f"{random.choice(experience_list)} for the first time",
"perspective": f"how it affects daily {random.choice(['routines', 'habits', 'interactions', 'decisions'])}"
}
def _generate_future_thinking_content(self, state, last_message):
"""Generate content related to future and forward-looking topics"""
theme = state.get("themes", random.choice(["technology", "environment", "society", "personal_growth"]))
perspective = state.get("perspectives", random.choice(["optimistic", "cautious", "innovative"]))
timeframe = state.get("timeframes", random.choice(["near future", "distant future", "theoretical possibilities"]))
subjects = {
"technology": ["artificial intelligence", "space exploration", "robotics", "virtual reality", "human augmentation"],
"environment": ["renewable energy", "climate adaptation", "sustainable living", "conservation efforts", "environmental policy"],
"society": ["future of work", "education evolution", "changing social norms", "global cooperation", "new economic models"],
"personal_growth": ["lifelong learning", "adaptability", "future skills", "evolving careers", "human potential"]
}
implications = {
"optimistic": ["create new opportunities", "solve major problems", "enhance human capabilities", "bring people together", "accelerate positive change"],
"cautious": ["require careful consideration", "present both benefits and risks", "need ethical guidelines", "transform familiar systems", "challenge our adaptability"],
"innovative": ["reshape our understanding", "combine unexpected elements", "create entirely new categories", "transcend current limitations", "evolve in surprising ways"]
}
# Select specific elements
subject_list = subjects.get(theme, subjects["technology"])
implication_list = implications.get(perspective, implications["optimistic"])
return {
"subject": f"{random.choice(subject_list)} in the {timeframe}",
"experience": f"read about breakthroughs in {random.choice(subject_list)}",
"related_subject": f"how {random.choice(subject_list)} might evolve",
"perspective": f"from a {perspective} standpoint",
"implication": f"these developments will {random.choice(implication_list)}"
}
def _generate_creative_content(self, state, last_message):
"""Generate content related to creative and artistic topics"""
medium = state.get("media", random.choice(["books", "films", "music", "visual arts", "games"]))
approach = state.get("approaches", random.choice(["analysis", "personal connection", "recommendations"]))
genre = state.get("genres", random.choice(["science fiction", "drama", "comedy", "fantasy"]))
creative_works = {
"books": ["novels", "non-fiction books", "poetry collections", "autobiographies", "short stories"],
"films": ["movies", "documentaries", "animated films", "classic cinema", "independent films"],
"music": ["songs", "albums", "concerts", "musical genres", "musical instruments"],
"visual arts": ["paintings", "sculptures", "photography", "digital art", "street art"],
"games": ["video games", "board games", "role-playing games", "puzzle games", "game design"]
}
creative_aspects = {
"analysis": ["themes", "creative techniques", "cultural impact", "historical context", "artistic innovation"],
"personal connection": ["emotional resonance", "personal interpretations", "memorable experiences", "formative influences", "changing perceptions"],
"recommendations": ["underrated works", "genre-defining examples", "recent discoveries", "personal favorites", "influential classics"]
}
# Select specific elements
work_list = creative_works.get(medium, creative_works["books"])
aspect_list = creative_aspects.get(approach, creative_aspects["personal connection"])
return {
"subject": f"{genre} {random.choice(work_list)}",
"experience": f"discovered a fascinating {medium} about {random.choice(['relationships', 'adventure', 'human nature', 'society', 'identity'])}",
"related_subject": f"{random.choice(aspect_list)} in {medium}",
"referenced_point": f"how {medium} can {random.choice(['inspire', 'challenge', 'comfort', 'provoke', 'transform'])} us",
"question": f"what makes a {medium} truly {random.choice(['memorable', 'impactful', 'meaningful', 'innovative', 'timeless'])}"
}
def _generate_world_content(self, state, last_message):
"""Generate content related to understanding the world topics"""
area = state.get("areas", random.choice(["science", "philosophy", "history", "culture", "psychology"]))
method = state.get("methods", random.choice(["questioning", "comparing viewpoints", "exploring implications"]))
goal = state.get("goals", random.choice(["knowledge", "wisdom", "practical application", "enjoyment"]))
subjects = {
"science": ["scientific discoveries", "theoretical physics", "evolutionary biology", "neuroscience", "environmental science"],
"philosophy": ["ethical dilemmas", "existential questions", "logic and reasoning", "consciousness", "human nature"],
"history": ["historical patterns", "civilizational development", "pivotal moments", "overlooked histories", "cultural evolution"],
"culture": ["cultural differences", "traditions", "artistic expressions", "language and communication", "social norms"],
"psychology": ["human behavior", "cognitive biases", "emotional intelligence", "decision-making", "relationship dynamics"]
}
approaches = {
"questioning": ["fundamental assumptions", "conventional wisdom", "apparent contradictions", "underlying principles", "hidden connections"],
"comparing viewpoints": ["different cultural perspectives", "contrasting theories", "historical vs. modern views", "expert disagreements", "interdisciplinary approaches"],
"exploring implications": ["real-world applications", "personal relevance", "societal impacts", "future possibilities", "ethical considerations"]
}
# Select specific elements
subject_list = subjects.get(area, subjects["science"])
approach_list = approaches.get(method, approaches["questioning"])
return {
"subject": f"{random.choice(subject_list)}",
"experience": f"learned about {random.choice(subject_list)} through {random.choice(['reading', 'a conversation', 'an online course', 'personal reflection', 'direct experience'])}",
"related_subject": f"{random.choice(subject_list)} in relation to {random.choice(['modern life', 'personal growth', 'social change', 'technological development', 'human understanding'])}",
"perspective": f"examining {random.choice(approach_list)}",
"implication": f"understanding this better could lead to {random.choice(['deeper insights', 'practical solutions', 'personal transformation', 'societal progress', 'new questions'])}"
}
def _generate_general_message(self, state):
"""Generate a general message as fallback"""
general_messages = [
"That's an interesting perspective. Could you tell me more about your thoughts on this?",
"I've been reflecting on what you said. It reminds me of how complex these topics can be.",
"I wonder how different people might approach this same situation.",
"It's fascinating how conversations can lead us to new insights.",
"That makes me think about how our experiences shape our perspectives.",
"I'm curious how these ideas connect to other aspects of life.",
f"Speaking of {state.get('current_topic', 'interesting topics')}, what aspects do you find most intriguing?",
"I appreciate your thoughts on this. It's given me something new to consider.",
"It's remarkable how much there is to explore in even seemingly simple topics.",
"I find it helpful to look at these questions from multiple angles."
]
return random.choice(general_messages)
async def stop(self):
"""Stop the chatbot"""
if not self.is_running:
return
self.is_running = False
# Cancel the conversation task
if self.conversation_task and not self.conversation_task.done():
self.conversation_task.cancel()
try:
await self.conversation_task
except asyncio.CancelledError:
pass
# Close the stream manager
if self.stream_manager:
await self.stream_manager.close()
print("\nChatbot stopped")
# Setup credentials for Hugging Face Spaces
def setup_environment_variables():
"""Set up AWS credentials from various sources including Hugging Face Spaces secrets"""
# Explicitly check for HuggingFace Spaces secrets
if HF_SPACES:
print("Detected HuggingFace Spaces environment, checking for secrets...")
# In HF Spaces, secrets might be in different formats
# Check for HF_AWS_ACCESS_KEY_ID or AWS_ACCESS_KEY_ID
if os.environ.get("HF_AWS_ACCESS_KEY_ID") and not os.environ.get("AWS_ACCESS_KEY_ID"):
os.environ["AWS_ACCESS_KEY_ID"] = os.environ.get("HF_AWS_ACCESS_KEY_ID")
print("Using HF_AWS_ACCESS_KEY_ID")
if os.environ.get("HF_AWS_SECRET_ACCESS_KEY") and not os.environ.get("AWS_SECRET_ACCESS_KEY"):
os.environ["AWS_SECRET_ACCESS_KEY"] = os.environ.get("HF_AWS_SECRET_ACCESS_KEY")
print("Using HF_AWS_SECRET_ACCESS_KEY")
# Set default region if not already set
if not os.environ.get("AWS_DEFAULT_REGION"):
os.environ["AWS_DEFAULT_REGION"] = "us-east-1"
print("Set default AWS region to us-east-1")
# Main function
async def main():
"""Main function to run the chatbot"""
# Initialize environment variables
setup_environment_variables()
# Create chatbot instance
chatbot = SimpleChatbot()
try:
# Start the chatbot
success = await chatbot.start()
if not success:
print("Failed to start chatbot")
return
# Keep running until interrupted
print("\nChatbot is running. Press Ctrl+C to stop...\n")
# Wait indefinitely
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("\nUser interrupted. Stopping chatbot...")
except Exception as e:
print(f"Error running chatbot: {e}")
finally:
# Clean up resources
await chatbot.stop()
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as e:
print(f"Application error: {e}")
if DEBUG:
import traceback
traceback.print_exc()