Emoti-Chatbot / app.py
PR3DAT0R3's picture
Upload 3 files
8dd094e verified
import torch
import torch.nn as nn
import torch.nn.functional as F
import random
from textblob import TextBlob
import pandas as pd
import requests
from io import StringIO
import gradio as gr
import speech_recognition as sr
import json
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from collections import Counter
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.feature_extraction.text import CountVectorizer
import numpy as np
import re
from torch.utils.data import Dataset, DataLoader
# --- Data Cleaning and Preprocessing ---
def clean_text(text):
if pd.isnull(text):
return ""
text = text.lower()
text = re.sub(r"http\S+|www\S+|https\S+", '', text) # Remove URLs
text = re.sub(r'\@\w+|\#','', text) # Remove @ and #
text = re.sub(r'[^a-z\s]', '', text) # Remove non-alphabetic characters
text = re.sub(r'\s+', ' ', text).strip() # Normalize spaces
return text
# --- Load datasets ---
df = pd.read_csv(
"https://drive.google.com/uc?export=download&id=14D_HcvTFL63-KffCQLNFxGH-oY_knwmo",
delimiter=';', header=None, names=['sentence', 'label']
)
ts_df = pd.read_csv(
"https://drive.google.com/uc?export=download&id=1Vmr1Rfv4pLSlAUrlOCxAcszvlxJOSHrm",
delimiter=';', header=None, names=['sentence', 'label']
)
df = pd.concat([df, ts_df], ignore_index=True)
df.drop_duplicates(inplace=True)
df['clean_sentence'] = df['sentence'].apply(clean_text)
# --- Build Vocabulary ---
tokenized = df['clean_sentence'].apply(str.split)
vocab = Counter([token for sentence in tokenized for token in sentence])
vocab = {word: i+2 for i, (word, _) in enumerate(vocab.most_common())}
vocab['<PAD>'] = 0
vocab['<UNK>'] = 1
def encode(text):
return [vocab.get(word, vocab['<UNK>']) for word in text]
encoded_texts = tokenized.apply(encode)
# --- Pad Sequences ---
MAX_LEN = 32
def pad_sequence(seq):
return seq[:MAX_LEN] + [vocab['<PAD>']] * max(0, MAX_LEN - len(seq))
padded = encoded_texts.apply(pad_sequence).tolist()
# --- Encode Labels ---
le = LabelEncoder()
labels = le.fit_transform(df['label'])
# --- Dataset + DataLoader ---
class EmotionDataset(Dataset):
def __init__(self, X, y):
self.X = torch.tensor(X, dtype=torch.long)
self.y = torch.tensor(y, dtype=torch.long)
def __len__(self):
return len(self.X)
def __getitem__(self, idx):
return self.X[idx], self.y[idx]
X_train, X_val, y_train, y_val = train_test_split(padded, labels, test_size=0.2, stratify=labels, random_state=42)
train_loader = DataLoader(EmotionDataset(X_train, y_train), batch_size=16, shuffle=True)
val_loader = DataLoader(EmotionDataset(X_val, y_val), batch_size=16)
# --- Positional Encoding ---
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=MAX_LEN):
super().__init__()
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2) * (-np.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
self.pe = pe.unsqueeze(0)
def forward(self, x):
return x + self.pe[:, :x.size(1)].to(x.device)
# --- Transformer Model with Masking + Dropout for Bayesian Inference ---
class EmotionTransformer(nn.Module):
def __init__(self, vocab_size, embed_dim, num_heads, num_classes):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=vocab['<PAD>'])
self.pos_encoder = PositionalEncoding(embed_dim)
encoder_layer = nn.TransformerEncoderLayer(d_model=embed_dim, nhead=num_heads, batch_first=True)
self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=2)
self.dropout = nn.Dropout(0.3)
self.fc = nn.Linear(embed_dim, num_classes)
def forward(self, x):
mask = (x == vocab['<PAD>'])
x = self.embedding(x)
x = self.pos_encoder(x)
x = self.transformer(x, src_key_padding_mask=mask)
x = self.dropout(x.mean(dim=1)) # mean pooling
return self.fc(x)
# --- Train the Model ---
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = EmotionTransformer(len(vocab), embed_dim=64, num_heads=4, num_classes=len(le.classes_)).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()
for epoch in range(5):
model.train()
total_loss = 0
for X_batch, y_batch in train_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
optimizer.zero_grad()
logits = model(X_batch)
loss = criterion(logits, y_batch)
loss.backward()
optimizer.step()
total_loss += loss.item()
# Validation
model.eval()
correct = total = 0
with torch.no_grad():
for X_batch, y_batch in val_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
outputs = model(X_batch)
preds = torch.argmax(outputs, dim=1)
correct += (preds == y_batch).sum().item()
total += y_batch.size(0)
print(f"Epoch {epoch+1} | Train Loss: {total_loss:.4f} | Val Accuracy: {correct / total:.4f}")
# Save model
torch.save(model.state_dict(), "emotion_transformer_model.pth")
# --- Load Solutions CSV ---
file_id = "1yVJh_NVL4Y4YqEXGym47UCK5ZNZgVZYv"
url = f"https://drive.google.com/uc?export=download&id={file_id}"
response = requests.get(url)
csv_text = response.text
if csv_text.strip().startswith('<'):
raise Exception("ERROR: Google Drive link is not returning CSV! Check your sharing settings.")
solutions_df = pd.read_csv(StringIO(csv_text), header=0, on_bad_lines='skip')
used_solutions = {emotion: set() for emotion in solutions_df['emotion'].unique()}
negative_words = [
"not", "bad", "sad", "anxious", "anxiety", "depressed", "upset", "shit", "stress",
"worried", "unwell", "struggling", "low", "down", "terrible", "awful",
"nervous", "panic", "afraid", "scared", "tense", "overwhelmed", "fear", "uneasy"
]
responses = {
"sadness": [
"It’s okay to feel down sometimes. I’m here to support you.",
"I'm really sorry you're going through this. Want to talk more about it?",
"You're not alone — I’m here for you."
],
"anger": [
"That must have been frustrating. Want to vent about it?",
"It's okay to feel this way. I'm listening.",
"Would it help to talk through it?"
],
"love": [
"That’s beautiful to hear! What made you feel that way?",
"It’s amazing to experience moments like that.",
"Sounds like something truly meaningful."
],
"happiness": [
"That's awesome! What’s bringing you joy today?",
"I love hearing good news. 😊",
"Yay! Want to share more about it?"
],
"neutral": [
"Got it. I’m here if you want to dive deeper.",
"Thanks for sharing that. Tell me more if you’d like.",
"I’m listening. How else can I support you?"
]
}
relaxation_resources = {
"exercise": "Try this 5-4-3-2-1 grounding method:\n- 5 things you see\n- 4 you can touch\n- 3 you hear\n- 2 you smell\n- 1 you taste",
"video": "Here’s a short calming video that might help: https://youtu.be/O-6f5wQXSu8"
}
help_keywords = ["suggest", "help", "calm", "exercise", "relax", "how can i", "any tips", "can u", "can you"]
thank_you_inputs = ["thank", "thanks", "thank you"]
bye_inputs = ["bye", "goodbye", "see you", "take care", "ok bye", "exit", "quit"]
def correct_spelling(text):
return str(TextBlob(text).correct())
def get_sentiment(text):
blob = TextBlob(text)
return blob.sentiment.polarity
def is_negative_input(text):
text_lower = text.lower()
return any(word in text_lower for word in negative_words)
def get_unique_solution(emotion):
available = solutions_df[solutions_df['emotion'] == emotion]
unused = available[~available['solution'].isin(used_solutions[emotion])]
if unused.empty:
used_solutions[emotion] = set()
unused = available
solution_row = unused.sample(1).iloc[0]
used_solutions[emotion].add(solution_row['solution'])
return solution_row['solution']
def preprocess_input(text):
tokens = text.lower().split()
encoded = [vocab.get(token, vocab['<UNK>']) for token in tokens]
padded = encoded[:MAX_LEN] + [vocab['<PAD>']] * max(0, MAX_LEN - len(encoded))
return torch.tensor([padded], dtype=torch.long).to(next(model.parameters()).device)
def get_emotion(user_input):
if is_negative_input(user_input):
return "sadness"
sentiment = get_sentiment(user_input)
x = preprocess_input(user_input)
model.train()
with torch.no_grad():
probs = torch.stack([F.softmax(model(x), dim=1) for _ in range(5)])
avg_probs = probs.mean(dim=0)
prob, idx = torch.max(avg_probs, dim=1)
pred_emotion = le.classes_[idx.item()]
if prob.item() < 0.6:
return "neutral"
if sentiment < -0.25 and pred_emotion == "happiness":
return "sadness"
if sentiment > 0.25 and pred_emotion == "sadness":
return "happiness"
return pred_emotion
def audio_to_text(audio_file):
if audio_file is None:
return ""
recog = sr.Recognizer()
with sr.AudioFile(audio_file) as source:
audio = recog.record(source)
try:
text = recog.recognize_google(audio)
return text
except Exception:
return ""
# LLM API function
def call_llm_api(user_text):
api_url = "https://api-inference.huggingface.co/models/distilbert-base-uncased"
headers = {
"Authorization": f"Bearer YOUR KEY"
}
payload = {"inputs": user_text}
try:
resp = requests.post(api_url, headers=headers, json=payload, timeout=15)
output = resp.json()
if isinstance(output, dict) and 'error' in output:
return "API error: " + str(output['error'])
return str(output)
except Exception as e:
return f"API call failed: {e}"
GLOBAL_CONVO_HISTORY = []
USER_FEEDBACK_STATE = {}
def emoti_chat(audio, text, history_json=""):
# --- Get user input from voice or text ---
if text and text.strip():
user_input = text
elif audio is not None:
user_input = audio_to_text(audio)
else:
user_input = ""
if not user_input.strip():
return "Please say something or type your message.", json.dumps(GLOBAL_CONVO_HISTORY[-5:], indent=2), ""
user_input = correct_spelling(user_input)
# --- Exit logic ---
exit_phrases = ["exit", "quit", "goodbye", "bye", "close"]
if user_input.lower().strip() in exit_phrases:
return "Take care! I’m here whenever you want to talk. 👋", json.dumps(GLOBAL_CONVO_HISTORY[-5:], indent=2), gr.update(visible=False)
# --- HuggingFace LLM API call for "fun fact" or "more about" ---
if "fun fact" in user_input.lower() or "more about" in user_input.lower() or "api" in user_input.lower():
api_reply = call_llm_api("Tell me a fun fact about AI.")
return f"(LLM API response)\n{api_reply}", json.dumps(GLOBAL_CONVO_HISTORY[-5:], indent=2), ""
# Feedback logic
user_id = "default_user"
state = USER_FEEDBACK_STATE.get(user_id, {"emotion": None, "pending": False})
if state["pending"]:
feedback = user_input.lower().strip()
GLOBAL_CONVO_HISTORY[-1]["feedback"] = feedback
if feedback == "no":
suggestion = get_unique_solution(state["emotion"])
reply = f"Here's another suggestion for you: {suggestion}\nDid this help? (yes/no/skip)"
USER_FEEDBACK_STATE[user_id]["pending"] = True
return reply, json.dumps(GLOBAL_CONVO_HISTORY[-5:], indent=2), ""
else:
USER_FEEDBACK_STATE[user_id] = {"emotion": None, "pending": False}
return "How can I help you further?", json.dumps(GLOBAL_CONVO_HISTORY[-5:], indent=2), ""
# Normal user message: get emotion, give suggestion
pred_emotion = get_emotion(user_input)
support = random.choice(responses.get(pred_emotion, responses["neutral"]))
try:
suggestion = get_unique_solution(pred_emotion)
except Exception:
suggestion = get_unique_solution("neutral")
reply = f"{support}\n\nHere's a suggestion for you: {suggestion}\nDid this help? (yes/no/skip)"
GLOBAL_CONVO_HISTORY.append({
"user_input": user_input,
"emotion": pred_emotion,
"bot_support": support,
"bot_suggestion": suggestion,
"feedback": ""
})
USER_FEEDBACK_STATE[user_id] = {"emotion": pred_emotion, "pending": True}
return reply, json.dumps(GLOBAL_CONVO_HISTORY[-5:], indent=2), ""
# ---- Gradio Web Interface ----
iface = gr.Interface(
fn=emoti_chat,
inputs=[
gr.Audio(type="filepath", label="🎤 Speak your message"),
gr.Textbox(lines=2, placeholder="Or type your message here...", label="💬 Type message"),
gr.Textbox(lines=1, value="", visible=False) # Hidden, passes history state
],
outputs=[
gr.Textbox(label="EmotiBot Reply"),
gr.Textbox(label="Hidden", visible=False)
],
title="EmotiBot Connect",
description="Talk to EmotiBot using your voice or by typing. Detects your emotion, gives dynamic suggestions, remembers your feedback, and keeps a conversation history! Type 'fun fact' or 'api' for an AI-generated fact."
)
iface.launch()