""" 🧠 Agent 2 (Interpretation) - Gemini 2.0 EDITION ------------------------------------------------- This version of the AgentBrain handles all deep sociolinguistic logic, prompt engineering, and dataset searching. """ import os import glob import pandas as pd import json import re import concurrent.futures from rapidfuzz import process, fuzz from src.rag_manager import SociolinguisticRAG class AgentInterpretation: def __init__(self, config, gemini_manager_instance=None): self.config = config self.gemini_manager = gemini_manager_instance self.gemini_manager_instance = gemini_manager_instance # Kept for your backward compatibility # 🟢 1. Initialize RAG Engines (The Token Saver) self.profiles_dir = os.path.join(self.config.BASE_DIR, "lab_profiles") self.rag_engines = {} self._initialize_rag_databases() # 🟢 2. Restore your Concurrency and Background Tasks import concurrent.futures # Ensure this is imported at the top of the file self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=10) print("🧠 Agent 2 (Interpretation) Online: Persistent Pool & RAG Engines Ready.") # 🟢 3. Restore your background knowledge refresh (if it relies on updating CSVs) self.refresh_knowledge_base() def _safe_generate(self, prompt): """ Safely runs the async Groq generation inside a synchronous thread by creating an isolated event loop. This prevents Gradio UI freezing. """ import asyncio loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: # We await the async generate_fast method inside this isolated loop return loop.run_until_complete(self.gemini_manager.generate_fast(prompt)) finally: loop.close() def _initialize_rag_databases(self): """Loads all heavy JSON personas into lightweight RAM vectors on startup.""" if not os.path.exists(self.profiles_dir): return for filename in os.listdir(self.profiles_dir): if filename.endswith("Persona.json"): dialect_name = filename.replace(" Persona.json", "") filepath = os.path.join(self.profiles_dir, filename) try: with open(filepath, "r", encoding="utf-8") as f: data = json.load(f) # 🟢 Flatten the heavy JSON into a list of simple string rules rules = [] if "lexicon" in data: for word, meaning in data["lexicon"].items(): rules.append(f"Lexicon: '{word}' means {meaning}.") if "pragmatics" in data: for rule in data["pragmatics"]: rules.append(f"Pragmatics: {rule}") if "syntax" in data: for rule in data["syntax"]: rules.append(f"Syntax: {rule}") # Initialize RAG for this specific dialect rag = SociolinguisticRAG() rag.load_persona_rules(dialect_name, rules) self.rag_engines[dialect_name] = rag except Exception as e: print(f"āš ļø Failed to load RAG for {filename}: {e}") def get_available_profiles(self): files = glob.glob(os.path.join(self.config.PROFILES_DIR, "*.json")) return [os.path.basename(f) for f in files] def load_profile_by_name(self, filename): path = os.path.join(self.config.PROFILES_DIR, filename) try: with open(path, 'r', encoding='utf-8') as f: self.lab_profile = json.load(f) return self.lab_profile except: return {} def save_specific_profile(self, filename, json_str): if not filename.endswith(".json"): filename += ".json" path = os.path.join(self.config.PROFILES_DIR, filename) try: with open(path, "w", encoding="utf-8") as f: json.dump(json.loads(json_str), f, indent=2) return "āœ… Saved locally (HF Sync pending)" except Exception as e: return f"āŒ Error: {e}" def get_current_profile_text(self): return json.dumps(self.lab_profile, indent=2) def load_all_profiles_simultaneously(self): files = glob.glob(os.path.join(self.config.PROFILES_DIR, "*.json")) loaded_count = 0 names = [] self.active_profiles_list = [] for f_path in files: try: with open(f_path, 'r', encoding='utf-8') as f: data = json.load(f) self.active_profiles_list.append(data) names.append(data.get("lab_name", os.path.basename(f_path))) loaded_count += 1 except: pass return f"āœ… Loaded {loaded_count} profiles simultaneously: {', '.join(names)}" def refresh_knowledge_base(self): all_files = glob.glob(os.path.join(self.config.DATASET_DIR, "*.csv")) df_list = [] for filename in all_files: try: temp_df = pd.read_csv(filename, encoding='utf-8-sig', on_bad_lines='skip', low_memory=True) temp_df = temp_df[['Utterance', 'Clarification', 'Dialect']] df_list.append(temp_df) except: pass if df_list: self.df = pd.concat(df_list, ignore_index=True) self.lookup_list = self.df["Utterance"].tolist() else: self.lookup_list = [] def normalize_keys(self, data_list): cleaned_list = [] for item in data_list: new_item = { "Dialect": item.get("Dialect") or item.get("dialect", "Unknown"), "Clarification": item.get("Clarification") or item.get("clarification", "---"), "Tone": item.get("Tone") or item.get("tone", "---"), "Context": item.get("Context") or item.get("context", "---"), "Pragmatic Analysis": item.get("Pragmatic Analysis") or item.get("pragmatics", "---"), "Source": item.get("Source", "✨ AI Generated") } cleaned_list.append(new_item) return cleaned_list # ========================================== # MODULAR LOGIC (MOVED FROM UX_AGENT) # ========================================== def generate_conversation_starter(self, topic_str): """Generates the mission prompt for the React Games.""" if not self.gemini_manager: if "You are Echo" in topic_str: return json.dumps({"text": "Haha, I hear you! What else is going on?"}) return json.dumps({"text": f"Let's talk about {topic_str}. What comes to mind?"}) try: if "You are Echo" in topic_str: prompt = f""" {topic_str} CRITICAL ROLEPLAY INSTRUCTIONS FOR THIS TURN: 1. You are Echo, a human peer and friend. 2. NEVER break character. NEVER say you are an AI, a computer program, or a language model. 3. Play along naturally. If they ask for favors or money (e.g., begging/joking), play along like a real person. 4. Keep your response to ONE short, highly conversational sentence. Do not be overly formal. 5. On login, you will switch the user's English dialect. """ else: prompt = f"Generate a very short, engaging, single-sentence conversation starter or question about: {topic_str}." response = self._safe_generate(prompt) #response = self.gemini_manager.client.models.generate_content( #model='gemini-2.0-flash', #contents=prompt #) clean_text = response.text.replace("```json", "").replace("```", "").strip() return json.dumps({"text": clean_text}) except Exception as e: print(f"āš ļø Mission Gen Error: {e}") if "You are Echo" in topic_str: return json.dumps({"text": "Haha, I hear you! What else is going on?"}) return json.dumps({"text": f"Let's talk about {topic_str}. What comes to mind?"}) def search_local_dataset(self, text): """Tier 1: Returns a single local match only if it is nearly identical.""" try: if self.df.empty: return None utterances = self.df["Utterance"].astype(str).tolist() match = process.extractOne(text, utterances, scorer=fuzz.WRatio) if match and match[1] >= 90: row = self.df[self.df["Utterance"] == match[0]].iloc[0] def clean_val(val, default=""): return default if pd.isna(val) else str(val) return { "Source": "šŸ—„ļø Local Dataset", "Speaker": "User", "dialect": clean_val(row.get("Dialect"), "Unknown"), "clarification": clean_val(row.get("Clarification")), "tone": clean_val(row.get("Tone_Category"), "Neutral / Conversational"), "context": clean_val(row.get("Linguistic_Context")), "pragmatics": clean_val(row.get("Pragmatic_Analysis"), "Verified via local Regex") } except Exception as e: print(f"Dataset Search Error: {e}") return None def search_personas(self, text): """Tier 2: Checks the currently loaded JSON persona for jargon.""" try: if not self.lab_profile: return None profile = self.lab_profile text_lower = text.lower() if "jargons" in profile: for slang, meaning in profile["jargons"].items(): if re.search(r'\b' + re.escape(slang.lower()) + r'\b', text_lower): rule = profile.get('pragmatic_rules', [''])[0] if profile.get('pragmatic_rules') else "" return { "Source": "šŸŽ­ Persona Injection", "Speaker": "User", "dialect": profile.get("dialect_name", "Unknown Persona"), "clarification": f"Contains slang '{slang}' -> {meaning}", "tone": "Casual / Slang", "context": profile.get("cultural_context", "Inferred from active Persona"), "pragmatics": f"Rule triggered: {rule}" } except Exception as e: print(f"Persona Search Error: {e}") return None # ========================================== # DEEP GENERATIVE AI LOGIC # ========================================== def analyze_dialect_single(self, text, language_code): """Generates a high-confidence, culturally nuanced interpretation from AI.""" print(f" -> 🧠 Preparing Sociolinguistic Prompt for: {language_code}") # 🟢 1. RAG RETRIEVAL: Get the specific rules for this dialect relevant_rules = "General dialect rules apply." if language_code in self.rag_engines: relevant_rules = self.rag_engines[language_code].retrieve_context(text, k=3) print(f" -> šŸŽÆ RAG Context Retrieved:\n{relevant_rules}") # 🟢 2. OPTIMIZED PROMPT: Combine RAG context with your strict academic instructions prompt = f"""Analyze the following utterance: '{text}' Language/Dialect Context: {language_code} CRITICAL CONTEXT (Apply ONLY these retrieved sociolinguistic rules to your analysis): {relevant_rules} CRITICAL SOCIOLINGUISTIC INSTRUCTIONS: 1. DIRECT MEANING: Provide the interpretation directly. Do not start with "This means..." or "The user is saying...". 2. PRESERVE CULTURAL NUANCE: Capture the exact social intent (e.g., humor, banter, sarcasm, respect, solidarity). Do not sterilize the translation into robotic, literal Standard English. Preserve the 'flavor' of the interaction. 3. LINGUISTIC EQUALITY: Treat this as a valid, deeply rule-governed language system. 4. FORBIDDEN WORDS: You must NEVER use "incorrect", "broken", "grammar error", "non-standard", or "learner". Return ONLY a valid JSON object with exactly these keys. Do not use markdown formatting blocks: {{ "dialect": "{language_code}", "clarification": "[Direct Meaning, capturing the original intent]", "tone": "[e.g., Playful Banter, Respectful, Sarcastic, Urgent, Casual]", "context": "[The typical cultural or situational setting for this phrase]", "pragmatics": "[The underlying social function, e.g., 'Establishing solidarity', 'Softening a request', 'Playful teasing']" }}""" # 🟢 3. API CALL & ERROR HANDLING (Kept exactly as it was) try: print(" -> 🧠 Awaiting API Response...") response = self._safe_generate(prompt) print(" -> 🧠 Parsing JSON Response...") clean_text = response.text.replace("```json", "").replace("```", "").strip() import re match = re.search(r'\{.*\}', clean_text, re.DOTALL) if match: import json res = json.loads(match.group(0)) res["Source"] = "🧠 Groq AI Engine (Cultural Context)" return res except Exception as e: import traceback print("\n🚨 CRITICAL GEMINI ERROR 🚨") traceback.print_exc() return {"Source": "🧠 AI Engine Error", "dialect": language_code, "clarification": f"AI Gen Failed: {e}", "tone": "Neutral", "context": "", "pragmatics": ""} print(" -> āŒ AI returned invalid format.") return {"Source": "🧠 AI Engine Error", "dialect": language_code, "clarification": "AI format failed", "tone": "Neutral", "context": "", "pragmatics": ""} def generate_unknown_analysis(self, text): if not self.gemini_manager: return [] all_jargon_keys = [] for p in self.active_profiles_list: all_jargon_keys.extend(list(p.get("jargon", {}).keys())) prompt = f""" Analyze utterance: "{text}" Context/Jargon Keys: {list(set(all_jargon_keys))[:50]} Task: Provide 3 distinct interpretations (Casual, Formal, or Cultural). CRITICAL INSTRUCTION: Treat the input as valid, meaningful Dialectal English. Do NOT label it as "incorrect". Output Strictly JSON: [ {{ "Dialect": "General", "Clarification": "...", "Tone": "...", "Context": "...", "Pragmatics": "..." }} ] """ try: response = self._safe_generate(prompt) clean_text = re.sub(r"```json|```", "", response.text).strip() data = json.loads(clean_text) return self.normalize_keys(data) except: return [{"Dialect": "Unknown", "Clarification": "Analysis Failed", "Tone": "---", "Context": "---", "Pragmatic Analysis": "Error"}] def adapt_with_ai(self, full_text, db_row): if not self.gemini_manager: return db_row["Clarification"], db_row["Pragmatic_Analysis"] prompt = f""" Ref Term: "{db_row['Utterance']}" = "{db_row['Clarification']}" User said: "{full_text}" Task: Adapt meaning to full sentence. Treat as valid dialect. Output JSON: {{ "clarification": "...", "pragmatics": "..." }} """ try: response = self._safe_generate(prompt) clean_json = re.search(r"\{.*\}", response.text, re.DOTALL) if clean_json: data = json.loads(clean_json.group(0)) return data.get("clarification", db_row["Clarification"]), data.get("pragmatics", "AI Adapted Analysis") except: pass return db_row["Clarification"], db_row["Pragmatic_Analysis"] def detect_and_analyze(self, text, threshold=60): clean_text = text.lower().strip() seen_indices = set() immediate_results = [] partial_candidates = [] if not self.df.empty: for index, row in self.df.iterrows(): match_type = None try: regex = str(row.get("Syntax_Pattern", "")) if len(regex) > 2 and re.search(regex, clean_text, re.IGNORECASE): match_type = "Regex_Match" except: pass if not match_type: db_str = str(row["Utterance"]).strip().lower() if len(db_str) > 3 and db_str in clean_text: match_type = "Exact_Substring" if match_type: seen_indices.add(index) immediate_results.append({ "Source": f"šŸ’Ž Database ({match_type})", "Dialect": row["Dialect"], "Clarification": row["Clarification"], "Tone": row.get("Tone_Category", "---"), "Context": row.get("Linguistic_Context", "---"), "Pragmatic Analysis": row.get("Pragmatic_Analysis", "---") }) if not self.lookup_list: self.lookup_list = [] matches = process.extract(clean_text, self.lookup_list, scorer=fuzz.token_set_ratio, limit=5) for match_str, score, index in matches: if score >= threshold and index not in seen_indices and index < len(self.df): seen_indices.add(index) row = self.df.iloc[index] partial_candidates.append({"row": row, "match_len": score, "type": f"Fuzzy ({score}%)"}) for profile in self.active_profiles_list: jargon_dict = profile.get("jargon", {}) for term, definition in jargon_dict.items(): if term.lower() in clean_text: immediate_results.append({ "Source": f"šŸ“œ Profile Rule ({term})", "Dialect": profile.get("lab_name", "Profile"), "Clarification": definition, "Tone": "Detected Jargon", "Context": f"Found in {profile.get('lab_name')} Profile", "Pragmatic Analysis": "Direct Profile Match" }) final_results = list(immediate_results) partial_candidates.sort(key=lambda x: x["match_len"], reverse=True) top_candidates = partial_candidates[:3] fallback_future = self.executor.submit(self.generate_unknown_analysis, text) db_futures = {} for cand in top_candidates: f = self.executor.submit(self.adapt_with_ai, text, cand["row"]) db_futures[f] = cand done, not_done = concurrent.futures.wait(list(db_futures.keys()) + [fallback_future], timeout=4.5, return_when=concurrent.futures.ALL_COMPLETED) for f in db_futures: if f in done: try: clar, prag = f.result() cand = db_futures[f] final_results.append({ "Source": f"šŸ’Ž DB + AI ({cand['type']})", "Dialect": cand["row"]["Dialect"], "Clarification": clar, "Tone": cand["row"].get("Tone_Category", "---"), "Context": cand["row"].get("Linguistic_Context", "---"), "Pragmatic Analysis": prag }) except: pass if len(final_results) < 3 and fallback_future in done: try: res = self.normalize_keys(fallback_future.result()) final_results += res except: pass if not final_results: final_results.append({"Source": "āš ļø AI Timeout", "Dialect": "---", "Clarification": "System Busy", "Tone": "---", "Context": "---", "Pragmatic Analysis": "---"}) return final_results[:3]