|
|
import gradio as gr |
|
|
import gspread |
|
|
from google.oauth2.service_account import Credentials |
|
|
import os |
|
|
from typing import Dict, List, Tuple |
|
|
import random |
|
|
import traceback |
|
|
import difflib |
|
|
|
|
|
|
|
|
SCOPES = ['https://www.googleapis.com/auth/spreadsheets', 'https://www.googleapis.com/auth/drive'] |
|
|
SERVICE_ACCOUNT_FILE = 'ext-collab-human-data-annotate-5fb589a03d60.json' |
|
|
ORIGINAL_SHEET_ID = '1WbHKeZ0VKWWD8JdbH5KhsIpkMECRzwXjFwHue2lknPw' |
|
|
SHEET_NAME = 'Gradio TEST - Tables v2.0 vs v2.1 (n=21)' |
|
|
SCREENSHOTS_DIR = 'screenshots' |
|
|
|
|
|
|
|
|
CUSTOM_CSS = """ |
|
|
<style> |
|
|
.gradio-container .table-wrapper { overflow-x: auto !important; } |
|
|
.gradio-container table { table-layout: auto !important; width: 100% !important; } |
|
|
.gradio-container th, .gradio-container td { white-space: pre !important; padding: 4px !important; } |
|
|
</style> |
|
|
""" |
|
|
|
|
|
|
|
|
def format_as_markdown_table(raw_text: str) -> str: |
|
|
if not raw_text or not raw_text.strip(): return "> _No data to display._" |
|
|
lines = raw_text.strip().split('\n'); md_output = "" |
|
|
for line in lines: |
|
|
if '|' in line: |
|
|
if '---' not in md_output: |
|
|
cells = [cell.strip() for cell in line.split('|')] |
|
|
md_output += f"| {' | '.join(cells)} |\n|{'---|' * len(cells)}\n" |
|
|
else: md_output += f"| {' | '.join(cell.strip() for cell in line.split('|'))} |\n" |
|
|
elif line.strip(): md_output += f"\n> {line.strip()}\n" |
|
|
return md_output |
|
|
|
|
|
def create_diff_html(base_text: str, new_text: str) -> str: |
|
|
if not base_text or not new_text: return "<p><i>Not enough data to create a diff.</i></p>" |
|
|
diff = difflib.ndiff(base_text.splitlines(), new_text.splitlines()) |
|
|
html = "<div style='font-family: monospace; white-space: pre-wrap; line-height: 1.4; font-size: 0.9em;'>" |
|
|
for line in diff: |
|
|
if line.startswith('+ '): html += f"<span style='background-color: #e6ffed;'>{line}</span>\n" |
|
|
elif line.startswith('- '): html += f"<span style='background-color: #ffeef0;'>{line}</span>\n" |
|
|
elif line.startswith('? '): continue |
|
|
else: html += f"{line}\n" |
|
|
html += "</div>" |
|
|
return html |
|
|
|
|
|
class AnnotationApp: |
|
|
def __init__(self): |
|
|
self.spreadsheet = None; self.worksheet = None; self.user_sheets = {} |
|
|
self.all_triplets = {}; self.claimed_triplets = set() |
|
|
self.user_sessions = {}; self.demo_mode = True; self.init_google_sheets() |
|
|
|
|
|
def init_google_sheets(self): |
|
|
try: |
|
|
creds = Credentials.from_service_account_file(SERVICE_ACCOUNT_FILE, scopes=SCOPES); gc = gspread.authorize(creds) |
|
|
self.spreadsheet = gc.open_by_key(ORIGINAL_SHEET_ID); self.worksheet = self.spreadsheet.worksheet(SHEET_NAME) |
|
|
self.group_data_into_triplets(); self.demo_mode = False; print("π Google Sheets initialized successfully!") |
|
|
except Exception as e: print(f"β Error initializing Google Sheets: {e}"); self.demo_mode = True |
|
|
|
|
|
def group_data_into_triplets(self): |
|
|
print("Grouping data..."); all_values = self.worksheet.get_all_values() |
|
|
if not all_values: print("Sheet is empty."); return |
|
|
headers = [h.strip() for h in all_values[0]] |
|
|
self.column_indices = {h: i for i, h in enumerate(headers)} |
|
|
if 'eval ID' not in self.column_indices: print("ERROR: 'eval ID' column not found."); return |
|
|
records = [dict(zip(headers, row)) for row in all_values[1:]] |
|
|
grouped = {}; |
|
|
for i, row in enumerate(records): |
|
|
eval_id = row.get('eval ID') |
|
|
if eval_id: |
|
|
if eval_id not in grouped: grouped[eval_id] = [] |
|
|
grouped[eval_id].append({'row_num': i + 2, **row}) |
|
|
self.all_triplets = {doc_id: rows for doc_id, rows in grouped.items() if len(rows) >= 3} |
|
|
print(f"Found {len(self.all_triplets)} complete triplets.") |
|
|
|
|
|
def get_available_triplets(self) -> List[str]: |
|
|
return [doc_id for doc_id in self.all_triplets if doc_id not in self.claimed_triplets] |
|
|
|
|
|
def create_user_sheet(self, annotator_name: str) -> Tuple[bool, str]: |
|
|
if self.demo_mode: return True, "Demo mode" |
|
|
try: |
|
|
sheet_name = f"{self.worksheet.title} - {annotator_name}"; worksheet = self.spreadsheet.worksheet(sheet_name) |
|
|
self.user_sheets[annotator_name] = worksheet; return True, f"Resuming work: {sheet_name}" |
|
|
except gspread.exceptions.WorksheetNotFound: |
|
|
worksheet = self.spreadsheet.duplicate_sheet(self.worksheet.id, new_sheet_name=sheet_name) |
|
|
self.user_sheets[annotator_name] = worksheet; return True, f"Created new sheet: {sheet_name}" |
|
|
except Exception as e: return False, str(e) |
|
|
|
|
|
def write_annotations_to_sheet(self, annotator_name: str, session: Dict): |
|
|
if self.demo_mode: return |
|
|
try: |
|
|
user_sheet = self.user_sheets[annotator_name]; annotations = session.get('annotations', {}); cells_to_update = [] |
|
|
|
|
|
|
|
|
if 'randomized_parse_map' not in session: |
|
|
print(f"β No randomized_parse_map found in session for {annotator_name}") |
|
|
return |
|
|
|
|
|
for ui_char, original_row in session['randomized_parse_map'].items(): |
|
|
eval_data = annotations.get(ui_char) |
|
|
if eval_data: |
|
|
row_num = original_row['row_num'] |
|
|
cells_to_update.extend([gspread.Cell(row_num, self.column_indices['annotator evaluation'] + 1, eval_data['evaluation']), |
|
|
gspread.Cell(row_num, self.column_indices['annotator explanation'] + 1, eval_data['explanation'])]) |
|
|
if 'comparison' in annotations: |
|
|
comp_data = annotations['comparison'] |
|
|
for original_row in session['triplet_rows']: |
|
|
row_num = original_row['row_num'] |
|
|
cells_to_update.extend([gspread.Cell(row_num, self.column_indices['which /parse version produced the best output?'] + 1, comp_data['best']), |
|
|
gspread.Cell(row_num, self.column_indices['explanation of v2.0 vs v2.1'] + 1, comp_data['explanation'])]) |
|
|
if cells_to_update: user_sheet.update_cells(cells_to_update, value_input_option='USER_ENTERED'); print(f"β
Wrote {len(cells_to_update)} cells to the sheet.") |
|
|
except Exception as e: print(f"β FAILED TO WRITE TO SHEET: {e}") |
|
|
|
|
|
|
|
|
app = AnnotationApp() |
|
|
|
|
|
def on_setup(name): |
|
|
success, message = app.create_user_sheet(name) |
|
|
if not success: return message, gr.update(visible=False), gr.update(visible=False) |
|
|
triplets = app.get_available_triplets() |
|
|
return f"{message}. Found {len(triplets)} triplets.", gr.update(visible=True, choices=triplets), gr.update(visible=True) |
|
|
|
|
|
def start_annotation(annotator_name: str, selected_doc_id: str): |
|
|
|
|
|
if annotator_name not in app.user_sessions: |
|
|
app.user_sessions[annotator_name] = {} |
|
|
app.user_sessions[annotator_name]['current_doc_id'] = selected_doc_id |
|
|
return load_next_triplet(annotator_name) |
|
|
|
|
|
def load_next_triplet(annotator_name: str): |
|
|
|
|
|
if annotator_name not in app.user_sessions: |
|
|
app.user_sessions[annotator_name] = {} |
|
|
|
|
|
session = app.user_sessions[annotator_name] |
|
|
doc_id = session.get('current_doc_id') |
|
|
available = app.get_available_triplets() |
|
|
|
|
|
if not doc_id or doc_id not in available: |
|
|
if not available: return "π All triplets completed!", {}, *[gr.update(visible=False)]*13 |
|
|
doc_id = available[0] |
|
|
|
|
|
app.claimed_triplets.add(doc_id) |
|
|
session['current_doc_id'] = doc_id |
|
|
triplet_rows = app.all_triplets[doc_id] |
|
|
session['triplet_rows'] = triplet_rows |
|
|
|
|
|
screenshot, gold_render = "", "" |
|
|
for r in triplet_rows: |
|
|
if not screenshot and r.get('PDF: screenshot (link)'): |
|
|
fname = r.get('PDF: screenshot (link)', '').strip() |
|
|
fpath = os.path.join(SCREENSHOTS_DIR, fname); |
|
|
if os.path.exists(fpath): screenshot = fpath |
|
|
if not gold_render and r.get('gold render'): gold_render = format_as_markdown_table(r.get('gold render')) |
|
|
|
|
|
parses = {r.get('tool name').strip(): r for r in triplet_rows if r.get('tool name')} |
|
|
parse_keys = list(parses.keys()); random.shuffle(parse_keys) |
|
|
|
|
|
|
|
|
if len(parse_keys) < 3: |
|
|
print(f"β Warning: Only {len(parse_keys)} parses found for {doc_id}, expected 3") |
|
|
|
|
|
while len(parse_keys) < 3: |
|
|
parse_keys.append(f'missing_{len(parse_keys)}') |
|
|
parses[f'missing_{len(parse_keys)-1}'] = {'tool name': 'Missing', 'tool output': ''} |
|
|
|
|
|
session['randomized_parse_map'] = { |
|
|
'A': parses[parse_keys[0]], |
|
|
'B': parses[parse_keys[1]], |
|
|
'C': parses[parse_keys[2]] |
|
|
} |
|
|
|
|
|
ui_data = { |
|
|
'screenshot': screenshot, 'gold_raw': gold_render, |
|
|
'parse_a_raw': format_as_markdown_table(session['randomized_parse_map']['A'].get('tool output', '')), |
|
|
'parse_b_raw': format_as_markdown_table(session['randomized_parse_map']['B'].get('tool output', '')), |
|
|
'parse_c_raw': format_as_markdown_table(session['randomized_parse_map']['C'].get('tool output', ''))} |
|
|
|
|
|
progress = f"Annotating Document ID: {doc_id}" |
|
|
initial_views = update_view(ui_data, 'rendered') |
|
|
|
|
|
return (progress, ui_data, ui_data.get('screenshot'), *initial_views, |
|
|
gr.update(value=None), "", gr.update(value=None), "", gr.update(value=None), "", |
|
|
gr.update(value=None), "") |
|
|
|
|
|
def update_view(current_data, view_type): |
|
|
keys = ['gold_raw', 'parse_a_raw', 'parse_b_raw', 'parse_c_raw'] |
|
|
if view_type == 'rendered': return [current_data.get(key, '') for key in keys] |
|
|
if view_type == 'raw': return [f"```markdown\n{current_data.get(key, '')}\n```" for key in keys] |
|
|
if view_type == 'diff': |
|
|
gold = current_data.get('gold_raw', '') |
|
|
return ["", create_diff_html(gold, current_data.get('parse_a_raw')), |
|
|
create_diff_html(gold, current_data.get('parse_b_raw')), |
|
|
create_diff_html(gold, current_data.get('parse_c_raw'))] |
|
|
|
|
|
def submit_annotations(annotator_name, eval_a, exp_a, eval_b, exp_b, eval_c, exp_c, best_choice, best_exp): |
|
|
|
|
|
if not annotator_name or annotator_name not in app.user_sessions: |
|
|
print(f"β Invalid annotator name or session not found: {annotator_name}") |
|
|
return "Error: Session not found", {}, *[gr.update()]*13 |
|
|
|
|
|
if not best_choice: |
|
|
print("β No best choice selected") |
|
|
return "Error: Please select which parse is best", {}, *[gr.update()]*13 |
|
|
|
|
|
session = app.user_sessions[annotator_name] |
|
|
|
|
|
|
|
|
if 'randomized_parse_map' not in session: |
|
|
print(f"β No randomized_parse_map in session for {annotator_name}") |
|
|
return "Error: Session data corrupted, please reload", {}, *[gr.update()]*13 |
|
|
|
|
|
|
|
|
choice_map = {"Parse A is best": "A", "Parse B is best": "B", "Parse C is best": "C"} |
|
|
ui_char = choice_map.get(best_choice) |
|
|
|
|
|
if not ui_char or ui_char not in session['randomized_parse_map']: |
|
|
print(f"β Invalid choice: {best_choice} -> {ui_char}") |
|
|
return "Error: Invalid selection", {}, *[gr.update()]*13 |
|
|
|
|
|
best_tool_name = session['randomized_parse_map'][ui_char].get('tool name', 'Unknown') |
|
|
|
|
|
session['annotations'] = { |
|
|
'A': {'evaluation': eval_a or 'Not specified', 'explanation': exp_a or ''}, |
|
|
'B': {'evaluation': eval_b or 'Not specified', 'explanation': exp_b or ''}, |
|
|
'C': {'evaluation': eval_c or 'Not specified', 'explanation': exp_c or ''}, |
|
|
'comparison': {'best': best_tool_name, 'explanation': best_exp or ''} |
|
|
} |
|
|
|
|
|
app.write_annotations_to_sheet(annotator_name, session) |
|
|
return load_next_triplet(annotator_name) |
|
|
|
|
|
|
|
|
with gr.Blocks(title="3-Way Parse Comparison", theme=gr.themes.Soft()) as demo: |
|
|
gr.HTML(CUSTOM_CSS) |
|
|
gr.Markdown("# π 3-Way Document Parse Comparison Tool") |
|
|
annotator_name_state = gr.State("") |
|
|
current_data_state = gr.State({}) |
|
|
|
|
|
with gr.Column(visible=True) as setup_section: |
|
|
name_input = gr.Textbox(label="Enter your name"); setup_btn = gr.Button("Start Session", variant="primary") |
|
|
setup_message = gr.Textbox(label="Status", interactive=False) |
|
|
triplet_selector = gr.Dropdown(label="Available Document Triplets (Optional, will auto-load if empty)", visible=False) |
|
|
start_btn = gr.Button("Start Annotation", variant="primary", visible=False) |
|
|
|
|
|
with gr.Column(visible=False) as annotation_section: |
|
|
progress_display = gr.Textbox(label="Progress", interactive=False) |
|
|
gr.Markdown("### π Original Document"); screenshot_display = gr.Image() |
|
|
gr.Markdown("---") |
|
|
with gr.Row(): |
|
|
gr.Markdown("### Select View Type:"); show_rendered_btn = gr.Button("πΌοΈ Rendered"); show_raw_btn = gr.Button("π Raw Markdown"); show_diff_btn = gr.Button("β¨ Diffs vs. Gold") |
|
|
with gr.Row(): |
|
|
with gr.Column(): gr.Markdown("### π Gold Standard"); gold_display = gr.Markdown() |
|
|
with gr.Column(): gr.Markdown("### Parse A"); parse_a_display = gr.Markdown() |
|
|
with gr.Column(): gr.Markdown("### Parse B"); parse_b_display = gr.Markdown() |
|
|
with gr.Column(): gr.Markdown("### Parse C"); parse_c_display = gr.Markdown() |
|
|
gr.Markdown("---") |
|
|
with gr.Row(): |
|
|
with gr.Column(scale=2): |
|
|
with gr.Row(): |
|
|
with gr.Column(): eval_a = gr.Dropdown(label="Evaluate A", choices=["No issues", "Minor", "Severe"]); exp_a = gr.Textbox(label="Explanation A", lines=3) |
|
|
with gr.Column(): eval_b = gr.Dropdown(label="Evaluate B", choices=["No issues", "Minor", "Severe"]); exp_b = gr.Textbox(label="Explanation B", lines=3) |
|
|
with gr.Column(): eval_c = gr.Dropdown(label="Evaluate C", choices=["No issues", "Minor", "Severe"]); exp_c = gr.Textbox(label="Explanation C", lines=3) |
|
|
with gr.Column(scale=1): |
|
|
comparison_choice = gr.Radio(["Parse A is best", "Parse B is best", "Parse C is best"], label="Which parse is best overall?") |
|
|
comparison_explanation = gr.Textbox(label="Explain final choice", lines=4) |
|
|
submit_btn = gr.Button("β
Submit & Load Next", variant="primary") |
|
|
|
|
|
|
|
|
setup_btn.click(on_setup, [name_input], [setup_message, triplet_selector, start_btn]).then(lambda name: name, [name_input], [annotator_name_state]) |
|
|
|
|
|
outputs_on_start = [progress_display, current_data_state, screenshot_display, |
|
|
gold_display, parse_a_display, parse_b_display, parse_c_display, |
|
|
eval_a, exp_a, eval_b, exp_b, eval_c, exp_c, |
|
|
comparison_choice, comparison_explanation] |
|
|
|
|
|
start_btn.click(start_annotation, [annotator_name_state, triplet_selector], outputs_on_start).then( |
|
|
lambda: gr.update(visible=False), None, [setup_section]).then( |
|
|
lambda: gr.update(visible=True), None, [annotation_section]) |
|
|
|
|
|
view_displays = [gold_display, parse_a_display, parse_b_display, parse_c_display] |
|
|
show_rendered_btn.click(lambda data: update_view(data, 'rendered'), [current_data_state], view_displays) |
|
|
show_raw_btn.click(lambda data: update_view(data, 'raw'), [current_data_state], view_displays) |
|
|
show_diff_btn.click(lambda data: update_view(data, 'diff'), [current_data_state], view_displays) |
|
|
|
|
|
submit_inputs = [annotator_name_state, eval_a, exp_a, eval_b, exp_b, eval_c, exp_c, comparison_choice, comparison_explanation] |
|
|
submit_btn.click(submit_annotations, submit_inputs, outputs_on_start) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch(debug=True) |