akaburia's picture
Update app.py
005ad78 verified
import pandas as pd
import gradio as gr
import os
import io
import json
import gspread
from huggingface_hub import HfApi, hf_hub_download
import torch
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import re
from captum.attr import LayerIntegratedGradients, TokenReferenceBase
from captum.attr import visualization as viz
from huggingface_hub import InferenceClient
from datetime import datetime
import uuid
# HF_TOKEN = os.environ.get("HF_TOKEN", f"{HF}_{token}")
HF_TOKEN = os.environ.get("HF_TOKEN")
HF_DATASET_REPO = "akaburia/policy-evaluations"
HF_CSV_FILE = "policy_coherence_annotations.csv"
HF_USERS_FILE = "user_profiles.csv"
HF_CHAT_LOG_FILE = "chatbot_logs.csv"
# IMPORT GOOGLE CLOUD TRANSLATE
try:
from google.cloud import translate_v2 as translate
except ImportError:
raise ImportError("Please install the translation library by running: pip install google-cloud-translate")
try:
from zoneinfo import ZoneInfo
except ImportError:
import pytz # Fallback if zoneinfo is missing
# --- COMPREHENSIVE LOGGING ---
LOG_FILE = "logs.txt"
def write_log(action_type, details):
"""Appends a timestamped log entry to logs.txt"""
try:
try:
tz = ZoneInfo("Africa/Nairobi")
except:
import pytz
tz = pytz.timezone("Africa/Nairobi")
timestamp = datetime.now(tz).strftime('%Y-%m-%d %H:%M:%S')
log_entry = f"[{timestamp}] [{action_type}] {details}\n"
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(log_entry)
print(log_entry.strip()) # Also print to console for debugging
except Exception as e:
print(f"Logging failed: {e}")
# --- CACHING HELPERS ---
DRAFT_FILE = "user_drafts.json"
def load_drafts():
if os.path.exists(DRAFT_FILE):
try:
with open(DRAFT_FILE, 'r') as f:
return json.load(f)
except:
return {}
return {}
def update_cache_row(user, session_id, dom_a, pol_a, dom_b, pol_b, tar_col, ctx_col, a_list, idx, b_text, rel, inter, just):
"""Fires automatically on keystrokes/clicks to save progress and workspace state"""
if not user or not a_list or idx >= len(a_list) or not b_text: return
curr_a = a_list[idx]
drafts = load_drafts()
# Upgraded structure to hold workspace settings AND row data
if user not in drafts: drafts[user] = {"workspace": {}, "rows": {}}
# Save the active workspace so we can restore it on reload
drafts[user]["workspace"] = {
"session_id": session_id,
"dom_a": dom_a, "pol_a": pol_a,
"dom_b": dom_b, "pol_b": pol_b,
"tar_col": tar_col, "ctx_col": ctx_col
}
cache_key = f"{pol_a}|{pol_b}|{curr_a}"
if cache_key not in drafts[user]["rows"]: drafts[user]["rows"][cache_key] = {}
# Store the exact state of this specific row with the unique session tag
drafts[user]["rows"][cache_key][b_text] = {
"rel": rel, "inter": inter, "just": just, "session_id": session_id
}
write_log("CACHE_UPDATE", f"User {user} auto-saved draft for row index {idx}.")
with open(DRAFT_FILE, 'w') as f:
json.dump(drafts, f)
# ==========================================
# 0. MODEL PRELOADING & INFERENCE MATH
# ==========================================
print("Loading Inference Model into Memory...")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
tokenizer = AutoTokenizer.from_pretrained("akaburia/policy-evaluations")
model = AutoModelForSequenceClassification.from_pretrained("akaburia/policy-evaluations").to(device)
id_to_label = {0: "neutral", 1: "coherent", 2: "incoherent"}
def custom_forward(input_ids, attention_mask):
inputs_embeds = model.roberta.embeddings.word_embeddings(input_ids)
return model(inputs_embeds=inputs_embeds, attention_mask=attention_mask).logits
# Explainability
lig = LayerIntegratedGradients(custom_forward, model.roberta.embeddings.word_embeddings)
llm_client = InferenceClient("Qwen/Qwen3-8B", token=HF_TOKEN)
def generate_row_explanation(a_list, idx, text_b, lang):
if not a_list or idx >= len(a_list) or not text_b:
return "", "", "", ""
policy_a = clean_policy_text(a_list[idx])
policy_b = clean_policy_text(text_b)
# 1. Run Captum Explainer
model.zero_grad()
inputs = tokenizer(policy_a, policy_b, return_tensors="pt", truncation=True, max_length=256)
input_ids = inputs["input_ids"].to(device)
attention_mask = inputs["attention_mask"].to(device)
ref_token_id = tokenizer.pad_token_id
special_token_mask = [1 if id in tokenizer.all_special_ids else 0 for id in input_ids[0].tolist()]
baseline_ids = torch.tensor([[id if is_special else ref_token_id for id, is_special in zip(input_ids[0].tolist(), special_token_mask)]]).to(device)
with torch.no_grad():
logits = model(input_ids=input_ids, attention_mask=attention_mask).logits
predicted_class_idx = torch.argmax(logits, dim=1).item()
prediction = id_to_label[predicted_class_idx]
attributions, _ = lig.attribute(
inputs=input_ids, baselines=baseline_ids,
additional_forward_args=(attention_mask,),
target=predicted_class_idx, return_convergence_delta=True
)
attributions = attributions.sum(dim=-1).squeeze(0)
attributions = attributions / torch.norm(attributions)
attributions = attributions.cpu().detach().numpy()
tokens = tokenizer.convert_ids_to_tokens(input_ids[0])
ig_dict = {t.replace('Ġ', '').strip(): float(s) for t, s in zip(tokens, attributions) if t.replace('Ġ', '').strip()}
ig_json_str = json.dumps(ig_dict)
score_list = [f"'{k}': {v:.3f}" for k, v in ig_dict.items()]
formatted_scores = ", ".join(score_list)
# 2. Call Qwen LLM
prompt = f"""You are an expert AI auditor interpreting an Explainable AI (XAI) output.
A sequence classification model evaluated two policies and predicted their relationship as: {prediction.upper()}
Policy A: "{policy_a}"
Policy B: "{policy_b}"
Token Scores: [{formatted_scores}]
Write a highly analytical, 2 to 3 sentence explanation of the model's reasoning. Explicitly quote the specific words that have the highest positive and highest negative scores. Do not hallucinate."""
try:
response = llm_client.chat_completion(messages=[{"role": "user", "content": prompt}], max_tokens=1500, temperature=0.1)
raw_output = response.choices[0].message.content.strip()
# 3. Format the <think> blocks and final output
match = re.search(r'<think>(.*?)</think>', raw_output, flags=re.DOTALL)
if match:
think_content = match.group(1).strip()
final_answer = raw_output.replace(match.group(0), '').strip()
# --- NEW: TRANSLATE EXPLANATION IF NEEDED ---
if lang != "English":
think_content = t_text(think_content, lang)
final_answer = t_text(final_answer, lang)
html_out = f"""<details style="margin-bottom: 12px; padding: 10px; background-color: #f3f4f6; border-radius: 6px; border: 1px solid #e5e7eb;"><summary style="cursor: pointer; font-weight: bold; color: #4b5563; outline: none;">🧠 Click to peek into the AI's thought process</summary><div style="margin-top: 10px; font-size: 0.9em; color: #6b7280; white-space: pre-wrap;">{think_content}</div></details>"""
return html_out, final_answer, raw_output, ig_json_str
if lang != "English":
raw_output = t_text(raw_output, lang)
return "", raw_output, raw_output, ig_json_str
except Exception as e:
err_msg = f"⚠️ Explainability Error: {str(e)}"
return "", t_text(err_msg, lang) if lang != "English" else err_msg, "", ""
def bucket_score(score):
"""Maps a continuous score [-1.0 to 1.0] to a 7-class drill down."""
if score >= 0.66:
return "+3 Indivisible", "coherent"
elif score >= 0.33:
return "+2 Reinforcing", "coherent"
elif score > 0.10:
return "+1 Enabling", "coherent"
elif score >= -0.10:
return "0 Consistent", "neutral"
elif score >= -0.33:
return "-1 Constraining", "incoherent"
elif score >= -0.66:
return "-2 Counteracting", "incoherent"
else:
return "-3 Cancelling", "incoherent"
def format_streaming_thoughts(text, is_streaming=True):
"""Safely formats <think> tags into HTML accordions even before the closing tag arrives."""
if "<think>" not in text:
return text
formatted = text.replace(
"<think>",
"<details open style='margin-bottom: 12px; padding: 10px; background-color: #f3f4f6; border-radius: 6px; border: 1px solid #e5e7eb;'>"
"<summary style='cursor: pointer; font-weight: bold; color: #4b5563; outline: none;'>🧠 AI is thinking...</summary>"
"<div style='margin-top: 10px; font-size: 0.9em; color: #6b7280; white-space: pre-wrap;'>"
)
if "</think>" in formatted:
# Close the accordion and change the title once thinking is done
formatted = formatted.replace("</think>", "</div></details>")
formatted = formatted.replace("🧠 AI is thinking...", "🧠 Click to peek into the AI's thought process")
formatted = formatted.replace("<details open", "<details")
elif is_streaming:
# Artificially close the div so the UI doesn't break while streaming mid-thought
formatted += "</div></details>"
return formatted
def log_chat_to_hf(user_tag, dom_a, pol_a, dom_b, pol_b, ctx_a, ctx_b, curr_a, user_query, ai_response):
"""Saves chatbot interactions to a dedicated CSV file in the HF Dataset."""
try:
try:
path = hf_hub_download(repo_id=HF_DATASET_REPO, filename=HF_CHAT_LOG_FILE, repo_type="dataset", token=HF_TOKEN)
chat_df = pd.read_csv(path)
except Exception:
chat_df = pd.DataFrame(columns=[
"Timestamp", "AnnotatorUsername", "Domain_A", "Policy_A",
"Domain_B", "Policy_B", "Context_A", "Context_B", "Target_A",
"User_Query", "AI_Response"
])
try:
tz = ZoneInfo("Africa/Nairobi")
except:
import pytz
tz = pytz.timezone("Africa/Nairobi")
current_time = datetime.now(tz).strftime('%Y-%m-%d %H:%M:%S')
new_row = {
"Timestamp": current_time, "AnnotatorUsername": user_tag,
"Domain_A": dom_a, "Policy_A": pol_a, "Domain_B": dom_b, "Policy_B": pol_b,
"Context_A": ctx_a, "Context_B": ctx_b, "Target_A": curr_a,
"User_Query": user_query, "AI_Response": ai_response
}
chat_df = pd.concat([chat_df, pd.DataFrame([new_row])], ignore_index=True)
csv_buffer = io.StringIO()
chat_df.to_csv(csv_buffer, index=False)
api = HfApi()
api.upload_file(
path_or_fileobj=io.BytesIO(csv_buffer.getvalue().encode('utf-8')),
path_in_repo=HF_CHAT_LOG_FILE, repo_id=HF_DATASET_REPO, token=HF_TOKEN, repo_type="dataset"
)
write_log("CHAT_SAVED", f"Logged chat for user {user_tag}.")
except Exception as e:
write_log("CHAT_SAVE_ERROR", f"Failed to log chat: {str(e)}")
def clean_policy_text(text):
if not text or not isinstance(text, str):
return ""
# 1. Remove leading numbering/bullets (e.g., "1. ", "ii. ", "a) ", "14. ")
# Matches optional spaces, numbers/letters/roman numerals, a dot or parenthesis, then spaces.
text = re.sub(r'^\s*(?:\d+|[a-zA-Z]|[ivxlcdmIVXLCDM]+)[\.\)]\s+', '', text)
# 2. Remove commas globally (as requested)
text = text.replace(',', '')
# 3. Remove trailing full stops, semicolons, and whitespace
text = text.rstrip('. ;')
# 4. Clean up any accidental double spaces
text = re.sub(r'\s+', ' ', text).strip()
return text
def get_model_predictions(text_a, b_texts):
if not text_a or not b_texts:
return []
updates = []
color_map = {"coherent": "#4CAF50", "neutral": "#9E9E9E", "incoherent": "#F44336"}
clean_a = clean_policy_text(text_a)
clean_b_list = [clean_policy_text(b) for b in b_texts]
# --- MASSIVE SPEED OPTIMIZATION: BATCH PROCESSING ---
# Feed the entire list of texts to the model at once instead of looping
inputs = tokenizer([clean_a] * len(clean_b_list), clean_b_list, return_tensors="pt", truncation=True, padding=True).to(device)
with torch.no_grad():
outputs = model(**inputs)
logits_batch = outputs.logits
probabilities_batch = F.softmax(logits_batch, dim=-1)
# Process the batched results
for idx, text_b in enumerate(b_texts):
probabilities = probabilities_batch[idx].squeeze()
# Extract raw probabilities
prob_dict = {}
results = []
for i, prob in enumerate(probabilities):
label = model.config.id2label.get(i, f"Class {i}").lower()
prob_val = prob.item()
prob_dict[label] = prob_val
results.append({"label": label, "prob": prob_val})
p_coherent = prob_dict.get("coherent", 0.0)
p_incoherent = prob_dict.get("incoherent", 0.0)
# 1. Find the model's absolute highest confidence class
top_raw_class = max(prob_dict, key=prob_dict.get)
# 2. If the model is mostly confident it's neutral, force it to neutral
if top_raw_class == "neutral":
drill_down_label = "0 Consistent"
coarse_label = "neutral"
else:
# 3. Otherwise, use the continuous math to figure out the drill-down intensity
continuous_score = p_coherent - p_incoherent
drill_down_label, coarse_label = bucket_score(continuous_score)
# Sort results for the UI bar chart
results = sorted(results, key=lambda x: x["prob"], reverse=True)
# Build HTML Horizontal Bars
html_parts = []
DISPLAY_LABEL_MAP = {
"coherent": "Supportive",
"neutral": "Non-interacting",
"incoherent": "Contradictory"
}
for r in results:
lbl = r["label"]
display_lbl = DISPLAY_LABEL_MAP.get(lbl, lbl)
pct = r["prob"] * 100
bg_color = color_map.get(lbl, "#333")
bar_html = f"""
<div style='margin-bottom: 6px;'>
<div style='display: flex; justify-content: space-between; font-size: 0.85em; margin-bottom: 2px; color: #555;'>
<span>{display_lbl}</span><span style='font-weight: 500;'>{pct:.1f}%</span>
</div>
<div style='width: 100%; background: #e0e0e0; height: 6px; border-radius: 3px; overflow: hidden;'>
<div style='width: {pct}%; background: {bg_color}; height: 100%; border-radius: 3px;'></div>
</div>
</div>
"""
html_parts.append(bar_html)
styled_conf = f"<div style='padding-top: 4px;'>{''.join(html_parts)}</div>"
# JSON string to log purely in the CSV
conf_json_str = json.dumps({k: round(v*100, 2) for k, v in prob_dict.items()})
drill_choices = DRILL_DOWN_MAP.get(coarse_label, [])
updates.append((
gr.update(value=coarse_label),
gr.update(value=styled_conf),
gr.update(choices=drill_choices, value=drill_down_label),
coarse_label,
drill_down_label,
conf_json_str
))
return updates
# ==========================================
# 1. AUTHENTICATION (GOOGLE SHEETS VIA SERVICE ACCOUNT)
# ==========================================
print("Authenticating with Google via Service Account...")
gcp_secret = os.environ.get("GCP_CREDENTIALS")
if not gcp_secret:
raise ValueError("GCP_CREDENTIALS secret not found. Please set it in Hugging Face Space Secrets.")
try:
creds_dict = json.loads(gcp_secret)
gc = gspread.service_account_from_dict(creds_dict)
translate_client = translate.Client.from_service_account_info(creds_dict)
except json.JSONDecodeError as e:
raise ValueError(f"Failed to parse GCP_CREDENTIALS JSON. Error: {e}")
spreadsheet = gc.open_by_key('12JM3u10WSpshCcSUEmjhRP5i2bWe9MAK_jrbI56WOCU')
def get_worksheet_by_number(spreadsheet, worksheet_number, format=True):
worksheet = spreadsheet.get_worksheet(worksheet_number)
rows = worksheet.get_all_values()
df = pd.DataFrame.from_records(rows[1:], columns=rows[0])
if format:
if worksheet_number == 4:
df = df.iloc[1:]
else:
df = df.iloc[2:]
df.columns = df.iloc[0].values
df = df.iloc[1:]
df.columns = [str(col).strip() for col in df.columns]
df = df.replace('', pd.NA)
if 'Sector' in df.columns:
df['Sector'] = df['Sector'].ffill()
else:
print(f"⚠️ Warning: 'Sector' column missing in worksheet {worksheet_number}. Found columns: {list(df.columns)}")
if 'Policy' in df.columns:
df['Policy'] = df['Policy'].ffill()
return df
print("Loading Data from Google Sheets...")
land_df = get_worksheet_by_number(spreadsheet, 3, format=True)
water_df = get_worksheet_by_number(spreadsheet, 5, format=True)
energy_df = get_worksheet_by_number(spreadsheet, 4, format=True)
DOMAIN_MAP = {"Land": land_df, "Water": water_df, "Energy": energy_df}
DOMAINS = list(DOMAIN_MAP.keys())
# --- EXPERTISE MAPPING ---
SECTOR_MAPPING = {
"Climate": "Climate",
"Water": "Water",
"Energy": "Energy",
"Land": "Land",
"Environment": "Land",
"Agriculture": "Land",
"Food": "Land",
}
SECTOR_CHOICES = list(SECTOR_MAPPING.keys())
# ==========================================
# 2. CONFIGURATION & TRANSLATION HELPERS
# ==========================================
HF_TOKEN = os.environ.get("HF_TOKEN")
HF_DATASET_REPO = "akaburia/policy-evaluations"
HF_CSV_FILE = "policy_coherence_annotations.csv"
HF_USERS_FILE = "user_profiles.csv"
AVAILABLE_COLUMNS = [
'Sector', 'Policy', 'General Vision', 'General policy objective',
'Strategic objectives / directions', 'Focus Area / Policy Action Category',
'Policy objectives (of the focus area)', 'Policy Actions and Measures (PAMs)',
'Policy Targets / Indicators'
]
DRILL_DOWN_MAP = {
"coherent": ["+3 Indivisible", "+2 Reinforcing", "+1 Enabling"],
"neutral": ["0 Consistent"],
"incoherent": ["-1 Constraining", "-2 Counteracting", "-3 Cancelling"]
}
MAX_ROWS = 2
LANG_CODES = {"English": "en", "French": "fr", "Portuguese": "pt", "Swahili": "sw"}
def t_text(text, target_lang_name):
code = LANG_CODES.get(target_lang_name, "en")
if code == "en" or not text:
return text
import html
result = translate_client.translate(text, target_language=code)
return html.unescape(result["translatedText"])
def t_batch(texts, target_lang_name):
code = LANG_CODES.get(target_lang_name, "en")
if code == "en" or not texts:
return texts
import html
results = translate_client.translate(texts, target_language=code)
return [html.unescape(res["translatedText"]) for res in results]
# ==========================================
# 3. STANDARD HELPERS
# ==========================================
def get_unique_items(df, policy_name, col_name):
if 'Policy' not in df.columns or col_name not in df.columns:
return []
if policy_name not in df['Policy'].values:
return []
items = df[df['Policy'] == policy_name][col_name].dropna().unique().tolist()
clean_items = []
for i in items:
val = str(i).strip()
if val and val.lower() not in ['missing', 'nan', 'n/a', 'none', 'null']:
clean_items.append(val)
return clean_items
def get_sector_for_policy(df, policy_name):
if 'Policy' not in df.columns or 'Sector' not in df.columns:
return "Unknown Sector"
if policy_name not in df['Policy'].values:
return "Unknown Sector"
return str(df[df['Policy'] == policy_name]['Sector'].iloc[0]).strip()
def get_policy_list(domain_key):
if not domain_key: return []
df = DOMAIN_MAP[domain_key]
if 'Policy' not in df.columns: return []
return [p for p in df['Policy'].unique() if pd.notna(p) and str(p).strip()]
def load_hf_dataset():
try:
path = hf_hub_download(repo_id=HF_DATASET_REPO, filename=HF_CSV_FILE, repo_type="dataset", token=HF_TOKEN)
return pd.read_csv(path)
except Exception as e:
# Added the 3 new Model tracking columns here
return pd.DataFrame(columns=[
"Domain_A", "Sector_A", "Policy_A_Name",
"Domain_B", "Sector_B", "Policy_B_Name",
"Target_Column", "Target_A_Row", "Target_B_Row",
"Context_Column", "Context_A_Chunk", "Context_B_Chunk",
"Model_Coarse_Label", "Model_Drill_Down_Label", "Model_Confidences",
"AI_Justification", "IG_JSON",
"Coherence_Label", "Drill_Down_Label", "Justification", "AnnotatorUsername",
"Timestamp", "SessionID", "Consent_Link_Email", "Consent_Follow_Up"
])
def load_user_profiles():
try:
path = hf_hub_download(repo_id=HF_DATASET_REPO, filename=HF_USERS_FILE, repo_type="dataset", token=HF_TOKEN)
return pd.read_csv(path)
except Exception:
return pd.DataFrame(columns=["Email", "UserID"])
def get_or_create_user(email):
email = email.strip().lower()
if not email: return None, "Email cannot be empty."
users_df = load_user_profiles()
if email in users_df['Email'].values:
user_id = users_df.loc[users_df['Email'] == email, 'UserID'].iloc[0]
return user_id, f"Welcome back. Logged in as {user_id}."
else:
new_num = len(users_df) + 1
new_user_id = f"user{new_num}"
new_row = {"Email": email, "UserID": new_user_id}
users_df = pd.concat([users_df, pd.DataFrame([new_row])], ignore_index=True)
try:
csv_buffer = io.StringIO()
users_df.to_csv(csv_buffer, index=False)
api = HfApi()
api.upload_file(
path_or_fileobj=io.BytesIO(csv_buffer.getvalue().encode('utf-8')),
path_in_repo=HF_USERS_FILE, repo_id=HF_DATASET_REPO,
token=HF_TOKEN, repo_type="dataset"
)
return new_user_id, f"New account created. Logged in as {new_user_id}."
except Exception as e:
return None, f"Error saving user profile: {e}"
# ==========================================
# 4. GRADIO UI DESIGN
# ==========================================
custom_css = """
.explain-btn {
background-color: #8b5cf6 !important;
color: white !important;
border: none !important;
}
.explain-btn:hover {
background-color: #7c3aed !important;
}
.scrollable-target textarea {
min-height: 80px !important;
overflow-y: auto !important;
}
.scrollable-rows-container {
padding: 5px !important;
background-color: #f9fafb !important;
}
/* Clean card layout for individual rows */
.row-card {
padding: 15px !important;
background: #ffffff !important;
border-radius: 8px !important;
box-shadow: 0 1px 3px rgba(0,0,0,0.1) !important;
margin-bottom: 20px !important;
border: 1px solid #e5e7eb !important;
}
"""
with gr.Blocks(theme=gr.themes.Soft(), css=custom_css) as demo:
# --- TOP BANNER ---
gr.HTML("""
<div style="width: 100%;
height: 220px;
background-image: url('https://www.epicafrica.eu/wp-content/uploads/2022/10/bigstock-Khun-Dan-Prakan-Chon-Dam-In-Na-203362501-1024x575.jpg');
background-size: cover;
background-position: center center;
background-repeat: no-repeat;
border-radius: 12px;
margin-bottom: 25px;
box-shadow: 0 4px 10px rgba(0,0,0,0.1);">
</div>
""")
# --- PERSISTENT HEADER ---
with gr.Row():
with gr.Column(scale=4):
main_title = gr.HTML("""
<div class="persistent-header">
<div style="display: flex; align-items: center; gap: 15px;">
<img src="https://www.epicafrica.eu/wp-content/uploads/2022/10/cropped-epicAfrica_HEProject-32x32.png" style="height: 50px;">
<h2 style="margin: 0; color: #374151;">Policy Coherence Tool</h2>
</div>
</div>
""")
with gr.Column(scale=1):
gr.HTML("""
<div style="display: flex; height: 100%; align-items: center; justify-content: flex-end; padding-top: 10px;">
<img src="https://k24.digital/wp-content/uploads/2025/09/FotoJet-2025-09-25T184625.119-1200x628.jpg" style="height: 35px; border-radius: 4px; border: 1px solid #ccc;" title="Kenya">
</div>
""")
lang_selector = gr.Dropdown(choices=list(LANG_CODES.keys()), value="English", label="Language / Langue", scale=1)
# --- 1. LANDING PAGE ---
with gr.Column(visible=True) as landing_page:
gr.HTML("""
<div style="text-align: center; margin-bottom: 20px;">
<img src="https://huggingface.co/spaces/akaburia/policy-coherence-annotations/resolve/main/landing_page_img.jpeg" style="max-height: 250px; border-radius: 8px; box-shadow: 0 2px 10px rgba(0,0,0,0.1);">
</div>
""")
main_desc = gr.Markdown(
"### Mapping Policy Synergies Across Sectors\n\n"
"Welcome to the EPIC Africa Policy Coherence Tool. This platform is designed to help researchers and policymakers "
"systematically evaluate how specific objectives within different policy documents interact with one another.\n\n"
"**How it works:**\n"
"You will be presented with a target objective from one policy and asked to score its interaction against objectives from a different policy. "
"By identifying whether these targets reinforce, enable, or constrain one another, you help build a comprehensive understanding of cross-sectoral coherence.\n\n"
"**Your contribution matters:**\n"
)
get_started_btn = gr.Button("Get Started", variant="primary", size="lg")
hf_df_state = gr.State()
user_tag_state = gr.State()
session_id_state = gr.State(lambda: str(uuid.uuid4().hex[:12]))
consent_link_state = gr.State("No")
consent_follow_state = gr.State("No")
target_a_list_state = gr.State([])
pending_tasks_state = gr.State({})
current_index_state = gr.State(0)
current_b_eng_list_state = gr.State([])
ctx_a_eng_state = gr.State("")
ctx_b_eng_state = gr.State("")
# --- LOGIN PANEL ---
with gr.Group(visible=False) as login_box:
login_title = gr.Markdown("### User Authentication & Informed Consent")
login_disclaimer = gr.Markdown(
"**Before you continue**\n\n"
"This survey is anonymous by default. When you sign in with your email, we will ask two brief questions about how your data is handled.\n\n"
"**1 · Linking your responses to your email**\n"
"You may choose to have your responses stored in association with your email address. This is entirely optional. Your data will be held securely and will not be shared with any third party.\n\n"
"**2 · Follow-up contact**\n"
"If you agreed to linking above, we may also ask whether you are willing to be contacted for a brief follow-up conversation — only in cases where your responses raise questions that would benefit from further discussion.\n\n"
"*You can say no to either question without affecting your participation.*"
)
with gr.Row():
consent_link_radio = gr.Radio(choices=["Yes", "No"], value="No", label="1. Link responses to my email?")
consent_follow_radio = gr.Radio(choices=["Yes", "No"], value="No", label="2. Willing to be contacted for follow-up?", interactive=False)
with gr.Row():
email_box = gr.Textbox(label="Email Address", placeholder="name@example.com")
login_btn = gr.Button("Login & Accept", variant="primary")
login_status = gr.Markdown(value="Waiting for authentication...")
# Logic to disable the second question if the first is 'No'
def toggle_followup(link_choice):
if link_choice == "Yes":
return gr.update(interactive=True)
else:
return gr.update(value="No", interactive=False)
consent_link_radio.change(fn=toggle_followup, inputs=consent_link_radio, outputs=consent_follow_radio)
# --- EXPERTISE / SECTOR SELECTION ---
with gr.Group(visible=False) as sector_box:
sector_title = gr.Markdown("### What is your expertise?")
sector_cb = gr.CheckboxGroup(
choices=SECTOR_CHOICES,
label="Please select the sector(s) that best match your expertise and work experience. Multiple selections are allowed."
)
proceed_btn = gr.Button("Proceed to Workspace", variant="primary", size="lg")
# --- MAIN APPLICATION ---
with gr.Group(visible=False) as app_box:
app_definitions = gr.Markdown(
"**Definitions:**\n"
"- **Nexus Domain:** The broad sector being analyzed (e.g., Land, Water, Energy).\n"
"- **Policy:** The specific document under review.\n"
"- **Target:** The exact objective or statement you are currently evaluating.\n"
"- **Context:** The broader set of measures belonging to the policy, provided as background reference.\n\n"
"**General Class Definitions:**\n"
"- **Supportive:** the policy objectives explicitly reinforce each other\n"
"- **Non-interacting:** policy objectives are independent of each other and non-related\n"
"- **Contradictory:** the policy objectives imply no explicit alignment or are contradicting of each other"
)
# NEW: Accordion containing the 7 classes table
with gr.Accordion("Interaction Class Definitions (Click to Expand)", open=False) as interaction_acc:
interaction_md = gr.Markdown(
"| Interaction Label | Meaning | Implication |\n"
"| :--- | :--- | :--- |\n"
"| **+3 (Indivisible)** | Progress on one target automatically delivers progress on another | There is high level of compatibility between the two targets. |\n"
"| **+2 (Reinforcing)** | Progress on one target makes it easier to make progress on another | There is relatively higher level of compatibility between the targets being compared. |\n"
"| **+1 (Enabling)** | Progress on one target creates conditions that enable progress on another | There is a small level of compatibility between the two targets compared. |\n"
"| **0 (Consistent)** | There is no significant link between two targets' progress | There is no significant compatibility between the two targets being evaluated. |\n"
"| **-1 (Constraining)** | Progress on one target constrains the options for how to deliver on another | The targets are relatively competitive resulting in counterproductive effects. |\n"
"| **-2 (Counteracting)** | Progress on one target makes it more difficult to make progress on another | The targets are counterproductive and do not support each other. |\n"
"| **-3 (Cancelling)** | Progress on one target automatically leads to a negative impact on another | The targets are highly opposite and are highly counterproductive. Cannot deliver synergistic effects. |"
)
with gr.Accordion("Data Selection", open=True) as data_acc:
# --- NEW LOCATION FOR BACK BUTTON ---
with gr.Row():
back_to_sectors_btn = gr.Button("⬅️ Back to Sector Selection", variant="secondary", size="sm")
gr.Markdown("") # Empty markdown to push button to the left
gr.Markdown("")
with gr.Row():
with gr.Column(scale=1):
src_a_title = gr.Markdown("### Source A")
domain_a_dd = gr.Dropdown(choices=DOMAINS, value=None, label="Domain A")
policy_a_dd = gr.Dropdown(choices=[], value=None, label="Policy A")
with gr.Column(scale=1):
src_b_title = gr.Markdown("### Source B")
domain_b_dd = gr.Dropdown(choices=DOMAINS, value=None, label="Domain B")
policy_b_dd = gr.Dropdown(choices=[], value=None, label="Policy B")
with gr.Row():
target_col_dd = gr.Dropdown(choices=AVAILABLE_COLUMNS, value='Strategic objectives / directions', label="Target Column")
context_col_dd = gr.Dropdown(choices=AVAILABLE_COLUMNS, value='Policy Actions and Measures (PAMs)', label="Context Column")
# Just the Load Button at the bottom now
load_btn = gr.Button("Fetch Data", variant="primary")
gr.Markdown("---")
progress_text = gr.Markdown("**Progress:** Waiting for data selection...")
with gr.Group(visible=False) as workspace_box:
# --- THE FIXED HEADER ---
# This stays naturally at the top of the workspace
with gr.Row():
with gr.Column(scale=1, variant="panel"):
meta_a = gr.Markdown("### Source A Information")
display_context_a = gr.Textbox(label="Context A", interactive=False, lines=4)
# display_target_a = gr.Textbox(label="Target A (Active)", interactive=False, lines=4)
with gr.Column(scale=1, variant="panel"):
meta_b = gr.Markdown("### Source B Information")
display_context_b = gr.Textbox(label="Context B", interactive=False, lines=4)
# --- AI CHATBOT QUERY OPTION ---
with gr.Accordion("💬 Ask AI about the Context & Policies", open=False):
chatbot = gr.Chatbot(height=300)
with gr.Row():
chat_input = gr.Textbox(placeholder="Ask a question about the policies or targets...", scale=4, show_label=False)
chat_submit = gr.Button("Send", scale=1)
# --- THE SCROLLABLE ROWS ---
with gr.Group(elem_classes="scrollable-rows-container"):
bulk_title = gr.Markdown("### Bulk Coherence Evaluation")
bulk_desc = gr.Markdown(
"Evaluate how the **Target A** above interacts with the **Target B** statements below.\n"
"**Rules:** If you evaluate a row, you **MUST** select the Class, the Extended Class Interaction, and write a Justification. "
"You may leave a row entirely blank to skip it."
)
# --- DYNAMIC BULK ROWS ---
eval_rows = []
for i in range(MAX_ROWS):
with gr.Group(visible=False, elem_classes="row-card") as row_container:
m_coarse_st = gr.State("")
m_drill_st = gr.State("")
m_conf_st = gr.State("")
m_ai_just_st = gr.State("")
m_ig_json_st = gr.State("")
# FIX: Show Target A and Target B side-by-side in every row block
with gr.Row(equal_height=True):
with gr.Column(scale=1):
a_text_display = gr.Textbox(label="Target A (Active)", interactive=False, lines=3, elem_classes="scrollable-target")
with gr.Column(scale=1):
b_text = gr.Textbox(label="Target B", interactive=False, lines=3, elem_classes="scrollable-target")
with gr.Row():
with gr.Column(scale=1, min_width=200):
rel_radio = gr.Radio(choices=[("Supportive", "coherent"), ("Non-interacting", "neutral"), ("Contradictory", "incoherent")], label="1. Class")
conf_md = gr.Markdown("")
with gr.Column(scale=1, min_width=200):
# ADDED allow_custom_value=True to prevent strict validation crashes on swap/clear
inter_dd = gr.Dropdown(choices=[], label="2. Extended Class Interaction", interactive=True, allow_custom_value=True)
explain_btn = gr.Button("✨ AI Explainability", size="sm", elem_classes="explain-btn")
explain_html = gr.HTML("")
with gr.Column(scale=2, min_width=250):
just_box = gr.Textbox(label="3. Justification", placeholder="Compulsory reasoning...", lines=3)
clear_row_btn = gr.Button("🗑️ Clear", size="sm", variant="stop")
explain_btn.click(
fn=generate_row_explanation,
inputs=[target_a_list_state, current_index_state, b_text, lang_selector], # Passed lang_selector here
outputs=[explain_html, just_box, m_ai_just_st, m_ig_json_st]
)
clear_row_btn.click(
fn=lambda: (gr.update(value=None), gr.update(choices=[], value=None), gr.update(value="")),
inputs=None,
outputs=[rel_radio, inter_dd, just_box]
)
# Added a_text_display to the tuple
eval_rows.append((row_container, a_text_display, b_text, rel_radio, conf_md, inter_dd, just_box, m_coarse_st, m_drill_st, m_conf_st, m_ai_just_st, m_ig_json_st))
# --- NAVIGATION BUTTONS ---
with gr.Row():
skip_btn = gr.Button("Skip Target A", size="lg")
save_btn = gr.Button("Save Filled Annotations", variant="primary", size="lg")
status_box = gr.Textbox(label="System Log", interactive=False)
workspace_info = gr.Markdown(
"<div style='text-align: center; padding: 15px; margin-top: 20px; background-color: #f8fafc; border: 1px solid #e2e8f0; border-radius: 8px;'>"
"<strong>Visualisations Report</strong><br>"
"As you keep on scoring, head on to check out the visualisations report! This is an exercise on scoring policy coherences and comparison with the AI model.<br>"
"<a href='https://datastudio.google.com/u/0/reporting/cc1d6cab-fe9d-4d77-91d9-ea6ccfb5a39c/page/U7RuF' target='_blank' style='color: #3b82f6; text-decoration: underline;'>https://datastudio.google.com/u/0/reporting/cc1d6cab-fe9d-4d77-91d9-ea6ccfb5a39c/page/U7RuF</a>"
"</div>"
)
# --- NAVIGATION BUTTONS ---
# Kept outside the scrollable box so they are always visible at the very bottom
# with gr.Row():
# skip_btn = gr.Button("Skip Target A", size="lg")
# save_btn = gr.Button("Save Filled Annotations", variant="primary", size="lg")
# status_box = gr.Textbox(label="System Log", interactive=False)
# ==========================================
# 5. EVENT CONTROLLERS
# ==========================================
# --- PERSISTENT FOOTER ---
footer_disclaimer = gr.Markdown(
"---\n"
"<div style='text-align: center; color: #6b7280; font-size: 0.9em; padding: 10px;'>"
"<strong>Disclaimer:</strong> This tool is developed and maintained by <a href='https://www.epicafrica.eu/' target='_blank' style='color: #4b5563; text-decoration: underline;'>EPIC Africa</a>. "
"The European Union (EU) is not liable for the content, use, or outputs generated by this tool."
"</div>"
)
gr.HTML("""
<div style="text-align: center; padding: 10px; margin: 0 auto; display: flex; justify-content: center;">
<a href="https://www.epicafrica.eu/" target="_blank">
<img src="https://www.epicafrica.eu/wp-content/uploads/2022/11/Untitled-design-1024x205.png" style="max-height: 80px; max-width: 100%; object-fit: contain; margin: 0 auto;">
</a>
</div>
""")
def translate_static_ui(lang):
titles = [
"""<div class="persistent-header">
<div style="display: flex; align-items: center; gap: 15px;">
<img src="https://www.epicafrica.eu/wp-content/uploads/2022/10/cropped-epicAfrica_HEProject-32x32.png" style="height: 50px;">
<h2 style="margin: 0; color: #374151;">Policy Coherence Tool</h2>
</div>
</div>""",
"### Mapping Policy Synergies Across Sectors\n\nWelcome to the EPIC Africa Policy Coherence Tool. This platform is designed to help researchers and policymakers systematically evaluate how specific objectives within different policy documents interact with one another.\n\n**How it works:**\nYou will be presented with a target objective from one policy and asked to score its interaction against objectives from a different policy. By identifying whether these targets reinforce, enable, or constrain one another, you help build a comprehensive understanding of cross-sectoral coherence.\n\n",
"Get Started",
"### User Authentication & Informed Consent",
"**Before you continue**\n\nThis survey is anonymous by default. When you sign in with your email, we will ask two brief questions about how your data is handled.\n\n**1 · Linking your responses to your email**\nYou may choose to have your responses stored in association with your email address. This is entirely optional. Your data will be held securely and will not be shared with any third party.\n\n**2 · Follow-up contact**\nIf you agreed to linking above, we may also ask whether you are willing to be contacted for a brief follow-up conversation — only in cases where your responses raise questions that would benefit from further discussion.\n\n*You can say no to either question without affecting your participation.*",
"Login & Accept",
"### What is your expertise?",
"Please select the sector(s) that best match your expertise and work experience. Multiple selections are allowed.",
"Proceed to Workspace",
"**Definitions:**\n- **Nexus Domain:** The broad sector being analyzed (e.g., Land, Water, Energy).\n- **Policy:** The specific document under review.\n- **Target:** The exact objective or statement you are currently evaluating.\n- **Context:** The broader set of measures belonging to the policy, provided as background reference.\n\n**General Class Definitions:**\n- **Supportive:** the policy objectives explicitly reinforce each other\n- **Non-interacting:** policy objectives are independent of each other and non-related\n- **Contradictory:** the policy objectives imply no explicit alignment or are contradicting of each other",
"Interaction Class Definitions (Click to Expand)",
"| Interaction Label | Meaning | Implication |\n| :--- | :--- | :--- |\n| **+3 (Indivisible)** | Progress on one target automatically delivers progress on another | There is high level of compatibility between the two targets. |\n| **+2 (Reinforcing)** | Progress on one target makes it easier to make progress on another | There is relatively higher level of compatibility between the targets being compared. |\n| **+1 (Enabling)** | Progress on one target creates conditions that enable progress on another | There is a small level of compatibility between the two targets compared. |\n| **0 (Consistent)** | There is no significant link between two targets' progress | There is no significant compatibility between the two targets being evaluated. |\n| **-1 (Constraining)** | Progress on one target constrains the options for how to deliver on another | The targets are relatively competitive resulting in counterproductive effects. |\n| **-2 (Counteracting)** | Progress on one target makes it more difficult to make progress on another | The targets are counterproductive and do not support each other. |\n| **-3 (Cancelling)** | Progress on one target automatically leads to a negative impact on another | The targets are highly opposite and are highly counterproductive. Cannot deliver synergistic effects. |",
"Data Selection",
"⬅️ Back to Sector Selection",
"### Source A",
"### Source B",
"Fetch Data",
"💬 Ask AI about the Context & Policies",
"### Bulk Coherence Evaluation",
"Evaluate how the **Target A** above interacts with the **Target B** statements below.\n**Rules:** If you evaluate a row, you **MUST** select the Class, the Extended Class Interaction, and write a Justification. You may leave a row entirely blank to skip it.",
"Skip Target A",
"Save Filled Annotations",
"<div style='text-align: center; padding: 15px; margin-top: 20px; background-color: #f8fafc; border: 1px solid #e2e8f0; border-radius: 8px;'>📊 <strong>Visualisations Report</strong><br>Watch the dataset grow as we map policy coherences across domains! Check out the live visualisations report to track our collective progress, uncover cross-sector synergies, and see exactly how our policy experts stack up against the AI baseline.<br><a href='https://datastudio.google.com/u/0/reporting/cc1d6cab-fe9d-4d77-91d9-ea6ccfb5a39c/page/U7RuF' target='_blank' style='color: #3b82f6; text-decoration: underline;'>https://datastudio.google.com/u/0/reporting/cc1d6cab-fe9d-4d77-91d9-ea6ccfb5a39c/page/U7RuF</a></div>",
"---\n<div style='text-align: center; color: #6b7280; font-size: 0.9em; padding: 10px;'><strong>Disclaimer:</strong> This tool is developed and maintained by EPIC Africa. The European Union (EU) is not liable for the content, use, or outputs generated by this tool.</div>"
]
translated = t_batch(titles, lang)
return translated
def handle_language_change(lang, ctx_a_eng, ctx_b_eng, a_list, tasks_dict, idx, user_tag, pol_a, pol_b, hf_df):
static_updates = translate_static_ui(lang)
ctx_a_trans = t_text(ctx_a_eng, lang)
ctx_b_trans = t_text(ctx_b_eng, lang)
rendered = render_target_a(a_list, tasks_dict, idx, lang, user_tag, pol_a, pol_b, hf_df)
prog_txt = rendered[0]
row_updates = rendered[1:]
return [
gr.update(value=static_updates[0]), # main_title HTML
gr.update(value=static_updates[1]), # main_desc
gr.update(value=static_updates[2]), # get_started_btn
gr.update(value=static_updates[3]), # login_title
gr.update(value=static_updates[4]), # login disclaimer
gr.update(value=static_updates[5]), # login_btn
gr.update(value=static_updates[6]), # sector_title
gr.update(label=static_updates[7]), # sector_cb label
gr.update(value=static_updates[8]), # proceed_btn
gr.update(value=static_updates[9]), # app_definitions
gr.update(label=static_updates[10]), # interaction_acc
gr.update(value=static_updates[11]), # interaction_md
gr.update(label=static_updates[12]), # data_acc
gr.update(value=static_updates[13]), # back_to_sectors_btn
gr.update(value=static_updates[14]), # src_a_title
gr.update(value=static_updates[15]), # src_b_title
gr.update(value=static_updates[16]), # load_btn
# Missing chat accordion translation mapping index, let's omit the chat accordion title for simplicity since we can't easily grab it.
gr.update(value=static_updates[18]), # bulk_title
gr.update(value=static_updates[19]), # bulk_desc
gr.update(value=static_updates[20]), # skip_btn
gr.update(value=static_updates[21]), # save_btn
gr.update(value=static_updates[22]), # workspace_info
gr.update(value=static_updates[23]), # footer_disclaimer
gr.update(value=ctx_a_trans), # ctx_a_trans
gr.update(value=ctx_b_trans), # ctx_b_trans
prog_txt
] + row_updates
def update_drill(label, current_val):
# Gracefully handle the clear button event to avoid validation errors
if not label:
return gr.update(choices=[], value=None)
choices = DRILL_DOWN_MAP.get(label, [])
if current_val in choices:
return gr.update(choices=choices, value=current_val)
new_val = choices[0] if choices else None
return gr.update(choices=choices, value=new_val)
# for i in range(MAX_ROWS):
# # Change this line to have 11 items (add two more underscores at the end)
# _, _, rel_radio, _, inter_dd, _, _, _, _, _, _ = eval_rows[i]
# # Notice we now pass BOTH the radio and the dropdown as inputs
# rel_radio.change(fn=update_drill, inputs=[rel_radio, inter_dd], outputs=inter_dd)
for i in range(MAX_ROWS):
# FIX: Added an extra '_' at the beginning to properly unpack 12 items instead of 11
_, _, b_text, rel_radio, _, inter_dd, just_box, _, _, _, _, _ = eval_rows[i]
# Restore the missing drill-down update event
rel_radio.change(fn=update_drill, inputs=[rel_radio, inter_dd], outputs=inter_dd)
# Gather the exact state needed to cache this row AND the workspace config
inputs_to_cache = [
user_tag_state, session_id_state,
domain_a_dd, policy_a_dd, domain_b_dd, policy_b_dd, target_col_dd, context_col_dd,
target_a_list_state, current_index_state, b_text, rel_radio, inter_dd, just_box
]
# Trigger cache save silently in the background on any change
rel_radio.change(fn=update_cache_row, inputs=inputs_to_cache)
inter_dd.change(fn=update_cache_row, inputs=inputs_to_cache)
just_box.change(fn=update_cache_row, inputs=inputs_to_cache)
# ADD link_val and follow_val to inputs
def authenticate(email, link_val, follow_val):
email = email.strip().lower()
# 1. Validate Email Format
email_pattern = r"^[^@\s]+@[^@\s]+\.[^@\s]+$"
if not re.match(email_pattern, email):
write_log("LOGIN_FAILED", f"Invalid email format attempted: '{email}'")
return (gr.update(value=f"<font color='red'>Please enter a valid email address.</font>"),
gr.update(visible=True), gr.update(visible=False), gr.update(visible=False),
None, None,
gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), link_val, follow_val)
user_tag, msg = get_or_create_user(email)
if not user_tag:
write_log("LOGIN_FAILED", f"System error creating user for '{email}'")
return (gr.update(value=f"<font color='red'>{msg}</font>"),
gr.update(visible=True), gr.update(visible=False), gr.update(visible=False),
None, None,
gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), link_val, follow_val)
write_log("LOGIN_SUCCESS", f"User {user_tag} ({email}) logged in. Consents - Link: {link_val}, Follow: {follow_val}")
hf_df = load_hf_dataset()
drafts = load_drafts()
user_data = drafts.get(user_tag, {})
ws = user_data.get("workspace", {})
# 2. Check for pending session and redirect straight to workspace
if ws.get("pol_a") and ws.get("pol_b"):
write_log("SESSION_RESTORED", f"User {user_tag} skipped sectors and restored previous session {ws.get('session_id')}.")
msg += f" Restored your pending session. Click 'Fetch Data' to resume your draft."
return (
gr.update(value=f"{msg} Loaded {len(hf_df)} annotations."),
gr.update(visible=False), # Hide login_box
gr.update(visible=False), # Hide sector_box (Bypass directly to app)
gr.update(visible=True), # Show app_box
user_tag,
hf_df,
gr.update(value=ws["dom_a"]),
gr.update(choices=get_policy_list(ws["dom_a"]), value=ws["pol_a"]),
gr.update(value=ws["dom_b"]),
gr.update(choices=get_policy_list(ws["dom_b"]), value=ws["pol_b"]),
gr.update(value=ws["tar_col"]),
gr.update(value=ws["ctx_col"]),
link_val,
follow_val
)
else:
write_log("NEW_SESSION", f"User {user_tag} starting fresh. Routing to Sector Selection.")
return (
gr.update(value=f"{msg} Loaded {len(hf_df)} annotations."),
gr.update(visible=False), # Hide login_box
gr.update(visible=True), # Show sector_box
gr.update(visible=False), # Hide app_box
user_tag,
hf_df,
gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(),
link_val,
follow_val
)
def route_to_workspace(selected_sectors):
if not selected_sectors:
raise gr.Error("Please select at least one sector.")
allowed_domains = set()
for s in selected_sectors:
mapped_domain = SECTOR_MAPPING.get(s)
if mapped_domain in DOMAINS:
allowed_domains.add(mapped_domain)
allowed_list = list(allowed_domains)
# Pre-select the first domain, but dynamically load its corresponding policies
default_domain = allowed_list[0] if allowed_list else None
available_policies = get_policy_list(default_domain) if default_domain else []
write_log("SECTOR_SELECTED", f"Mapped sectors {selected_sectors} to domains {allowed_list}")
return (
gr.update(visible=False), # Hide sector box
gr.update(visible=True), # Show main app
gr.update(choices=allowed_list, value=default_domain), # Restrict Domain A
gr.update(choices=available_policies, value=None), # <--- POPULATE Policy A choices, leave value empty
gr.update(choices=DOMAINS, value=DOMAINS[0] if DOMAINS else None), # Open Domain B
gr.update(choices=get_policy_list(DOMAINS[0]) if DOMAINS else [], value=None) # <--- POPULATE Policy B choices, leave value empty
)
def update_a_choices(dom_a, pol_b, curr_a):
choices = [p for p in get_policy_list(dom_a) if p != pol_b]
val = curr_a if curr_a in choices else None
return gr.update(choices=choices, value=val)
def update_b_choices(dom_b, pol_a, curr_b):
choices = [p for p in get_policy_list(dom_b) if p != pol_a]
val = curr_b if curr_b in choices else None
return gr.update(choices=choices, value=val)
get_started_btn.click(
fn=lambda: (gr.update(visible=False), gr.update(visible=True)),
inputs=None,
outputs=[landing_page, login_box]
)
login_btn.click(
fn=authenticate,
inputs=[email_box, consent_link_radio, consent_follow_radio],
outputs=[
login_status, login_box, sector_box, app_box, user_tag_state, hf_df_state, # <-- Added sector_box here
domain_a_dd, policy_a_dd, domain_b_dd, policy_b_dd, target_col_dd, context_col_dd,
consent_link_state, consent_follow_state
]
)
# --- WIRE THE NEW PROCEED BUTTON ---
proceed_btn.click(
fn=route_to_workspace,
inputs=[sector_cb],
outputs=[sector_box, app_box, domain_a_dd, policy_a_dd, domain_b_dd, policy_b_dd] # <-- Added policy dropdowns
)
back_to_sectors_btn.click(
fn=lambda: (gr.update(visible=True), gr.update(visible=False)),
inputs=None,
outputs=[sector_box, app_box]
)
domain_a_dd.change(fn=update_a_choices, inputs=[domain_a_dd, policy_b_dd, policy_a_dd], outputs=policy_a_dd)
policy_b_dd.change(fn=update_a_choices, inputs=[domain_a_dd, policy_b_dd, policy_a_dd], outputs=policy_a_dd)
domain_b_dd.change(fn=update_b_choices, inputs=[domain_b_dd, policy_a_dd, policy_b_dd], outputs=policy_b_dd)
policy_a_dd.change(fn=update_b_choices, inputs=[domain_b_dd, policy_a_dd, policy_b_dd], outputs=policy_b_dd)
def render_target_a(a_list, tasks_dict, idx, lang, user_tag, pol_a, pol_b, hf_df, progress=gr.Progress()):
updates = []
empty_row = [gr.update(visible=False), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), "", "", "", "", ""]
if not a_list:
prog_txt = t_text("**Progress:** No unannotated items found.", lang)
updates.append(prog_txt)
for i in range(MAX_ROWS): updates.extend(empty_row)
return updates + [[]]
if idx >= len(a_list):
prog_txt = t_text("**Progress:** Completed all items.", lang)
updates.append(prog_txt)
for i in range(MAX_ROWS): updates.extend(empty_row)
return updates + [[]]
curr_a_eng = a_list[idx]
bs_to_eval_eng = tasks_dict[curr_a_eng]
curr_a_display = t_text(curr_a_eng, lang)
bs_display = t_batch(bs_to_eval_eng, lang)
prog_txt = t_text(f"**Progress:** Annotating Target A group {idx + 1} of {len(a_list)}", lang)
updates.append(prog_txt)
drafts = load_drafts()
cache_key = f"{pol_a}|{pol_b}|{curr_a_eng}"
user_draft = drafts.get(user_tag, {}).get("rows", {}).get(cache_key, {})
user_saved_df = pd.DataFrame()
if not hf_df.empty:
temp_df = hf_df[
(hf_df["AnnotatorUsername"] == user_tag) &
(hf_df["Policy_A_Name"] == pol_a) &
(hf_df["Policy_B_Name"] == pol_b) &
(hf_df["Target_A_Row"] == curr_a_eng)
].copy()
if not temp_df.empty:
temp_df['Timestamp'] = pd.to_datetime(temp_df['Timestamp'])
temp_df = temp_df.sort_values(by='Timestamp')
user_saved_df = temp_df.drop_duplicates(subset=["Target_B_Row"], keep="last")
# --- PROGRESS BAR UPDATE ---
if progress is not None:
progress(0.4, desc="Running background AI predictions...")
preds = get_model_predictions(curr_a_eng, bs_to_eval_eng)
# --- PROGRESS BAR UPDATE ---
if progress is not None:
progress(0.8, desc="Rendering UI blocks...")
for i in range(MAX_ROWS):
if i < len(bs_display):
p_radio, p_conf_md, p_inter_dd, p_m_coarse, p_m_drill, p_m_conf = preds[i]
b_val_eng = bs_to_eval_eng[i]
cached_row = user_draft.get(b_val_eng)
saved_row = user_saved_df[user_saved_df["Target_B_Row"] == b_val_eng] if not user_saved_df.empty else pd.DataFrame()
if cached_row:
set_radio = gr.update(value=cached_row.get("rel")) if cached_row.get("rel") else p_radio
set_inter = gr.update(value=cached_row.get("inter")) if cached_row.get("inter") else p_inter_dd
set_just = gr.update(value=cached_row.get("just", ""))
elif not saved_row.empty:
set_radio = gr.update(value=saved_row.iloc[-1]["Coherence_Label"])
set_inter = gr.update(value=saved_row.iloc[-1]["Drill_Down_Label"])
set_just = gr.update(value=saved_row.iloc[-1]["Justification"])
else:
set_radio = p_radio
set_inter = p_inter_dd
set_just = gr.update(value="")
updates.extend([
gr.update(visible=True),
gr.update(value=curr_a_display),
gr.update(value=bs_display[i]),
set_radio, p_conf_md, set_inter, set_just,
p_m_coarse, p_m_drill, p_m_conf, "", ""
])
else:
updates.extend(empty_row)
# --- PROGRESS BAR UPDATE ---
if progress is not None:
progress(1.0, desc="Done!")
return updates + [bs_to_eval_eng]
# def render_target_a(a_list, tasks_dict, idx, lang):
# updates = []
# # 9 components per row to reset
# # empty_row = [gr.update(visible=False), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), "", "", ""]
# empty_row = [gr.update(visible=False), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), "", "", "", "", ""]
# if not a_list:
# prog_txt = t_text("**Progress:** No unannotated items found.", lang)
# updates.extend([prog_txt, "N/A"])
# for i in range(MAX_ROWS): updates.extend(empty_row)
# return updates + [[]]
# if idx >= len(a_list):
# prog_txt = t_text("**Progress:** Completed all items.", lang)
# updates.extend([prog_txt, t_text("End of list.", lang)])
# for i in range(MAX_ROWS): updates.extend(empty_row)
# return updates + [[]]
# curr_a_eng = a_list[idx]
# bs_to_eval_eng = tasks_dict[curr_a_eng]
# curr_a_display = t_text(curr_a_eng, lang)
# bs_display = t_batch(bs_to_eval_eng, lang)
# prog_txt = t_text(f"**Progress:** Annotating Target A group {idx + 1} of {len(a_list)}", lang)
# updates.extend([prog_txt, curr_a_display])
# for i in range(MAX_ROWS):
# if i < len(bs_display):
# updates.extend([
# gr.update(visible=True),
# gr.update(value=bs_display[i]),
# gr.update(value=None),
# gr.update(value=""), # conf_md
# gr.update(choices=[], value=None),
# gr.update(value=""), # just_box
# "", "", "", "", "" # Reset the 5 hidden model states
# ])
# else:
# updates.extend(empty_row)
# return updates + [bs_to_eval_eng]
def load_workspace(dom_a, pol_a, dom_b, pol_b, tar_col, ctx_col, hf_df, user_tag, lang, progress=gr.Progress()):
progress(0.1, desc="Validating selections...")
if not pol_a or not pol_b:
err = t_text("Error: Select both policies.", lang)
return [gr.update(value=err)] + [gr.skip()] * (14 + MAX_ROWS*12)
if tar_col == ctx_col:
err = t_text("Error: Target and Context cannot be the same.", lang)
return [gr.update(value=err)] + [gr.skip()] * (14 + MAX_ROWS*12)
progress(0.2, desc="Extracting policy structures...")
df_a = DOMAIN_MAP[dom_a]
df_b = DOMAIN_MAP[dom_b]
sec_a = get_sector_for_policy(df_a, pol_a)
sec_b = get_sector_for_policy(df_b, pol_b)
meta_a_str = f"**Sector:** {sec_a} | **Policy:** {pol_a}"
meta_b_str = f"**Sector:** {sec_b} | **Policy:** {pol_b}"
targets_a = get_unique_items(df_a, pol_a, tar_col)
targets_b = get_unique_items(df_b, pol_b, tar_col)
user_df = hf_df[hf_df["AnnotatorUsername"] == user_tag]
mask = (user_df["Policy_A_Name"] == pol_a) & (user_df["Policy_B_Name"] == pol_b)
annotated_pairs = set(zip(user_df.loc[mask, "Target_A_Row"], user_df.loc[mask, "Target_B_Row"]))
pending_tasks = {}
total_missing_pairs = 0
for a in targets_a:
missing_bs = [b for b in targets_b if (a, b) not in annotated_pairs]
if missing_bs:
pending_tasks[a] = missing_bs[:MAX_ROWS]
total_missing_pairs += len(pending_tasks[a])
target_a_list = list(pending_tasks.keys())
contexts_a = get_unique_items(df_a, pol_a, ctx_col)
contexts_b = get_unique_items(df_b, pol_b, ctx_col)
ctx_a_chunk_eng = "\n\n".join([f"• {c}" for c in contexts_a]) if contexts_a else "No context data available."
ctx_b_chunk_eng = "\n\n".join([f"• {c}" for c in contexts_b]) if contexts_b else "No context data available."
ctx_a_display = t_text(ctx_a_chunk_eng, lang)
ctx_b_display = t_text(ctx_b_chunk_eng, lang)
rendered_updates = render_target_a(target_a_list, pending_tasks, 0, lang, user_tag, pol_a, pol_b, hf_df, progress)
prog = rendered_updates[0]
row_updates = rendered_updates[1:-1]
b_eng_list = rendered_updates[-1]
status_msg = t_text(f"Data loaded. {total_missing_pairs} unannotated pairs remain across {len(target_a_list)} Target A groups.", lang)
write_log("WORKSPACE_LOADED", f"User {user_tag} fetched {pol_a} vs {pol_b}.")
return [
target_a_list, pending_tasks, 0,
ctx_a_chunk_eng, ctx_b_chunk_eng, b_eng_list,
prog, meta_a_str, ctx_a_display,
meta_b_str, ctx_b_display, status_msg,
gr.update(visible=len(target_a_list) > 0)
] + row_updates
def save_action(idx, a_list, tasks_dict, ctx_a_chunk_eng,
ctx_b_chunk_eng, b_eng_list, dom_a, pol_a,
dom_b, pol_b, tar_col, ctx_col, user_tag,
session_id, c_link, c_follow, hf_df, lang, *row_data):
if idx >= len(a_list):
return gr.update(value=t_text("End of list reached.", lang)), idx, hf_df
current_a_eng = a_list[idx]
new_rows = []
# Generate exact local timestamp
try:
tz = ZoneInfo("Africa/Nairobi")
except:
import pytz
tz = pytz.timezone("Africa/Nairobi")
current_time = datetime.now(tz).strftime('%Y-%m-%d %H:%M:%S')
for i in range(MAX_ROWS):
if i >= len(b_eng_list): break
b_val_eng = b_eng_list[i]
rel = row_data[i*10 + 1]
inter = row_data[i*10 + 3]
just = row_data[i*10 + 4]
model_coarse = row_data[i*10 + 5]
model_drill = row_data[i*10 + 6]
model_conf = row_data[i*10 + 7]
ai_just = row_data[i*10 + 8]
ig_json = row_data[i*10 + 9]
has_rel = bool(rel)
has_inter = bool(inter)
has_just = bool(just and just.strip())
if has_rel or has_inter or has_just:
if not (has_rel and has_inter and has_just):
raise gr.Error(f"Row {i+1} is incomplete! Please fill Class, Extended Class, and Justification, or clear the row to skip.")
new_rows.append({
"Domain_A": dom_a,
"Sector_A": get_sector_for_policy(DOMAIN_MAP[dom_a], pol_a),
"Policy_A_Name": pol_a,
"Domain_B": dom_b,
"Sector_B": get_sector_for_policy(DOMAIN_MAP[dom_b], pol_b),
"Policy_B_Name": pol_b,
"Target_Column": tar_col,
"Target_A_Row": current_a_eng,
"Target_B_Row": b_val_eng,
"Context_Column": ctx_col,
"Context_A_Chunk": ctx_a_chunk_eng,
"Context_B_Chunk": ctx_b_chunk_eng,
"Model_Coarse_Label": model_coarse,
"Model_Drill_Down_Label": model_drill,
"Model_Confidences": model_conf,
"AI_Justification": ai_just,
"IG_JSON": ig_json,
"Coherence_Label": rel,
"Drill_Down_Label": inter,
"Justification": just.strip(),
"AnnotatorUsername": user_tag,
"Timestamp": current_time,
"SessionID": session_id,
"Consent_Link_Email": c_link,
"Consent_Follow_Up": c_follow
})
if new_rows:
new_df = pd.DataFrame(new_rows)
hf_df = pd.concat([hf_df, new_df], ignore_index=True)
try:
csv_buffer = io.StringIO()
hf_df.to_csv(csv_buffer, index=False)
csv_bytes = csv_buffer.getvalue().encode('utf-8')
write_log("DATA_SAVED", f"User {user_tag} successfully saved {len(new_rows)} completed rows to Hugging Face.")
api = HfApi()
api.upload_file(
path_or_fileobj=io.BytesIO(csv_bytes), path_in_repo=HF_CSV_FILE,
repo_id=HF_DATASET_REPO, token=HF_TOKEN, repo_type="dataset"
)
log_msg = t_text(f"Successfully saved {len(new_rows)} annotations.", lang)
# CLEAR CACHE ON SUCCESSFUL SAVE
drafts = load_drafts()
cache_key = f"{pol_a}|{pol_b}|{current_a_eng}"
# Check inside the "rows" sub-dictionary
if user_tag in drafts and "rows" in drafts[user_tag] and cache_key in drafts[user_tag]["rows"]:
del drafts[user_tag]["rows"][cache_key]
with open(DRAFT_FILE, 'w') as f:
json.dump(drafts, f)
except Exception as e:
log_msg = f"Error saving data: {e}"
else:
log_msg = t_text("No annotations filled. Skipped to next group.", lang)
return gr.update(value=log_msg), idx + 1, hf_df
def skip_action(idx, lang):
write_log("TARGET_SKIPPED", f"User skipped group {idx + 1}")
return gr.update(value=t_text(f"Skipped group {idx + 1}.", lang)), idx + 1
# --- TRIGGER FIRST PASS ---
# def trigger_first_pass(a_list, idx, b_eng_list):
# if not a_list or idx >= len(a_list) or not b_eng_list:
# # Return 6 updates per row (radio, html, dropdown, state_c, state_d, state_json)
# return [gr.update()] * (MAX_ROWS * 6)
# curr_a_eng = a_list[idx]
# preds = get_model_predictions(curr_a_eng, b_eng_list)
# outputs = []
# for i in range(MAX_ROWS):
# if i < len(preds):
# outputs.extend([
# preds[i][0], # rel_radio
# preds[i][1], # conf_md
# preds[i][2], # inter_dd
# preds[i][3], # m_coarse_st
# preds[i][4], # m_drill_st
# preds[i][5], # m_conf_st
# ])
# else:
# outputs.extend([gr.update(), gr.update(value=""), gr.update(), "", "", ""])
# return outputs
# ── EVENT WIRING ──
row_outputs = []
row_inputs = []
# Notice we now unpack 12 items per row (added a_text_display)
for container, a_txt, b, r, c_md, inter, j, m_co, m_dr, m_cf, m_ai_j, m_ig_j in eval_rows:
row_outputs.extend([container, a_txt, b, r, c_md, inter, j, m_co, m_dr, m_cf, m_ai_j, m_ig_j])
row_inputs.extend([b, r, c_md, inter, j, m_co, m_dr, m_cf, m_ai_j, m_ig_j])
# first_pass_outputs = []
# Unpack 9 items per row
# for container, b, r, c_md, inter, j, m_co, m_dr, m_cf in eval_rows:
# row_outputs.extend([container, b, r, c_md, inter, j, m_co, m_dr, m_cf])
# row_inputs.extend([b, r, c_md, inter, j, m_co, m_dr, m_cf])
# first_pass_outputs.extend([r, c_md, inter, m_co, m_dr, m_cf])
# for container, b, r, c_md, inter, j, m_co, m_dr, m_cf, m_ai_j, m_ig_j in eval_rows:
# row_outputs.extend([container, b, r, c_md, inter, j, m_co, m_dr, m_cf, m_ai_j, m_ig_j])
# row_inputs.extend([b, r, c_md, inter, j, m_co, m_dr, m_cf, m_ai_j, m_ig_j])
# first_pass_outputs.extend([r, c_md, inter, m_co, m_dr, m_cf])
# --- CHATBOT LOGIC ---
def chat_with_ai(user_msg, history, ctx_a, ctx_b, a_list, idx, lang, user_tag, dom_a, pol_a, dom_b, pol_b):
if not user_msg:
yield "", history
return
curr_a = a_list[idx] if a_list and idx < len(a_list) else "None"
system_prompt = f"You are an AI policy assistant helping an annotator understand policy documents.\nContext A: {ctx_a}\nContext B: {ctx_b}\nActive Target A: {curr_a}\nAnswer the user's query clearly and concisely based on this context."
messages = [{"role": "system", "content": system_prompt}]
messages.extend(history)
messages.append({"role": "user", "content": user_msg})
# Append empty bot response to history for streaming
history.append({"role": "user", "content": user_msg})
history.append({"role": "assistant", "content": ""})
yield "", history
try:
res = llm_client.chat_completion(messages=messages, max_tokens=8000, temperature=0.1, stream=True)
partial_text = ""
for chunk in res:
token = chunk.choices[0].delta.content or ""
partial_text += token
# Dynamically format <think> tags into HTML accordions as it streams
history[-1]["content"] = format_streaming_thoughts(partial_text, is_streaming=True)
yield "", history
# Perform translation only after the stream finishes to save API calls
if lang != "English":
partial_text = t_text(partial_text, lang)
final_formatted = format_streaming_thoughts(partial_text, is_streaming=False)
history[-1]["content"] = final_formatted
yield "", history
# Run the background upload function
log_chat_to_hf(user_tag, dom_a, pol_a, dom_b, pol_b, ctx_a, ctx_b, curr_a, user_msg, partial_text)
except Exception as e:
history[-1]["content"] += f"\n\nError: {str(e)}"
yield "", history
# Update Inputs to capture required Contexts and Domains
chat_inputs = [chat_input, chatbot, ctx_a_eng_state, ctx_b_eng_state, target_a_list_state, current_index_state, lang_selector, user_tag_state, domain_a_dd, policy_a_dd, domain_b_dd, policy_b_dd]
chat_submit.click(fn=chat_with_ai, inputs=chat_inputs, outputs=[chat_input, chatbot])
chat_input.submit(fn=chat_with_ai, inputs=chat_inputs, outputs=[chat_input, chatbot])
lang_selector.change(
fn=handle_language_change,
inputs=[
lang_selector, ctx_a_eng_state, ctx_b_eng_state, target_a_list_state,
pending_tasks_state, current_index_state, user_tag_state, policy_a_dd,
policy_b_dd, hf_df_state
],
outputs=[
main_title, main_desc, get_started_btn, login_title, login_disclaimer, login_btn,
sector_title, sector_cb, proceed_btn, app_definitions, interaction_acc, interaction_md,
data_acc, back_to_sectors_btn, src_a_title, src_b_title, load_btn, bulk_title, bulk_desc,
skip_btn, save_btn, workspace_info, footer_disclaimer,
display_context_a, display_context_b, progress_text
] + row_outputs + [current_b_eng_list_state]
)
load_btn.click(
fn=load_workspace,
inputs=[
domain_a_dd, policy_a_dd, domain_b_dd, policy_b_dd,
target_col_dd, context_col_dd, hf_df_state, user_tag_state, lang_selector
],
outputs=[
target_a_list_state, pending_tasks_state, current_index_state,
ctx_a_eng_state, ctx_b_eng_state, current_b_eng_list_state,
progress_text, meta_a, display_context_a,
meta_b, display_context_b, status_box, workspace_box
] + row_outputs
)
save_btn.click(
fn=save_action,
inputs=[
current_index_state, target_a_list_state, pending_tasks_state,
ctx_a_eng_state, ctx_b_eng_state, current_b_eng_list_state,
domain_a_dd, policy_a_dd, domain_b_dd, policy_b_dd,
target_col_dd, context_col_dd, user_tag_state, session_id_state,
consent_link_state, consent_follow_state,
hf_df_state, lang_selector
] + row_inputs,
outputs=[status_box, current_index_state, hf_df_state]
).then(
fn=render_target_a,
inputs=[
target_a_list_state, pending_tasks_state, current_index_state, lang_selector,
user_tag_state, policy_a_dd, policy_b_dd, hf_df_state
],
outputs=[progress_text] + row_outputs + [current_b_eng_list_state]
)
skip_btn.click(
fn=skip_action, inputs=[current_index_state, lang_selector], outputs=[status_box, current_index_state]
).then(
fn=render_target_a,
inputs=[
target_a_list_state, pending_tasks_state, current_index_state, lang_selector,
user_tag_state, policy_a_dd, policy_b_dd, hf_df_state
],
outputs=[progress_text] + row_outputs + [current_b_eng_list_state]
)
# demo.launch(debug=True, ssr_mode=False, show_error=True)
demo.queue(default_concurrency_limit=40).launch(debug=True, show_error=True, ssr_mode=False)