Spaces:
Sleeping
Sleeping
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| import random | |
| from textblob import TextBlob | |
| import pandas as pd | |
| import requests | |
| from io import StringIO | |
| import gradio as gr | |
| import speech_recognition as sr | |
| import json | |
| # ----- Dummy Model and vocab ----- | |
| vocab = {'<PAD>': 0, '<UNK>': 1, 'i': 2, 'am': 3, 'feeling': 4, 'sad': 5, 'happy': 6, 'angry': 7, 'love': 8, 'stressed': 9, 'anxious': 10} | |
| MAX_LEN = 16 | |
| class DummyLabelEncoder: | |
| def __init__(self): | |
| self.classes_ = ['sadness', 'anger', 'love', 'happiness', 'neutral'] | |
| def transform(self, x): return [self.classes_.index(i) for i in x] | |
| def inverse_transform(self, x): return [self.classes_[i] for i in x] | |
| le = DummyLabelEncoder() | |
| class DummyModel(nn.Module): | |
| def __init__(self): | |
| super().__init__() | |
| self.embedding = nn.Embedding(len(vocab), 8) | |
| self.fc = nn.Linear(8, len(le.classes_)) | |
| def forward(self, x): | |
| x = self.embedding(x) | |
| x = x.mean(dim=1) | |
| return self.fc(x) | |
| model = DummyModel() | |
| def preprocess_input(text): | |
| tokens = text.lower().split() | |
| encoded = [vocab.get(token, vocab['<UNK>']) for token in tokens] | |
| padded = encoded[:MAX_LEN] + [vocab['<PAD>']] * max(0, MAX_LEN - len(encoded)) | |
| return torch.tensor([padded], dtype=torch.long).to(next(model.parameters()).device) | |
| # ----- Load CSV from Google Drive ----- | |
| file_id = "1yVJh_NVL4Y4YqEXGym47UCK5ZNZgVZYv" | |
| url = f"https://drive.google.com/uc?export=download&id={file_id}" | |
| response = requests.get(url) | |
| csv_text = response.text | |
| if csv_text.strip().startswith('<'): | |
| raise Exception("ERROR: Google Drive link is not returning CSV! Check your sharing settings.") | |
| solutions_df = pd.read_csv(StringIO(csv_text), header=0, on_bad_lines='skip') | |
| used_solutions = {emotion: set() for emotion in solutions_df['emotion'].unique()} | |
| # ----- Data and responses ----- | |
| negative_words = [ | |
| "not", "bad", "sad", "anxious", "anxiety", "depressed", "upset", "shit", "stress", | |
| "worried", "unwell", "struggling", "low", "down", "terrible", "awful", | |
| "nervous", "panic", "afraid", "scared", "tense", "overwhelmed", "fear", "uneasy" | |
| ] | |
| responses = { | |
| "sadness": [ | |
| "It’s okay to feel down sometimes. I’m here to support you.", | |
| "I'm really sorry you're going through this. Want to talk more about it?", | |
| "You're not alone — I’m here for you." | |
| ], | |
| "anger": [ | |
| "That must have been frustrating. Want to vent about it?", | |
| "It's okay to feel this way. I'm listening.", | |
| "Would it help to talk through it?" | |
| ], | |
| "love": [ | |
| "That’s beautiful to hear! What made you feel that way?", | |
| "It’s amazing to experience moments like that.", | |
| "Sounds like something truly meaningful." | |
| ], | |
| "happiness": [ | |
| "That's awesome! What’s bringing you joy today?", | |
| "I love hearing good news. 😊", | |
| "Yay! Want to share more about it?" | |
| ], | |
| "neutral": [ | |
| "Got it. I’m here if you want to dive deeper.", | |
| "Thanks for sharing that. Tell me more if you’d like.", | |
| "I’m listening. How else can I support you?" | |
| ] | |
| } | |
| # --- Helper functions --- | |
| def correct_spelling(text): | |
| return str(TextBlob(text).correct()) | |
| def get_sentiment(text): | |
| return TextBlob(text).sentiment.polarity | |
| def is_negative_input(text): | |
| text_lower = text.lower() | |
| return any(word in text_lower for word in negative_words) | |
| def get_unique_solution(emotion): | |
| available = solutions_df[solutions_df['emotion'] == emotion] | |
| unused = available[~available['solution'].isin(used_solutions[emotion])] | |
| if unused.empty: | |
| used_solutions[emotion] = set() | |
| unused = available | |
| solution_row = unused.sample(1).iloc[0] | |
| used_solutions[emotion].add(solution_row['solution']) | |
| return solution_row['solution'] | |
| def get_emotion(user_input): | |
| if is_negative_input(user_input): | |
| return "sadness" | |
| sentiment = get_sentiment(user_input) | |
| x = preprocess_input(user_input) | |
| model.train() | |
| with torch.no_grad(): | |
| probs = torch.stack([F.softmax(model(x), dim=1) for _ in range(5)]) | |
| avg_probs = probs.mean(dim=0) | |
| prob, idx = torch.max(avg_probs, dim=1) | |
| pred_emotion = le.classes_[idx.item()] | |
| if prob.item() < 0.6: | |
| return "neutral" | |
| if sentiment < -0.25 and pred_emotion == "happiness": | |
| return "sadness" | |
| if sentiment > 0.25 and pred_emotion == "sadness": | |
| return "happiness" | |
| return pred_emotion | |
| def audio_to_text(audio_file): | |
| if audio_file is None: | |
| return "" | |
| recog = sr.Recognizer() | |
| with sr.AudioFile(audio_file) as source: | |
| audio = recog.record(source) | |
| try: | |
| text = recog.recognize_google(audio) | |
| return text | |
| except Exception: | |
| return "" | |
| # ----- Chat function ----- | |
| GLOBAL_CONVO_HISTORY = [] | |
| USER_FEEDBACK_STATE = {} | |
| def emoti_chat(audio, text, history_json=""): | |
| if text and text.strip(): | |
| user_input = text | |
| elif audio is not None: | |
| user_input = audio_to_text(audio) | |
| else: | |
| user_input = "" | |
| if not user_input.strip(): | |
| return "Please say something or type your message.", json.dumps(GLOBAL_CONVO_HISTORY[-5:], indent=2), "" | |
| user_input = correct_spelling(user_input) | |
| exit_phrases = ["exit", "quit", "goodbye", "bye", "close"] | |
| if user_input.lower().strip() in exit_phrases: | |
| return "Take care! I’m here whenever you want to talk. 👋", json.dumps(GLOBAL_CONVO_HISTORY[-5:], indent=2), gr.update(visible=False) | |
| user_id = "default_user" | |
| state = USER_FEEDBACK_STATE.get(user_id, {"emotion": None, "pending": False}) | |
| if state["pending"]: | |
| feedback = user_input.lower().strip() | |
| GLOBAL_CONVO_HISTORY[-1]["feedback"] = feedback | |
| if feedback == "no": | |
| suggestion = get_unique_solution(state["emotion"]) | |
| reply = f"Here's another suggestion for you: {suggestion}\nDid this help? (yes/no/skip)" | |
| USER_FEEDBACK_STATE[user_id]["pending"] = True | |
| return reply, json.dumps(GLOBAL_CONVO_HISTORY[-5:], indent=2), "" | |
| else: | |
| USER_FEEDBACK_STATE[user_id] = {"emotion": None, "pending": False} | |
| return "How can I help you further?", json.dumps(GLOBAL_CONVO_HISTORY[-5:], indent=2), "" | |
| pred_emotion = get_emotion(user_input) | |
| support = random.choice(responses.get(pred_emotion, responses["neutral"])) | |
| try: | |
| suggestion = get_unique_solution(pred_emotion) | |
| except Exception: | |
| suggestion = get_unique_solution("neutral") | |
| reply = f"{support}\n\nHere's a suggestion for you: {suggestion}\nDid this help? (yes/no/skip)" | |
| GLOBAL_CONVO_HISTORY.append({ | |
| "user_input": user_input, | |
| "emotion": pred_emotion, | |
| "bot_support": support, | |
| "bot_suggestion": suggestion, | |
| "feedback": "" | |
| }) | |
| USER_FEEDBACK_STATE[user_id] = {"emotion": pred_emotion, "pending": True} | |
| return reply, json.dumps(GLOBAL_CONVO_HISTORY[-5:], indent=2), "" | |
| # ---- Gradio interface ---- | |
| iface = gr.Interface( | |
| fn=emoti_chat, | |
| inputs=[ | |
| gr.Audio(type="filepath", label="🎤 Speak your message"), | |
| gr.Textbox(lines=2, placeholder="Or type your message here...", label="💬 Type message"), | |
| gr.Textbox(lines=1, value="", visible=False) # hidden, history state | |
| ], | |
| outputs=[ | |
| gr.Textbox(label="EmotiBot Reply"), | |
| gr.Textbox(label="Hidden", visible=False) | |
| ], | |
| title="EmotiBot Connect", | |
| description="Talk to EmotiBot using your voice or by typing. Detects your emotion, gives dynamic suggestions, remembers your feedback, and keeps a conversation history! Type 'exit' to leave." | |
| ) | |
| if __name__ == "__main__": | |
| iface.launch(debug=True) | |