Spaces:
Running
Running
| """ | |
| 🧠 Agent 2 (Interpretation) - Gemini 2.0 EDITION | |
| ------------------------------------------------- | |
| This version of the AgentBrain handles all deep sociolinguistic logic, | |
| prompt engineering, and dataset searching. | |
| """ | |
| import os | |
| import glob | |
| import pandas as pd | |
| import json | |
| import re | |
| import concurrent.futures | |
| from rapidfuzz import process, fuzz | |
| from src.rag_manager import SociolinguisticRAG | |
| class AgentInterpretation: | |
| def __init__(self, config, gemini_manager_instance=None): | |
| self.config = config | |
| self.gemini_manager = gemini_manager_instance | |
| self.gemini_manager_instance = gemini_manager_instance # Kept for your backward compatibility | |
| # 🟢 1. Initialize RAG Engines (The Token Saver) | |
| self.profiles_dir = os.path.join(self.config.BASE_DIR, "lab_profiles") | |
| self.rag_engines = {} | |
| self._initialize_rag_databases() | |
| # 🟢 2. Restore your Concurrency and Background Tasks | |
| import concurrent.futures # Ensure this is imported at the top of the file | |
| self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=10) | |
| print("🧠 Agent 2 (Interpretation) Online: Persistent Pool & RAG Engines Ready.") | |
| # 🟢 3. Restore your background knowledge refresh (if it relies on updating CSVs) | |
| self.refresh_knowledge_base() | |
| def _safe_generate(self, prompt): | |
| """ | |
| Safely runs the async Groq generation inside a synchronous thread | |
| by creating an isolated event loop. This prevents Gradio UI freezing. | |
| """ | |
| import asyncio | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| # We await the async generate_fast method inside this isolated loop | |
| return loop.run_until_complete(self.gemini_manager.generate_fast(prompt)) | |
| finally: | |
| loop.close() | |
| def _initialize_rag_databases(self): | |
| """Loads all heavy JSON personas into lightweight RAM vectors on startup.""" | |
| if not os.path.exists(self.profiles_dir): | |
| return | |
| for filename in os.listdir(self.profiles_dir): | |
| if filename.endswith("Persona.json"): | |
| dialect_name = filename.replace(" Persona.json", "") | |
| filepath = os.path.join(self.profiles_dir, filename) | |
| try: | |
| with open(filepath, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| # 🟢 Flatten the heavy JSON into a list of simple string rules | |
| rules = [] | |
| if "lexicon" in data: | |
| for word, meaning in data["lexicon"].items(): | |
| rules.append(f"Lexicon: '{word}' means {meaning}.") | |
| if "pragmatics" in data: | |
| for rule in data["pragmatics"]: | |
| rules.append(f"Pragmatics: {rule}") | |
| if "syntax" in data: | |
| for rule in data["syntax"]: | |
| rules.append(f"Syntax: {rule}") | |
| # Initialize RAG for this specific dialect | |
| rag = SociolinguisticRAG() | |
| rag.load_persona_rules(dialect_name, rules) | |
| self.rag_engines[dialect_name] = rag | |
| except Exception as e: | |
| print(f"⚠️ Failed to load RAG for {filename}: {e}") | |
| def get_available_profiles(self): | |
| files = glob.glob(os.path.join(self.config.PROFILES_DIR, "*.json")) | |
| return [os.path.basename(f) for f in files] | |
| def load_profile_by_name(self, filename): | |
| path = os.path.join(self.config.PROFILES_DIR, filename) | |
| try: | |
| with open(path, 'r', encoding='utf-8') as f: | |
| self.lab_profile = json.load(f) | |
| return self.lab_profile | |
| except: return {} | |
| def save_specific_profile(self, filename, json_str): | |
| if not filename.endswith(".json"): filename += ".json" | |
| path = os.path.join(self.config.PROFILES_DIR, filename) | |
| try: | |
| with open(path, "w", encoding="utf-8") as f: json.dump(json.loads(json_str), f, indent=2) | |
| return "✅ Saved locally (HF Sync pending)" | |
| except Exception as e: return f"❌ Error: {e}" | |
| def get_current_profile_text(self): | |
| return json.dumps(self.lab_profile, indent=2) | |
| def load_all_profiles_simultaneously(self): | |
| files = glob.glob(os.path.join(self.config.PROFILES_DIR, "*.json")) | |
| loaded_count = 0 | |
| names = [] | |
| self.active_profiles_list = [] | |
| for f_path in files: | |
| try: | |
| with open(f_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| self.active_profiles_list.append(data) | |
| names.append(data.get("lab_name", os.path.basename(f_path))) | |
| loaded_count += 1 | |
| except: pass | |
| return f"✅ Loaded {loaded_count} profiles simultaneously: {', '.join(names)}" | |
| def refresh_knowledge_base(self): | |
| all_files = glob.glob(os.path.join(self.config.DATASET_DIR, "*.csv")) | |
| df_list = [] | |
| for filename in all_files: | |
| try: | |
| temp_df = pd.read_csv(filename, encoding='utf-8-sig', on_bad_lines='skip', low_memory=True) | |
| temp_df = temp_df[['Utterance', 'Clarification', 'Dialect']] | |
| df_list.append(temp_df) | |
| except: pass | |
| if df_list: | |
| self.df = pd.concat(df_list, ignore_index=True) | |
| self.lookup_list = self.df["Utterance"].tolist() | |
| else: | |
| self.lookup_list = [] | |
| def normalize_keys(self, data_list): | |
| cleaned_list = [] | |
| for item in data_list: | |
| new_item = { | |
| "Dialect": item.get("Dialect") or item.get("dialect", "Unknown"), | |
| "Clarification": item.get("Clarification") or item.get("clarification", "---"), | |
| "Tone": item.get("Tone") or item.get("tone", "---"), | |
| "Context": item.get("Context") or item.get("context", "---"), | |
| "Pragmatic Analysis": item.get("Pragmatic Analysis") or item.get("pragmatics", "---"), | |
| "Source": item.get("Source", "✨ AI Generated") | |
| } | |
| cleaned_list.append(new_item) | |
| return cleaned_list | |
| # ========================================== | |
| # MODULAR LOGIC (MOVED FROM UX_AGENT) | |
| # ========================================== | |
| def generate_conversation_starter(self, topic_str): | |
| """Generates the mission prompt for the React Games.""" | |
| if not self.gemini_manager: | |
| if "You are Echo" in topic_str: return json.dumps({"text": "Haha, I hear you! What else is going on?"}) | |
| return json.dumps({"text": f"Let's talk about {topic_str}. What comes to mind?"}) | |
| try: | |
| if "You are Echo" in topic_str: | |
| prompt = f""" | |
| {topic_str} | |
| CRITICAL ROLEPLAY INSTRUCTIONS FOR THIS TURN: | |
| 1. You are Echo, a human peer and friend. | |
| 2. NEVER break character. NEVER say you are an AI, a computer program, or a language model. | |
| 3. Play along naturally. If they ask for favors or money (e.g., begging/joking), play along like a real person. | |
| 4. Keep your response to ONE short, highly conversational sentence. Do not be overly formal. | |
| 5. On login, you will switch the user's English dialect. | |
| """ | |
| else: | |
| prompt = f"Generate a very short, engaging, single-sentence conversation starter or question about: {topic_str}." | |
| response = self._safe_generate(prompt) | |
| #response = self.gemini_manager.client.models.generate_content( | |
| #model='gemini-2.0-flash', | |
| #contents=prompt | |
| #) | |
| clean_text = response.text.replace("```json", "").replace("```", "").strip() | |
| return json.dumps({"text": clean_text}) | |
| except Exception as e: | |
| print(f"⚠️ Mission Gen Error: {e}") | |
| if "You are Echo" in topic_str: return json.dumps({"text": "Haha, I hear you! What else is going on?"}) | |
| return json.dumps({"text": f"Let's talk about {topic_str}. What comes to mind?"}) | |
| def search_local_dataset(self, text): | |
| """Tier 1: Returns a single local match only if it is nearly identical.""" | |
| try: | |
| if self.df.empty: return None | |
| utterances = self.df["Utterance"].astype(str).tolist() | |
| match = process.extractOne(text, utterances, scorer=fuzz.WRatio) | |
| if match and match[1] >= 90: | |
| row = self.df[self.df["Utterance"] == match[0]].iloc[0] | |
| def clean_val(val, default=""): | |
| return default if pd.isna(val) else str(val) | |
| return { | |
| "Source": "🗄️ Local Dataset", | |
| "Speaker": "User", | |
| "dialect": clean_val(row.get("Dialect"), "Unknown"), | |
| "clarification": clean_val(row.get("Clarification")), | |
| "tone": clean_val(row.get("Tone_Category"), "Neutral / Conversational"), | |
| "context": clean_val(row.get("Linguistic_Context")), | |
| "pragmatics": clean_val(row.get("Pragmatic_Analysis"), "Verified via local Regex") | |
| } | |
| except Exception as e: | |
| print(f"Dataset Search Error: {e}") | |
| return None | |
| def search_personas(self, text): | |
| """Tier 2: Checks the currently loaded JSON persona for jargon.""" | |
| try: | |
| if not self.lab_profile: return None | |
| profile = self.lab_profile | |
| text_lower = text.lower() | |
| if "jargons" in profile: | |
| for slang, meaning in profile["jargons"].items(): | |
| if re.search(r'\b' + re.escape(slang.lower()) + r'\b', text_lower): | |
| rule = profile.get('pragmatic_rules', [''])[0] if profile.get('pragmatic_rules') else "" | |
| return { | |
| "Source": "🎭 Persona Injection", | |
| "Speaker": "User", | |
| "dialect": profile.get("dialect_name", "Unknown Persona"), | |
| "clarification": f"Contains slang '{slang}' -> {meaning}", | |
| "tone": "Casual / Slang", | |
| "context": profile.get("cultural_context", "Inferred from active Persona"), | |
| "pragmatics": f"Rule triggered: {rule}" | |
| } | |
| except Exception as e: | |
| print(f"Persona Search Error: {e}") | |
| return None | |
| # ========================================== | |
| # DEEP GENERATIVE AI LOGIC | |
| # ========================================== | |
| def analyze_dialect_single(self, text, language_code): | |
| """Generates a high-confidence, culturally nuanced interpretation from AI.""" | |
| print(f" -> 🧠 Preparing Sociolinguistic Prompt for: {language_code}") | |
| # 🟢 1. RAG RETRIEVAL: Get the specific rules for this dialect | |
| relevant_rules = "General dialect rules apply." | |
| if language_code in self.rag_engines: | |
| relevant_rules = self.rag_engines[language_code].retrieve_context(text, k=3) | |
| print(f" -> 🎯 RAG Context Retrieved:\n{relevant_rules}") | |
| # 🟢 2. OPTIMIZED PROMPT: Combine RAG context with your strict academic instructions | |
| prompt = f"""Analyze the following utterance: '{text}' | |
| Language/Dialect Context: {language_code} | |
| CRITICAL CONTEXT (Apply ONLY these retrieved sociolinguistic rules to your analysis): | |
| {relevant_rules} | |
| CRITICAL SOCIOLINGUISTIC INSTRUCTIONS: | |
| 1. DIRECT MEANING: Provide the interpretation directly. Do not start with "This means..." or "The user is saying...". | |
| 2. PRESERVE CULTURAL NUANCE: Capture the exact social intent (e.g., humor, banter, sarcasm, respect, solidarity). Do not sterilize the translation into robotic, literal Standard English. Preserve the 'flavor' of the interaction. | |
| 3. LINGUISTIC EQUALITY: Treat this as a valid, deeply rule-governed language system. | |
| 4. FORBIDDEN WORDS: You must NEVER use "incorrect", "broken", "grammar error", "non-standard", or "learner". | |
| Return ONLY a valid JSON object with exactly these keys. Do not use markdown formatting blocks: | |
| {{ | |
| "dialect": "{language_code}", | |
| "clarification": "[Direct Meaning, capturing the original intent]", | |
| "tone": "[e.g., Playful Banter, Respectful, Sarcastic, Urgent, Casual]", | |
| "context": "[The typical cultural or situational setting for this phrase]", | |
| "pragmatics": "[The underlying social function, e.g., 'Establishing solidarity', 'Softening a request', 'Playful teasing']" | |
| }}""" | |
| # 🟢 3. API CALL & ERROR HANDLING (Kept exactly as it was) | |
| try: | |
| print(" -> 🧠 Awaiting API Response...") | |
| response = self._safe_generate(prompt) | |
| print(" -> 🧠 Parsing JSON Response...") | |
| clean_text = response.text.replace("```json", "").replace("```", "").strip() | |
| import re | |
| match = re.search(r'\{.*\}', clean_text, re.DOTALL) | |
| if match: | |
| import json | |
| res = json.loads(match.group(0)) | |
| res["Source"] = "🧠 Groq AI Engine (Cultural Context)" | |
| return res | |
| except Exception as e: | |
| import traceback | |
| print("\n🚨 CRITICAL GEMINI ERROR 🚨") | |
| traceback.print_exc() | |
| return {"Source": "🧠 AI Engine Error", "dialect": language_code, "clarification": f"AI Gen Failed: {e}", "tone": "Neutral", "context": "", "pragmatics": ""} | |
| print(" -> ❌ AI returned invalid format.") | |
| return {"Source": "🧠 AI Engine Error", "dialect": language_code, "clarification": "AI format failed", "tone": "Neutral", "context": "", "pragmatics": ""} | |
| def generate_unknown_analysis(self, text): | |
| if not self.gemini_manager: return [] | |
| all_jargon_keys = [] | |
| for p in self.active_profiles_list: | |
| all_jargon_keys.extend(list(p.get("jargon", {}).keys())) | |
| prompt = f""" | |
| Analyze utterance: "{text}" | |
| Context/Jargon Keys: {list(set(all_jargon_keys))[:50]} | |
| Task: Provide 3 distinct interpretations (Casual, Formal, or Cultural). | |
| CRITICAL INSTRUCTION: Treat the input as valid, meaningful Dialectal English. Do NOT label it as "incorrect". | |
| Output Strictly JSON: [ {{ "Dialect": "General", "Clarification": "...", "Tone": "...", "Context": "...", "Pragmatics": "..." }} ] | |
| """ | |
| try: | |
| response = self._safe_generate(prompt) | |
| clean_text = re.sub(r"```json|```", "", response.text).strip() | |
| data = json.loads(clean_text) | |
| return self.normalize_keys(data) | |
| except: | |
| return [{"Dialect": "Unknown", "Clarification": "Analysis Failed", "Tone": "---", "Context": "---", "Pragmatic Analysis": "Error"}] | |
| def adapt_with_ai(self, full_text, db_row): | |
| if not self.gemini_manager: return db_row["Clarification"], db_row["Pragmatic_Analysis"] | |
| prompt = f""" | |
| Ref Term: "{db_row['Utterance']}" = "{db_row['Clarification']}" | |
| User said: "{full_text}" | |
| Task: Adapt meaning to full sentence. Treat as valid dialect. | |
| Output JSON: {{ "clarification": "...", "pragmatics": "..." }} | |
| """ | |
| try: | |
| response = self._safe_generate(prompt) | |
| clean_json = re.search(r"\{.*\}", response.text, re.DOTALL) | |
| if clean_json: | |
| data = json.loads(clean_json.group(0)) | |
| return data.get("clarification", db_row["Clarification"]), data.get("pragmatics", "AI Adapted Analysis") | |
| except: pass | |
| return db_row["Clarification"], db_row["Pragmatic_Analysis"] | |
| def detect_and_analyze(self, text, threshold=60): | |
| clean_text = text.lower().strip() | |
| seen_indices = set() | |
| immediate_results = [] | |
| partial_candidates = [] | |
| if not self.df.empty: | |
| for index, row in self.df.iterrows(): | |
| match_type = None | |
| try: | |
| regex = str(row.get("Syntax_Pattern", "")) | |
| if len(regex) > 2 and re.search(regex, clean_text, re.IGNORECASE): | |
| match_type = "Regex_Match" | |
| except: pass | |
| if not match_type: | |
| db_str = str(row["Utterance"]).strip().lower() | |
| if len(db_str) > 3 and db_str in clean_text: | |
| match_type = "Exact_Substring" | |
| if match_type: | |
| seen_indices.add(index) | |
| immediate_results.append({ | |
| "Source": f"💎 Database ({match_type})", "Dialect": row["Dialect"], | |
| "Clarification": row["Clarification"], "Tone": row.get("Tone_Category", "---"), | |
| "Context": row.get("Linguistic_Context", "---"), "Pragmatic Analysis": row.get("Pragmatic_Analysis", "---") | |
| }) | |
| if not self.lookup_list: self.lookup_list = [] | |
| matches = process.extract(clean_text, self.lookup_list, scorer=fuzz.token_set_ratio, limit=5) | |
| for match_str, score, index in matches: | |
| if score >= threshold and index not in seen_indices and index < len(self.df): | |
| seen_indices.add(index) | |
| row = self.df.iloc[index] | |
| partial_candidates.append({"row": row, "match_len": score, "type": f"Fuzzy ({score}%)"}) | |
| for profile in self.active_profiles_list: | |
| jargon_dict = profile.get("jargon", {}) | |
| for term, definition in jargon_dict.items(): | |
| if term.lower() in clean_text: | |
| immediate_results.append({ | |
| "Source": f"📜 Profile Rule ({term})", "Dialect": profile.get("lab_name", "Profile"), | |
| "Clarification": definition, "Tone": "Detected Jargon", | |
| "Context": f"Found in {profile.get('lab_name')} Profile", "Pragmatic Analysis": "Direct Profile Match" | |
| }) | |
| final_results = list(immediate_results) | |
| partial_candidates.sort(key=lambda x: x["match_len"], reverse=True) | |
| top_candidates = partial_candidates[:3] | |
| fallback_future = self.executor.submit(self.generate_unknown_analysis, text) | |
| db_futures = {} | |
| for cand in top_candidates: | |
| f = self.executor.submit(self.adapt_with_ai, text, cand["row"]) | |
| db_futures[f] = cand | |
| done, not_done = concurrent.futures.wait(list(db_futures.keys()) + [fallback_future], timeout=4.5, return_when=concurrent.futures.ALL_COMPLETED) | |
| for f in db_futures: | |
| if f in done: | |
| try: | |
| clar, prag = f.result() | |
| cand = db_futures[f] | |
| final_results.append({ | |
| "Source": f"💎 DB + AI ({cand['type']})", "Dialect": cand["row"]["Dialect"], | |
| "Clarification": clar, "Tone": cand["row"].get("Tone_Category", "---"), | |
| "Context": cand["row"].get("Linguistic_Context", "---"), "Pragmatic Analysis": prag | |
| }) | |
| except: pass | |
| if len(final_results) < 3 and fallback_future in done: | |
| try: | |
| res = self.normalize_keys(fallback_future.result()) | |
| final_results += res | |
| except: pass | |
| if not final_results: | |
| final_results.append({"Source": "⚠️ AI Timeout", "Dialect": "---", "Clarification": "System Busy", "Tone": "---", "Context": "---", "Pragmatic Analysis": "---"}) | |
| return final_results[:3] | |