File size: 16,149 Bytes
80e58bc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 |
import gradio as gr
import gspread
from google.oauth2.service_account import Credentials
import os
from typing import Dict, List, Tuple
import random
import traceback
import difflib
# --- Configuration ---
SCOPES = ['https://www.googleapis.com/auth/spreadsheets', 'https://www.googleapis.com/auth/drive']
SERVICE_ACCOUNT_FILE = 'ext-collab-human-data-annotate-5fb589a03d60.json'
ORIGINAL_SHEET_ID = '1WbHKeZ0VKWWD8JdbH5KhsIpkMECRzwXjFwHue2lknPw'
SHEET_NAME = 'Gradio TEST - Tables v2.0 vs v2.1 (n=21)'
SCREENSHOTS_DIR = 'screenshots'
# --- Custom CSS for better table rendering ---
CUSTOM_CSS = """
<style>
.gradio-container .table-wrapper { overflow-x: auto !important; }
.gradio-container table { table-layout: auto !important; width: 100% !important; }
.gradio-container th, .gradio-container td { white-space: pre !important; padding: 4px !important; }
</style>
"""
# --- Helper Functions ---
def format_as_markdown_table(raw_text: str) -> str:
if not raw_text or not raw_text.strip(): return "> _No data to display._"
lines = raw_text.strip().split('\n'); md_output = ""
for line in lines:
if '|' in line:
if '---' not in md_output:
cells = [cell.strip() for cell in line.split('|')]
md_output += f"| {' | '.join(cells)} |\n|{'---|' * len(cells)}\n"
else: md_output += f"| {' | '.join(cell.strip() for cell in line.split('|'))} |\n"
elif line.strip(): md_output += f"\n> {line.strip()}\n"
return md_output
def create_diff_html(base_text: str, new_text: str) -> str:
if not base_text or not new_text: return "<p><i>Not enough data to create a diff.</i></p>"
diff = difflib.ndiff(base_text.splitlines(), new_text.splitlines())
html = "<div style='font-family: monospace; white-space: pre-wrap; line-height: 1.4; font-size: 0.9em;'>"
for line in diff:
if line.startswith('+ '): html += f"<span style='background-color: #e6ffed;'>{line}</span>\n"
elif line.startswith('- '): html += f"<span style='background-color: #ffeef0;'>{line}</span>\n"
elif line.startswith('? '): continue
else: html += f"{line}\n"
html += "</div>"
return html
class AnnotationApp:
def __init__(self):
self.spreadsheet = None; self.worksheet = None; self.user_sheets = {}
self.all_triplets = {}; self.claimed_triplets = set()
self.user_sessions = {}; self.demo_mode = True; self.init_google_sheets()
def init_google_sheets(self):
try:
creds = Credentials.from_service_account_file(SERVICE_ACCOUNT_FILE, scopes=SCOPES); gc = gspread.authorize(creds)
self.spreadsheet = gc.open_by_key(ORIGINAL_SHEET_ID); self.worksheet = self.spreadsheet.worksheet(SHEET_NAME)
self.group_data_into_triplets(); self.demo_mode = False; print("π Google Sheets initialized successfully!")
except Exception as e: print(f"β Error initializing Google Sheets: {e}"); self.demo_mode = True
def group_data_into_triplets(self):
print("Grouping data..."); all_values = self.worksheet.get_all_values()
if not all_values: print("Sheet is empty."); return
headers = [h.strip() for h in all_values[0]]
self.column_indices = {h: i for i, h in enumerate(headers)}
if 'eval ID' not in self.column_indices: print("ERROR: 'eval ID' column not found."); return
records = [dict(zip(headers, row)) for row in all_values[1:]]
grouped = {};
for i, row in enumerate(records):
eval_id = row.get('eval ID')
if eval_id:
if eval_id not in grouped: grouped[eval_id] = []
grouped[eval_id].append({'row_num': i + 2, **row})
self.all_triplets = {doc_id: rows for doc_id, rows in grouped.items() if len(rows) >= 3}
print(f"Found {len(self.all_triplets)} complete triplets.")
def get_available_triplets(self) -> List[str]:
return [doc_id for doc_id in self.all_triplets if doc_id not in self.claimed_triplets]
def create_user_sheet(self, annotator_name: str) -> Tuple[bool, str]:
if self.demo_mode: return True, "Demo mode"
try:
sheet_name = f"{self.worksheet.title} - {annotator_name}"; worksheet = self.spreadsheet.worksheet(sheet_name)
self.user_sheets[annotator_name] = worksheet; return True, f"Resuming work: {sheet_name}"
except gspread.exceptions.WorksheetNotFound:
worksheet = self.spreadsheet.duplicate_sheet(self.worksheet.id, new_sheet_name=sheet_name)
self.user_sheets[annotator_name] = worksheet; return True, f"Created new sheet: {sheet_name}"
except Exception as e: return False, str(e)
def write_annotations_to_sheet(self, annotator_name: str, session: Dict):
if self.demo_mode: return
try:
user_sheet = self.user_sheets[annotator_name]; annotations = session.get('annotations', {}); cells_to_update = []
# Check if randomized_parse_map exists
if 'randomized_parse_map' not in session:
print(f"β No randomized_parse_map found in session for {annotator_name}")
return
for ui_char, original_row in session['randomized_parse_map'].items():
eval_data = annotations.get(ui_char)
if eval_data:
row_num = original_row['row_num']
cells_to_update.extend([gspread.Cell(row_num, self.column_indices['annotator evaluation'] + 1, eval_data['evaluation']),
gspread.Cell(row_num, self.column_indices['annotator explanation'] + 1, eval_data['explanation'])])
if 'comparison' in annotations:
comp_data = annotations['comparison']
for original_row in session['triplet_rows']:
row_num = original_row['row_num']
cells_to_update.extend([gspread.Cell(row_num, self.column_indices['which /parse version produced the best output?'] + 1, comp_data['best']),
gspread.Cell(row_num, self.column_indices['explanation of v2.0 vs v2.1'] + 1, comp_data['explanation'])])
if cells_to_update: user_sheet.update_cells(cells_to_update, value_input_option='USER_ENTERED'); print(f"β
Wrote {len(cells_to_update)} cells to the sheet.")
except Exception as e: print(f"β FAILED TO WRITE TO SHEET: {e}")
# --- Backend Logic ---
app = AnnotationApp()
def on_setup(name):
success, message = app.create_user_sheet(name)
if not success: return message, gr.update(visible=False), gr.update(visible=False)
triplets = app.get_available_triplets()
return f"{message}. Found {len(triplets)} triplets.", gr.update(visible=True, choices=triplets), gr.update(visible=True)
def start_annotation(annotator_name: str, selected_doc_id: str):
# Initialize or update the session
if annotator_name not in app.user_sessions:
app.user_sessions[annotator_name] = {}
app.user_sessions[annotator_name]['current_doc_id'] = selected_doc_id
return load_next_triplet(annotator_name)
def load_next_triplet(annotator_name: str):
# Ensure session exists
if annotator_name not in app.user_sessions:
app.user_sessions[annotator_name] = {}
session = app.user_sessions[annotator_name]
doc_id = session.get('current_doc_id')
available = app.get_available_triplets()
if not doc_id or doc_id not in available:
if not available: return "π All triplets completed!", {}, *[gr.update(visible=False)]*13
doc_id = available[0]
app.claimed_triplets.add(doc_id)
session['current_doc_id'] = doc_id
triplet_rows = app.all_triplets[doc_id]
session['triplet_rows'] = triplet_rows
screenshot, gold_render = "", ""
for r in triplet_rows:
if not screenshot and r.get('PDF: screenshot (link)'):
fname = r.get('PDF: screenshot (link)', '').strip()
fpath = os.path.join(SCREENSHOTS_DIR, fname);
if os.path.exists(fpath): screenshot = fpath
if not gold_render and r.get('gold render'): gold_render = format_as_markdown_table(r.get('gold render'))
parses = {r.get('tool name').strip(): r for r in triplet_rows if r.get('tool name')}
parse_keys = list(parses.keys()); random.shuffle(parse_keys)
# Ensure we have exactly 3 parses
if len(parse_keys) < 3:
print(f"β Warning: Only {len(parse_keys)} parses found for {doc_id}, expected 3")
# Pad with empty entries if needed
while len(parse_keys) < 3:
parse_keys.append(f'missing_{len(parse_keys)}')
parses[f'missing_{len(parse_keys)-1}'] = {'tool name': 'Missing', 'tool output': ''}
session['randomized_parse_map'] = {
'A': parses[parse_keys[0]],
'B': parses[parse_keys[1]],
'C': parses[parse_keys[2]]
}
ui_data = {
'screenshot': screenshot, 'gold_raw': gold_render,
'parse_a_raw': format_as_markdown_table(session['randomized_parse_map']['A'].get('tool output', '')),
'parse_b_raw': format_as_markdown_table(session['randomized_parse_map']['B'].get('tool output', '')),
'parse_c_raw': format_as_markdown_table(session['randomized_parse_map']['C'].get('tool output', ''))}
progress = f"Annotating Document ID: {doc_id}"
initial_views = update_view(ui_data, 'rendered')
return (progress, ui_data, ui_data.get('screenshot'), *initial_views,
gr.update(value=None), "", gr.update(value=None), "", gr.update(value=None), "",
gr.update(value=None), "")
def update_view(current_data, view_type):
keys = ['gold_raw', 'parse_a_raw', 'parse_b_raw', 'parse_c_raw']
if view_type == 'rendered': return [current_data.get(key, '') for key in keys]
if view_type == 'raw': return [f"```markdown\n{current_data.get(key, '')}\n```" for key in keys]
if view_type == 'diff':
gold = current_data.get('gold_raw', '')
return ["", create_diff_html(gold, current_data.get('parse_a_raw')),
create_diff_html(gold, current_data.get('parse_b_raw')),
create_diff_html(gold, current_data.get('parse_c_raw'))]
def submit_annotations(annotator_name, eval_a, exp_a, eval_b, exp_b, eval_c, exp_c, best_choice, best_exp):
# Validate inputs
if not annotator_name or annotator_name not in app.user_sessions:
print(f"β Invalid annotator name or session not found: {annotator_name}")
return "Error: Session not found", {}, *[gr.update()]*13
if not best_choice:
print("β No best choice selected")
return "Error: Please select which parse is best", {}, *[gr.update()]*13
session = app.user_sessions[annotator_name]
# Verify session has required data
if 'randomized_parse_map' not in session:
print(f"β No randomized_parse_map in session for {annotator_name}")
return "Error: Session data corrupted, please reload", {}, *[gr.update()]*13
# Extract the UI choice (A, B, or C) from the radio button text
choice_map = {"Parse A is best": "A", "Parse B is best": "B", "Parse C is best": "C"}
ui_char = choice_map.get(best_choice)
if not ui_char or ui_char not in session['randomized_parse_map']:
print(f"β Invalid choice: {best_choice} -> {ui_char}")
return "Error: Invalid selection", {}, *[gr.update()]*13
best_tool_name = session['randomized_parse_map'][ui_char].get('tool name', 'Unknown')
session['annotations'] = {
'A': {'evaluation': eval_a or 'Not specified', 'explanation': exp_a or ''},
'B': {'evaluation': eval_b or 'Not specified', 'explanation': exp_b or ''},
'C': {'evaluation': eval_c or 'Not specified', 'explanation': exp_c or ''},
'comparison': {'best': best_tool_name, 'explanation': best_exp or ''}
}
app.write_annotations_to_sheet(annotator_name, session)
return load_next_triplet(annotator_name)
# --- Gradio UI Definition ---
with gr.Blocks(title="3-Way Parse Comparison", theme=gr.themes.Soft()) as demo:
gr.HTML(CUSTOM_CSS) # Inject the CSS for better table rendering
gr.Markdown("# π 3-Way Document Parse Comparison Tool")
annotator_name_state = gr.State("")
current_data_state = gr.State({})
with gr.Column(visible=True) as setup_section:
name_input = gr.Textbox(label="Enter your name"); setup_btn = gr.Button("Start Session", variant="primary")
setup_message = gr.Textbox(label="Status", interactive=False)
triplet_selector = gr.Dropdown(label="Available Document Triplets (Optional, will auto-load if empty)", visible=False)
start_btn = gr.Button("Start Annotation", variant="primary", visible=False)
with gr.Column(visible=False) as annotation_section:
progress_display = gr.Textbox(label="Progress", interactive=False)
gr.Markdown("### π Original Document"); screenshot_display = gr.Image()
gr.Markdown("---")
with gr.Row():
gr.Markdown("### Select View Type:"); show_rendered_btn = gr.Button("πΌοΈ Rendered"); show_raw_btn = gr.Button("π Raw Markdown"); show_diff_btn = gr.Button("β¨ Diffs vs. Gold")
with gr.Row():
with gr.Column(): gr.Markdown("### π Gold Standard"); gold_display = gr.Markdown()
with gr.Column(): gr.Markdown("### Parse A"); parse_a_display = gr.Markdown()
with gr.Column(): gr.Markdown("### Parse B"); parse_b_display = gr.Markdown()
with gr.Column(): gr.Markdown("### Parse C"); parse_c_display = gr.Markdown()
gr.Markdown("---")
with gr.Row():
with gr.Column(scale=2):
with gr.Row():
with gr.Column(): eval_a = gr.Dropdown(label="Evaluate A", choices=["No issues", "Minor", "Severe"]); exp_a = gr.Textbox(label="Explanation A", lines=3)
with gr.Column(): eval_b = gr.Dropdown(label="Evaluate B", choices=["No issues", "Minor", "Severe"]); exp_b = gr.Textbox(label="Explanation B", lines=3)
with gr.Column(): eval_c = gr.Dropdown(label="Evaluate C", choices=["No issues", "Minor", "Severe"]); exp_c = gr.Textbox(label="Explanation C", lines=3)
with gr.Column(scale=1):
comparison_choice = gr.Radio(["Parse A is best", "Parse B is best", "Parse C is best"], label="Which parse is best overall?")
comparison_explanation = gr.Textbox(label="Explain final choice", lines=4)
submit_btn = gr.Button("β
Submit & Load Next", variant="primary")
# --- Event Handlers ---
setup_btn.click(on_setup, [name_input], [setup_message, triplet_selector, start_btn]).then(lambda name: name, [name_input], [annotator_name_state])
outputs_on_start = [progress_display, current_data_state, screenshot_display,
gold_display, parse_a_display, parse_b_display, parse_c_display,
eval_a, exp_a, eval_b, exp_b, eval_c, exp_c,
comparison_choice, comparison_explanation]
start_btn.click(start_annotation, [annotator_name_state, triplet_selector], outputs_on_start).then(
lambda: gr.update(visible=False), None, [setup_section]).then(
lambda: gr.update(visible=True), None, [annotation_section])
view_displays = [gold_display, parse_a_display, parse_b_display, parse_c_display]
show_rendered_btn.click(lambda data: update_view(data, 'rendered'), [current_data_state], view_displays)
show_raw_btn.click(lambda data: update_view(data, 'raw'), [current_data_state], view_displays)
show_diff_btn.click(lambda data: update_view(data, 'diff'), [current_data_state], view_displays)
submit_inputs = [annotator_name_state, eval_a, exp_a, eval_b, exp_b, eval_c, exp_c, comparison_choice, comparison_explanation]
submit_btn.click(submit_annotations, submit_inputs, outputs_on_start)
if __name__ == "__main__":
demo.launch(debug=True) |