PureVersation / src /brain_agent.py
GitHub Actions
🚀 Automated sync from GitHub
f8f5b17
"""
🧠 Agent 2 (Interpretation) - Gemini 2.0 EDITION
-------------------------------------------------
This version of the AgentBrain handles all deep sociolinguistic logic,
prompt engineering, and dataset searching.
"""
import os
import glob
import pandas as pd
import json
import re
import concurrent.futures
from rapidfuzz import process, fuzz
from src.rag_manager import SociolinguisticRAG
class AgentInterpretation:
def __init__(self, config, gemini_manager_instance=None):
self.config = config
self.gemini_manager = gemini_manager_instance
self.gemini_manager_instance = gemini_manager_instance # Kept for your backward compatibility
# 🟢 1. Initialize RAG Engines (The Token Saver)
self.profiles_dir = os.path.join(self.config.BASE_DIR, "lab_profiles")
self.rag_engines = {}
self._initialize_rag_databases()
# 🟢 2. Restore your Concurrency and Background Tasks
import concurrent.futures # Ensure this is imported at the top of the file
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)
print("🧠 Agent 2 (Interpretation) Online: Persistent Pool & RAG Engines Ready.")
# 🟢 3. Restore your background knowledge refresh (if it relies on updating CSVs)
self.refresh_knowledge_base()
def _safe_generate(self, prompt):
"""
Safely runs the async Groq generation inside a synchronous thread
by creating an isolated event loop. This prevents Gradio UI freezing.
"""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# We await the async generate_fast method inside this isolated loop
return loop.run_until_complete(self.gemini_manager.generate_fast(prompt))
finally:
loop.close()
def _initialize_rag_databases(self):
"""Loads all heavy JSON personas into lightweight RAM vectors on startup."""
if not os.path.exists(self.profiles_dir):
return
for filename in os.listdir(self.profiles_dir):
if filename.endswith("Persona.json"):
dialect_name = filename.replace(" Persona.json", "")
filepath = os.path.join(self.profiles_dir, filename)
try:
with open(filepath, "r", encoding="utf-8") as f:
data = json.load(f)
# 🟢 Flatten the heavy JSON into a list of simple string rules
rules = []
if "lexicon" in data:
for word, meaning in data["lexicon"].items():
rules.append(f"Lexicon: '{word}' means {meaning}.")
if "pragmatics" in data:
for rule in data["pragmatics"]:
rules.append(f"Pragmatics: {rule}")
if "syntax" in data:
for rule in data["syntax"]:
rules.append(f"Syntax: {rule}")
# Initialize RAG for this specific dialect
rag = SociolinguisticRAG()
rag.load_persona_rules(dialect_name, rules)
self.rag_engines[dialect_name] = rag
except Exception as e:
print(f"⚠️ Failed to load RAG for {filename}: {e}")
def get_available_profiles(self):
files = glob.glob(os.path.join(self.config.PROFILES_DIR, "*.json"))
return [os.path.basename(f) for f in files]
def load_profile_by_name(self, filename):
path = os.path.join(self.config.PROFILES_DIR, filename)
try:
with open(path, 'r', encoding='utf-8') as f:
self.lab_profile = json.load(f)
return self.lab_profile
except: return {}
def save_specific_profile(self, filename, json_str):
if not filename.endswith(".json"): filename += ".json"
path = os.path.join(self.config.PROFILES_DIR, filename)
try:
with open(path, "w", encoding="utf-8") as f: json.dump(json.loads(json_str), f, indent=2)
return "✅ Saved locally (HF Sync pending)"
except Exception as e: return f"❌ Error: {e}"
def get_current_profile_text(self):
return json.dumps(self.lab_profile, indent=2)
def load_all_profiles_simultaneously(self):
files = glob.glob(os.path.join(self.config.PROFILES_DIR, "*.json"))
loaded_count = 0
names = []
self.active_profiles_list = []
for f_path in files:
try:
with open(f_path, 'r', encoding='utf-8') as f:
data = json.load(f)
self.active_profiles_list.append(data)
names.append(data.get("lab_name", os.path.basename(f_path)))
loaded_count += 1
except: pass
return f"✅ Loaded {loaded_count} profiles simultaneously: {', '.join(names)}"
def refresh_knowledge_base(self):
all_files = glob.glob(os.path.join(self.config.DATASET_DIR, "*.csv"))
df_list = []
for filename in all_files:
try:
temp_df = pd.read_csv(filename, encoding='utf-8-sig', on_bad_lines='skip', low_memory=True)
temp_df = temp_df[['Utterance', 'Clarification', 'Dialect']]
df_list.append(temp_df)
except: pass
if df_list:
self.df = pd.concat(df_list, ignore_index=True)
self.lookup_list = self.df["Utterance"].tolist()
else:
self.lookup_list = []
def normalize_keys(self, data_list):
cleaned_list = []
for item in data_list:
new_item = {
"Dialect": item.get("Dialect") or item.get("dialect", "Unknown"),
"Clarification": item.get("Clarification") or item.get("clarification", "---"),
"Tone": item.get("Tone") or item.get("tone", "---"),
"Context": item.get("Context") or item.get("context", "---"),
"Pragmatic Analysis": item.get("Pragmatic Analysis") or item.get("pragmatics", "---"),
"Source": item.get("Source", "✨ AI Generated")
}
cleaned_list.append(new_item)
return cleaned_list
# ==========================================
# MODULAR LOGIC (MOVED FROM UX_AGENT)
# ==========================================
def generate_conversation_starter(self, topic_str):
"""Generates the mission prompt for the React Games."""
if not self.gemini_manager:
if "You are Echo" in topic_str: return json.dumps({"text": "Haha, I hear you! What else is going on?"})
return json.dumps({"text": f"Let's talk about {topic_str}. What comes to mind?"})
try:
if "You are Echo" in topic_str:
prompt = f"""
{topic_str}
CRITICAL ROLEPLAY INSTRUCTIONS FOR THIS TURN:
1. You are Echo, a human peer and friend.
2. NEVER break character. NEVER say you are an AI, a computer program, or a language model.
3. Play along naturally. If they ask for favors or money (e.g., begging/joking), play along like a real person.
4. Keep your response to ONE short, highly conversational sentence. Do not be overly formal.
5. On login, you will switch the user's English dialect.
"""
else:
prompt = f"Generate a very short, engaging, single-sentence conversation starter or question about: {topic_str}."
response = self._safe_generate(prompt)
#response = self.gemini_manager.client.models.generate_content(
#model='gemini-2.0-flash',
#contents=prompt
#)
clean_text = response.text.replace("```json", "").replace("```", "").strip()
return json.dumps({"text": clean_text})
except Exception as e:
print(f"⚠️ Mission Gen Error: {e}")
if "You are Echo" in topic_str: return json.dumps({"text": "Haha, I hear you! What else is going on?"})
return json.dumps({"text": f"Let's talk about {topic_str}. What comes to mind?"})
def search_local_dataset(self, text):
"""Tier 1: Returns a single local match only if it is nearly identical."""
try:
if self.df.empty: return None
utterances = self.df["Utterance"].astype(str).tolist()
match = process.extractOne(text, utterances, scorer=fuzz.WRatio)
if match and match[1] >= 90:
row = self.df[self.df["Utterance"] == match[0]].iloc[0]
def clean_val(val, default=""):
return default if pd.isna(val) else str(val)
return {
"Source": "🗄️ Local Dataset",
"Speaker": "User",
"dialect": clean_val(row.get("Dialect"), "Unknown"),
"clarification": clean_val(row.get("Clarification")),
"tone": clean_val(row.get("Tone_Category"), "Neutral / Conversational"),
"context": clean_val(row.get("Linguistic_Context")),
"pragmatics": clean_val(row.get("Pragmatic_Analysis"), "Verified via local Regex")
}
except Exception as e:
print(f"Dataset Search Error: {e}")
return None
def search_personas(self, text):
"""Tier 2: Checks the currently loaded JSON persona for jargon."""
try:
if not self.lab_profile: return None
profile = self.lab_profile
text_lower = text.lower()
if "jargons" in profile:
for slang, meaning in profile["jargons"].items():
if re.search(r'\b' + re.escape(slang.lower()) + r'\b', text_lower):
rule = profile.get('pragmatic_rules', [''])[0] if profile.get('pragmatic_rules') else ""
return {
"Source": "🎭 Persona Injection",
"Speaker": "User",
"dialect": profile.get("dialect_name", "Unknown Persona"),
"clarification": f"Contains slang '{slang}' -> {meaning}",
"tone": "Casual / Slang",
"context": profile.get("cultural_context", "Inferred from active Persona"),
"pragmatics": f"Rule triggered: {rule}"
}
except Exception as e:
print(f"Persona Search Error: {e}")
return None
# ==========================================
# DEEP GENERATIVE AI LOGIC
# ==========================================
def analyze_dialect_single(self, text, language_code):
"""Generates a high-confidence, culturally nuanced interpretation from AI."""
print(f" -> 🧠 Preparing Sociolinguistic Prompt for: {language_code}")
# 🟢 1. RAG RETRIEVAL: Get the specific rules for this dialect
relevant_rules = "General dialect rules apply."
if language_code in self.rag_engines:
relevant_rules = self.rag_engines[language_code].retrieve_context(text, k=3)
print(f" -> 🎯 RAG Context Retrieved:\n{relevant_rules}")
# 🟢 2. OPTIMIZED PROMPT: Combine RAG context with your strict academic instructions
prompt = f"""Analyze the following utterance: '{text}'
Language/Dialect Context: {language_code}
CRITICAL CONTEXT (Apply ONLY these retrieved sociolinguistic rules to your analysis):
{relevant_rules}
CRITICAL SOCIOLINGUISTIC INSTRUCTIONS:
1. DIRECT MEANING: Provide the interpretation directly. Do not start with "This means..." or "The user is saying...".
2. PRESERVE CULTURAL NUANCE: Capture the exact social intent (e.g., humor, banter, sarcasm, respect, solidarity). Do not sterilize the translation into robotic, literal Standard English. Preserve the 'flavor' of the interaction.
3. LINGUISTIC EQUALITY: Treat this as a valid, deeply rule-governed language system.
4. FORBIDDEN WORDS: You must NEVER use "incorrect", "broken", "grammar error", "non-standard", or "learner".
Return ONLY a valid JSON object with exactly these keys. Do not use markdown formatting blocks:
{{
"dialect": "{language_code}",
"clarification": "[Direct Meaning, capturing the original intent]",
"tone": "[e.g., Playful Banter, Respectful, Sarcastic, Urgent, Casual]",
"context": "[The typical cultural or situational setting for this phrase]",
"pragmatics": "[The underlying social function, e.g., 'Establishing solidarity', 'Softening a request', 'Playful teasing']"
}}"""
# 🟢 3. API CALL & ERROR HANDLING (Kept exactly as it was)
try:
print(" -> 🧠 Awaiting API Response...")
response = self._safe_generate(prompt)
print(" -> 🧠 Parsing JSON Response...")
clean_text = response.text.replace("```json", "").replace("```", "").strip()
import re
match = re.search(r'\{.*\}', clean_text, re.DOTALL)
if match:
import json
res = json.loads(match.group(0))
res["Source"] = "🧠 Groq AI Engine (Cultural Context)"
return res
except Exception as e:
import traceback
print("\n🚨 CRITICAL GEMINI ERROR 🚨")
traceback.print_exc()
return {"Source": "🧠 AI Engine Error", "dialect": language_code, "clarification": f"AI Gen Failed: {e}", "tone": "Neutral", "context": "", "pragmatics": ""}
print(" -> ❌ AI returned invalid format.")
return {"Source": "🧠 AI Engine Error", "dialect": language_code, "clarification": "AI format failed", "tone": "Neutral", "context": "", "pragmatics": ""}
def generate_unknown_analysis(self, text):
if not self.gemini_manager: return []
all_jargon_keys = []
for p in self.active_profiles_list:
all_jargon_keys.extend(list(p.get("jargon", {}).keys()))
prompt = f"""
Analyze utterance: "{text}"
Context/Jargon Keys: {list(set(all_jargon_keys))[:50]}
Task: Provide 3 distinct interpretations (Casual, Formal, or Cultural).
CRITICAL INSTRUCTION: Treat the input as valid, meaningful Dialectal English. Do NOT label it as "incorrect".
Output Strictly JSON: [ {{ "Dialect": "General", "Clarification": "...", "Tone": "...", "Context": "...", "Pragmatics": "..." }} ]
"""
try:
response = self._safe_generate(prompt)
clean_text = re.sub(r"```json|```", "", response.text).strip()
data = json.loads(clean_text)
return self.normalize_keys(data)
except:
return [{"Dialect": "Unknown", "Clarification": "Analysis Failed", "Tone": "---", "Context": "---", "Pragmatic Analysis": "Error"}]
def adapt_with_ai(self, full_text, db_row):
if not self.gemini_manager: return db_row["Clarification"], db_row["Pragmatic_Analysis"]
prompt = f"""
Ref Term: "{db_row['Utterance']}" = "{db_row['Clarification']}"
User said: "{full_text}"
Task: Adapt meaning to full sentence. Treat as valid dialect.
Output JSON: {{ "clarification": "...", "pragmatics": "..." }}
"""
try:
response = self._safe_generate(prompt)
clean_json = re.search(r"\{.*\}", response.text, re.DOTALL)
if clean_json:
data = json.loads(clean_json.group(0))
return data.get("clarification", db_row["Clarification"]), data.get("pragmatics", "AI Adapted Analysis")
except: pass
return db_row["Clarification"], db_row["Pragmatic_Analysis"]
def detect_and_analyze(self, text, threshold=60):
clean_text = text.lower().strip()
seen_indices = set()
immediate_results = []
partial_candidates = []
if not self.df.empty:
for index, row in self.df.iterrows():
match_type = None
try:
regex = str(row.get("Syntax_Pattern", ""))
if len(regex) > 2 and re.search(regex, clean_text, re.IGNORECASE):
match_type = "Regex_Match"
except: pass
if not match_type:
db_str = str(row["Utterance"]).strip().lower()
if len(db_str) > 3 and db_str in clean_text:
match_type = "Exact_Substring"
if match_type:
seen_indices.add(index)
immediate_results.append({
"Source": f"💎 Database ({match_type})", "Dialect": row["Dialect"],
"Clarification": row["Clarification"], "Tone": row.get("Tone_Category", "---"),
"Context": row.get("Linguistic_Context", "---"), "Pragmatic Analysis": row.get("Pragmatic_Analysis", "---")
})
if not self.lookup_list: self.lookup_list = []
matches = process.extract(clean_text, self.lookup_list, scorer=fuzz.token_set_ratio, limit=5)
for match_str, score, index in matches:
if score >= threshold and index not in seen_indices and index < len(self.df):
seen_indices.add(index)
row = self.df.iloc[index]
partial_candidates.append({"row": row, "match_len": score, "type": f"Fuzzy ({score}%)"})
for profile in self.active_profiles_list:
jargon_dict = profile.get("jargon", {})
for term, definition in jargon_dict.items():
if term.lower() in clean_text:
immediate_results.append({
"Source": f"📜 Profile Rule ({term})", "Dialect": profile.get("lab_name", "Profile"),
"Clarification": definition, "Tone": "Detected Jargon",
"Context": f"Found in {profile.get('lab_name')} Profile", "Pragmatic Analysis": "Direct Profile Match"
})
final_results = list(immediate_results)
partial_candidates.sort(key=lambda x: x["match_len"], reverse=True)
top_candidates = partial_candidates[:3]
fallback_future = self.executor.submit(self.generate_unknown_analysis, text)
db_futures = {}
for cand in top_candidates:
f = self.executor.submit(self.adapt_with_ai, text, cand["row"])
db_futures[f] = cand
done, not_done = concurrent.futures.wait(list(db_futures.keys()) + [fallback_future], timeout=4.5, return_when=concurrent.futures.ALL_COMPLETED)
for f in db_futures:
if f in done:
try:
clar, prag = f.result()
cand = db_futures[f]
final_results.append({
"Source": f"💎 DB + AI ({cand['type']})", "Dialect": cand["row"]["Dialect"],
"Clarification": clar, "Tone": cand["row"].get("Tone_Category", "---"),
"Context": cand["row"].get("Linguistic_Context", "---"), "Pragmatic Analysis": prag
})
except: pass
if len(final_results) < 3 and fallback_future in done:
try:
res = self.normalize_keys(fallback_future.result())
final_results += res
except: pass
if not final_results:
final_results.append({"Source": "⚠️ AI Timeout", "Dialect": "---", "Clarification": "System Busy", "Tone": "---", "Context": "---", "Pragmatic Analysis": "---"})
return final_results[:3]