KilgorePennington's picture
Upload app.py with huggingface_hub
80e58bc verified
import gradio as gr
import gspread
from google.oauth2.service_account import Credentials
import os
from typing import Dict, List, Tuple
import random
import traceback
import difflib
# --- Configuration ---
SCOPES = ['https://www.googleapis.com/auth/spreadsheets', 'https://www.googleapis.com/auth/drive']
SERVICE_ACCOUNT_FILE = 'ext-collab-human-data-annotate-5fb589a03d60.json'
ORIGINAL_SHEET_ID = '1WbHKeZ0VKWWD8JdbH5KhsIpkMECRzwXjFwHue2lknPw'
SHEET_NAME = 'Gradio TEST - Tables v2.0 vs v2.1 (n=21)'
SCREENSHOTS_DIR = 'screenshots'
# --- Custom CSS for better table rendering ---
CUSTOM_CSS = """
<style>
.gradio-container .table-wrapper { overflow-x: auto !important; }
.gradio-container table { table-layout: auto !important; width: 100% !important; }
.gradio-container th, .gradio-container td { white-space: pre !important; padding: 4px !important; }
</style>
"""
# --- Helper Functions ---
def format_as_markdown_table(raw_text: str) -> str:
if not raw_text or not raw_text.strip(): return "> _No data to display._"
lines = raw_text.strip().split('\n'); md_output = ""
for line in lines:
if '|' in line:
if '---' not in md_output:
cells = [cell.strip() for cell in line.split('|')]
md_output += f"| {' | '.join(cells)} |\n|{'---|' * len(cells)}\n"
else: md_output += f"| {' | '.join(cell.strip() for cell in line.split('|'))} |\n"
elif line.strip(): md_output += f"\n> {line.strip()}\n"
return md_output
def create_diff_html(base_text: str, new_text: str) -> str:
if not base_text or not new_text: return "<p><i>Not enough data to create a diff.</i></p>"
diff = difflib.ndiff(base_text.splitlines(), new_text.splitlines())
html = "<div style='font-family: monospace; white-space: pre-wrap; line-height: 1.4; font-size: 0.9em;'>"
for line in diff:
if line.startswith('+ '): html += f"<span style='background-color: #e6ffed;'>{line}</span>\n"
elif line.startswith('- '): html += f"<span style='background-color: #ffeef0;'>{line}</span>\n"
elif line.startswith('? '): continue
else: html += f"{line}\n"
html += "</div>"
return html
class AnnotationApp:
def __init__(self):
self.spreadsheet = None; self.worksheet = None; self.user_sheets = {}
self.all_triplets = {}; self.claimed_triplets = set()
self.user_sessions = {}; self.demo_mode = True; self.init_google_sheets()
def init_google_sheets(self):
try:
creds = Credentials.from_service_account_file(SERVICE_ACCOUNT_FILE, scopes=SCOPES); gc = gspread.authorize(creds)
self.spreadsheet = gc.open_by_key(ORIGINAL_SHEET_ID); self.worksheet = self.spreadsheet.worksheet(SHEET_NAME)
self.group_data_into_triplets(); self.demo_mode = False; print("πŸŽ‰ Google Sheets initialized successfully!")
except Exception as e: print(f"❌ Error initializing Google Sheets: {e}"); self.demo_mode = True
def group_data_into_triplets(self):
print("Grouping data..."); all_values = self.worksheet.get_all_values()
if not all_values: print("Sheet is empty."); return
headers = [h.strip() for h in all_values[0]]
self.column_indices = {h: i for i, h in enumerate(headers)}
if 'eval ID' not in self.column_indices: print("ERROR: 'eval ID' column not found."); return
records = [dict(zip(headers, row)) for row in all_values[1:]]
grouped = {};
for i, row in enumerate(records):
eval_id = row.get('eval ID')
if eval_id:
if eval_id not in grouped: grouped[eval_id] = []
grouped[eval_id].append({'row_num': i + 2, **row})
self.all_triplets = {doc_id: rows for doc_id, rows in grouped.items() if len(rows) >= 3}
print(f"Found {len(self.all_triplets)} complete triplets.")
def get_available_triplets(self) -> List[str]:
return [doc_id for doc_id in self.all_triplets if doc_id not in self.claimed_triplets]
def create_user_sheet(self, annotator_name: str) -> Tuple[bool, str]:
if self.demo_mode: return True, "Demo mode"
try:
sheet_name = f"{self.worksheet.title} - {annotator_name}"; worksheet = self.spreadsheet.worksheet(sheet_name)
self.user_sheets[annotator_name] = worksheet; return True, f"Resuming work: {sheet_name}"
except gspread.exceptions.WorksheetNotFound:
worksheet = self.spreadsheet.duplicate_sheet(self.worksheet.id, new_sheet_name=sheet_name)
self.user_sheets[annotator_name] = worksheet; return True, f"Created new sheet: {sheet_name}"
except Exception as e: return False, str(e)
def write_annotations_to_sheet(self, annotator_name: str, session: Dict):
if self.demo_mode: return
try:
user_sheet = self.user_sheets[annotator_name]; annotations = session.get('annotations', {}); cells_to_update = []
# Check if randomized_parse_map exists
if 'randomized_parse_map' not in session:
print(f"❌ No randomized_parse_map found in session for {annotator_name}")
return
for ui_char, original_row in session['randomized_parse_map'].items():
eval_data = annotations.get(ui_char)
if eval_data:
row_num = original_row['row_num']
cells_to_update.extend([gspread.Cell(row_num, self.column_indices['annotator evaluation'] + 1, eval_data['evaluation']),
gspread.Cell(row_num, self.column_indices['annotator explanation'] + 1, eval_data['explanation'])])
if 'comparison' in annotations:
comp_data = annotations['comparison']
for original_row in session['triplet_rows']:
row_num = original_row['row_num']
cells_to_update.extend([gspread.Cell(row_num, self.column_indices['which /parse version produced the best output?'] + 1, comp_data['best']),
gspread.Cell(row_num, self.column_indices['explanation of v2.0 vs v2.1'] + 1, comp_data['explanation'])])
if cells_to_update: user_sheet.update_cells(cells_to_update, value_input_option='USER_ENTERED'); print(f"βœ… Wrote {len(cells_to_update)} cells to the sheet.")
except Exception as e: print(f"❌ FAILED TO WRITE TO SHEET: {e}")
# --- Backend Logic ---
app = AnnotationApp()
def on_setup(name):
success, message = app.create_user_sheet(name)
if not success: return message, gr.update(visible=False), gr.update(visible=False)
triplets = app.get_available_triplets()
return f"{message}. Found {len(triplets)} triplets.", gr.update(visible=True, choices=triplets), gr.update(visible=True)
def start_annotation(annotator_name: str, selected_doc_id: str):
# Initialize or update the session
if annotator_name not in app.user_sessions:
app.user_sessions[annotator_name] = {}
app.user_sessions[annotator_name]['current_doc_id'] = selected_doc_id
return load_next_triplet(annotator_name)
def load_next_triplet(annotator_name: str):
# Ensure session exists
if annotator_name not in app.user_sessions:
app.user_sessions[annotator_name] = {}
session = app.user_sessions[annotator_name]
doc_id = session.get('current_doc_id')
available = app.get_available_triplets()
if not doc_id or doc_id not in available:
if not available: return "πŸŽ‰ All triplets completed!", {}, *[gr.update(visible=False)]*13
doc_id = available[0]
app.claimed_triplets.add(doc_id)
session['current_doc_id'] = doc_id
triplet_rows = app.all_triplets[doc_id]
session['triplet_rows'] = triplet_rows
screenshot, gold_render = "", ""
for r in triplet_rows:
if not screenshot and r.get('PDF: screenshot (link)'):
fname = r.get('PDF: screenshot (link)', '').strip()
fpath = os.path.join(SCREENSHOTS_DIR, fname);
if os.path.exists(fpath): screenshot = fpath
if not gold_render and r.get('gold render'): gold_render = format_as_markdown_table(r.get('gold render'))
parses = {r.get('tool name').strip(): r for r in triplet_rows if r.get('tool name')}
parse_keys = list(parses.keys()); random.shuffle(parse_keys)
# Ensure we have exactly 3 parses
if len(parse_keys) < 3:
print(f"❌ Warning: Only {len(parse_keys)} parses found for {doc_id}, expected 3")
# Pad with empty entries if needed
while len(parse_keys) < 3:
parse_keys.append(f'missing_{len(parse_keys)}')
parses[f'missing_{len(parse_keys)-1}'] = {'tool name': 'Missing', 'tool output': ''}
session['randomized_parse_map'] = {
'A': parses[parse_keys[0]],
'B': parses[parse_keys[1]],
'C': parses[parse_keys[2]]
}
ui_data = {
'screenshot': screenshot, 'gold_raw': gold_render,
'parse_a_raw': format_as_markdown_table(session['randomized_parse_map']['A'].get('tool output', '')),
'parse_b_raw': format_as_markdown_table(session['randomized_parse_map']['B'].get('tool output', '')),
'parse_c_raw': format_as_markdown_table(session['randomized_parse_map']['C'].get('tool output', ''))}
progress = f"Annotating Document ID: {doc_id}"
initial_views = update_view(ui_data, 'rendered')
return (progress, ui_data, ui_data.get('screenshot'), *initial_views,
gr.update(value=None), "", gr.update(value=None), "", gr.update(value=None), "",
gr.update(value=None), "")
def update_view(current_data, view_type):
keys = ['gold_raw', 'parse_a_raw', 'parse_b_raw', 'parse_c_raw']
if view_type == 'rendered': return [current_data.get(key, '') for key in keys]
if view_type == 'raw': return [f"```markdown\n{current_data.get(key, '')}\n```" for key in keys]
if view_type == 'diff':
gold = current_data.get('gold_raw', '')
return ["", create_diff_html(gold, current_data.get('parse_a_raw')),
create_diff_html(gold, current_data.get('parse_b_raw')),
create_diff_html(gold, current_data.get('parse_c_raw'))]
def submit_annotations(annotator_name, eval_a, exp_a, eval_b, exp_b, eval_c, exp_c, best_choice, best_exp):
# Validate inputs
if not annotator_name or annotator_name not in app.user_sessions:
print(f"❌ Invalid annotator name or session not found: {annotator_name}")
return "Error: Session not found", {}, *[gr.update()]*13
if not best_choice:
print("❌ No best choice selected")
return "Error: Please select which parse is best", {}, *[gr.update()]*13
session = app.user_sessions[annotator_name]
# Verify session has required data
if 'randomized_parse_map' not in session:
print(f"❌ No randomized_parse_map in session for {annotator_name}")
return "Error: Session data corrupted, please reload", {}, *[gr.update()]*13
# Extract the UI choice (A, B, or C) from the radio button text
choice_map = {"Parse A is best": "A", "Parse B is best": "B", "Parse C is best": "C"}
ui_char = choice_map.get(best_choice)
if not ui_char or ui_char not in session['randomized_parse_map']:
print(f"❌ Invalid choice: {best_choice} -> {ui_char}")
return "Error: Invalid selection", {}, *[gr.update()]*13
best_tool_name = session['randomized_parse_map'][ui_char].get('tool name', 'Unknown')
session['annotations'] = {
'A': {'evaluation': eval_a or 'Not specified', 'explanation': exp_a or ''},
'B': {'evaluation': eval_b or 'Not specified', 'explanation': exp_b or ''},
'C': {'evaluation': eval_c or 'Not specified', 'explanation': exp_c or ''},
'comparison': {'best': best_tool_name, 'explanation': best_exp or ''}
}
app.write_annotations_to_sheet(annotator_name, session)
return load_next_triplet(annotator_name)
# --- Gradio UI Definition ---
with gr.Blocks(title="3-Way Parse Comparison", theme=gr.themes.Soft()) as demo:
gr.HTML(CUSTOM_CSS) # Inject the CSS for better table rendering
gr.Markdown("# πŸ“Š 3-Way Document Parse Comparison Tool")
annotator_name_state = gr.State("")
current_data_state = gr.State({})
with gr.Column(visible=True) as setup_section:
name_input = gr.Textbox(label="Enter your name"); setup_btn = gr.Button("Start Session", variant="primary")
setup_message = gr.Textbox(label="Status", interactive=False)
triplet_selector = gr.Dropdown(label="Available Document Triplets (Optional, will auto-load if empty)", visible=False)
start_btn = gr.Button("Start Annotation", variant="primary", visible=False)
with gr.Column(visible=False) as annotation_section:
progress_display = gr.Textbox(label="Progress", interactive=False)
gr.Markdown("### πŸ“„ Original Document"); screenshot_display = gr.Image()
gr.Markdown("---")
with gr.Row():
gr.Markdown("### Select View Type:"); show_rendered_btn = gr.Button("πŸ–ΌοΈ Rendered"); show_raw_btn = gr.Button("πŸ“„ Raw Markdown"); show_diff_btn = gr.Button("✨ Diffs vs. Gold")
with gr.Row():
with gr.Column(): gr.Markdown("### πŸ† Gold Standard"); gold_display = gr.Markdown()
with gr.Column(): gr.Markdown("### Parse A"); parse_a_display = gr.Markdown()
with gr.Column(): gr.Markdown("### Parse B"); parse_b_display = gr.Markdown()
with gr.Column(): gr.Markdown("### Parse C"); parse_c_display = gr.Markdown()
gr.Markdown("---")
with gr.Row():
with gr.Column(scale=2):
with gr.Row():
with gr.Column(): eval_a = gr.Dropdown(label="Evaluate A", choices=["No issues", "Minor", "Severe"]); exp_a = gr.Textbox(label="Explanation A", lines=3)
with gr.Column(): eval_b = gr.Dropdown(label="Evaluate B", choices=["No issues", "Minor", "Severe"]); exp_b = gr.Textbox(label="Explanation B", lines=3)
with gr.Column(): eval_c = gr.Dropdown(label="Evaluate C", choices=["No issues", "Minor", "Severe"]); exp_c = gr.Textbox(label="Explanation C", lines=3)
with gr.Column(scale=1):
comparison_choice = gr.Radio(["Parse A is best", "Parse B is best", "Parse C is best"], label="Which parse is best overall?")
comparison_explanation = gr.Textbox(label="Explain final choice", lines=4)
submit_btn = gr.Button("βœ… Submit & Load Next", variant="primary")
# --- Event Handlers ---
setup_btn.click(on_setup, [name_input], [setup_message, triplet_selector, start_btn]).then(lambda name: name, [name_input], [annotator_name_state])
outputs_on_start = [progress_display, current_data_state, screenshot_display,
gold_display, parse_a_display, parse_b_display, parse_c_display,
eval_a, exp_a, eval_b, exp_b, eval_c, exp_c,
comparison_choice, comparison_explanation]
start_btn.click(start_annotation, [annotator_name_state, triplet_selector], outputs_on_start).then(
lambda: gr.update(visible=False), None, [setup_section]).then(
lambda: gr.update(visible=True), None, [annotation_section])
view_displays = [gold_display, parse_a_display, parse_b_display, parse_c_display]
show_rendered_btn.click(lambda data: update_view(data, 'rendered'), [current_data_state], view_displays)
show_raw_btn.click(lambda data: update_view(data, 'raw'), [current_data_state], view_displays)
show_diff_btn.click(lambda data: update_view(data, 'diff'), [current_data_state], view_displays)
submit_inputs = [annotator_name_state, eval_a, exp_a, eval_b, exp_b, eval_c, exp_c, comparison_choice, comparison_explanation]
submit_btn.click(submit_annotations, submit_inputs, outputs_on_start)
if __name__ == "__main__":
demo.launch(debug=True)