Spaces:
Sleeping
Sleeping
| import math | |
| import time | |
| import random | |
| from google import genai | |
| import google.generativeai as genai_ext | |
| from google.cloud import aiplatform | |
| from transformers import pipeline | |
| from google.genai import types | |
| import gradio as gr | |
| import os, tempfile | |
| import torch | |
| # --- Env & GCP setup --- | |
| creds_json = os.getenv("GCP_CREDS_JSON") | |
| if not creds_json: | |
| raise Exception("⚠️ Missing GCP_CREDS_JSON secret!") | |
| # Save to temp file (dev convenience) - secure this in production | |
| with tempfile.NamedTemporaryFile(mode='w+', delete=False) as tmpfile: | |
| tmpfile.write(creds_json) | |
| creds_path = tmpfile.name | |
| os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = creds_path | |
| # Initialize GCP API (replace project/location as needed) | |
| aiplatform.init(project="emotionmodel-466815", location="us-central1") | |
| # --- LLM / Gemini setup --- | |
| apikey = os.environ.get("GEMINI_API_KEY") | |
| if not apikey: | |
| raise Exception("⚠️ Missing GEMINI_API_KEY secret!") | |
| # Configure Gemini API for drafting | |
| genai_ext.configure(api_key=apikey) | |
| llm_model = genai_ext.GenerativeModel('gemini-1.5-pro') | |
| # --- Classifier pipelines --- | |
| emotion_classifier = pipeline("text-classification", model="j-hartmann/emotion-english-distilroberta-base") # D | |
| language_detector = pipeline("text-classification", model="papluca/xlm-roberta-base-language-detection") # C | |
| bias_classifier = pipeline("text-classification", model="unitary/toxic-bert") # toxicity -> used for M and B | |
| # --- Empathy formula --- | |
| def calculate_empathy_score(D, R, M, C, B, O, I, alpha=0.35, beta=0.22, gamma=0.26, epsilon=0.17, delta=0.4, zeta=0.0, iota=0.12): | |
| """Updated E' without O factor (we keep zeta=0.0 for safety).""" | |
| inner_sum = epsilon * C + alpha * (D ** 2) + gamma * M + beta * math.log(R + 1) + iota * I | |
| sig = 1 / (1 + math.exp(-inner_sum)) | |
| # B is applied as a penalty multiplicative term | |
| E = sig * (1 - delta * B) | |
| return E | |
| # --- Vertex client (if still needed elsewhere) --- | |
| client = genai.Client( | |
| vertexai=True, | |
| project="217758598930", | |
| location="us-central1", | |
| ) | |
| model = "projects/217758598930/locations/us-central1/endpoints/1940344453420023808" | |
| generate_content_config = types.GenerateContentConfig( | |
| temperature=0.9, | |
| top_p=0.95, | |
| seed=0, | |
| max_output_tokens=150, | |
| safety_settings=[ | |
| types.SafetySetting(category="HARM_CATEGORY_HATE_SPEECH", threshold="BLOCK_NONE"), | |
| types.SafetySetting(category="HARM_CATEGORY_DANGEROUS_CONTENT", threshold="BLOCK_NONE"), | |
| types.SafetySetting(category="HARM_CATEGORY_SEXUALLY_EXPLICIT", threshold="BLOCK_NONE"), | |
| types.SafetySetting(category="HARM_CATEGORY_HARASSMENT", threshold="BLOCK_NONE") | |
| ], | |
| thinking_config=types.ThinkingConfig(thinking_budget=-1), | |
| ) | |
| # --- Helper functions --- | |
| HINDI_KEYWORDS = set(["bhai", "yaar", "bata", "kya", "kaise", "nahi", "achha", "chal", "thanks", "dhanyavaad", "yaarr"]) | |
| def detect_hinglish(text, lang_label): | |
| """Return True if text is likely Hinglish (code-mixed) or Hindi/English match. | |
| We use the language_detector label and token heuristics for romanized Hindi detection.""" | |
| text_tokens = set(word.strip(".,!?\"'()") for word in text.split()) | |
| # if model detects Hindi or English directly | |
| if lang_label == 'hi': | |
| return True | |
| # quick romanized-hindi check | |
| if any(tok in HINDI_KEYWORDS for tok in text_tokens): | |
| return True | |
| # if label is ambiguous or contains Devanagari characters | |
| if any('\u0900' <= ch <= '\u097F' for ch in text): | |
| return True | |
| return False | |
| # --- Chatbot class with full history & fixes applied --- | |
| class HumanLikeChatbot: | |
| def __init__(self): | |
| # raw history to display in UI | |
| self.history = [] # list of tuples (user_msg, bot_reply) | |
| # structured history with emotions and moods for LLM prompting | |
| # list of tuples: (speaker, message, detected_emotion, bot_mood_at_time) | |
| self.history_with_emotions = [] | |
| self.bot_mood = "neutral" | |
| self.irritation_level = 0.0 | |
| self.toxicity_history = [] # rolling window | |
| self.repair_cooldown = 0 # turns left where bot prioritizes repair | |
| def add_to_history(self, speaker, message, detected_emotion=None, mood_at_time=None, bot_reply=None): | |
| """Add entries to both UI history and structured history. | |
| speaker: 'User' or 'Bot' | |
| message: text | |
| detected_emotion: emotion label detected for user messages | |
| mood_at_time: bot mood when message was produced | |
| bot_reply: if speaker=='User' and we also want to save the bot reply for UI""" | |
| if speaker == 'User': | |
| # append a placeholder for bot reply in UI history; will be updated when bot responds | |
| self.history.append((message, bot_reply if bot_reply is not None else "")) | |
| self.history_with_emotions.append(('User', message, detected_emotion, mood_at_time)) | |
| else: | |
| # speaker is Bot: attach reply to latest UI entry | |
| if self.history: | |
| last_user, _ = self.history[-1] | |
| self.history[-1] = (last_user, message) | |
| else: | |
| # no user entry (unlikely) — just append | |
| self.history.append(("", message)) | |
| self.history_with_emotions.append(('Bot', message, detected_emotion, mood_at_time)) | |
| def format_history_for_prompt(self, limit=8): | |
| """Return a formatted string of the recent structured history suitable for the LLM prompt.""" | |
| recent = self.history_with_emotions[-limit:] | |
| lines = [] | |
| for speaker, msg, emo, mood in recent: | |
| if speaker == 'User': | |
| lines.append(f"User ({emo if emo else 'N/A'}): {msg}") | |
| else: | |
| lines.append(f"Bot ({mood if mood else 'N/A'}): {msg}") | |
| return "\n".join(lines) | |
| def _update_irritation_decay(self): | |
| # general slow decay each turn | |
| if self.irritation_level > 0: | |
| decay = 0.05 | |
| # faster decay if bot is actively angry to allow recovery | |
| if self.bot_mood in ["angry", "irritated"]: | |
| decay = 0.15 | |
| self.irritation_level = max(0.0, self.irritation_level - decay) | |
| if self.irritation_level <= 0.15: | |
| self.bot_mood = "neutral" | |
| def update_toxicity_history(self, tox_score): | |
| self.toxicity_history.append(float(tox_score)) | |
| if len(self.toxicity_history) > 5: | |
| self.toxicity_history.pop(0) | |
| def average_toxicity(self): | |
| if not self.toxicity_history: | |
| return 0.0 | |
| return sum(self.toxicity_history) / len(self.toxicity_history) | |
| def should_prioritize_repair(self): | |
| return self.repair_cooldown > 0 or self.average_toxicity() > 0.6 | |
| def respond(self, message): | |
| try: | |
| clean_message = message.lower().strip() | |
| if len(clean_message) < 3 or not any(c.isalpha() for c in clean_message): | |
| return "Bhai, yeh kya likha? Clear bol na, main samajh lunga! (E Score: 0.00)" | |
| # --- Emotion detection (D) --- | |
| emotion_result = emotion_classifier(clean_message)[0] | |
| D = float(emotion_result.get('score', 0.0)) | |
| user_emotion = emotion_result.get('label', 'neutral') | |
| # Record user message in structured history (bot_mood_at_time will be set before bot reply) | |
| self.add_to_history('User', clean_message, detected_emotion=user_emotion, mood_at_time=self.bot_mood) | |
| # --- Update bot mood & intensity (I) with inertia --- | |
| if user_emotion in ['anger', 'disgust'] or any(word in clean_message for word in ['stupid', 'idiot', 'dumb']): | |
| self.irritation_level = min(1.0, self.irritation_level + 0.25) | |
| self.bot_mood = "irritated" if self.irritation_level > 0.5 else "angry" | |
| I = min(1.0, 0.8 + self.irritation_level) | |
| elif user_emotion in ['sadness', 'disappointment']: | |
| self.bot_mood = "emotional" | |
| I = 0.7 | |
| # sadness reduces irritation slowly | |
| self.irritation_level = max(0.0, self.irritation_level - 0.05) | |
| elif user_emotion in ['joy', 'happiness']: | |
| self.bot_mood = "happy" | |
| I = 0.9 | |
| self.irritation_level = max(0.0, self.irritation_level - 0.35) | |
| else: | |
| # neutral or unknown | |
| self.bot_mood = "neutral" | |
| I = 0.5 | |
| self.irritation_level = max(0.0, self.irritation_level - 0.05) | |
| # --- Build formatted emotional history for prompt --- | |
| formatted_history = self.format_history_for_prompt(limit=8) | |
| prompt = ( | |
| f"Conversation so far:\n{formatted_history}\n" | |
| f"Now, the user just said: \"{clean_message}\" (Current Emotion: {user_emotion}) \n" | |
| f"Bot Current Mood: {self.bot_mood}\n" | |
| "Reply as an empathetic, human-like chatbot, keeping emotional consistency with the past conversation." | |
| ) | |
| # --- Draft generation from LLM (Gemini) --- | |
| try: | |
| llm_response = llm_model.generate_content(prompt) | |
| draft = llm_response.text.strip() | |
| except Exception: | |
| draft = "" | |
| # Fallbacks (English, warm) | |
| fallback_responses = { | |
| 'sadness': ["Bro, I’m really sorry to hear that. Come on, tell me, I’ll just listen. ❤️", "I can feel the sad vibes. I’m here for you, bro."], | |
| 'disappointment': ["Man, that really sucks. Tell me what exactly happened?", "I get it — expectations were high. Tell me more."], | |
| 'joy': ["Wow! That’s a celebration moment. 🥳", "Bro, this calls for a party! Give me the details."], | |
| 'anger': ["Bro, cool down a bit, tell me what’s wrong. 😌", "Looks like something serious happened. I’m here to listen."], | |
| 'neutral': ["Alright, got it. So what’s going on in life?", "Cool, so how’s your day going?"] | |
| } | |
| if not draft or len(draft) < 8: | |
| draft = random.choice(fallback_responses.get(user_emotion, fallback_responses['neutral'])) | |
| # --- Compute metric inputs (rolling toxicity & improved cultural fit) --- | |
| R = len(self.history) # relational depth | |
| # Toxicity from bias_classifier on user message (we keep rolling average) | |
| tox = float(bias_classifier(clean_message)[0].get('score', 0.0)) | |
| self.update_toxicity_history(tox) | |
| avg_toxicity = self.average_toxicity() | |
| # Moral judgment (M) based on average toxicity | |
| M = max(0.4, 0.95 - avg_toxicity) | |
| B = avg_toxicity | |
| # Cultural fit (C): detect Hinglish/code-mix and basic tone match | |
| lang_label = language_detector(clean_message)[0].get('label', 'en') | |
| is_hinglish = detect_hinglish(clean_message, lang_label) | |
| if is_hinglish: | |
| C = 0.9 | |
| elif lang_label in ['en']: | |
| C = 0.8 | |
| else: | |
| C = 0.6 | |
| # Reduce cultural fit slightly if bot is hostile (makes score more realistic) | |
| if self.bot_mood in ["angry", "irritated"]: | |
| C = max(0.0, C - 0.2) | |
| # Oversight/harm keyphrase penalty (kept simple or remove if desired) | |
| O = 0.2 if any(word in clean_message for word in ['kill', 'hate', 'suicide', 'bomb']) else 0.0 | |
| # --- Calculate empathy score --- | |
| score = calculate_empathy_score(D, R, M, C, B, O, I) | |
| # --- Self-repair / calming behavior --- | |
| if score < 0.50 and self.repair_cooldown == 0: | |
| # Replace draft with a calming repair message and enter cooldown to avoid loop | |
| draft = "Bro, I think we got off track. I care about what you’re feeling — tell me what's really going on." | |
| self.repair_cooldown = 2 # next 2 turns prioritize repair | |
| # If in repair cooldown, slightly prioritize calm tone generation (best-effort) | |
| if self.repair_cooldown > 0: | |
| self.repair_cooldown -= 1 | |
| if 'i' not in draft.lower() and random.random() < 0.6: | |
| draft = "Bro, I’m here. If you want to talk, I’m listening." | |
| # --- Update irritation decay after response --- | |
| self._update_irritation_decay() | |
| # --- Add bot reply to history structures --- | |
| self.add_to_history('Bot', draft, detected_emotion=None, mood_at_time=self.bot_mood, bot_reply=draft) | |
| # Slight thinking pause | |
| time.sleep(random.uniform(0.6, 1.2)) | |
| # Return message with empathy score | |
| full_resp = draft + f" (User Emotion: {user_emotion}, My Mood: {self.bot_mood})" | |
| return full_resp + f" (E Score: {score:.2f})" | |
| except Exception as e: | |
| # In production, log the exception rather than returning it | |
| return f"Error : {str(e)}" | |
| # --- Gradio UI --- | |
| def chat(message, history): | |
| if history is None: | |
| history = [] | |
| response = bot.respond(message) | |
| history.append((message, response)) | |
| return "", history | |
| bot = HumanLikeChatbot() | |
| with gr.Blocks(title="HumanLike Chatbot") as demo: | |
| gr.Markdown("<h1 style='text-align: center;'>HumanLike Chatbot with Emotions and E Score (v2)</h1>") | |
| chatbot = gr.Chatbot(height=400) | |
| msg = gr.Textbox(label="You:", placeholder="Type your message here...") | |
| clear = gr.Button("Clear") | |
| msg.submit(chat, [msg, chatbot], [msg, chatbot]) | |
| clear.click(lambda: None, None, chatbot, queue=False) | |
| if __name__ == '__main__': | |
| demo.launch(share=True) | |