DPO_ga / app.py
jmcinern's picture
Update app.py
fac4afa verified
import gradio as gr
import pandas as pd
import json
import random
from datetime import datetime
from pathlib import Path
import os
from huggingface_hub import HfApi, hf_hub_download, create_repo
try:
from huggingface_hub.utils import HfHubHTTPError
except ImportError:
# For older versions of huggingface_hub
class HfHubHTTPError(Exception):
pass
# --- Configuration ---
# Source data file containing instructions and responses
TRANSLATED_FILE = "translated_IRT_ga.jsonl"
# Local and remote filename for annotations
ANNOTATION_FILE = "DPO_annotations.csv"
# Hugging Face Hub details
HF_REPO_ID = "jmcinern/DPO_ga" # Your HF repo ID
HF_TOKEN = os.getenv("HF_TOKEN")
# Deterministic sampling settings
NUM_SAMPLES = 200
RANDOM_SEED = 42
# --- UI Content ---
CONSENT_MD = """
### Irish QA Pair Comparison (Master’s Thesis)
You are invited to take part in a study on Large Language Model Irish-language QA quality.
By continuing, you consent to the following:
- Your annotations are anonymised.
- The dataset (reference text + model outputs + your choices) will be released **open-source** for both research and commercial purposes.
- No personal data is collected. You may stop at any time.
- You will answer the following question:
#### Which answer, A or B, is better in terms of grammar, naturalness, and coherence?
- Only base your decision on this question and not other factors.
Please confirm consent, select your role, then press **Begin**.
"""
# --- Helper Functions ---
def load_master_samples() -> list:
"""Loads, shuffles deterministically, and returns the first 100 samples."""
if not Path(TRANSLATED_FILE).exists():
raise FileNotFoundError(f"Source file not found: {TRANSLATED_FILE}")
with open(TRANSLATED_FILE, "r", encoding="utf-8") as f:
data = [json.loads(line) for line in f]
# Shuffle with a fixed seed to get a deterministic "random" subset
rng = random.Random(RANDOM_SEED)
rng.shuffle(data)
return data[:NUM_SAMPLES]
def download_annotations() -> pd.DataFrame:
"""Downloads annotations from HF. If not found, returns an empty DataFrame."""
try:
local_path = hf_hub_download(
repo_id=HF_REPO_ID,
filename=ANNOTATION_FILE,
repo_type="dataset",
token=HF_TOKEN,
)
print(f"Downloaded existing annotations from {HF_REPO_ID}")
return pd.read_csv(local_path)
except HfHubHTTPError as e:
# If the file doesn't exist on the Hub (404), it's the first run.
if e.response.status_code == 404:
print("No remote annotation file found. Creating a new one.")
# Define the schema for the new CSV file, now including annotator_type
return pd.DataFrame(columns=["hash", "annotator_type", "choice", "preferred_response", "timestamp"])
else:
raise # Re-raise other HTTP errors
def upload_annotations(df: pd.DataFrame):
"""Saves a DataFrame locally and pushes it to the Hugging Face Hub."""
if not HF_TOKEN:
print("WARNING: No HF_TOKEN found. Skipping upload.")
return
# Save locally first
df.to_csv(ANNOTATION_FILE, index=False)
# Upload to Hub
api = HfApi()
create_repo(HF_REPO_ID, repo_type="dataset", exist_ok=True, token=HF_TOKEN)
api.upload_file(
path_or_fileobj=ANNOTATION_FILE,
path_in_repo=ANNOTATION_FILE,
repo_id=HF_REPO_ID,
repo_type="dataset",
token=HF_TOKEN,
commit_message="Append new DPO annotation"
)
print(f"Successfully uploaded updated annotations to {HF_REPO_ID}")
# --- Gradio Core Logic ---
def prepare_tasks():
"""
Loads master samples, downloads existing annotations, and prepares the
list of un-annotated tasks for the current session.
"""
master_samples = load_master_samples()
annotations_df = download_annotations()
completed_hashes = set(annotations_df['hash'].unique())
to_do_samples = [s for s in master_samples if s['hash'] not in completed_hashes]
tasks = []
for sample in to_do_samples:
# Shuffle response1 and response2 for unbiased presentation
options = [('response1', sample['response1']), ('response2', sample['response2'])]
random.shuffle(options)
tasks.append({
"hash": sample['hash'],
"instruction": sample['instruction'],
"response_A": options[0][1],
"response_B": options[1][1],
# Track which original response corresponds to A and B
"shuffle_map": {'A': options[0][0], 'B': options[1][0]}
})
return tasks
def start_session(annotator_type):
"""
Triggered by the 'Begin' button. Prepares tasks and loads the first one.
"""
tasks = prepare_tasks()
if not tasks:
# All samples are already annotated
return {
consent_group: gr.update(visible=False),
task_group: gr.update(visible=False),
done_group: gr.update(visible=True),
state_tasks: [],
state_task_index: 0,
state_annotator_type: ""
}
first_task = tasks[0]
progress_str = f"Progress: 1 / {len(tasks)}"
return {
consent_group: gr.update(visible=False),
task_group: gr.update(visible=True),
done_group: gr.update(visible=False),
state_tasks: tasks,
state_task_index: 0,
state_annotator_type: annotator_type,
progress_counter: gr.update(value=progress_str),
instruction_box: gr.update(value=first_task['instruction']),
response_a_box: gr.update(value=first_task['response_A']),
response_b_box: gr.update(value=first_task['response_B']),
}
def record_choice(tasks, current_index, annotator_type, choice):
"""
Records the user's choice, saves it, and loads the next task.
"""
# 1. Get current task and determine which original response was preferred
current_task = tasks[current_index]
preferred_response_key = current_task['shuffle_map'][choice] # 'response1' or 'response2'
# 2. Create a new annotation row, now including the annotator_type
new_annotation = {
"hash": current_task['hash'],
"annotator_type": annotator_type,
"choice": choice, # 'A' or 'B'
"preferred_response": preferred_response_key,
"timestamp": datetime.utcnow().isoformat()
}
# 3. Load existing annotations, append, and upload
annotations_df = download_annotations()
new_df = pd.concat([annotations_df, pd.DataFrame([new_annotation])], ignore_index=True)
upload_annotations(new_df)
# 4. Move to the next task
next_index = current_index + 1
if next_index >= len(tasks):
# All tasks for this session are done
return {
task_group: gr.update(visible=False),
done_group: gr.update(visible=True)
}
next_task = tasks[next_index]
progress_str = f"Progress: {next_index + 1} / {len(tasks)}"
return {
state_task_index: next_index,
progress_counter: gr.update(value=progress_str),
instruction_box: gr.update(value=next_task['instruction']),
response_a_box: gr.update(value=next_task['response_A']),
response_b_box: gr.update(value=next_task['response_B']),
}
def update_begin_button_status(consent_given, role_selected):
"""Enable the begin button only if consent is checked and a role is selected."""
return gr.update(interactive=(consent_given and role_selected is not None))
# --- Gradio UI Layout ---
with gr.Blocks(theme=gr.themes.Soft(), title="DPO Annotation") as demo:
# State management
state_tasks = gr.State([])
state_task_index = gr.State(0)
state_annotator_type = gr.State("")
# Page 1: Consent
with gr.Group(visible=True) as consent_group:
gr.Markdown(CONSENT_MD)
with gr.Row():
consent_checkbox = gr.Checkbox(label="I consent to the terms above")
annotator_type_dropdown = gr.Dropdown(["Tester", "Native"], label="Select Your Role")
begin_btn = gr.Button("Begin", interactive=False)
# Page 2: Annotation Task
with gr.Group(visible=False) as task_group:
progress_counter = gr.Markdown("Progress: 0 / 0", elem_id="progress_counter")
with gr.Column():
instruction_box = gr.Textbox(label="Instruction", interactive=False, lines=3)
with gr.Row():
response_a_box = gr.Textbox(label="Answer A", interactive=False, lines=8)
response_b_box = gr.Textbox(label="Answer B", interactive=False, lines=8)
with gr.Row():
choose_a_btn = gr.Button("A is Better", variant="primary")
choose_b_btn = gr.Button("B is Better", variant="primary")
# Page 3: Completion Message
with gr.Group(visible=False) as done_group:
gr.Markdown("## ✅ Thank You!\n\nAll available samples have been annotated. Your contribution is greatly appreciated.")
# --- Event Handlers ---
# Enable 'Begin' button only when consent is checked AND a role is selected
consent_checkbox.change(
fn=update_begin_button_status,
inputs=[consent_checkbox, annotator_type_dropdown],
outputs=begin_btn
)
annotator_type_dropdown.change(
fn=update_begin_button_status,
inputs=[consent_checkbox, annotator_type_dropdown],
outputs=begin_btn
)
# Start the session when 'Begin' is clicked
begin_btn.click(
fn=start_session,
inputs=[annotator_type_dropdown],
outputs=[
consent_group, task_group, done_group,
state_tasks, state_task_index, state_annotator_type,
progress_counter, instruction_box, response_a_box, response_b_box
]
)
# Handle choice A
choose_a_btn.click(
fn=record_choice,
inputs=[state_tasks, state_task_index, state_annotator_type, gr.State('A')],
outputs=[
state_task_index, progress_counter,
instruction_box, response_a_box, response_b_box,
task_group, done_group
]
)
# Handle choice B
choose_b_btn.click(
fn=record_choice,
inputs=[state_tasks, state_task_index, state_annotator_type, gr.State('B')],
outputs=[
state_task_index, progress_counter,
instruction_box, response_a_box, response_b_box,
task_group, done_group
]
)
if __name__ == "__main__":
# Ensure the source file exists before launching
if not Path(TRANSLATED_FILE).exists():
print(f"FATAL: Source data file '{TRANSLATED_FILE}' not found.")
print("Please ensure the file is in the correct directory before running.")
else:
demo.launch()