Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import random | |
| import datetime | |
| import re | |
| import sys | |
| # Define emotion label mapping | |
| EMOTION_LABELS = [ | |
| "admiration", "amusement", "anger", "annoyance", "approval", "caring", "confusion", | |
| "curiosity", "desire", "disappointment", "disapproval", "disgust", "embarrassment", | |
| "excitement", "fear", "gratitude", "grief", "joy", "love", "nervousness", "optimism", | |
| "pride", "realization", "relief", "remorse", "sadness", "surprise", "neutral" | |
| ] | |
| # Map similar emotions to our response categories | |
| EMOTION_MAPPING = { | |
| "admiration": "joy", | |
| "amusement": "joy", | |
| "anger": "anger", | |
| "annoyance": "anger", | |
| "approval": "joy", | |
| "caring": "joy", | |
| "confusion": "neutral", | |
| "curiosity": "neutral", | |
| "desire": "neutral", | |
| "disappointment": "sadness", | |
| "disapproval": "anger", | |
| "disgust": "disgust", | |
| "embarrassment": "sadness", | |
| "excitement": "joy", | |
| "fear": "fear", | |
| "gratitude": "joy", | |
| "grief": "sadness", | |
| "joy": "joy", | |
| "love": "joy", | |
| "nervousness": "fear", | |
| "optimism": "joy", | |
| "pride": "joy", | |
| "realization": "neutral", | |
| "relief": "joy", | |
| "remorse": "sadness", | |
| "sadness": "sadness", | |
| "surprise": "surprise", | |
| "neutral": "neutral" | |
| } | |
| class ChatbotContext: | |
| """Class to maintain conversation context and history""" | |
| def __init__(self): | |
| self.conversation_history = [] | |
| self.detected_emotions = [] | |
| self.user_feedback = [] | |
| self.current_session_id = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") | |
| # Track emotional progression for therapeutic conversation flow | |
| self.conversation_stage = "initial" # initial, middle, advanced | |
| self.emotion_trajectory = [] # track emotion changes over time | |
| self.consecutive_positive_count = 0 | |
| self.consecutive_negative_count = 0 | |
| # Add user name tracking | |
| self.user_name = None | |
| self.bot_name = "Mira" # Friendly, easy to remember name | |
| self.introduced = False | |
| def add_message(self, role, text, emotions=None): | |
| """Add a message to the conversation history""" | |
| timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| message = { | |
| "role": role, | |
| "text": text, | |
| "timestamp": timestamp | |
| } | |
| if emotions and role == "user": | |
| message["emotions"] = emotions | |
| self.detected_emotions.append(emotions) | |
| self._update_emotional_trajectory(emotions) | |
| self.conversation_history.append(message) | |
| return message | |
| def _update_emotional_trajectory(self, emotions): | |
| """Update the emotional trajectory based on newly detected emotions""" | |
| # Get the primary emotion | |
| primary_emotion = emotions[0]["emotion"] if emotions else "neutral" | |
| # Add to trajectory | |
| self.emotion_trajectory.append(primary_emotion) | |
| # Classify as positive, negative, or neutral | |
| positive_emotions = ["joy", "admiration", "amusement", "excitement", | |
| "optimism", "gratitude", "pride", "love", "relief"] | |
| negative_emotions = ["sadness", "anger", "fear", "disgust", "disappointment", | |
| "annoyance", "disapproval", "embarrassment", "grief", | |
| "remorse", "nervousness"] | |
| if primary_emotion in positive_emotions: | |
| self.consecutive_positive_count += 1 | |
| self.consecutive_negative_count = 0 | |
| elif primary_emotion in negative_emotions: | |
| self.consecutive_negative_count += 1 | |
| self.consecutive_positive_count = 0 | |
| else: # neutral or other | |
| # Don't reset counters for neutral emotions to maintain progress | |
| pass | |
| # Update conversation stage based on trajectory and message count | |
| # This now uses conversation history length rather than just emotion trajectory | |
| msg_count = len(self.conversation_history) // 2 # Count actual exchanges (user/bot pairs) | |
| if msg_count <= 1: # First real exchange | |
| self.conversation_stage = "initial" | |
| elif msg_count <= 3: # First few exchanges | |
| self.conversation_stage = "middle" | |
| else: # More established conversation | |
| self.conversation_stage = "advanced" | |
| def get_emotional_state(self): | |
| """Get the current emotional state of the conversation""" | |
| if len(self.emotion_trajectory) < 2: | |
| return "unknown" | |
| # Get the last few emotions (with 'neutral' having less weight) | |
| recent_emotions = self.emotion_trajectory[-3:] | |
| positive_emotions = ["joy", "admiration", "amusement", "excitement", | |
| "optimism", "gratitude", "pride", "love", "relief"] | |
| negative_emotions = ["sadness", "anger", "fear", "disgust", "disappointment", | |
| "annoyance", "disapproval", "embarrassment", "grief", | |
| "remorse", "nervousness"] | |
| # Count positive and negative emotions | |
| pos_count = sum(1 for e in recent_emotions if e in positive_emotions) | |
| neg_count = sum(1 for e in recent_emotions if e in negative_emotions) | |
| if self.consecutive_positive_count >= 2: | |
| return "positive" | |
| elif self.consecutive_negative_count >= 2: | |
| return "negative" | |
| elif pos_count > neg_count: | |
| return "improving" | |
| elif neg_count > pos_count: | |
| return "declining" | |
| else: | |
| return "neutral" | |
| def add_feedback(self, rating, comments=None): | |
| """Add user feedback about the chatbot's response""" | |
| feedback = { | |
| "rating": rating, | |
| "comments": comments, | |
| "timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| } | |
| self.user_feedback.append(feedback) | |
| return feedback | |
| def get_recent_messages(self, count=5): | |
| """Get the most recent messages from the conversation history""" | |
| return self.conversation_history[-count:] if len(self.conversation_history) >= count else self.conversation_history | |
| def save_conversation(self, filepath=None): | |
| """Save the conversation history to a JSON file""" | |
| if not filepath: | |
| os.makedirs("./conversations", exist_ok=True) | |
| filepath = f"./conversations/conversation_{self.current_session_id}.json" | |
| data = { | |
| "conversation_history": self.conversation_history, | |
| "user_feedback": self.user_feedback, | |
| "emotion_trajectory": self.emotion_trajectory, | |
| "session_id": self.current_session_id, | |
| "start_time": self.conversation_history[0]["timestamp"] if self.conversation_history else None, | |
| "end_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| } | |
| with open(filepath, 'w') as f: | |
| json.dump(data, f, indent=2) | |
| print(f"Conversation saved to {filepath}") | |
| return filepath | |
| def clean_response_text(response, user_name): | |
| """Clean up the response text to make it more natural""" | |
| # Remove repeated name mentions | |
| if user_name: | |
| # Replace patterns like "Hey user_name," or "Hi user_name," | |
| response = re.sub(r'^(Hey|Hi|Hello)\s+' + re.escape(user_name) + r',?\s+', '', response, flags=re.IGNORECASE) | |
| # Replace duplicate name mentions | |
| pattern = re.escape(user_name) + r',?\s+.*' + re.escape(user_name) | |
| if re.search(pattern, response, re.IGNORECASE): | |
| response = re.sub(r',?\s+' + re.escape(user_name) + r'([,.!?])', r'\1', response, flags=re.IGNORECASE) | |
| # Remove name at the end of sentences if it appears earlier | |
| if response.count(user_name) > 1: | |
| response = re.sub(r',\s+' + re.escape(user_name) + r'([.!?])(\s|$)', r'\1\2', response, flags=re.IGNORECASE) | |
| # Remove phrases that feel repetitive or formulaic | |
| phrases_to_remove = [ | |
| r"let me know what you'd prefer,?\s+", | |
| r"i'm here to listen,?\s+", | |
| r"let me know if there's anything else,?\s+", | |
| r"i'm all ears,?\s+", | |
| r"i'm here for you,?\s+" | |
| ] | |
| for phrase in phrases_to_remove: | |
| response = re.sub(phrase, "", response, flags=re.IGNORECASE) | |
| # Fix multiple punctuation | |
| response = re.sub(r'([.!?])\s+\1', r'\1', response) | |
| # Fix missing space after punctuation | |
| response = re.sub(r'([.!?])([A-Za-z])', r'\1 \2', response) | |
| # Make sure first letter is capitalized | |
| if response and len(response) > 0: | |
| response = response[0].upper() + response[1:] | |
| return response.strip() | |