| import pandas as pd |
| import gradio as gr |
| import os |
| import io |
| import json |
| import gspread |
| from huggingface_hub import HfApi, hf_hub_download |
| import torch |
| import torch.nn.functional as F |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification |
| import re |
| from captum.attr import LayerIntegratedGradients, TokenReferenceBase |
| from captum.attr import visualization as viz |
| from huggingface_hub import InferenceClient |
| from datetime import datetime |
| import uuid |
|
|
|
|
| |
| HF_TOKEN = os.environ.get("HF_TOKEN") |
| HF_DATASET_REPO = "akaburia/policy-evaluations" |
| HF_CSV_FILE = "policy_coherence_annotations.csv" |
| HF_USERS_FILE = "user_profiles.csv" |
| HF_CHAT_LOG_FILE = "chatbot_logs.csv" |
|
|
| |
| try: |
| from google.cloud import translate_v2 as translate |
| except ImportError: |
| raise ImportError("Please install the translation library by running: pip install google-cloud-translate") |
|
|
| try: |
| from zoneinfo import ZoneInfo |
| except ImportError: |
| import pytz |
|
|
| |
| LOG_FILE = "logs.txt" |
|
|
| def write_log(action_type, details): |
| """Appends a timestamped log entry to logs.txt""" |
| try: |
| try: |
| tz = ZoneInfo("Africa/Nairobi") |
| except: |
| import pytz |
| tz = pytz.timezone("Africa/Nairobi") |
| |
| timestamp = datetime.now(tz).strftime('%Y-%m-%d %H:%M:%S') |
| log_entry = f"[{timestamp}] [{action_type}] {details}\n" |
| |
| with open(LOG_FILE, "a", encoding="utf-8") as f: |
| f.write(log_entry) |
| |
| print(log_entry.strip()) |
| except Exception as e: |
| print(f"Logging failed: {e}") |
|
|
| |
| DRAFT_FILE = "user_drafts.json" |
|
|
|
|
| def load_drafts(): |
| if os.path.exists(DRAFT_FILE): |
| try: |
| with open(DRAFT_FILE, 'r') as f: |
| return json.load(f) |
| except: |
| return {} |
| return {} |
|
|
| def update_cache_row(user, session_id, dom_a, pol_a, dom_b, pol_b, tar_col, ctx_col, a_list, idx, b_text, rel, inter, just): |
| """Fires automatically on keystrokes/clicks to save progress and workspace state""" |
| if not user or not a_list or idx >= len(a_list) or not b_text: return |
| curr_a = a_list[idx] |
| |
| drafts = load_drafts() |
| |
| if user not in drafts: drafts[user] = {"workspace": {}, "rows": {}} |
| |
| |
| drafts[user]["workspace"] = { |
| "session_id": session_id, |
| "dom_a": dom_a, "pol_a": pol_a, |
| "dom_b": dom_b, "pol_b": pol_b, |
| "tar_col": tar_col, "ctx_col": ctx_col |
| } |
| |
| cache_key = f"{pol_a}|{pol_b}|{curr_a}" |
| if cache_key not in drafts[user]["rows"]: drafts[user]["rows"][cache_key] = {} |
| |
| |
| drafts[user]["rows"][cache_key][b_text] = { |
| "rel": rel, "inter": inter, "just": just, "session_id": session_id |
| } |
| write_log("CACHE_UPDATE", f"User {user} auto-saved draft for row index {idx}.") |
| |
| with open(DRAFT_FILE, 'w') as f: |
| json.dump(drafts, f) |
|
|
| |
| |
| |
| print("Loading Inference Model into Memory...") |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| tokenizer = AutoTokenizer.from_pretrained("akaburia/policy-evaluations") |
| model = AutoModelForSequenceClassification.from_pretrained("akaburia/policy-evaluations").to(device) |
|
|
| id_to_label = {0: "neutral", 1: "coherent", 2: "incoherent"} |
|
|
| def custom_forward(input_ids, attention_mask): |
| inputs_embeds = model.roberta.embeddings.word_embeddings(input_ids) |
| return model(inputs_embeds=inputs_embeds, attention_mask=attention_mask).logits |
|
|
| |
| lig = LayerIntegratedGradients(custom_forward, model.roberta.embeddings.word_embeddings) |
|
|
| llm_client = InferenceClient("Qwen/Qwen3-8B", token=HF_TOKEN) |
|
|
| def generate_row_explanation(a_list, idx, text_b, lang): |
| if not a_list or idx >= len(a_list) or not text_b: |
| return "", "", "", "" |
| |
| policy_a = clean_policy_text(a_list[idx]) |
| policy_b = clean_policy_text(text_b) |
|
|
| |
| model.zero_grad() |
| inputs = tokenizer(policy_a, policy_b, return_tensors="pt", truncation=True, max_length=256) |
| input_ids = inputs["input_ids"].to(device) |
| attention_mask = inputs["attention_mask"].to(device) |
|
|
| ref_token_id = tokenizer.pad_token_id |
| special_token_mask = [1 if id in tokenizer.all_special_ids else 0 for id in input_ids[0].tolist()] |
| baseline_ids = torch.tensor([[id if is_special else ref_token_id for id, is_special in zip(input_ids[0].tolist(), special_token_mask)]]).to(device) |
|
|
| with torch.no_grad(): |
| logits = model(input_ids=input_ids, attention_mask=attention_mask).logits |
| predicted_class_idx = torch.argmax(logits, dim=1).item() |
| prediction = id_to_label[predicted_class_idx] |
|
|
| attributions, _ = lig.attribute( |
| inputs=input_ids, baselines=baseline_ids, |
| additional_forward_args=(attention_mask,), |
| target=predicted_class_idx, return_convergence_delta=True |
| ) |
|
|
| attributions = attributions.sum(dim=-1).squeeze(0) |
| attributions = attributions / torch.norm(attributions) |
| attributions = attributions.cpu().detach().numpy() |
|
|
| tokens = tokenizer.convert_ids_to_tokens(input_ids[0]) |
| |
| ig_dict = {t.replace('Ġ', '').strip(): float(s) for t, s in zip(tokens, attributions) if t.replace('Ġ', '').strip()} |
| ig_json_str = json.dumps(ig_dict) |
|
|
| score_list = [f"'{k}': {v:.3f}" for k, v in ig_dict.items()] |
| formatted_scores = ", ".join(score_list) |
|
|
| |
| prompt = f"""You are an expert AI auditor interpreting an Explainable AI (XAI) output. |
| A sequence classification model evaluated two policies and predicted their relationship as: {prediction.upper()} |
| Policy A: "{policy_a}" |
| Policy B: "{policy_b}" |
| Token Scores: [{formatted_scores}] |
| Write a highly analytical, 2 to 3 sentence explanation of the model's reasoning. Explicitly quote the specific words that have the highest positive and highest negative scores. Do not hallucinate.""" |
|
|
| try: |
| response = llm_client.chat_completion(messages=[{"role": "user", "content": prompt}], max_tokens=1500, temperature=0.1) |
| raw_output = response.choices[0].message.content.strip() |
|
|
| |
| match = re.search(r'<think>(.*?)</think>', raw_output, flags=re.DOTALL) |
| if match: |
| think_content = match.group(1).strip() |
| final_answer = raw_output.replace(match.group(0), '').strip() |
| |
| |
| if lang != "English": |
| think_content = t_text(think_content, lang) |
| final_answer = t_text(final_answer, lang) |
| |
| html_out = f"""<details style="margin-bottom: 12px; padding: 10px; background-color: #f3f4f6; border-radius: 6px; border: 1px solid #e5e7eb;"><summary style="cursor: pointer; font-weight: bold; color: #4b5563; outline: none;">🧠 Click to peek into the AI's thought process</summary><div style="margin-top: 10px; font-size: 0.9em; color: #6b7280; white-space: pre-wrap;">{think_content}</div></details>""" |
| |
| return html_out, final_answer, raw_output, ig_json_str |
| |
| if lang != "English": |
| raw_output = t_text(raw_output, lang) |
| return "", raw_output, raw_output, ig_json_str |
| except Exception as e: |
| err_msg = f"⚠️ Explainability Error: {str(e)}" |
| return "", t_text(err_msg, lang) if lang != "English" else err_msg, "", "" |
| |
|
|
| def bucket_score(score): |
| """Maps a continuous score [-1.0 to 1.0] to a 7-class drill down.""" |
| if score >= 0.66: |
| return "+3 Indivisible", "coherent" |
| elif score >= 0.33: |
| return "+2 Reinforcing", "coherent" |
| elif score > 0.10: |
| return "+1 Enabling", "coherent" |
| elif score >= -0.10: |
| return "0 Consistent", "neutral" |
| elif score >= -0.33: |
| return "-1 Constraining", "incoherent" |
| elif score >= -0.66: |
| return "-2 Counteracting", "incoherent" |
| else: |
| return "-3 Cancelling", "incoherent" |
| |
| def format_streaming_thoughts(text, is_streaming=True): |
| """Safely formats <think> tags into HTML accordions even before the closing tag arrives.""" |
| if "<think>" not in text: |
| return text |
| |
| formatted = text.replace( |
| "<think>", |
| "<details open style='margin-bottom: 12px; padding: 10px; background-color: #f3f4f6; border-radius: 6px; border: 1px solid #e5e7eb;'>" |
| "<summary style='cursor: pointer; font-weight: bold; color: #4b5563; outline: none;'>🧠 AI is thinking...</summary>" |
| "<div style='margin-top: 10px; font-size: 0.9em; color: #6b7280; white-space: pre-wrap;'>" |
| ) |
| |
| if "</think>" in formatted: |
| |
| formatted = formatted.replace("</think>", "</div></details>") |
| formatted = formatted.replace("🧠 AI is thinking...", "🧠 Click to peek into the AI's thought process") |
| formatted = formatted.replace("<details open", "<details") |
| elif is_streaming: |
| |
| formatted += "</div></details>" |
| |
| return formatted |
|
|
| def log_chat_to_hf(user_tag, dom_a, pol_a, dom_b, pol_b, ctx_a, ctx_b, curr_a, user_query, ai_response): |
| """Saves chatbot interactions to a dedicated CSV file in the HF Dataset.""" |
| try: |
| try: |
| path = hf_hub_download(repo_id=HF_DATASET_REPO, filename=HF_CHAT_LOG_FILE, repo_type="dataset", token=HF_TOKEN) |
| chat_df = pd.read_csv(path) |
| except Exception: |
| chat_df = pd.DataFrame(columns=[ |
| "Timestamp", "AnnotatorUsername", "Domain_A", "Policy_A", |
| "Domain_B", "Policy_B", "Context_A", "Context_B", "Target_A", |
| "User_Query", "AI_Response" |
| ]) |
| |
| try: |
| tz = ZoneInfo("Africa/Nairobi") |
| except: |
| import pytz |
| tz = pytz.timezone("Africa/Nairobi") |
| |
| current_time = datetime.now(tz).strftime('%Y-%m-%d %H:%M:%S') |
| |
| new_row = { |
| "Timestamp": current_time, "AnnotatorUsername": user_tag, |
| "Domain_A": dom_a, "Policy_A": pol_a, "Domain_B": dom_b, "Policy_B": pol_b, |
| "Context_A": ctx_a, "Context_B": ctx_b, "Target_A": curr_a, |
| "User_Query": user_query, "AI_Response": ai_response |
| } |
| |
| chat_df = pd.concat([chat_df, pd.DataFrame([new_row])], ignore_index=True) |
| csv_buffer = io.StringIO() |
| chat_df.to_csv(csv_buffer, index=False) |
| |
| api = HfApi() |
| api.upload_file( |
| path_or_fileobj=io.BytesIO(csv_buffer.getvalue().encode('utf-8')), |
| path_in_repo=HF_CHAT_LOG_FILE, repo_id=HF_DATASET_REPO, token=HF_TOKEN, repo_type="dataset" |
| ) |
| write_log("CHAT_SAVED", f"Logged chat for user {user_tag}.") |
| except Exception as e: |
| write_log("CHAT_SAVE_ERROR", f"Failed to log chat: {str(e)}") |
| |
| def clean_policy_text(text): |
| if not text or not isinstance(text, str): |
| return "" |
| |
| |
| |
| text = re.sub(r'^\s*(?:\d+|[a-zA-Z]|[ivxlcdmIVXLCDM]+)[\.\)]\s+', '', text) |
| |
| |
| text = text.replace(',', '') |
| |
| |
| text = text.rstrip('. ;') |
| |
| |
| text = re.sub(r'\s+', ' ', text).strip() |
| |
| return text |
|
|
| def get_model_predictions(text_a, b_texts): |
| if not text_a or not b_texts: |
| return [] |
| |
| updates = [] |
| color_map = {"coherent": "#4CAF50", "neutral": "#9E9E9E", "incoherent": "#F44336"} |
| |
| clean_a = clean_policy_text(text_a) |
| clean_b_list = [clean_policy_text(b) for b in b_texts] |
| |
| |
| |
| inputs = tokenizer([clean_a] * len(clean_b_list), clean_b_list, return_tensors="pt", truncation=True, padding=True).to(device) |
| with torch.no_grad(): |
| outputs = model(**inputs) |
| |
| logits_batch = outputs.logits |
| probabilities_batch = F.softmax(logits_batch, dim=-1) |
| |
| |
| for idx, text_b in enumerate(b_texts): |
| probabilities = probabilities_batch[idx].squeeze() |
| |
| |
| prob_dict = {} |
| results = [] |
| for i, prob in enumerate(probabilities): |
| label = model.config.id2label.get(i, f"Class {i}").lower() |
| prob_val = prob.item() |
| prob_dict[label] = prob_val |
| results.append({"label": label, "prob": prob_val}) |
| |
| p_coherent = prob_dict.get("coherent", 0.0) |
| p_incoherent = prob_dict.get("incoherent", 0.0) |
| |
| |
| top_raw_class = max(prob_dict, key=prob_dict.get) |
| |
| |
| if top_raw_class == "neutral": |
| drill_down_label = "0 Consistent" |
| coarse_label = "neutral" |
| else: |
| |
| continuous_score = p_coherent - p_incoherent |
| drill_down_label, coarse_label = bucket_score(continuous_score) |
| |
| |
| results = sorted(results, key=lambda x: x["prob"], reverse=True) |
| |
| |
| html_parts = [] |
| |
| DISPLAY_LABEL_MAP = { |
| "coherent": "Supportive", |
| "neutral": "Non-interacting", |
| "incoherent": "Contradictory" |
| } |
| |
| for r in results: |
| lbl = r["label"] |
| display_lbl = DISPLAY_LABEL_MAP.get(lbl, lbl) |
| pct = r["prob"] * 100 |
| bg_color = color_map.get(lbl, "#333") |
| bar_html = f""" |
| <div style='margin-bottom: 6px;'> |
| <div style='display: flex; justify-content: space-between; font-size: 0.85em; margin-bottom: 2px; color: #555;'> |
| <span>{display_lbl}</span><span style='font-weight: 500;'>{pct:.1f}%</span> |
| </div> |
| <div style='width: 100%; background: #e0e0e0; height: 6px; border-radius: 3px; overflow: hidden;'> |
| <div style='width: {pct}%; background: {bg_color}; height: 100%; border-radius: 3px;'></div> |
| </div> |
| </div> |
| """ |
| html_parts.append(bar_html) |
| styled_conf = f"<div style='padding-top: 4px;'>{''.join(html_parts)}</div>" |
| |
| |
| conf_json_str = json.dumps({k: round(v*100, 2) for k, v in prob_dict.items()}) |
| |
| drill_choices = DRILL_DOWN_MAP.get(coarse_label, []) |
| updates.append(( |
| gr.update(value=coarse_label), |
| gr.update(value=styled_conf), |
| gr.update(choices=drill_choices, value=drill_down_label), |
| coarse_label, |
| drill_down_label, |
| conf_json_str |
| )) |
| |
| return updates |
|
|
| |
| |
| |
| print("Authenticating with Google via Service Account...") |
|
|
| gcp_secret = os.environ.get("GCP_CREDENTIALS") |
| if not gcp_secret: |
| raise ValueError("GCP_CREDENTIALS secret not found. Please set it in Hugging Face Space Secrets.") |
|
|
| try: |
| creds_dict = json.loads(gcp_secret) |
| gc = gspread.service_account_from_dict(creds_dict) |
| translate_client = translate.Client.from_service_account_info(creds_dict) |
| except json.JSONDecodeError as e: |
| raise ValueError(f"Failed to parse GCP_CREDENTIALS JSON. Error: {e}") |
|
|
| spreadsheet = gc.open_by_key('12JM3u10WSpshCcSUEmjhRP5i2bWe9MAK_jrbI56WOCU') |
|
|
| def get_worksheet_by_number(spreadsheet, worksheet_number, format=True): |
| worksheet = spreadsheet.get_worksheet(worksheet_number) |
| rows = worksheet.get_all_values() |
| df = pd.DataFrame.from_records(rows[1:], columns=rows[0]) |
| |
| if format: |
| if worksheet_number == 4: |
| df = df.iloc[1:] |
| else: |
| df = df.iloc[2:] |
| |
| df.columns = df.iloc[0].values |
| df = df.iloc[1:] |
| |
| df.columns = [str(col).strip() for col in df.columns] |
| df = df.replace('', pd.NA) |
| |
| if 'Sector' in df.columns: |
| df['Sector'] = df['Sector'].ffill() |
| else: |
| print(f"⚠️ Warning: 'Sector' column missing in worksheet {worksheet_number}. Found columns: {list(df.columns)}") |
| |
| if 'Policy' in df.columns: |
| df['Policy'] = df['Policy'].ffill() |
| |
| |
| return df |
| |
| print("Loading Data from Google Sheets...") |
| land_df = get_worksheet_by_number(spreadsheet, 3, format=True) |
| water_df = get_worksheet_by_number(spreadsheet, 5, format=True) |
| energy_df = get_worksheet_by_number(spreadsheet, 4, format=True) |
|
|
| DOMAIN_MAP = {"Land": land_df, "Water": water_df, "Energy": energy_df} |
| DOMAINS = list(DOMAIN_MAP.keys()) |
| |
| SECTOR_MAPPING = { |
| "Climate": "Climate", |
| "Water": "Water", |
| "Energy": "Energy", |
| "Land": "Land", |
| "Environment": "Land", |
| "Agriculture": "Land", |
| "Food": "Land", |
| } |
| SECTOR_CHOICES = list(SECTOR_MAPPING.keys()) |
|
|
| |
| |
| |
| HF_TOKEN = os.environ.get("HF_TOKEN") |
| HF_DATASET_REPO = "akaburia/policy-evaluations" |
| HF_CSV_FILE = "policy_coherence_annotations.csv" |
| HF_USERS_FILE = "user_profiles.csv" |
|
|
| AVAILABLE_COLUMNS = [ |
| 'Sector', 'Policy', 'General Vision', 'General policy objective', |
| 'Strategic objectives / directions', 'Focus Area / Policy Action Category', |
| 'Policy objectives (of the focus area)', 'Policy Actions and Measures (PAMs)', |
| 'Policy Targets / Indicators' |
| ] |
|
|
| DRILL_DOWN_MAP = { |
| "coherent": ["+3 Indivisible", "+2 Reinforcing", "+1 Enabling"], |
| "neutral": ["0 Consistent"], |
| "incoherent": ["-1 Constraining", "-2 Counteracting", "-3 Cancelling"] |
| } |
|
|
| MAX_ROWS = 2 |
|
|
| LANG_CODES = {"English": "en", "French": "fr", "Portuguese": "pt", "Swahili": "sw"} |
|
|
| def t_text(text, target_lang_name): |
| code = LANG_CODES.get(target_lang_name, "en") |
| if code == "en" or not text: |
| return text |
| import html |
| result = translate_client.translate(text, target_language=code) |
| return html.unescape(result["translatedText"]) |
|
|
| def t_batch(texts, target_lang_name): |
| code = LANG_CODES.get(target_lang_name, "en") |
| if code == "en" or not texts: |
| return texts |
| import html |
| results = translate_client.translate(texts, target_language=code) |
| return [html.unescape(res["translatedText"]) for res in results] |
|
|
| |
| |
| |
| def get_unique_items(df, policy_name, col_name): |
| if 'Policy' not in df.columns or col_name not in df.columns: |
| return [] |
| if policy_name not in df['Policy'].values: |
| return [] |
| items = df[df['Policy'] == policy_name][col_name].dropna().unique().tolist() |
| clean_items = [] |
| for i in items: |
| val = str(i).strip() |
| if val and val.lower() not in ['missing', 'nan', 'n/a', 'none', 'null']: |
| clean_items.append(val) |
| return clean_items |
|
|
| def get_sector_for_policy(df, policy_name): |
| if 'Policy' not in df.columns or 'Sector' not in df.columns: |
| return "Unknown Sector" |
| if policy_name not in df['Policy'].values: |
| return "Unknown Sector" |
| return str(df[df['Policy'] == policy_name]['Sector'].iloc[0]).strip() |
|
|
| def get_policy_list(domain_key): |
| if not domain_key: return [] |
| df = DOMAIN_MAP[domain_key] |
| if 'Policy' not in df.columns: return [] |
| return [p for p in df['Policy'].unique() if pd.notna(p) and str(p).strip()] |
|
|
| def load_hf_dataset(): |
| try: |
| path = hf_hub_download(repo_id=HF_DATASET_REPO, filename=HF_CSV_FILE, repo_type="dataset", token=HF_TOKEN) |
| return pd.read_csv(path) |
| except Exception as e: |
| |
| return pd.DataFrame(columns=[ |
| "Domain_A", "Sector_A", "Policy_A_Name", |
| "Domain_B", "Sector_B", "Policy_B_Name", |
| "Target_Column", "Target_A_Row", "Target_B_Row", |
| "Context_Column", "Context_A_Chunk", "Context_B_Chunk", |
| "Model_Coarse_Label", "Model_Drill_Down_Label", "Model_Confidences", |
| "AI_Justification", "IG_JSON", |
| "Coherence_Label", "Drill_Down_Label", "Justification", "AnnotatorUsername", |
| "Timestamp", "SessionID", "Consent_Link_Email", "Consent_Follow_Up" |
| ]) |
|
|
| def load_user_profiles(): |
| try: |
| path = hf_hub_download(repo_id=HF_DATASET_REPO, filename=HF_USERS_FILE, repo_type="dataset", token=HF_TOKEN) |
| return pd.read_csv(path) |
| except Exception: |
| return pd.DataFrame(columns=["Email", "UserID"]) |
|
|
| def get_or_create_user(email): |
| email = email.strip().lower() |
| if not email: return None, "Email cannot be empty." |
| users_df = load_user_profiles() |
| if email in users_df['Email'].values: |
| user_id = users_df.loc[users_df['Email'] == email, 'UserID'].iloc[0] |
| return user_id, f"Welcome back. Logged in as {user_id}." |
| else: |
| new_num = len(users_df) + 1 |
| new_user_id = f"user{new_num}" |
| new_row = {"Email": email, "UserID": new_user_id} |
| users_df = pd.concat([users_df, pd.DataFrame([new_row])], ignore_index=True) |
| try: |
| csv_buffer = io.StringIO() |
| users_df.to_csv(csv_buffer, index=False) |
| api = HfApi() |
| api.upload_file( |
| path_or_fileobj=io.BytesIO(csv_buffer.getvalue().encode('utf-8')), |
| path_in_repo=HF_USERS_FILE, repo_id=HF_DATASET_REPO, |
| token=HF_TOKEN, repo_type="dataset" |
| ) |
| return new_user_id, f"New account created. Logged in as {new_user_id}." |
| except Exception as e: |
| return None, f"Error saving user profile: {e}" |
|
|
|
|
| |
| |
| |
|
|
| custom_css = """ |
| .explain-btn { |
| background-color: #8b5cf6 !important; |
| color: white !important; |
| border: none !important; |
| } |
| .explain-btn:hover { |
| background-color: #7c3aed !important; |
| } |
| .scrollable-target textarea { |
| min-height: 80px !important; |
| overflow-y: auto !important; |
| } |
| |
| .scrollable-rows-container { |
| padding: 5px !important; |
| background-color: #f9fafb !important; |
| } |
| |
| /* Clean card layout for individual rows */ |
| .row-card { |
| padding: 15px !important; |
| background: #ffffff !important; |
| border-radius: 8px !important; |
| box-shadow: 0 1px 3px rgba(0,0,0,0.1) !important; |
| margin-bottom: 20px !important; |
| border: 1px solid #e5e7eb !important; |
| } |
| """ |
|
|
| with gr.Blocks(theme=gr.themes.Soft(), css=custom_css) as demo: |
| |
| |
| gr.HTML(""" |
| <div style="width: 100%; |
| height: 220px; |
| background-image: url('https://www.epicafrica.eu/wp-content/uploads/2022/10/bigstock-Khun-Dan-Prakan-Chon-Dam-In-Na-203362501-1024x575.jpg'); |
| background-size: cover; |
| background-position: center center; |
| background-repeat: no-repeat; |
| border-radius: 12px; |
| margin-bottom: 25px; |
| box-shadow: 0 4px 10px rgba(0,0,0,0.1);"> |
| </div> |
| """) |
|
|
| |
| with gr.Row(): |
| with gr.Column(scale=4): |
| main_title = gr.HTML(""" |
| <div class="persistent-header"> |
| <div style="display: flex; align-items: center; gap: 15px;"> |
| <img src="https://www.epicafrica.eu/wp-content/uploads/2022/10/cropped-epicAfrica_HEProject-32x32.png" style="height: 50px;"> |
| <h2 style="margin: 0; color: #374151;">Policy Coherence Tool</h2> |
| </div> |
| </div> |
| """) |
| with gr.Column(scale=1): |
| gr.HTML(""" |
| <div style="display: flex; height: 100%; align-items: center; justify-content: flex-end; padding-top: 10px;"> |
| <img src="https://k24.digital/wp-content/uploads/2025/09/FotoJet-2025-09-25T184625.119-1200x628.jpg" style="height: 35px; border-radius: 4px; border: 1px solid #ccc;" title="Kenya"> |
| </div> |
| """) |
| lang_selector = gr.Dropdown(choices=list(LANG_CODES.keys()), value="English", label="Language / Langue", scale=1) |
| |
| |
| with gr.Column(visible=True) as landing_page: |
| |
| gr.HTML(""" |
| <div style="text-align: center; margin-bottom: 20px;"> |
| <img src="https://huggingface.co/spaces/akaburia/policy-coherence-annotations/resolve/main/landing_page_img.jpeg" style="max-height: 250px; border-radius: 8px; box-shadow: 0 2px 10px rgba(0,0,0,0.1);"> |
| </div> |
| """) |
| main_desc = gr.Markdown( |
| "### Mapping Policy Synergies Across Sectors\n\n" |
| "Welcome to the EPIC Africa Policy Coherence Tool. This platform is designed to help researchers and policymakers " |
| "systematically evaluate how specific objectives within different policy documents interact with one another.\n\n" |
| "**How it works:**\n" |
| "You will be presented with a target objective from one policy and asked to score its interaction against objectives from a different policy. " |
| "By identifying whether these targets reinforce, enable, or constrain one another, you help build a comprehensive understanding of cross-sectoral coherence.\n\n" |
| "**Your contribution matters:**\n" |
| ) |
| |
| get_started_btn = gr.Button("Get Started", variant="primary", size="lg") |
| |
| hf_df_state = gr.State() |
| user_tag_state = gr.State() |
| session_id_state = gr.State(lambda: str(uuid.uuid4().hex[:12])) |
|
|
| consent_link_state = gr.State("No") |
| consent_follow_state = gr.State("No") |
| |
| target_a_list_state = gr.State([]) |
| pending_tasks_state = gr.State({}) |
| current_index_state = gr.State(0) |
| current_b_eng_list_state = gr.State([]) |
| ctx_a_eng_state = gr.State("") |
| ctx_b_eng_state = gr.State("") |
|
|
| |
| with gr.Group(visible=False) as login_box: |
| login_title = gr.Markdown("### User Authentication & Informed Consent") |
| |
| login_disclaimer = gr.Markdown( |
| "**Before you continue**\n\n" |
| "This survey is anonymous by default. When you sign in with your email, we will ask two brief questions about how your data is handled.\n\n" |
| "**1 · Linking your responses to your email**\n" |
| "You may choose to have your responses stored in association with your email address. This is entirely optional. Your data will be held securely and will not be shared with any third party.\n\n" |
| "**2 · Follow-up contact**\n" |
| "If you agreed to linking above, we may also ask whether you are willing to be contacted for a brief follow-up conversation — only in cases where your responses raise questions that would benefit from further discussion.\n\n" |
| "*You can say no to either question without affecting your participation.*" |
| ) |
| |
| with gr.Row(): |
| consent_link_radio = gr.Radio(choices=["Yes", "No"], value="No", label="1. Link responses to my email?") |
| consent_follow_radio = gr.Radio(choices=["Yes", "No"], value="No", label="2. Willing to be contacted for follow-up?", interactive=False) |
| |
| with gr.Row(): |
| email_box = gr.Textbox(label="Email Address", placeholder="name@example.com") |
| |
| login_btn = gr.Button("Login & Accept", variant="primary") |
| login_status = gr.Markdown(value="Waiting for authentication...") |
| |
| |
| def toggle_followup(link_choice): |
| if link_choice == "Yes": |
| return gr.update(interactive=True) |
| else: |
| return gr.update(value="No", interactive=False) |
| |
| consent_link_radio.change(fn=toggle_followup, inputs=consent_link_radio, outputs=consent_follow_radio) |
|
|
| |
| with gr.Group(visible=False) as sector_box: |
| sector_title = gr.Markdown("### What is your expertise?") |
| sector_cb = gr.CheckboxGroup( |
| choices=SECTOR_CHOICES, |
| label="Please select the sector(s) that best match your expertise and work experience. Multiple selections are allowed." |
| ) |
| proceed_btn = gr.Button("Proceed to Workspace", variant="primary", size="lg") |
|
|
| |
| with gr.Group(visible=False) as app_box: |
| |
| app_definitions = gr.Markdown( |
| "**Definitions:**\n" |
| "- **Nexus Domain:** The broad sector being analyzed (e.g., Land, Water, Energy).\n" |
| "- **Policy:** The specific document under review.\n" |
| "- **Target:** The exact objective or statement you are currently evaluating.\n" |
| "- **Context:** The broader set of measures belonging to the policy, provided as background reference.\n\n" |
| "**General Class Definitions:**\n" |
| "- **Supportive:** the policy objectives explicitly reinforce each other\n" |
| "- **Non-interacting:** policy objectives are independent of each other and non-related\n" |
| "- **Contradictory:** the policy objectives imply no explicit alignment or are contradicting of each other" |
| ) |
| |
| |
| with gr.Accordion("Interaction Class Definitions (Click to Expand)", open=False) as interaction_acc: |
| interaction_md = gr.Markdown( |
| "| Interaction Label | Meaning | Implication |\n" |
| "| :--- | :--- | :--- |\n" |
| "| **+3 (Indivisible)** | Progress on one target automatically delivers progress on another | There is high level of compatibility between the two targets. |\n" |
| "| **+2 (Reinforcing)** | Progress on one target makes it easier to make progress on another | There is relatively higher level of compatibility between the targets being compared. |\n" |
| "| **+1 (Enabling)** | Progress on one target creates conditions that enable progress on another | There is a small level of compatibility between the two targets compared. |\n" |
| "| **0 (Consistent)** | There is no significant link between two targets' progress | There is no significant compatibility between the two targets being evaluated. |\n" |
| "| **-1 (Constraining)** | Progress on one target constrains the options for how to deliver on another | The targets are relatively competitive resulting in counterproductive effects. |\n" |
| "| **-2 (Counteracting)** | Progress on one target makes it more difficult to make progress on another | The targets are counterproductive and do not support each other. |\n" |
| "| **-3 (Cancelling)** | Progress on one target automatically leads to a negative impact on another | The targets are highly opposite and are highly counterproductive. Cannot deliver synergistic effects. |" |
| ) |
|
|
| with gr.Accordion("Data Selection", open=True) as data_acc: |
| |
| |
| with gr.Row(): |
| back_to_sectors_btn = gr.Button("⬅️ Back to Sector Selection", variant="secondary", size="sm") |
| gr.Markdown("") |
| gr.Markdown("") |
| |
| with gr.Row(): |
| with gr.Column(scale=1): |
| src_a_title = gr.Markdown("### Source A") |
| domain_a_dd = gr.Dropdown(choices=DOMAINS, value=None, label="Domain A") |
| policy_a_dd = gr.Dropdown(choices=[], value=None, label="Policy A") |
| |
| with gr.Column(scale=1): |
| src_b_title = gr.Markdown("### Source B") |
| domain_b_dd = gr.Dropdown(choices=DOMAINS, value=None, label="Domain B") |
| policy_b_dd = gr.Dropdown(choices=[], value=None, label="Policy B") |
| |
| with gr.Row(): |
| target_col_dd = gr.Dropdown(choices=AVAILABLE_COLUMNS, value='Strategic objectives / directions', label="Target Column") |
| context_col_dd = gr.Dropdown(choices=AVAILABLE_COLUMNS, value='Policy Actions and Measures (PAMs)', label="Context Column") |
| |
| |
| load_btn = gr.Button("Fetch Data", variant="primary") |
|
|
| gr.Markdown("---") |
| progress_text = gr.Markdown("**Progress:** Waiting for data selection...") |
|
|
| with gr.Group(visible=False) as workspace_box: |
| |
| |
| |
| with gr.Row(): |
| with gr.Column(scale=1, variant="panel"): |
| meta_a = gr.Markdown("### Source A Information") |
| display_context_a = gr.Textbox(label="Context A", interactive=False, lines=4) |
| |
| |
| with gr.Column(scale=1, variant="panel"): |
| meta_b = gr.Markdown("### Source B Information") |
| display_context_b = gr.Textbox(label="Context B", interactive=False, lines=4) |
|
|
| |
| with gr.Accordion("💬 Ask AI about the Context & Policies", open=False): |
| chatbot = gr.Chatbot(height=300) |
| with gr.Row(): |
| chat_input = gr.Textbox(placeholder="Ask a question about the policies or targets...", scale=4, show_label=False) |
| chat_submit = gr.Button("Send", scale=1) |
|
|
| |
| with gr.Group(elem_classes="scrollable-rows-container"): |
| bulk_title = gr.Markdown("### Bulk Coherence Evaluation") |
| bulk_desc = gr.Markdown( |
| "Evaluate how the **Target A** above interacts with the **Target B** statements below.\n" |
| "**Rules:** If you evaluate a row, you **MUST** select the Class, the Extended Class Interaction, and write a Justification. " |
| "You may leave a row entirely blank to skip it." |
| ) |
| |
| |
| eval_rows = [] |
| for i in range(MAX_ROWS): |
| with gr.Group(visible=False, elem_classes="row-card") as row_container: |
| m_coarse_st = gr.State("") |
| m_drill_st = gr.State("") |
| m_conf_st = gr.State("") |
| m_ai_just_st = gr.State("") |
| m_ig_json_st = gr.State("") |
| |
| |
| with gr.Row(equal_height=True): |
| with gr.Column(scale=1): |
| a_text_display = gr.Textbox(label="Target A (Active)", interactive=False, lines=3, elem_classes="scrollable-target") |
| with gr.Column(scale=1): |
| b_text = gr.Textbox(label="Target B", interactive=False, lines=3, elem_classes="scrollable-target") |
| |
| with gr.Row(): |
| with gr.Column(scale=1, min_width=200): |
| rel_radio = gr.Radio(choices=[("Supportive", "coherent"), ("Non-interacting", "neutral"), ("Contradictory", "incoherent")], label="1. Class") |
| conf_md = gr.Markdown("") |
| |
| with gr.Column(scale=1, min_width=200): |
| |
| inter_dd = gr.Dropdown(choices=[], label="2. Extended Class Interaction", interactive=True, allow_custom_value=True) |
| explain_btn = gr.Button("✨ AI Explainability", size="sm", elem_classes="explain-btn") |
| explain_html = gr.HTML("") |
| |
| with gr.Column(scale=2, min_width=250): |
| just_box = gr.Textbox(label="3. Justification", placeholder="Compulsory reasoning...", lines=3) |
| clear_row_btn = gr.Button("🗑️ Clear", size="sm", variant="stop") |
| |
| explain_btn.click( |
| fn=generate_row_explanation, |
| inputs=[target_a_list_state, current_index_state, b_text, lang_selector], |
| outputs=[explain_html, just_box, m_ai_just_st, m_ig_json_st] |
| ) |
| |
| clear_row_btn.click( |
| fn=lambda: (gr.update(value=None), gr.update(choices=[], value=None), gr.update(value="")), |
| inputs=None, |
| outputs=[rel_radio, inter_dd, just_box] |
| ) |
| |
| |
| eval_rows.append((row_container, a_text_display, b_text, rel_radio, conf_md, inter_dd, just_box, m_coarse_st, m_drill_st, m_conf_st, m_ai_just_st, m_ig_json_st)) |
|
|
| |
| with gr.Row(): |
| skip_btn = gr.Button("Skip Target A", size="lg") |
| save_btn = gr.Button("Save Filled Annotations", variant="primary", size="lg") |
| |
| status_box = gr.Textbox(label="System Log", interactive=False) |
| |
| workspace_info = gr.Markdown( |
| "<div style='text-align: center; padding: 15px; margin-top: 20px; background-color: #f8fafc; border: 1px solid #e2e8f0; border-radius: 8px;'>" |
| "<strong>Visualisations Report</strong><br>" |
| "As you keep on scoring, head on to check out the visualisations report! This is an exercise on scoring policy coherences and comparison with the AI model.<br>" |
| "<a href='https://datastudio.google.com/u/0/reporting/cc1d6cab-fe9d-4d77-91d9-ea6ccfb5a39c/page/U7RuF' target='_blank' style='color: #3b82f6; text-decoration: underline;'>https://datastudio.google.com/u/0/reporting/cc1d6cab-fe9d-4d77-91d9-ea6ccfb5a39c/page/U7RuF</a>" |
| "</div>" |
| ) |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| footer_disclaimer = gr.Markdown( |
| "---\n" |
| "<div style='text-align: center; color: #6b7280; font-size: 0.9em; padding: 10px;'>" |
| "<strong>Disclaimer:</strong> This tool is developed and maintained by <a href='https://www.epicafrica.eu/' target='_blank' style='color: #4b5563; text-decoration: underline;'>EPIC Africa</a>. " |
| "The European Union (EU) is not liable for the content, use, or outputs generated by this tool." |
| "</div>" |
| ) |
| |
| gr.HTML(""" |
| <div style="text-align: center; padding: 10px; margin: 0 auto; display: flex; justify-content: center;"> |
| <a href="https://www.epicafrica.eu/" target="_blank"> |
| <img src="https://www.epicafrica.eu/wp-content/uploads/2022/11/Untitled-design-1024x205.png" style="max-height: 80px; max-width: 100%; object-fit: contain; margin: 0 auto;"> |
| </a> |
| </div> |
| """) |
| |
| def translate_static_ui(lang): |
| titles = [ |
| """<div class="persistent-header"> |
| <div style="display: flex; align-items: center; gap: 15px;"> |
| <img src="https://www.epicafrica.eu/wp-content/uploads/2022/10/cropped-epicAfrica_HEProject-32x32.png" style="height: 50px;"> |
| <h2 style="margin: 0; color: #374151;">Policy Coherence Tool</h2> |
| </div> |
| </div>""", |
| "### Mapping Policy Synergies Across Sectors\n\nWelcome to the EPIC Africa Policy Coherence Tool. This platform is designed to help researchers and policymakers systematically evaluate how specific objectives within different policy documents interact with one another.\n\n**How it works:**\nYou will be presented with a target objective from one policy and asked to score its interaction against objectives from a different policy. By identifying whether these targets reinforce, enable, or constrain one another, you help build a comprehensive understanding of cross-sectoral coherence.\n\n", |
| "Get Started", |
| "### User Authentication & Informed Consent", |
| "**Before you continue**\n\nThis survey is anonymous by default. When you sign in with your email, we will ask two brief questions about how your data is handled.\n\n**1 · Linking your responses to your email**\nYou may choose to have your responses stored in association with your email address. This is entirely optional. Your data will be held securely and will not be shared with any third party.\n\n**2 · Follow-up contact**\nIf you agreed to linking above, we may also ask whether you are willing to be contacted for a brief follow-up conversation — only in cases where your responses raise questions that would benefit from further discussion.\n\n*You can say no to either question without affecting your participation.*", |
| "Login & Accept", |
| "### What is your expertise?", |
| "Please select the sector(s) that best match your expertise and work experience. Multiple selections are allowed.", |
| "Proceed to Workspace", |
| "**Definitions:**\n- **Nexus Domain:** The broad sector being analyzed (e.g., Land, Water, Energy).\n- **Policy:** The specific document under review.\n- **Target:** The exact objective or statement you are currently evaluating.\n- **Context:** The broader set of measures belonging to the policy, provided as background reference.\n\n**General Class Definitions:**\n- **Supportive:** the policy objectives explicitly reinforce each other\n- **Non-interacting:** policy objectives are independent of each other and non-related\n- **Contradictory:** the policy objectives imply no explicit alignment or are contradicting of each other", |
| "Interaction Class Definitions (Click to Expand)", |
| "| Interaction Label | Meaning | Implication |\n| :--- | :--- | :--- |\n| **+3 (Indivisible)** | Progress on one target automatically delivers progress on another | There is high level of compatibility between the two targets. |\n| **+2 (Reinforcing)** | Progress on one target makes it easier to make progress on another | There is relatively higher level of compatibility between the targets being compared. |\n| **+1 (Enabling)** | Progress on one target creates conditions that enable progress on another | There is a small level of compatibility between the two targets compared. |\n| **0 (Consistent)** | There is no significant link between two targets' progress | There is no significant compatibility between the two targets being evaluated. |\n| **-1 (Constraining)** | Progress on one target constrains the options for how to deliver on another | The targets are relatively competitive resulting in counterproductive effects. |\n| **-2 (Counteracting)** | Progress on one target makes it more difficult to make progress on another | The targets are counterproductive and do not support each other. |\n| **-3 (Cancelling)** | Progress on one target automatically leads to a negative impact on another | The targets are highly opposite and are highly counterproductive. Cannot deliver synergistic effects. |", |
| "Data Selection", |
| "⬅️ Back to Sector Selection", |
| "### Source A", |
| "### Source B", |
| "Fetch Data", |
| "💬 Ask AI about the Context & Policies", |
| "### Bulk Coherence Evaluation", |
| "Evaluate how the **Target A** above interacts with the **Target B** statements below.\n**Rules:** If you evaluate a row, you **MUST** select the Class, the Extended Class Interaction, and write a Justification. You may leave a row entirely blank to skip it.", |
| "Skip Target A", |
| "Save Filled Annotations", |
| "<div style='text-align: center; padding: 15px; margin-top: 20px; background-color: #f8fafc; border: 1px solid #e2e8f0; border-radius: 8px;'>📊 <strong>Visualisations Report</strong><br>Watch the dataset grow as we map policy coherences across domains! Check out the live visualisations report to track our collective progress, uncover cross-sector synergies, and see exactly how our policy experts stack up against the AI baseline.<br><a href='https://datastudio.google.com/u/0/reporting/cc1d6cab-fe9d-4d77-91d9-ea6ccfb5a39c/page/U7RuF' target='_blank' style='color: #3b82f6; text-decoration: underline;'>https://datastudio.google.com/u/0/reporting/cc1d6cab-fe9d-4d77-91d9-ea6ccfb5a39c/page/U7RuF</a></div>", |
| "---\n<div style='text-align: center; color: #6b7280; font-size: 0.9em; padding: 10px;'><strong>Disclaimer:</strong> This tool is developed and maintained by EPIC Africa. The European Union (EU) is not liable for the content, use, or outputs generated by this tool.</div>" |
| ] |
| translated = t_batch(titles, lang) |
| return translated |
|
|
| def handle_language_change(lang, ctx_a_eng, ctx_b_eng, a_list, tasks_dict, idx, user_tag, pol_a, pol_b, hf_df): |
| static_updates = translate_static_ui(lang) |
| ctx_a_trans = t_text(ctx_a_eng, lang) |
| ctx_b_trans = t_text(ctx_b_eng, lang) |
| |
| rendered = render_target_a(a_list, tasks_dict, idx, lang, user_tag, pol_a, pol_b, hf_df) |
| prog_txt = rendered[0] |
| row_updates = rendered[1:] |
| |
| return [ |
| gr.update(value=static_updates[0]), |
| gr.update(value=static_updates[1]), |
| gr.update(value=static_updates[2]), |
| gr.update(value=static_updates[3]), |
| gr.update(value=static_updates[4]), |
| gr.update(value=static_updates[5]), |
| gr.update(value=static_updates[6]), |
| gr.update(label=static_updates[7]), |
| gr.update(value=static_updates[8]), |
| gr.update(value=static_updates[9]), |
| gr.update(label=static_updates[10]), |
| gr.update(value=static_updates[11]), |
| gr.update(label=static_updates[12]), |
| gr.update(value=static_updates[13]), |
| gr.update(value=static_updates[14]), |
| gr.update(value=static_updates[15]), |
| gr.update(value=static_updates[16]), |
| |
| gr.update(value=static_updates[18]), |
| gr.update(value=static_updates[19]), |
| gr.update(value=static_updates[20]), |
| gr.update(value=static_updates[21]), |
| gr.update(value=static_updates[22]), |
| gr.update(value=static_updates[23]), |
| gr.update(value=ctx_a_trans), |
| gr.update(value=ctx_b_trans), |
| prog_txt |
| ] + row_updates |
| |
| def update_drill(label, current_val): |
| |
| if not label: |
| return gr.update(choices=[], value=None) |
| |
| choices = DRILL_DOWN_MAP.get(label, []) |
| if current_val in choices: |
| return gr.update(choices=choices, value=current_val) |
| |
| new_val = choices[0] if choices else None |
| return gr.update(choices=choices, value=new_val) |
|
|
| |
| |
| |
| |
| |
| |
| |
| for i in range(MAX_ROWS): |
| |
| _, _, b_text, rel_radio, _, inter_dd, just_box, _, _, _, _, _ = eval_rows[i] |
| |
| |
| rel_radio.change(fn=update_drill, inputs=[rel_radio, inter_dd], outputs=inter_dd) |
| |
| |
| inputs_to_cache = [ |
| user_tag_state, session_id_state, |
| domain_a_dd, policy_a_dd, domain_b_dd, policy_b_dd, target_col_dd, context_col_dd, |
| target_a_list_state, current_index_state, b_text, rel_radio, inter_dd, just_box |
| ] |
| |
| |
| rel_radio.change(fn=update_cache_row, inputs=inputs_to_cache) |
| inter_dd.change(fn=update_cache_row, inputs=inputs_to_cache) |
| just_box.change(fn=update_cache_row, inputs=inputs_to_cache) |
| |
| |
| |
| def authenticate(email, link_val, follow_val): |
| email = email.strip().lower() |
| |
| |
| email_pattern = r"^[^@\s]+@[^@\s]+\.[^@\s]+$" |
| if not re.match(email_pattern, email): |
| write_log("LOGIN_FAILED", f"Invalid email format attempted: '{email}'") |
| return (gr.update(value=f"<font color='red'>Please enter a valid email address.</font>"), |
| gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), |
| None, None, |
| gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), link_val, follow_val) |
| |
| user_tag, msg = get_or_create_user(email) |
| |
| if not user_tag: |
| write_log("LOGIN_FAILED", f"System error creating user for '{email}'") |
| return (gr.update(value=f"<font color='red'>{msg}</font>"), |
| gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), |
| None, None, |
| gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), link_val, follow_val) |
| |
| write_log("LOGIN_SUCCESS", f"User {user_tag} ({email}) logged in. Consents - Link: {link_val}, Follow: {follow_val}") |
| hf_df = load_hf_dataset() |
| drafts = load_drafts() |
| user_data = drafts.get(user_tag, {}) |
| ws = user_data.get("workspace", {}) |
| |
| |
| if ws.get("pol_a") and ws.get("pol_b"): |
| write_log("SESSION_RESTORED", f"User {user_tag} skipped sectors and restored previous session {ws.get('session_id')}.") |
| msg += f" Restored your pending session. Click 'Fetch Data' to resume your draft." |
| return ( |
| gr.update(value=f"{msg} Loaded {len(hf_df)} annotations."), |
| gr.update(visible=False), |
| gr.update(visible=False), |
| gr.update(visible=True), |
| user_tag, |
| hf_df, |
| gr.update(value=ws["dom_a"]), |
| gr.update(choices=get_policy_list(ws["dom_a"]), value=ws["pol_a"]), |
| gr.update(value=ws["dom_b"]), |
| gr.update(choices=get_policy_list(ws["dom_b"]), value=ws["pol_b"]), |
| gr.update(value=ws["tar_col"]), |
| gr.update(value=ws["ctx_col"]), |
| link_val, |
| follow_val |
| ) |
| else: |
| write_log("NEW_SESSION", f"User {user_tag} starting fresh. Routing to Sector Selection.") |
| return ( |
| gr.update(value=f"{msg} Loaded {len(hf_df)} annotations."), |
| gr.update(visible=False), |
| gr.update(visible=True), |
| gr.update(visible=False), |
| user_tag, |
| hf_df, |
| gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), |
| link_val, |
| follow_val |
| ) |
| def route_to_workspace(selected_sectors): |
| if not selected_sectors: |
| raise gr.Error("Please select at least one sector.") |
| |
| allowed_domains = set() |
| for s in selected_sectors: |
| mapped_domain = SECTOR_MAPPING.get(s) |
| if mapped_domain in DOMAINS: |
| allowed_domains.add(mapped_domain) |
| |
| |
| allowed_list = list(allowed_domains) |
| |
| |
| default_domain = allowed_list[0] if allowed_list else None |
| available_policies = get_policy_list(default_domain) if default_domain else [] |
| |
| write_log("SECTOR_SELECTED", f"Mapped sectors {selected_sectors} to domains {allowed_list}") |
| return ( |
| gr.update(visible=False), |
| gr.update(visible=True), |
| gr.update(choices=allowed_list, value=default_domain), |
| gr.update(choices=available_policies, value=None), |
| gr.update(choices=DOMAINS, value=DOMAINS[0] if DOMAINS else None), |
| gr.update(choices=get_policy_list(DOMAINS[0]) if DOMAINS else [], value=None) |
| ) |
| |
| def update_a_choices(dom_a, pol_b, curr_a): |
| choices = [p for p in get_policy_list(dom_a) if p != pol_b] |
| val = curr_a if curr_a in choices else None |
| return gr.update(choices=choices, value=val) |
|
|
| def update_b_choices(dom_b, pol_a, curr_b): |
| choices = [p for p in get_policy_list(dom_b) if p != pol_a] |
| val = curr_b if curr_b in choices else None |
| return gr.update(choices=choices, value=val) |
| |
| get_started_btn.click( |
| fn=lambda: (gr.update(visible=False), gr.update(visible=True)), |
| inputs=None, |
| outputs=[landing_page, login_box] |
| ) |
|
|
| login_btn.click( |
| fn=authenticate, |
| inputs=[email_box, consent_link_radio, consent_follow_radio], |
| outputs=[ |
| login_status, login_box, sector_box, app_box, user_tag_state, hf_df_state, |
| domain_a_dd, policy_a_dd, domain_b_dd, policy_b_dd, target_col_dd, context_col_dd, |
| consent_link_state, consent_follow_state |
| ] |
| ) |
|
|
| |
| proceed_btn.click( |
| fn=route_to_workspace, |
| inputs=[sector_cb], |
| outputs=[sector_box, app_box, domain_a_dd, policy_a_dd, domain_b_dd, policy_b_dd] |
| ) |
|
|
| back_to_sectors_btn.click( |
| fn=lambda: (gr.update(visible=True), gr.update(visible=False)), |
| inputs=None, |
| outputs=[sector_box, app_box] |
| ) |
|
|
| domain_a_dd.change(fn=update_a_choices, inputs=[domain_a_dd, policy_b_dd, policy_a_dd], outputs=policy_a_dd) |
| policy_b_dd.change(fn=update_a_choices, inputs=[domain_a_dd, policy_b_dd, policy_a_dd], outputs=policy_a_dd) |
|
|
| domain_b_dd.change(fn=update_b_choices, inputs=[domain_b_dd, policy_a_dd, policy_b_dd], outputs=policy_b_dd) |
| policy_a_dd.change(fn=update_b_choices, inputs=[domain_b_dd, policy_a_dd, policy_b_dd], outputs=policy_b_dd) |
|
|
| |
| def render_target_a(a_list, tasks_dict, idx, lang, user_tag, pol_a, pol_b, hf_df, progress=gr.Progress()): |
| updates = [] |
| |
| empty_row = [gr.update(visible=False), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), "", "", "", "", ""] |
| |
| if not a_list: |
| prog_txt = t_text("**Progress:** No unannotated items found.", lang) |
| updates.append(prog_txt) |
| for i in range(MAX_ROWS): updates.extend(empty_row) |
| return updates + [[]] |
| |
| if idx >= len(a_list): |
| prog_txt = t_text("**Progress:** Completed all items.", lang) |
| updates.append(prog_txt) |
| for i in range(MAX_ROWS): updates.extend(empty_row) |
| return updates + [[]] |
| |
| curr_a_eng = a_list[idx] |
| bs_to_eval_eng = tasks_dict[curr_a_eng] |
| |
| curr_a_display = t_text(curr_a_eng, lang) |
| bs_display = t_batch(bs_to_eval_eng, lang) |
| |
| prog_txt = t_text(f"**Progress:** Annotating Target A group {idx + 1} of {len(a_list)}", lang) |
| updates.append(prog_txt) |
| |
| drafts = load_drafts() |
| cache_key = f"{pol_a}|{pol_b}|{curr_a_eng}" |
| user_draft = drafts.get(user_tag, {}).get("rows", {}).get(cache_key, {}) |
| |
| user_saved_df = pd.DataFrame() |
| if not hf_df.empty: |
| temp_df = hf_df[ |
| (hf_df["AnnotatorUsername"] == user_tag) & |
| (hf_df["Policy_A_Name"] == pol_a) & |
| (hf_df["Policy_B_Name"] == pol_b) & |
| (hf_df["Target_A_Row"] == curr_a_eng) |
| ].copy() |
| |
| if not temp_df.empty: |
| temp_df['Timestamp'] = pd.to_datetime(temp_df['Timestamp']) |
| temp_df = temp_df.sort_values(by='Timestamp') |
| user_saved_df = temp_df.drop_duplicates(subset=["Target_B_Row"], keep="last") |
| |
| |
| if progress is not None: |
| progress(0.4, desc="Running background AI predictions...") |
| |
| preds = get_model_predictions(curr_a_eng, bs_to_eval_eng) |
| |
| |
| if progress is not None: |
| progress(0.8, desc="Rendering UI blocks...") |
| |
| for i in range(MAX_ROWS): |
| if i < len(bs_display): |
| p_radio, p_conf_md, p_inter_dd, p_m_coarse, p_m_drill, p_m_conf = preds[i] |
| b_val_eng = bs_to_eval_eng[i] |
| |
| cached_row = user_draft.get(b_val_eng) |
| saved_row = user_saved_df[user_saved_df["Target_B_Row"] == b_val_eng] if not user_saved_df.empty else pd.DataFrame() |
| |
| if cached_row: |
| set_radio = gr.update(value=cached_row.get("rel")) if cached_row.get("rel") else p_radio |
| set_inter = gr.update(value=cached_row.get("inter")) if cached_row.get("inter") else p_inter_dd |
| set_just = gr.update(value=cached_row.get("just", "")) |
| elif not saved_row.empty: |
| set_radio = gr.update(value=saved_row.iloc[-1]["Coherence_Label"]) |
| set_inter = gr.update(value=saved_row.iloc[-1]["Drill_Down_Label"]) |
| set_just = gr.update(value=saved_row.iloc[-1]["Justification"]) |
| else: |
| set_radio = p_radio |
| set_inter = p_inter_dd |
| set_just = gr.update(value="") |
|
|
| updates.extend([ |
| gr.update(visible=True), |
| gr.update(value=curr_a_display), |
| gr.update(value=bs_display[i]), |
| set_radio, p_conf_md, set_inter, set_just, |
| p_m_coarse, p_m_drill, p_m_conf, "", "" |
| ]) |
| else: |
| updates.extend(empty_row) |
| |
| |
| if progress is not None: |
| progress(1.0, desc="Done!") |
| |
| return updates + [bs_to_eval_eng] |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| def load_workspace(dom_a, pol_a, dom_b, pol_b, tar_col, ctx_col, hf_df, user_tag, lang, progress=gr.Progress()): |
| progress(0.1, desc="Validating selections...") |
| if not pol_a or not pol_b: |
| err = t_text("Error: Select both policies.", lang) |
| return [gr.update(value=err)] + [gr.skip()] * (14 + MAX_ROWS*12) |
| |
| if tar_col == ctx_col: |
| err = t_text("Error: Target and Context cannot be the same.", lang) |
| return [gr.update(value=err)] + [gr.skip()] * (14 + MAX_ROWS*12) |
| |
| progress(0.2, desc="Extracting policy structures...") |
| df_a = DOMAIN_MAP[dom_a] |
| df_b = DOMAIN_MAP[dom_b] |
| |
| sec_a = get_sector_for_policy(df_a, pol_a) |
| sec_b = get_sector_for_policy(df_b, pol_b) |
| meta_a_str = f"**Sector:** {sec_a} | **Policy:** {pol_a}" |
| meta_b_str = f"**Sector:** {sec_b} | **Policy:** {pol_b}" |
| |
| targets_a = get_unique_items(df_a, pol_a, tar_col) |
| targets_b = get_unique_items(df_b, pol_b, tar_col) |
| |
| user_df = hf_df[hf_df["AnnotatorUsername"] == user_tag] |
| mask = (user_df["Policy_A_Name"] == pol_a) & (user_df["Policy_B_Name"] == pol_b) |
| annotated_pairs = set(zip(user_df.loc[mask, "Target_A_Row"], user_df.loc[mask, "Target_B_Row"])) |
| |
| pending_tasks = {} |
| total_missing_pairs = 0 |
| |
| for a in targets_a: |
| missing_bs = [b for b in targets_b if (a, b) not in annotated_pairs] |
| if missing_bs: |
| pending_tasks[a] = missing_bs[:MAX_ROWS] |
| total_missing_pairs += len(pending_tasks[a]) |
| |
| target_a_list = list(pending_tasks.keys()) |
| |
| contexts_a = get_unique_items(df_a, pol_a, ctx_col) |
| contexts_b = get_unique_items(df_b, pol_b, ctx_col) |
| ctx_a_chunk_eng = "\n\n".join([f"• {c}" for c in contexts_a]) if contexts_a else "No context data available." |
| ctx_b_chunk_eng = "\n\n".join([f"• {c}" for c in contexts_b]) if contexts_b else "No context data available." |
| |
| ctx_a_display = t_text(ctx_a_chunk_eng, lang) |
| ctx_b_display = t_text(ctx_b_chunk_eng, lang) |
|
|
| rendered_updates = render_target_a(target_a_list, pending_tasks, 0, lang, user_tag, pol_a, pol_b, hf_df, progress) |
| prog = rendered_updates[0] |
| row_updates = rendered_updates[1:-1] |
| b_eng_list = rendered_updates[-1] |
| |
| status_msg = t_text(f"Data loaded. {total_missing_pairs} unannotated pairs remain across {len(target_a_list)} Target A groups.", lang) |
| |
| write_log("WORKSPACE_LOADED", f"User {user_tag} fetched {pol_a} vs {pol_b}.") |
| return [ |
| target_a_list, pending_tasks, 0, |
| ctx_a_chunk_eng, ctx_b_chunk_eng, b_eng_list, |
| prog, meta_a_str, ctx_a_display, |
| meta_b_str, ctx_b_display, status_msg, |
| gr.update(visible=len(target_a_list) > 0) |
| ] + row_updates |
|
|
| def save_action(idx, a_list, tasks_dict, ctx_a_chunk_eng, |
| ctx_b_chunk_eng, b_eng_list, dom_a, pol_a, |
| dom_b, pol_b, tar_col, ctx_col, user_tag, |
| session_id, c_link, c_follow, hf_df, lang, *row_data): |
| |
| if idx >= len(a_list): |
| return gr.update(value=t_text("End of list reached.", lang)), idx, hf_df |
| |
| current_a_eng = a_list[idx] |
| new_rows = [] |
| |
| |
| try: |
| tz = ZoneInfo("Africa/Nairobi") |
| except: |
| import pytz |
| tz = pytz.timezone("Africa/Nairobi") |
| |
| current_time = datetime.now(tz).strftime('%Y-%m-%d %H:%M:%S') |
| |
| for i in range(MAX_ROWS): |
| if i >= len(b_eng_list): break |
| b_val_eng = b_eng_list[i] |
| |
| rel = row_data[i*10 + 1] |
| inter = row_data[i*10 + 3] |
| just = row_data[i*10 + 4] |
| |
| model_coarse = row_data[i*10 + 5] |
| model_drill = row_data[i*10 + 6] |
| model_conf = row_data[i*10 + 7] |
| ai_just = row_data[i*10 + 8] |
| ig_json = row_data[i*10 + 9] |
| |
| has_rel = bool(rel) |
| has_inter = bool(inter) |
| has_just = bool(just and just.strip()) |
| |
| |
| if has_rel or has_inter or has_just: |
| if not (has_rel and has_inter and has_just): |
| raise gr.Error(f"Row {i+1} is incomplete! Please fill Class, Extended Class, and Justification, or clear the row to skip.") |
| |
| new_rows.append({ |
| "Domain_A": dom_a, |
| "Sector_A": get_sector_for_policy(DOMAIN_MAP[dom_a], pol_a), |
| "Policy_A_Name": pol_a, |
| "Domain_B": dom_b, |
| "Sector_B": get_sector_for_policy(DOMAIN_MAP[dom_b], pol_b), |
| "Policy_B_Name": pol_b, |
| "Target_Column": tar_col, |
| "Target_A_Row": current_a_eng, |
| "Target_B_Row": b_val_eng, |
| "Context_Column": ctx_col, |
| "Context_A_Chunk": ctx_a_chunk_eng, |
| "Context_B_Chunk": ctx_b_chunk_eng, |
| "Model_Coarse_Label": model_coarse, |
| "Model_Drill_Down_Label": model_drill, |
| "Model_Confidences": model_conf, |
| "AI_Justification": ai_just, |
| "IG_JSON": ig_json, |
| "Coherence_Label": rel, |
| "Drill_Down_Label": inter, |
| "Justification": just.strip(), |
| "AnnotatorUsername": user_tag, |
| "Timestamp": current_time, |
| "SessionID": session_id, |
| "Consent_Link_Email": c_link, |
| "Consent_Follow_Up": c_follow |
| }) |
| |
| if new_rows: |
| new_df = pd.DataFrame(new_rows) |
| hf_df = pd.concat([hf_df, new_df], ignore_index=True) |
| try: |
| csv_buffer = io.StringIO() |
| hf_df.to_csv(csv_buffer, index=False) |
| csv_bytes = csv_buffer.getvalue().encode('utf-8') |
| |
| write_log("DATA_SAVED", f"User {user_tag} successfully saved {len(new_rows)} completed rows to Hugging Face.") |
| api = HfApi() |
| api.upload_file( |
| path_or_fileobj=io.BytesIO(csv_bytes), path_in_repo=HF_CSV_FILE, |
| repo_id=HF_DATASET_REPO, token=HF_TOKEN, repo_type="dataset" |
| ) |
| log_msg = t_text(f"Successfully saved {len(new_rows)} annotations.", lang) |
| |
| |
| drafts = load_drafts() |
| cache_key = f"{pol_a}|{pol_b}|{current_a_eng}" |
| |
| |
| if user_tag in drafts and "rows" in drafts[user_tag] and cache_key in drafts[user_tag]["rows"]: |
| del drafts[user_tag]["rows"][cache_key] |
| with open(DRAFT_FILE, 'w') as f: |
| json.dump(drafts, f) |
| |
| except Exception as e: |
| log_msg = f"Error saving data: {e}" |
| else: |
| log_msg = t_text("No annotations filled. Skipped to next group.", lang) |
| |
| return gr.update(value=log_msg), idx + 1, hf_df |
|
|
| def skip_action(idx, lang): |
| write_log("TARGET_SKIPPED", f"User skipped group {idx + 1}") |
| return gr.update(value=t_text(f"Skipped group {idx + 1}.", lang)), idx + 1 |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| row_outputs = [] |
| row_inputs = [] |
| |
| |
| for container, a_txt, b, r, c_md, inter, j, m_co, m_dr, m_cf, m_ai_j, m_ig_j in eval_rows: |
| row_outputs.extend([container, a_txt, b, r, c_md, inter, j, m_co, m_dr, m_cf, m_ai_j, m_ig_j]) |
| row_inputs.extend([b, r, c_md, inter, j, m_co, m_dr, m_cf, m_ai_j, m_ig_j]) |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| def chat_with_ai(user_msg, history, ctx_a, ctx_b, a_list, idx, lang, user_tag, dom_a, pol_a, dom_b, pol_b): |
| if not user_msg: |
| yield "", history |
| return |
| |
| curr_a = a_list[idx] if a_list and idx < len(a_list) else "None" |
| |
| system_prompt = f"You are an AI policy assistant helping an annotator understand policy documents.\nContext A: {ctx_a}\nContext B: {ctx_b}\nActive Target A: {curr_a}\nAnswer the user's query clearly and concisely based on this context." |
| |
| messages = [{"role": "system", "content": system_prompt}] |
| messages.extend(history) |
| messages.append({"role": "user", "content": user_msg}) |
| |
| |
| history.append({"role": "user", "content": user_msg}) |
| history.append({"role": "assistant", "content": ""}) |
| yield "", history |
| |
| try: |
| res = llm_client.chat_completion(messages=messages, max_tokens=8000, temperature=0.1, stream=True) |
| partial_text = "" |
| |
| for chunk in res: |
| token = chunk.choices[0].delta.content or "" |
| partial_text += token |
| |
| |
| history[-1]["content"] = format_streaming_thoughts(partial_text, is_streaming=True) |
| yield "", history |
| |
| |
| if lang != "English": |
| partial_text = t_text(partial_text, lang) |
| |
| final_formatted = format_streaming_thoughts(partial_text, is_streaming=False) |
| history[-1]["content"] = final_formatted |
| yield "", history |
| |
| |
| log_chat_to_hf(user_tag, dom_a, pol_a, dom_b, pol_b, ctx_a, ctx_b, curr_a, user_msg, partial_text) |
|
|
| except Exception as e: |
| history[-1]["content"] += f"\n\nError: {str(e)}" |
| yield "", history |
|
|
| |
| chat_inputs = [chat_input, chatbot, ctx_a_eng_state, ctx_b_eng_state, target_a_list_state, current_index_state, lang_selector, user_tag_state, domain_a_dd, policy_a_dd, domain_b_dd, policy_b_dd] |
| chat_submit.click(fn=chat_with_ai, inputs=chat_inputs, outputs=[chat_input, chatbot]) |
| chat_input.submit(fn=chat_with_ai, inputs=chat_inputs, outputs=[chat_input, chatbot]) |
|
|
| lang_selector.change( |
| fn=handle_language_change, |
| inputs=[ |
| lang_selector, ctx_a_eng_state, ctx_b_eng_state, target_a_list_state, |
| pending_tasks_state, current_index_state, user_tag_state, policy_a_dd, |
| policy_b_dd, hf_df_state |
| ], |
| outputs=[ |
| main_title, main_desc, get_started_btn, login_title, login_disclaimer, login_btn, |
| sector_title, sector_cb, proceed_btn, app_definitions, interaction_acc, interaction_md, |
| data_acc, back_to_sectors_btn, src_a_title, src_b_title, load_btn, bulk_title, bulk_desc, |
| skip_btn, save_btn, workspace_info, footer_disclaimer, |
| display_context_a, display_context_b, progress_text |
| ] + row_outputs + [current_b_eng_list_state] |
| ) |
|
|
| load_btn.click( |
| fn=load_workspace, |
| inputs=[ |
| domain_a_dd, policy_a_dd, domain_b_dd, policy_b_dd, |
| target_col_dd, context_col_dd, hf_df_state, user_tag_state, lang_selector |
| ], |
| outputs=[ |
| target_a_list_state, pending_tasks_state, current_index_state, |
| ctx_a_eng_state, ctx_b_eng_state, current_b_eng_list_state, |
| progress_text, meta_a, display_context_a, |
| meta_b, display_context_b, status_box, workspace_box |
| ] + row_outputs |
| ) |
|
|
| save_btn.click( |
| fn=save_action, |
| inputs=[ |
| current_index_state, target_a_list_state, pending_tasks_state, |
| ctx_a_eng_state, ctx_b_eng_state, current_b_eng_list_state, |
| domain_a_dd, policy_a_dd, domain_b_dd, policy_b_dd, |
| target_col_dd, context_col_dd, user_tag_state, session_id_state, |
| consent_link_state, consent_follow_state, |
| hf_df_state, lang_selector |
| ] + row_inputs, |
| outputs=[status_box, current_index_state, hf_df_state] |
| ).then( |
| fn=render_target_a, |
| inputs=[ |
| target_a_list_state, pending_tasks_state, current_index_state, lang_selector, |
| user_tag_state, policy_a_dd, policy_b_dd, hf_df_state |
| ], |
| outputs=[progress_text] + row_outputs + [current_b_eng_list_state] |
| ) |
|
|
| skip_btn.click( |
| fn=skip_action, inputs=[current_index_state, lang_selector], outputs=[status_box, current_index_state] |
| ).then( |
| fn=render_target_a, |
| inputs=[ |
| target_a_list_state, pending_tasks_state, current_index_state, lang_selector, |
| user_tag_state, policy_a_dd, policy_b_dd, hf_df_state |
| ], |
| outputs=[progress_text] + row_outputs + [current_b_eng_list_state] |
| ) |
|
|
| |
|
|
| |
| demo.queue(default_concurrency_limit=40).launch(debug=True, show_error=True, ssr_mode=False) |
|
|
|
|
|
|