"""
๐จ Agent UX (Interface & API Gateway)
-------------------------------------------------
This agent acts as the central controller and router. It manages the Gradio
Lab Admin dashboard, provides headless API endpoints for the React frontend games,
and seamlessly orchestrates data flow between the users, the Brain, and the Trust agents.
"""
import os
import time
import uuid
import glob
import json
import shutil
import threading
import traceback
from datetime import datetime
import gradio as gr
import pandas as pd
from config import AppConfig
from huggingface_hub import HfApi, hf_hub_download
from src import prepare_training_data # ๐ข Import your new script
# ==========================================
# โ๏ธ CLOUD SYNC CHECK
# ==========================================
def check_cloud_sync_status():
"""Checks Hugging Face for the last commit time and returns a HUD status string."""
try:
# ๐ข FIX: Use AppConfig to guarantee the token is found, instead of relying on os.environ
hf_token = AppConfig.HF_TOKEN
if not hf_token:
print("Sync check warning: HF_TOKEN is None")
return "๐ด SYNC OFFLINE (NO TOKEN)"
api = HfApi(token=hf_token)
repo_id = "toecm/IEDID"
commits = api.list_repo_commits(repo_id=repo_id, repo_type="dataset")
if commits:
last_sync = commits[0].created_at.strftime("%H:%M")
return f"๐ข CLOUD SYNCED (Last: {last_sync})"
return "๐ก REPO EMPTY"
except Exception as e:
print(f"Sync check error: {e}")
return "๐ด SYNC OFFLINE"
class AgentUX:
def __init__(self, input_agent, brain_agent, trust_agent):
print("\n" + "="*40)
print("๐ต๏ธโโ๏ธ UX AGENT: SECRETS AUDIT")
pk = os.environ.get("PRIVATE_KEY")
rpc = os.environ.get("PURECHAIN_RPC_URL")
print(f"๐ PRIVATE_KEY : {'โ
LOADED' if pk else 'โ MISSING'}")
print(f"๐ RPC_URL : {rpc if rpc else 'โ MISSING'}")
print(f"๐ง GEMINI_KEY : {'โ
LOADED' if os.environ.get('GOOGLE_API_KEY') else 'โ MISSING'}")
print(f"โ๏ธ HF_TOKEN : {'โ
LOADED' if os.environ.get('HF_TOKEN') else 'โ MISSING'}")
print("="*40 + "\n")
self.input = input_agent
self.brain = brain_agent
self.trust = trust_agent
self.TONES = ["Neutral / Conversational", "Casual / Slang", "Formal / Professional", "Proverb / Idiom"]
self.PENDING_FILE = "/app/pending_approvals.csv"
self.last_audio_path = None
self.last_pending_count = 0
# ๐ข NEW: Matchmaking & Live Room Memory
self.waiting_pool = [] # List of operators looking for a match
self.active_matches = {} # Maps Operator ID -> Room Code
self.live_rooms = {} # Stores the actual chat logs per room
self.alert_sound = None
print("๐จ Agent UX Online: PhD Research Hub Ready.")
self.sync_pending_queue(direction="down")
def get_quota_status(self):
if hasattr(self.brain, 'check_quota'): return self.brain.check_quota()
if hasattr(self.brain.gemini_manager, 'get_status_string'): return self.brain.gemini_manager.get_status_string()
return "Active"
def get_blockchain_health(self):
try:
if hasattr(self.trust, 'w3') and self.trust.w3 and self.trust.w3.is_connected():
return "
๐ข PureChain Network: ONLINE & SYNCED
"
except Exception:
pass
return "๐ด PureChain Network: OFFLINE / DISCONNECTED
"
def check_background_status(self):
tasks = self.trust.active_tasks
current_time = datetime.now().strftime('%H:%M:%S')
if tasks > 0:
return f"๐ Processing {tasks} background task(s)... | {current_time}"
return f"โ
System Active (Ready) | {current_time}"
# ==========================================
# REACT API ENDPOINTS
# ==========================================
# ๐ข FIX: Added 'dummy_trigger' to fix the React Zero-Input bug
def api_get_dialects(self):
print("\n" + "="*40)
print("๐ก REACT API WAKEUP: Requesting Dialects...")
dialects = set()
ignore_list = ["minted_history", "system_feedback", "pending_approvals", "train", "dataset_export"]
try:
# ๐ข Directly target the IEDID directory
dataset_dir = getattr(self.brain.config, 'DATASET_DIR', "/app/IEDID")
print(f"๐ Scanning directory: {dataset_dir}")
if os.path.exists(dataset_dir):
files = glob.glob(os.path.join(dataset_dir, "*.csv"))
print(f"๐ Found {len(files)} CSV files in folder.")
for f in files:
name = os.path.basename(f).replace(".csv", "")
if name not in ignore_list:
dialects.add(name)
else:
print(f"โ ๏ธ Directory {dataset_dir} does not exist!")
if not dialects:
print("โ ๏ธ No valid dialects found. Using defaults.")
dialects = {"American English", "British English", "Nigerian Pidgin English"}
final_list = sorted(list(dialects)) + ["+ Add New Dialect"]
print(f"โ
SUCCESS: Sending to React -> {final_list}")
print("="*40 + "\n")
return json.dumps(final_list)
except Exception as e:
print(f"๐จ CRITICAL API ERROR: {e}")
return json.dumps(["American English", "British English", "+ Add New Dialect"])
def api_generate_mission(self, topic):
topic_str = str(topic).strip("['\"]")
if hasattr(self.brain, 'generate_conversation_starter'):
return self.brain.generate_conversation_starter(topic_str)
return json.dumps({"text": f"Let's talk about {topic_str}."})
def api_transcribe(self, audio_path, dialect):
if not audio_path: return ""
try:
if hasattr(self.input, 'transcribe'):
res = self.input.transcribe(audio_path, language="en")
return res[0]['text'] if isinstance(res, list) else str(res)
except Exception as e:
return f"Transcription error: {str(e)}"
return ""
def api_clarify(self, text, dialect):
try:
if hasattr(self.brain, 'analyze_dialect_single'):
res = self.brain.analyze_dialect_single(text, dialect)
clarification = res.get("clarification", text)
return json.dumps({"clarification": clarification})
except: pass
return json.dumps({"clarification": text})
def api_translate_peer(self, text, source_dialect, target_dialect):
"""Translates an utterance directly from one dialect to another."""
if not text: return ""
prompt = f"""
Translate the following utterance from {source_dialect} to {target_dialect}.
Utterance: "{text}"
CRITICAL INSTRUCTIONS:
1. Output ONLY the translated text. No conversational filler.
2. Preserve the cultural pragmatics and emotional tone.
3. Do not explain the translation, just provide the direct equivalent in {target_dialect}.
"""
try:
if hasattr(self.brain, 'gemini_manager') and self.brain.gemini_manager:
response = self.brain.gemini_manager.client.models.generate_content(
model='gemini-2.0-flash',
contents=prompt
)
return response.text.replace("```json", "").replace("```", "").replace('"', '').strip()
except Exception as e:
print(f"Peer Translation Error: {e}")
return text # Fallback to original text if API fails
# ==========================================
# DIALECT RELAY (MATCHMAKING & CHAT)
# ==========================================
def api_join_queue(self, operator_id, dialect, target_partner_id=""):
import uuid, json, time
# Clean up any old ghost matches
if operator_id in self.active_matches:
del self.active_matches[operator_id]
# 1. Check if a specific target partner is requested and available
partner = None
if target_partner_id:
for p in self.waiting_pool:
if p['operator_id'] == target_partner_id:
partner = p
break
# If targeting someone and they exist, or just picking someone (if we wanted auto)
if partner:
# 2. Match found! Create a room.
self.waiting_pool.remove(partner)
room_id = f"FREQ-{uuid.uuid4().hex[:6].upper()}"
self.active_matches[operator_id] = {"room_id": room_id, "partner_dialect": partner['dialect']}
self.active_matches[partner['operator_id']] = {"room_id": room_id, "partner_dialect": dialect}
self.live_rooms[room_id] = []
return json.dumps({"status": "matched", "room_id": room_id, "partner_dialect": partner['dialect']})
else:
# 3. Add self to pool to wait.
self.waiting_pool = [p for p in self.waiting_pool if p['operator_id'] != operator_id] # Prevent duplicates
self.waiting_pool.append({"operator_id": operator_id, "dialect": dialect, "time": time.time()})
return json.dumps({"status": "waiting"})
def api_get_lobby(self):
import json
# Return only dialect and masked operator ID for privacy/display
lobby_data = [{"id": p["operator_id"], "dialect": p["dialect"]} for p in self.waiting_pool]
return json.dumps(lobby_data)
def api_check_match(self, operator_id):
import json
if operator_id in self.active_matches:
match = self.active_matches[operator_id]
# Consume the match so it cannot be re-used on a second poll (prevents ghost room reconnection)
del self.active_matches[operator_id]
return json.dumps({"status": "matched", "room_id": match["room_id"], "partner_dialect": match["partner_dialect"]})
return json.dumps({"status": "waiting"})
def api_leave_queue(self, operator_id):
import json
self.waiting_pool = [p for p in self.waiting_pool if p['operator_id'] != operator_id]
if operator_id in self.active_matches:
del self.active_matches[operator_id]
return json.dumps({"status": "left"})
def api_remote_eval_and_send(self, room_code, sender_id, text, source_dialect, target_dialect, meaning_to_send=""):
import json, os, re, uuid, threading
import pandas as pd
if not text: return json.dumps({"status": "error", "msg": "Empty text"})
standard_meaning = meaning_to_send.strip() if meaning_to_send and meaning_to_send.strip() else text
# 1. Forward Lookup (Understand Source - used for dataset clarity if meaning wasn't provided)
if not meaning_to_send or not meaning_to_send.strip():
eval_result = self.brain.search_local_dataset(text)
if not eval_result: eval_result = self.brain.search_personas(text)
if eval_result:
standard_meaning = eval_result.get("clarification", text)
# ๐ข THE DATA COLLECTION HOOK (Silent Background Saving)
clarification_to_save = standard_meaning
try:
threading.Thread(
target=self.check_and_submit_logic,
args=(text, source_dialect, "", clarification_to_save, "Conversational", "Relay Peer-to-Peer Chat", "Automated Relay Extraction", "Game: Relay Pair", "AI Relay Bouncer", sender_id, None, False),
daemon=True
).start()
except Exception as save_err:
print(f"Data Hook Error: {save_err}")
# 4. Route to Room
if room_code not in self.live_rooms: self.live_rooms[room_code] = []
msg = {"sender": sender_id, "original": text, "translation": standard_meaning, "dialect": source_dialect, "target_dialect": "Standard English", "id": str(uuid.uuid4())[:8]}
self.live_rooms[room_code].append(msg)
return json.dumps({"status": "success"})
def api_remote_poll(self, room_code, last_index):
import json
if room_code not in self.live_rooms: return json.dumps([])
idx = int(last_index)
return json.dumps(self.live_rooms[room_code][idx:])
# ==========================================
# SOCIOLINGUISTIC PIPELINE
# ==========================================
def automated_pipeline(self, audio_path, language_code, request: gr.Request):
client_ip = request.headers.get("x-forwarded-for") or request.client.host if request else "Unknown_IP"
source_tag = f"Lab_Admin_{client_ip}"
print(f"๐ PIPELINE TRIGGERED by {source_tag}")
headers = ["Source", "Speaker", "Utterance", "Dialect", "Clarification", "Tone", "Context", "Pragmatic Analysis"]
empty_df = pd.DataFrame(columns=headers)
if not audio_path:
yield empty_df, "Waiting for input...", self.get_quota_status(), "", None, "Neutral / Conversational", "", ""
return
print("\n" + "="*40)
print(f"๐ PIPELINE TRIGGERED for {language_code}!")
print(f"Audio Path: {audio_path}")
self.last_audio_path = audio_path
status_log = "๐ง Transcribing Audio...\n"
try:
print("โณ STEP 1: Calling Whisper Transcription...")
if hasattr(self.input, 'transcribe'):
transcription_result = self.input.transcribe(audio_path, language=language_code)
print(f"โ
Whisper Result: {transcription_result}")
if isinstance(transcription_result, list) and len(transcription_result) > 0:
transcribed_text = transcription_result[0].get('text', str(transcription_result))
else:
transcribed_text = str(transcription_result)
else:
transcribed_text = "Audio Received."
status_log += f"๐ฃ๏ธ Heard: '{transcribed_text}'\n\n"
print(f"โณ STEP 2: Checking Local Dataset...")
status_log += "๐๏ธ Checking Local Dataset (Fast Match)...\n"
final_result = self.brain.search_local_dataset(transcribed_text)
if not final_result:
print("โณ STEP 3: Checking Persona...")
status_log += "โ No local match. ๐ญ Checking Persona Context...\n"
final_result = self.brain.search_personas(transcribed_text)
if not final_result:
print("โณ STEP 4: Sending to Gemini API...")
status_log += "โ No persona hit. ๐ง Generating AI interpretation...\n"
if hasattr(self.brain, 'analyze_dialect_single'):
final_result = self.brain.analyze_dialect_single(transcribed_text, language_code)
print(f"โ
Gemini Result: {final_result}")
else:
raise Exception("analyze_dialect_single missing from brain_agent.")
status_log += f"\nโ
Analysis Complete via {final_result.get('Source', 'Unknown')}."
def safe_str(val, default=""):
return default if pd.isna(val) or val is None else str(val)
df_data = [[
safe_str(final_result.get("Source", "Unknown")),
source_tag,
safe_str(transcribed_text),
safe_str(final_result.get("dialect", "")),
safe_str(final_result.get("clarification", "")),
safe_str(final_result.get("tone", "")),
safe_str(final_result.get("context", "")),
safe_str(final_result.get("pragmatics", ""))
]]
df = pd.DataFrame(df_data, columns=headers)
print("โ
PIPELINE COMPLETED SUCCESSFULLY.")
print("="*40 + "\n")
yield (
df, safe_str(status_log), self.get_quota_status(), safe_str(transcribed_text),
safe_str(final_result.get("dialect")), safe_str(final_result.get("clarification")),
safe_str(final_result.get("tone", "Neutral / Conversational")),
safe_str(final_result.get("context")), safe_str(final_result.get("pragmatics"))
)
return
except Exception as e:
print("\n๐จ CRITICAL PIPELINE ERROR ๐จ")
traceback.print_exc()
print("="*40 + "\n")
status_log += f"\nโ System Error: {e}\n"
yield empty_df, status_log, self.get_quota_status(), "", None, "Neutral / Conversational", "", ""
return
# ==========================================
# RESEARCH DATA SUBMISSION
# ==========================================
def check_and_submit_logic(
self, transcribed, dialect, customD, clarification, tone, context, pragmatics,
sourceTag="Web", clar_source="User", userKey="", blob=None, confirm=False,
request: gr.Request = None
):
print("\n" + "="*40)
print(f"๐ฅ API HIT: /check_and_submit_logic")
print(f" Source: {sourceTag}")
print(f" Text: '{transcribed}'")
print(f" Dialect: {dialect}")
print("="*40)
clean_key = str(userKey).strip()
if not clean_key.startswith("0x"):
clean_key = "0x" + uuid.uuid4().hex + uuid.uuid4().hex[:8]
final_user = clean_key
if sourceTag == "Web" or sourceTag == "":
final_origin = "Gradio Admin UI"
else:
final_origin = sourceTag
if not transcribed or not clarification:
return "โ ๏ธ Cannot submit empty analysis.", gr.update(visible=False)
final_d = customD if (dialect == "+ Add New Dialect" and customD) else dialect
if not final_d or final_d == "+ Add New Dialect":
return "โ ๏ธ Select a dialect first.", gr.update(visible=False)
permanent_audio_path = ""
if blob is not None:
actual_path = None
if isinstance(blob, str): actual_path = blob
elif isinstance(blob, dict) and 'path' in blob: actual_path = blob['path']
elif hasattr(blob, 'name'): actual_path = blob.name
if actual_path and os.path.exists(actual_path):
save_dir = os.path.join(self.brain.config.DATASET_DIR, "audio")
os.makedirs(save_dir, exist_ok=True)
unique_name = f"rec_{int(time.time())}_{uuid.uuid4().hex[:6]}.wav"
permanent_audio_path = os.path.join(save_dir, unique_name)
shutil.copy(actual_path, permanent_audio_path)
print(f"โ
Audio securely extracted to: {permanent_audio_path}")
else:
print(f"โ ๏ธ Audio skipped! Blob invalid: {blob}")
elif self.last_audio_path:
permanent_audio_path = self.last_audio_path
is_game_submission = ("Game" in final_origin)
if not is_game_submission:
success = self.trust.update_dataset_csv(
final_d, transcribed, clarification, tone, context, "",
permanent_audio_path, pragmatics, final_origin, clar_source, final_user
)
if success:
payload = { "original": transcribed, "dialect": final_d, "clarification": clarification, "tone": tone, "user": final_user }
threading.Thread(target=self.trust.stamp_on_chain, args=(payload,), daemon=True).start()
return f"๐ Approved and Minted to {final_d}", gr.update(visible=False)
else:
new_entry = {
"User": final_user,
"Data_Origin": final_origin,
"Utterance": transcribed,
"Dialect": final_d,
"Clarification": clarification,
"Clarification_Source": clar_source,
"Tone": tone,
"Context": context,
"Pragmatic_Analysis": pragmatics,
"Audio": permanent_audio_path,
"Timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
try:
if os.path.exists(self.PENDING_FILE):
df = pd.read_csv(self.PENDING_FILE)
else:
df = pd.DataFrame(columns=new_entry.keys())
df = pd.concat([df, pd.DataFrame([new_entry])], ignore_index=True)
df.to_csv(self.PENDING_FILE, index=False)
self.sync_pending_queue(direction="up")
if permanent_audio_path and os.path.exists(permanent_audio_path):
try:
api = HfApi(token=os.environ.get("HF_TOKEN"))
api.upload_file(
path_or_fileobj=permanent_audio_path,
path_in_repo=f"pending_audio/{os.path.basename(permanent_audio_path)}",
repo_id="toecm/PureChain_Dataset",
repo_type="dataset",
commit_message=f"๐๏ธ Staging pending audio from {final_origin}"
)
except Exception as e:
print(f"โ ๏ธ Failed to stage audio: {e}")
return "๐ฅ Submitted for Admin Review (+50 XP)", gr.update(visible=False)
except Exception as e:
print(f"Pending Save Error: {e}")
return "โ Failed to queue submission.", gr.update(visible=False)
def force_overwrite_logic(self, *args):
return self.check_and_submit_logic(*args, confirm=True)
# ==========================================
# FEEDBACK & AUDIT HELPERS
# ==========================================
def handle_feedback_submission(self, op_id, text, img_blob):
"""Catches secure feedback from React games and logs it with images."""
feedback_file = "/app/system_feedback.csv"
image_path = ""
print("\n" + "="*40)
print("๐ก๏ธ SECURE FEEDBACK RECEIVED")
print(f"Operator: {op_id}")
# 1. Process Image if attached
if img_blob is not None:
actual_path = None
if isinstance(img_blob, str): actual_path = img_blob
elif hasattr(img_blob, 'name'): actual_path = img_blob.name
if actual_path and os.path.exists(actual_path):
unique_name = f"bug_img_{int(time.time())}.png"
image_path = os.path.join("/app", unique_name)
shutil.copy(actual_path, image_path)
try:
api = HfApi(token=os.environ.get("HF_TOKEN"))
api.upload_file(
path_or_fileobj=image_path,
path_in_repo=f"feedback_images/{unique_name}",
repo_id="toecm/PureChain_Dataset",
repo_type="dataset",
commit_message="๐ธ New Bug Report Image"
)
except Exception as e:
print(f"Image upload failed: {e}")
# 2. Log to CSV
new_row = {
"Timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"Operator_ID": op_id,
"Feedback_Text": text,
"Image_Reference": image_path
}
try:
if os.path.exists(feedback_file):
df = pd.read_csv(feedback_file)
else:
df = pd.DataFrame(columns=new_row.keys())
df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)
df.to_csv(feedback_file, index=False)
try:
api = HfApi(token=os.environ.get("HF_TOKEN"))
api.upload_file(
path_or_fileobj=feedback_file,
path_in_repo="system_feedback.csv",
repo_id="toecm/PureChain_Dataset",
repo_type="dataset",
commit_message="๐ Updated System Feedback Log"
)
except: pass
except Exception as e:
print(f"Feedback save error: {e}")
return "Success"
def get_feedback_dataframe(self):
feedback_file = "/app/system_feedback.csv"
hf_token = os.environ.get("HF_TOKEN")
repo_id = "toecm/PureChain_Dataset"
if hf_token:
try:
downloaded = hf_hub_download(repo_id=repo_id, filename="system_feedback.csv", repo_type="dataset", token=hf_token)
shutil.copy(downloaded, feedback_file)
except Exception as e:
pass
if os.path.exists(feedback_file):
try:
df = pd.read_csv(feedback_file)
for col in ["Timestamp", "Operator_ID", "Feedback_Text", "Image_Reference"]:
if col not in df.columns: df[col] = ""
return df.sort_values(by="Timestamp", ascending=False)
except:
pass
return pd.DataFrame(columns=["Timestamp", "Operator_ID", "Feedback_Text", "Image_Reference"])
def get_pending_dataframe(self, dialect_filter="All"):
if os.path.exists(self.PENDING_FILE):
df = pd.read_csv(self.PENDING_FILE)
cols = ["User", "Data_Origin", "Utterance", "Dialect", "Clarification", "Clarification_Source", "Tone", "Audio", "Timestamp"]
for c in cols:
if c not in df.columns: df[c] = ""
if dialect_filter and dialect_filter != "All":
df = df[df["Dialect"] == dialect_filter]
return df[cols]
return pd.DataFrame(columns=["User", "Data_Origin", "Utterance", "Dialect", "Clarification", "Clarification_Source", "Tone", "Audio", "Timestamp"])
def sync_pending_queue(self, direction="up"):
hf_token = os.environ.get("HF_TOKEN")
repo_id = "toecm/PureChain_Dataset"
if not hf_token:
print("โ ๏ธ Skipping Pending Sync: No HF_TOKEN found.")
return
api = HfApi(token=hf_token)
if direction == "up":
if os.path.exists(self.PENDING_FILE):
try:
api.upload_file(
path_or_fileobj=self.PENDING_FILE,
path_in_repo="pending_approvals.csv",
repo_id=repo_id,
repo_type="dataset",
commit_message="๐ Auto-sync: Updated pending approvals queue"
)
print("โ๏ธ Pending queue backed up to PureChain_Dataset.")
except Exception as e:
print(f"โ ๏ธ Failed to upload pending queue: {e}")
elif direction == "down":
try:
downloaded_path = hf_hub_download(
repo_id=repo_id,
filename="pending_approvals.csv",
repo_type="dataset",
token=hf_token
)
shutil.copy(downloaded_path, self.PENDING_FILE)
print("โ๏ธ Pending queue CSV restored from PureChain_Dataset.")
df = pd.read_csv(self.PENDING_FILE)
for audio_path in df['Audio'].dropna():
if audio_path and not os.path.exists(audio_path):
try:
audio_filename = os.path.basename(audio_path)
print(f"โ๏ธ Recovering missing audio: {audio_filename}...")
audio_dl = hf_hub_download(
repo_id=repo_id,
filename=f"pending_audio/{audio_filename}",
repo_type="dataset",
token=hf_token
)
os.makedirs(os.path.dirname(audio_path), exist_ok=True)
shutil.copy(audio_dl, audio_path)
except Exception as dl_err:
print(f"โ ๏ธ Could not recover {audio_filename}: {dl_err}")
except Exception as e:
print("โน๏ธ No remote pending queue found. Starting fresh.")
def get_pending_label(self):
if os.path.exists(self.PENDING_FILE):
count = len(pd.read_csv(self.PENDING_FILE))
if count > 0:
return f"๐ฎ Pending ({count})", count
return "๐ฎ Pending", 0
def monitor_pending_state(self):
label, count = self.get_pending_label()
if count > self.last_pending_count and self.alert_sound:
sound = gr.update(value=self.alert_sound, autoplay=True)
else:
sound = gr.skip()
self.last_pending_count = count
return f"### {label} - Review Submissions from React Games", sound
def admin_approve_pending(self, timestamp, orig_utt, edited_utt, edited_dialect, edited_clar, edited_tone, trimmed_audio_path):
try:
df = pd.read_csv(self.PENDING_FILE)
match = df[(df["Timestamp"] == timestamp) & (df["Utterance"] == orig_utt)]
if len(match) == 0:
return "โ Approval failed: Entry not found in pending database."
index_in_csv = match.index[0]
row = df.loc[index_in_csv]
# ๐ข Use the edited text instead of the original row data
final_utt = edited_utt if edited_utt else row["Utterance"]
final_dialect = edited_dialect if edited_dialect else row["Dialect"]
final_clar = edited_clar if edited_clar else row["Clarification"]
final_tone = edited_tone if edited_tone else row["Tone"]
final_audio = trimmed_audio_path if isinstance(trimmed_audio_path, str) and os.path.exists(trimmed_audio_path) else row.get("Audio", "")
self.trust.update_dataset_csv(
final_dialect, final_utt, final_clar, final_tone,
row.get("Context", ""), "", final_audio, row.get("Pragmatic_Analysis", ""),
row.get("Data_Origin", ""), "Admin Edit", row.get("User", "")
)
payload = {
"original": final_utt,
"dialect": final_dialect,
"clarification": final_clar,
"tone": final_tone,
"user": str(row.get("User", ""))
}
threading.Thread(target=self.trust.stamp_on_chain, args=(payload,), daemon=True).start()
df.drop(index_in_csv).to_csv(self.PENDING_FILE, index=False)
self.sync_pending_queue(direction="up")
audio_to_delete = row.get("Audio")
if audio_to_delete and str(audio_to_delete) != "nan":
try:
api = HfApi(token=os.environ.get("HF_TOKEN"))
api.delete_file(
path_in_repo=f"pending_audio/{os.path.basename(audio_to_delete)}",
repo_id="toecm/PureChain_Dataset",
repo_type="dataset",
commit_message="๐๏ธ Cleaned up processed pending audio"
)
except Exception as e:
pass
return f"โ
Approved & Minted: {final_utt[:20]}..."
except Exception as e:
return f"โ Approval failed: {e}"
def admin_reject_pending(self, timestamp, orig_utt):
try:
df = pd.read_csv(self.PENDING_FILE)
match = df[(df["Timestamp"] == timestamp) & (df["Utterance"] == orig_utt)]
if len(match) == 0:
return "โ Rejection failed: Entry not found."
index_in_csv = match.index[0]
row = df.loc[index_in_csv]
df.drop(index_in_csv).to_csv(self.PENDING_FILE, index=False)
self.sync_pending_queue(direction="up")
audio_to_delete = row.get("Audio")
if audio_to_delete and str(audio_to_delete) != "nan":
try:
api = HfApi(token=os.environ.get("HF_TOKEN"))
api.delete_file(
path_in_repo=f"pending_audio/{os.path.basename(audio_to_delete)}",
repo_id="toecm/PureChain_Dataset",
repo_type="dataset",
commit_message="๐๏ธ Cleaned up rejected pending audio"
)
except Exception as e:
pass
return "๐๏ธ Entry Rejected & Audio Cleaned."
except Exception as e:
return f"โ Rejection failed: {e}"
def admin_clear_all_pending(self):
try:
if os.path.exists(self.PENDING_FILE):
os.remove(self.PENDING_FILE)
self.sync_pending_queue(direction="up")
return "๐งน All pending entries swept!"
except Exception as e: return f"โ Clear failed: {e}"
def export_analysis_to_csv(self, df):
if df is None or not hasattr(df, 'columns') or df.empty:
return None
path = "/app/sociolinguistic_export.csv"
df.to_csv(path, index=False, encoding='utf-8-sig')
return path
def api_generate_training_data(self):
try:
prepare_training_data.main()
return f"โ
Success! 'train.csv' created."
except Exception as e:
return f"โ Error generating data: {e}"
def api_get_full_dataset_zip(self):
try:
shutil.make_archive("/app/dataset_export", 'zip', self.brain.config.DATASET_DIR)
return "/app/dataset_export.zip"
except Exception as e:
return f"Error zipping: {e}"
def auto_regenerate_analysis(self, text, clar, tone, ctx, prag, new_dialect):
show_new = (new_dialect == "+ Add New Dialect")
# Guard: don't fire heavy AI calls if there is no utterance text (e.g. tab-switching with empty field)
if not text or not text.strip() or show_new or not new_dialect:
return clar, tone, ctx, prag, gr.update(visible=show_new)
print(f"๐ UI Trigger: Re-analyzing '{text}' for dialect: {new_dialect}")
try:
if hasattr(self.brain, 'analyze_dialect_single'):
res = self.brain.analyze_dialect_single(text, new_dialect)
return (
res.get("clarification", clar),
res.get("tone", tone),
res.get("context", ctx),
res.get("pragmatics", prag),
gr.update(visible=show_new)
)
except Exception as e:
print(f"Auto-regenerate error: {e}")
return clar, tone, ctx, prag, gr.update(visible=show_new)
def lab_analyze_and_mint(self, text, dialect, force_ai, userKey, request: gr.Request):
status_log = f"๐ LAB PIPELINE TRIGGERED for '{text}'\n"
final_result = None
if not text or not dialect:
return {"error": "Missing input"}, "โ ๏ธ Please provide text and select a dialect."
if not force_ai:
status_log += "๐๏ธ Checking Local Dataset...\n"
final_result = self.brain.search_local_dataset(text)
if final_result:
status_log += "โ
Found in Local Dataset.\n"
else:
status_log += "๐ญ Checking Persona Context...\n"
final_result = self.brain.search_personas(text)
else:
status_log += "๐ Force AI Enabled: Bypassing local lookups...\n"
if not final_result:
status_log += "๐ง Generating AI interpretation...\n"
if hasattr(self.brain, 'analyze_dialect_single'):
final_result = self.brain.analyze_dialect_single(text, dialect)
status_log += "โ
AI Engine Analysis Complete.\n"
else:
return {"error": "Missing AI function"}, status_log + "โ Error."
clarification = final_result.get("clarification", "")
tone = final_result.get("tone", "Neutral")
context = final_result.get("context", "")
pragmatics = final_result.get("pragmatics", "")
client_ip = request.headers.get("x-forwarded-for") or request.client.host if request else "Unknown_IP"
source_tag = f"Lab_Admin_{client_ip}"
success = self.trust.update_dataset_csv(
dialect=dialect, utterance=text, clarification=clarification,
tone=tone, context=context, syntax="", audio_path=None,
pragmatics=pragmatics, sourceTag=source_tag,
clar_source=final_result.get("Source", "AI"), userKey=userKey
)
if success:
status_log += "\n๐ SUCCESS: Entry saved to CSV and synced to HF Cloud!"
payload = {
"original": text,
"dialect": dialect,
"clarification": clarification,
"tone": tone,
"user": userKey
}
threading.Thread(target=self.trust.stamp_on_chain, args=(payload,), daemon=True).start()
status_log += "\nโ๏ธ PureChain minting triggered in background."
else:
status_log += "\nโ ๏ธ ERROR: Database save failed."
return final_result, status_log
# ==========================================
# THE RESEARCH UI
# ==========================================
def create_ui(self):
def generate_admin_op_id():
return "0x" + uuid.uuid4().hex + uuid.uuid4().hex[:8]
custom_css = """
.gradio-container { max-width: 95% !important; }
table { width: 100% !important; table-layout: auto !important; }
td { white-space: normal !important; word-wrap: break-word !important; }
"""
existing_dialects = []
if hasattr(self.brain, 'config') and os.path.exists(self.brain.config.DATASET_DIR):
found = [os.path.basename(f).replace(".csv", "") for f in glob.glob(os.path.join(self.brain.config.DATASET_DIR, "*.csv"))]
if found: existing_dialects = found
dropdown_choices = sorted(list(set(existing_dialects))) + ["+ Add New Dialect"]
available_profiles = self.brain.get_available_profiles() if hasattr(self.brain, 'get_available_profiles') else []
backup_files = []
if hasattr(self.brain, 'config'):
if os.path.exists(self.brain.config.DATASET_DIR):
backup_files.extend([os.path.basename(f) for f in glob.glob(os.path.join(self.brain.config.DATASET_DIR, "*.csv"))])
if os.path.exists(self.brain.config.PROFILES_DIR):
backup_files.extend([os.path.basename(f) for f in glob.glob(os.path.join(self.brain.config.PROFILES_DIR, "*.json"))])
backup_files = sorted(list(set(backup_files))) if backup_files else ["No files found"]
with gr.Blocks() as ui:
gr.Markdown("## ๐ PureVersation: Decentralized Dialect Mediator (Lab View)")
ui_source_tag = gr.Textbox(visible=False, value="Web")
ui_clar_source = gr.Textbox(visible=False, value="Lab Admin")
ui_operator_id = gr.Textbox(visible=False, value=generate_admin_op_id)
api_audio_blob = gr.Audio(visible=False, type="filepath")
api_confirm = gr.State(False)
with gr.Tabs():
with gr.Tab("๐๏ธ Live Field Analysis"):
health_display = gr.HTML(self.get_blockchain_health())
with gr.Row():
with gr.Column(scale=1):
audio_in = gr.Audio(label="Step 1: Speak/Upload", sources=["microphone", "upload"], type="filepath")
lang_sel = gr.Dropdown(["en", "yo", "ig", "ko", "ha"], value="en", label="Language Context")
btn_run = gr.Button("Analyze Audio ๐", variant="primary")
quota_box = gr.Textbox(label="๐ API Quota", value=self.get_quota_status(), interactive=False)
with gr.Row(variant="compact"):
background_status_display = gr.Textbox(label="Status", value="Checking...", interactive=False, show_label=False)
with gr.Column(scale=5):
log_box = gr.Textbox(label="Linguistic Analysis Log", interactive=False)
gr.Markdown("### ๐ฅ AI Interpretation Baseline")
results_table = gr.Dataframe(
headers=["Source", "Speaker", "Utterance", "Dialect", "Clarification", "Tone", "Context", "Pragmatic Analysis"],
interactive=True,
wrap=False,
row_count=(1, "dynamic")
)
with gr.Row():
export_btn = gr.Button("๐ฅ Download Analysis CSV", variant="secondary")
export_file = gr.File(label="Export Result", visible=False)
gr.Markdown("### โ๏ธ Active Sociolinguistic Feedback Loop (Edit & Approve)")
with gr.Row():
with gr.Column(scale=1):
orig_text = gr.Textbox(visible=True, label="Utterance (Transcribed)")
dialect_sel = gr.Dropdown(choices=dropdown_choices, label="Assigned Dialect", interactive=True, allow_custom_value=True)
new_dialect = gr.Textbox(label="Enter New Dialect Name", visible=False, interactive=True)
with gr.Column():
clar_text = gr.Textbox(label="Final Clarification", interactive=True, lines=2)
tone_sel = gr.Dropdown(choices=self.TONES, value="Neutral / Conversational", label="Pragmatic Tone", interactive=True, allow_custom_value=True)
ctx_area = gr.TextArea(label="Linguistic Context", interactive=True, lines=1)
prag_area = gr.TextArea(label="Pragmatic Analysis ([Force], [Deixis], [Register])", interactive=True, lines=1)
with gr.Row():
btn_save = gr.Button("๐พ Validate & Save", variant="primary")
btn_over = gr.Button("โ ๏ธ Confirm Overwrite", variant="stop", visible=False)
feedback_msg = gr.Markdown()
gr.Markdown("### ๐ฅ PhD Data Export & Training")
with gr.Row():
export_data_btn = gr.Button("๐ฆ Generate Full Dataset ZIP", variant="secondary")
train_btn = gr.Button("๐ง Generate AutoTrain CSV", variant="primary")
export_zip_file = gr.File(label="Download")
train_status = gr.Textbox(label="Training Data Status", lines=1)
with gr.Tab("๐งช THE LAB (Force AI)"):
gr.Markdown("### ๐ฌ Test text inputs directly and force AI generation")
with gr.Row():
with gr.Column():
lab_input = gr.Textbox(label="Test Phrase (Text Only)")
lab_dialect = gr.Dropdown(choices=dropdown_choices, label="Target Dialect")
force_ai_toggle = gr.Checkbox(label="Force Live AI (Skip Local Cache)", value=False)
lab_user_key = gr.Textbox(label="Admin User Key", value="Admin_001")
lab_btn = gr.Button("RUN ANALYSIS & MINT", variant="primary")
with gr.Column():
lab_output = gr.JSON(label="Analysis Result")
lab_log = gr.Textbox(label="System Logs", lines=10)
lab_btn.click(
fn=self.lab_analyze_and_mint,
inputs=[lab_input, lab_dialect, force_ai_toggle, lab_user_key],
outputs=[lab_output, lab_log]
)
with gr.Tab("โ๏ธ Persona Management"):
with gr.Row():
load_all_btn = gr.Button("๐ Load ALL Profiles", variant="secondary")
profile_selector = gr.Dropdown(choices=available_profiles, label="Select Profile", allow_custom_value=True)
profile_filename = gr.Textbox(label="Filename")
load_profile_btn = gr.Button("๐ฅ Load Selected Profile", variant="primary") # ๐ข NEW
profile_editor = gr.Textbox(label="Profile Content (JSON)", lines=20) # ๐ข CHANGED TO TEXTBOX
save_profile_btn = gr.Button("๐พ Save Profile modifications", variant="primary")
profile_status = gr.Textbox(label="System Response", interactive=False)
def change_profile(val):
if not val: return "", ""
return json.dumps(self.brain.load_profile_by_name(val), indent=2), val
def save_and_refresh_profile(filename, content):
msg = self.brain.save_specific_profile(filename, content)
return msg, gr.update(choices=self.brain.get_available_profiles(), value=filename)
# ๐ข Changed to click instead of auto-change
load_profile_btn.click(change_profile, inputs=[profile_selector], outputs=[profile_editor, profile_filename])
save_profile_btn.click(save_and_refresh_profile, inputs=[profile_filename, profile_editor], outputs=[profile_status, profile_selector])
if hasattr(self.brain, 'load_all_profiles_simultaneously'):
load_all_btn.click(lambda: (self.brain.load_all_profiles_simultaneously(), self.brain.get_current_profile_text()), outputs=[profile_status, profile_editor])
with gr.Tab("๐ฎ Pending Audit/Approvals"):
pending_header = gr.Markdown("### ๐ฎ Pending (0) - Review Submissions from React Games")
with gr.Row():
with gr.Column(scale=3):
filter_dialect = gr.Dropdown(choices=["All"] + dropdown_choices, value="All", label="Filter by Dialect")
pending_df = gr.Dataframe(headers=["User", "Data_Origin", "Utterance", "Dialect", "Clarification", "Tone", "Audio", "Timestamp"], interactive=False, wrap=False, row_count=(1, "dynamic"))
with gr.Column(scale=1):
gr.Markdown("#### ๐ง Audio Auditor")
btn_refresh = gr.Button("๐ Refresh List")
pending_audio_player = gr.Audio(label="Trim or Preview Audio", type="filepath", interactive=True)
audit_log = gr.Textbox(label="Audit Status", interactive=False)
# ๐ข NEW: Editable Textboxes for Admin Corrections
gr.Markdown("#### โ๏ธ Edit Selected Entry Before Minting")
with gr.Row():
pending_timestamp = gr.Textbox(label="Timestamp ID", interactive=False)
pending_orig_utt = gr.Textbox(visible=False)
edit_utt = gr.Textbox(label="Utterance", interactive=True)
edit_dialect = gr.Textbox(label="Dialect / Language", interactive=True)
edit_clar = gr.Textbox(label="Clarification / Meaning", interactive=True)
edit_tone = gr.Textbox(label="Tone", interactive=True)
with gr.Row():
btn_appr_p = gr.Button("โ
Approve & Mint (With Edits)", variant="primary")
btn_rejt_p = gr.Button("๐๏ธ Reject entry", variant="stop")
btn_clear_pending = gr.Button("Sweep All Pending", variant="secondary")
with gr.Tab("โ๏ธ PureChain History"):
gr.Markdown("### ๐ Immutable Transaction Log & Audit Reports")
with gr.Row():
start_date = gr.DateTime(label="Start Date", type="string")
end_date = gr.DateTime(label="End Date", type="string")
btn_filter = gr.Button("๐ Filter & Refresh", variant="primary")
gr.HTML("๐ Open PureChain Explorer")
history_df = gr.Dataframe(
headers=["Timestamp", "Utterance", "Dialect", "Data_Origin", "Block", "TX Hash"],
interactive=False,
wrap=False,
row_count=(5, "dynamic")
)
with gr.Row():
export_report_btn = gr.Button("๐ฅ Generate CSV Report", variant="secondary")
report_file = gr.File(label="Download Audit Report")
explorer_link = gr.Markdown("Select a row to generate Explorer Link")
def run_filter(s, e):
df = self.trust.get_filtered_history(s, e)
display_cols = ["Timestamp", "Utterance", "Dialect", "Data_Origin", "Block", "TX Hash"]
available = [c for c in display_cols if c in df.columns]
return df[available]
def generate_report(s, e):
df = self.trust.get_filtered_history(s, e)
report_path = "/app/purechain_audit_report.csv"
export_cols = ["Timestamp", "Utterance", "Dialect", "Clarification", "Data_Origin", "Block", "TX Hash"]
available_cols = [c for c in export_cols if c in df.columns]
df[available_cols].to_csv(report_path, index=False, encoding='utf-8-sig')
return report_path
def make_explorer_link(evt: gr.SelectData, df):
try:
tx_hash = df.iloc[evt.index[0]]["TX Hash"]
return f"๐ **[View Transaction on Explorer](http://3.34.161.207:3000/tx/{tx_hash})**"
except: return "Select a valid row"
btn_filter.click(run_filter, [start_date, end_date], [history_df])
export_report_btn.click(generate_report, inputs=[start_date, end_date], outputs=[report_file]).then(run_filter, [start_date, end_date], [history_df])
history_df.select(make_explorer_link, [history_df], [explorer_link])
with gr.Tab("๐พ System Backups"):
with gr.Row():
backup_target = gr.Dropdown(choices=backup_files, label="Select File")
backup_desc = gr.Textbox(label="Backup Note", value="Routine check")
backup_btn = gr.Button("๐ Create Immutable Backup", variant="primary")
recover_btn = gr.Button("๐ Recover Data from Blockchain", variant="secondary")
backup_log = gr.Textbox(label="Backup Status", interactive=False)
gr.Markdown("---")
with gr.Row():
bytecode_input = gr.Textbox(label="Paste Contract Bytecode", lines=3)
deploy_btn = gr.Button("๐ Force Deploy (Zero Gas)", variant="stop")
deployment_output = gr.Textbox(label="Deployment Result", interactive=False)
# ๐ข FIX: Connect the Deploy Button to the Trust Agent
deploy_btn.click(
fn=self.trust.force_deploy_contract,
inputs=[bytecode_input],
outputs=[deployment_output]
)
# --- TAB 6: BUG REPORTS & FEEDBACK ---
with gr.Tab("๐ Bug Reports & Feedback"):
gr.Markdown("### ๐ก๏ธ Secure System Feedback Log")
with gr.Row():
with gr.Column(scale=3):
btn_refresh_fb = gr.Button("๐ Refresh Feedback List", variant="secondary")
feedback_df = gr.Dataframe(
headers=["Timestamp", "Operator_ID", "Feedback_Text", "Image_Reference"],
interactive=False,
wrap=False,
row_count=(5, "dynamic")
)
with gr.Column(scale=1):
gr.Markdown("#### ๐ธ Attached Screenshot")
feedback_image = gr.Image(label="Click a row to view screenshot", interactive=False)
# ==========================================
# EVENT BINDINGS
# ==========================================
export_data_btn.click(self.api_get_full_dataset_zip, outputs=[export_zip_file])
train_btn.click(self.api_generate_training_data, outputs=[train_status])
btn_run.click(
self.automated_pipeline,
[audio_in, lang_sel],
[results_table, log_box, quota_box, orig_text, dialect_sel, clar_text, tone_sel, ctx_area, prag_area]
)
audio_in.stop_recording(
self.automated_pipeline,
[audio_in, lang_sel],
[results_table, log_box, quota_box, orig_text, dialect_sel, clar_text, tone_sel, ctx_area, prag_area]
)
audio_in.upload(
self.automated_pipeline,
[audio_in, lang_sel],
[results_table, log_box, quota_box, orig_text, dialect_sel, clar_text, tone_sel, ctx_area, prag_area]
)
def handle_selection(evt: gr.SelectData, df):
if df is None or not hasattr(df, 'columns') or len(df) == 0:
return "", "", "", "Neutral / Conversational", "", ""
try:
row = df.iloc[evt.index[0]]
d = row["Dialect"] if row["Dialect"] in dropdown_choices else None
return row["Utterance"], d, row["Clarification"], row["Tone"], row.get("Context", ""), row.get("Pragmatic Analysis", "")
except: return "", "", "", "Neutral / Conversational", "", ""
results_table.select(handle_selection, [results_table], [orig_text, dialect_sel, clar_text, tone_sel, ctx_area, prag_area])
export_btn.click(self.export_analysis_to_csv, [results_table], [export_file]).then(lambda: gr.update(visible=True), None, [export_file])
btn_save.click(
fn=self.check_and_submit_logic,
inputs=[
orig_text, dialect_sel, new_dialect, clar_text, tone_sel, ctx_area, prag_area,
ui_source_tag, # 8. sourceTag ("Web")
ui_clar_source, # 9. clar_source ("Lab Admin")
ui_operator_id, # 10. userKey (Generated ID)
audio_in # 11. blob
],
outputs=[feedback_msg, btn_over]
)
btn_over.click(
fn=self.force_overwrite_logic,
inputs=[
orig_text, dialect_sel, new_dialect, clar_text, tone_sel, ctx_area, prag_area,
ui_source_tag, # 8. sourceTag
ui_clar_source, # 9. clar_source
ui_operator_id, # 10. userKey
audio_in # 11. blob
],
outputs=[feedback_msg, btn_over]
)
# Audit / Pending
def select_pending_row(evt: gr.SelectData, df):
try:
idx = evt.index[0]
row = df.iloc[idx]
audio_path = row.get("Audio")
timestamp = str(row.get("Timestamp", ""))
utt = str(row.get("Utterance", ""))
clar = str(row.get("Clarification", ""))
tone = str(row.get("Tone", ""))
dialect = str(row.get("Dialect", ""))
return audio_path, timestamp, utt, utt, dialect, clar, tone
except:
return None, "", "", "", "", "", ""
filter_dialect.change(self.get_pending_dataframe, inputs=[filter_dialect], outputs=[pending_df])
btn_refresh.click(self.get_pending_dataframe, inputs=[filter_dialect], outputs=[pending_df])
# ๐ข FIX: Fill the textboxes when a row is clicked
pending_df.select(select_pending_row, [pending_df], [pending_audio_player, pending_timestamp, pending_orig_utt, edit_utt, edit_dialect, edit_clar, edit_tone])
# ๐ข FIX: Pass the edited textboxes to the approval function, then auto-refresh the table
btn_appr_p.click(self.admin_approve_pending, inputs=[pending_timestamp, pending_orig_utt, edit_utt, edit_dialect, edit_clar, edit_tone, pending_audio_player], outputs=[audit_log]).then(self.get_pending_dataframe, inputs=[filter_dialect], outputs=[pending_df])
btn_rejt_p.click(self.admin_reject_pending, inputs=[pending_timestamp, pending_orig_utt], outputs=[audit_log]).then(self.get_pending_dataframe, inputs=[filter_dialect], outputs=[pending_df])
btn_clear_pending.click(self.admin_clear_all_pending, outputs=[audit_log]).then(self.get_pending_dataframe, inputs=[filter_dialect], outputs=[pending_df])
def trigger_recovery():
try:
import recover_chain # Left local specifically to prevent circular dependency at startup
result_message = recover_chain.main()
return result_message
except Exception as e:
return f"โ Recovery Error: {e}"
recover_btn.click(trigger_recovery, inputs=[], outputs=[backup_log])
# --- Feedback Tab Events ---
def show_feedback_image(evt: gr.SelectData, df):
try:
img_path = str(df.iloc[evt.index[0]].get("Image_Reference", ""))
if img_path == "nan" or not img_path:
return None
if not os.path.exists(img_path) and os.environ.get("HF_TOKEN"):
img_name = os.path.basename(img_path)
try:
dl_img = hf_hub_download(
repo_id="toecm/PureChain_Dataset",
filename=f"feedback_images/{img_name}",
repo_type="dataset",
token=os.environ.get("HF_TOKEN")
)
os.makedirs(os.path.dirname(img_path), exist_ok=True)
shutil.copy(dl_img, img_path)
except:
pass
return img_path if os.path.exists(img_path) else None
except:
return None
btn_refresh_fb.click(self.get_feedback_dataframe, outputs=[feedback_df])
feedback_df.select(show_feedback_image, [feedback_df], [feedback_image])
# โฌ๏ธ COMPREHENSIVE API BRIDGE FOR REACT FRONTEND โฌ๏ธ
gr.Markdown("---")
gr.Markdown("### ๐ก API Gateway (Headless endpoints for React)")
with gr.Row(visible=False):
api_sync_out = gr.Textbox()
api_btn_sync = gr.Button()
api_btn_sync.click(fn=check_cloud_sync_status, inputs=[], outputs=[api_sync_out], api_name="check_cloud_sync")
# ๐ข FIX: Added an invisible input box to satisfy the JS client routing
api_btn_dialects = gr.Button()
api_dialects_out = gr.Textbox()
api_btn_dialects.click(fn=self.api_get_dialects, inputs=[], outputs=[api_dialects_out], api_name="api_get_dialects")
api_btn_mission = gr.Button()
api_topic_in = gr.Textbox()
api_mission_out = gr.Textbox()
api_btn_mission.click(fn=self.api_generate_mission, inputs=[api_topic_in], outputs=[api_mission_out], api_name="generate_mission")
api_btn_transcribe = gr.Button()
api_audio_in = gr.File()
api_dialect_in = gr.Textbox()
api_transcribe_out = gr.Textbox()
api_btn_transcribe.click(fn=self.api_transcribe, inputs=[api_audio_in, api_dialect_in], outputs=[api_transcribe_out], api_name="transcribe_check")
api_btn_clarify = gr.Button()
api_text_in = gr.Textbox()
api_clarify_out = gr.Textbox()
api_btn_clarify.click(fn=self.api_clarify, inputs=[api_text_in, api_dialect_in], outputs=[api_clarify_out], api_name="generate_clarifications")
api_btn_submit = gr.Button()
api_custom_d = gr.Textbox()
api_tone = gr.Textbox()
api_context = gr.Textbox()
api_pragmatics = gr.Textbox()
api_source_tag = gr.Textbox(visible=False, value="Web")
api_clar_source = gr.Textbox(visible=False, value="AI")
api_user_key = gr.Textbox(visible=False, value="")
api_confirm = gr.State(False)
# ๐ข NEW: Peer-to-Peer Translation Endpoint
api_btn_translate = gr.Button()
api_translate_text_in = gr.Textbox()
api_translate_source_in = gr.Textbox()
api_translate_target_in = gr.Textbox()
api_translate_out = gr.Textbox()
api_btn_translate.click(
fn=self.api_translate_peer,
inputs=[api_translate_text_in, api_translate_source_in, api_translate_target_in],
outputs=[api_translate_out],
api_name="translate_peer"
)
# ๐ข NEW: Dialect Relay Endpoints
api_btn_join = gr.Button()
api_q_op = gr.Textbox()
api_q_dialect = gr.Textbox()
api_q_target = gr.Textbox()
api_q_out = gr.Textbox()
api_btn_join.click(fn=self.api_join_queue, inputs=[api_q_op, api_q_dialect, api_q_target], outputs=[api_q_out], api_name="join_queue")
api_btn_lobby = gr.Button()
api_btn_lobby.click(fn=self.api_get_lobby, inputs=[], outputs=[api_q_out], api_name="get_lobby")
api_btn_check = gr.Button()
api_c_out = gr.Textbox()
api_btn_check.click(fn=self.api_check_match, inputs=[api_q_op], outputs=[api_c_out], api_name="check_match")
api_btn_leave = gr.Button()
api_btn_leave.click(fn=self.api_leave_queue, inputs=[api_q_op], outputs=[api_c_out], api_name="leave_queue")
api_btn_relay_send = gr.Button()
api_relay_room = gr.Textbox()
api_relay_text = gr.Textbox()
api_relay_target = gr.Textbox()
api_relay_meaning = gr.Textbox()
api_relay_out = gr.Textbox()
api_btn_relay_send.click(fn=self.api_remote_eval_and_send, inputs=[api_relay_room, api_q_op, api_relay_text, api_q_dialect, api_relay_target, api_relay_meaning], outputs=[api_relay_out], api_name="relay_send")
api_btn_relay_poll = gr.Button()
api_poll_idx = gr.Number()
api_btn_relay_poll.click(fn=self.api_remote_poll, inputs=[api_relay_room, api_poll_idx], outputs=[api_c_out], api_name="relay_poll")
# 7. Secure Feedback Endpoint
api_fb_btn = gr.Button()
api_fb_op = gr.Textbox()
api_fb_text = gr.Textbox()
api_fb_img = gr.File()
api_fb_out = gr.Textbox()
api_fb_btn.click(
fn=self.handle_feedback_submission,
inputs=[api_fb_op, api_fb_text, api_fb_img],
outputs=[api_fb_out],
api_name="submit_feedback"
)
api_btn_submit.click(
fn=self.check_and_submit_logic,
inputs=[
api_text_in, # 1. transcribed
api_dialect_in, # 2. dialect
api_custom_d, # 3. customD
api_clarify_out, # 4. clarification
api_tone, # 5. tone
api_context, # 6. context
api_pragmatics, # 7. pragmatics
api_source_tag, # 8. sourceTag
api_clar_source, # 9. clar_source
api_user_key, # 10. userKey
api_audio_in, # 11. blob (audio)
api_confirm # 12. confirm
],
outputs=[feedback_msg, btn_over],
api_name="check_and_submit_logic"
)
# โฌ๏ธ END OF API BRIDGE โฌ๏ธ
return ui