File size: 16,149 Bytes
80e58bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
import gradio as gr
import gspread
from google.oauth2.service_account import Credentials
import os
from typing import Dict, List, Tuple
import random
import traceback
import difflib

# --- Configuration ---
SCOPES = ['https://www.googleapis.com/auth/spreadsheets', 'https://www.googleapis.com/auth/drive']
SERVICE_ACCOUNT_FILE = 'ext-collab-human-data-annotate-5fb589a03d60.json' 
ORIGINAL_SHEET_ID = '1WbHKeZ0VKWWD8JdbH5KhsIpkMECRzwXjFwHue2lknPw'
SHEET_NAME = 'Gradio TEST - Tables v2.0 vs v2.1 (n=21)'
SCREENSHOTS_DIR = 'screenshots'

# --- Custom CSS for better table rendering ---
CUSTOM_CSS = """
<style>
    .gradio-container .table-wrapper { overflow-x: auto !important; }
    .gradio-container table { table-layout: auto !important; width: 100% !important; }
    .gradio-container th, .gradio-container td { white-space: pre !important; padding: 4px !important; }
</style>
"""

# --- Helper Functions ---
def format_as_markdown_table(raw_text: str) -> str:
    if not raw_text or not raw_text.strip(): return "> _No data to display._"
    lines = raw_text.strip().split('\n'); md_output = ""
    for line in lines:
        if '|' in line:
            if '---' not in md_output:
                cells = [cell.strip() for cell in line.split('|')]
                md_output += f"| {' | '.join(cells)} |\n|{'---|' * len(cells)}\n"
            else: md_output += f"| {' | '.join(cell.strip() for cell in line.split('|'))} |\n"
        elif line.strip(): md_output += f"\n> {line.strip()}\n"
    return md_output

def create_diff_html(base_text: str, new_text: str) -> str:
    if not base_text or not new_text: return "<p><i>Not enough data to create a diff.</i></p>"
    diff = difflib.ndiff(base_text.splitlines(), new_text.splitlines())
    html = "<div style='font-family: monospace; white-space: pre-wrap; line-height: 1.4; font-size: 0.9em;'>"
    for line in diff:
        if line.startswith('+ '): html += f"<span style='background-color: #e6ffed;'>{line}</span>\n"
        elif line.startswith('- '): html += f"<span style='background-color: #ffeef0;'>{line}</span>\n"
        elif line.startswith('? '): continue
        else: html += f"{line}\n"
    html += "</div>"
    return html

class AnnotationApp:
    def __init__(self):
        self.spreadsheet = None; self.worksheet = None; self.user_sheets = {}
        self.all_triplets = {}; self.claimed_triplets = set()
        self.user_sessions = {}; self.demo_mode = True; self.init_google_sheets()
    
    def init_google_sheets(self):
        try:
            creds = Credentials.from_service_account_file(SERVICE_ACCOUNT_FILE, scopes=SCOPES); gc = gspread.authorize(creds)
            self.spreadsheet = gc.open_by_key(ORIGINAL_SHEET_ID); self.worksheet = self.spreadsheet.worksheet(SHEET_NAME)
            self.group_data_into_triplets(); self.demo_mode = False; print("πŸŽ‰ Google Sheets initialized successfully!")
        except Exception as e: print(f"❌ Error initializing Google Sheets: {e}"); self.demo_mode = True
    
    def group_data_into_triplets(self):
        print("Grouping data..."); all_values = self.worksheet.get_all_values()
        if not all_values: print("Sheet is empty."); return
        headers = [h.strip() for h in all_values[0]]
        self.column_indices = {h: i for i, h in enumerate(headers)}
        if 'eval ID' not in self.column_indices: print("ERROR: 'eval ID' column not found."); return
        records = [dict(zip(headers, row)) for row in all_values[1:]]
        grouped = {};
        for i, row in enumerate(records):
            eval_id = row.get('eval ID')
            if eval_id:
                if eval_id not in grouped: grouped[eval_id] = []
                grouped[eval_id].append({'row_num': i + 2, **row})
        self.all_triplets = {doc_id: rows for doc_id, rows in grouped.items() if len(rows) >= 3}
        print(f"Found {len(self.all_triplets)} complete triplets.")
    
    def get_available_triplets(self) -> List[str]:
        return [doc_id for doc_id in self.all_triplets if doc_id not in self.claimed_triplets]
    
    def create_user_sheet(self, annotator_name: str) -> Tuple[bool, str]:
        if self.demo_mode: return True, "Demo mode"
        try:
            sheet_name = f"{self.worksheet.title} - {annotator_name}"; worksheet = self.spreadsheet.worksheet(sheet_name)
            self.user_sheets[annotator_name] = worksheet; return True, f"Resuming work: {sheet_name}"
        except gspread.exceptions.WorksheetNotFound:
            worksheet = self.spreadsheet.duplicate_sheet(self.worksheet.id, new_sheet_name=sheet_name)
            self.user_sheets[annotator_name] = worksheet; return True, f"Created new sheet: {sheet_name}"
        except Exception as e: return False, str(e)
    
    def write_annotations_to_sheet(self, annotator_name: str, session: Dict):
        if self.demo_mode: return
        try:
            user_sheet = self.user_sheets[annotator_name]; annotations = session.get('annotations', {}); cells_to_update = []
            
            # Check if randomized_parse_map exists
            if 'randomized_parse_map' not in session:
                print(f"❌ No randomized_parse_map found in session for {annotator_name}")
                return
                
            for ui_char, original_row in session['randomized_parse_map'].items():
                eval_data = annotations.get(ui_char)
                if eval_data:
                    row_num = original_row['row_num']
                    cells_to_update.extend([gspread.Cell(row_num, self.column_indices['annotator evaluation'] + 1, eval_data['evaluation']),
                                            gspread.Cell(row_num, self.column_indices['annotator explanation'] + 1, eval_data['explanation'])])
            if 'comparison' in annotations:
                comp_data = annotations['comparison']
                for original_row in session['triplet_rows']:
                    row_num = original_row['row_num']
                    cells_to_update.extend([gspread.Cell(row_num, self.column_indices['which /parse version produced the best output?'] + 1, comp_data['best']),
                                            gspread.Cell(row_num, self.column_indices['explanation of v2.0 vs v2.1'] + 1, comp_data['explanation'])])
            if cells_to_update: user_sheet.update_cells(cells_to_update, value_input_option='USER_ENTERED'); print(f"βœ… Wrote {len(cells_to_update)} cells to the sheet.")
        except Exception as e: print(f"❌ FAILED TO WRITE TO SHEET: {e}")

# --- Backend Logic ---
app = AnnotationApp()

def on_setup(name):
    success, message = app.create_user_sheet(name)
    if not success: return message, gr.update(visible=False), gr.update(visible=False)
    triplets = app.get_available_triplets()
    return f"{message}. Found {len(triplets)} triplets.", gr.update(visible=True, choices=triplets), gr.update(visible=True)

def start_annotation(annotator_name: str, selected_doc_id: str):
    # Initialize or update the session
    if annotator_name not in app.user_sessions:
        app.user_sessions[annotator_name] = {}
    app.user_sessions[annotator_name]['current_doc_id'] = selected_doc_id
    return load_next_triplet(annotator_name)

def load_next_triplet(annotator_name: str):
    # Ensure session exists
    if annotator_name not in app.user_sessions:
        app.user_sessions[annotator_name] = {}
        
    session = app.user_sessions[annotator_name]
    doc_id = session.get('current_doc_id')
    available = app.get_available_triplets()
    
    if not doc_id or doc_id not in available:
        if not available: return "πŸŽ‰ All triplets completed!", {}, *[gr.update(visible=False)]*13
        doc_id = available[0]
    
    app.claimed_triplets.add(doc_id)
    session['current_doc_id'] = doc_id
    triplet_rows = app.all_triplets[doc_id]
    session['triplet_rows'] = triplet_rows

    screenshot, gold_render = "", ""
    for r in triplet_rows:
        if not screenshot and r.get('PDF: screenshot (link)'):
            fname = r.get('PDF: screenshot (link)', '').strip()
            fpath = os.path.join(SCREENSHOTS_DIR, fname);
            if os.path.exists(fpath): screenshot = fpath
        if not gold_render and r.get('gold render'): gold_render = format_as_markdown_table(r.get('gold render'))

    parses = {r.get('tool name').strip(): r for r in triplet_rows if r.get('tool name')}
    parse_keys = list(parses.keys()); random.shuffle(parse_keys)
    
    # Ensure we have exactly 3 parses
    if len(parse_keys) < 3:
        print(f"❌ Warning: Only {len(parse_keys)} parses found for {doc_id}, expected 3")
        # Pad with empty entries if needed
        while len(parse_keys) < 3:
            parse_keys.append(f'missing_{len(parse_keys)}')
            parses[f'missing_{len(parse_keys)-1}'] = {'tool name': 'Missing', 'tool output': ''}
    
    session['randomized_parse_map'] = {
        'A': parses[parse_keys[0]], 
        'B': parses[parse_keys[1]], 
        'C': parses[parse_keys[2]]
    }
    
    ui_data = {
        'screenshot': screenshot, 'gold_raw': gold_render,
        'parse_a_raw': format_as_markdown_table(session['randomized_parse_map']['A'].get('tool output', '')),
        'parse_b_raw': format_as_markdown_table(session['randomized_parse_map']['B'].get('tool output', '')),
        'parse_c_raw': format_as_markdown_table(session['randomized_parse_map']['C'].get('tool output', ''))}
    
    progress = f"Annotating Document ID: {doc_id}"
    initial_views = update_view(ui_data, 'rendered')
    
    return (progress, ui_data, ui_data.get('screenshot'), *initial_views,
            gr.update(value=None), "", gr.update(value=None), "", gr.update(value=None), "",
            gr.update(value=None), "")

def update_view(current_data, view_type):
    keys = ['gold_raw', 'parse_a_raw', 'parse_b_raw', 'parse_c_raw']
    if view_type == 'rendered': return [current_data.get(key, '') for key in keys]
    if view_type == 'raw': return [f"```markdown\n{current_data.get(key, '')}\n```" for key in keys]
    if view_type == 'diff':
        gold = current_data.get('gold_raw', '')
        return ["", create_diff_html(gold, current_data.get('parse_a_raw')),
                create_diff_html(gold, current_data.get('parse_b_raw')),
                create_diff_html(gold, current_data.get('parse_c_raw'))]
                
def submit_annotations(annotator_name, eval_a, exp_a, eval_b, exp_b, eval_c, exp_c, best_choice, best_exp):
    # Validate inputs
    if not annotator_name or annotator_name not in app.user_sessions:
        print(f"❌ Invalid annotator name or session not found: {annotator_name}")
        return "Error: Session not found", {}, *[gr.update()]*13
    
    if not best_choice:
        print("❌ No best choice selected")
        return "Error: Please select which parse is best", {}, *[gr.update()]*13
    
    session = app.user_sessions[annotator_name]
    
    # Verify session has required data
    if 'randomized_parse_map' not in session:
        print(f"❌ No randomized_parse_map in session for {annotator_name}")
        return "Error: Session data corrupted, please reload", {}, *[gr.update()]*13
    
    # Extract the UI choice (A, B, or C) from the radio button text
    choice_map = {"Parse A is best": "A", "Parse B is best": "B", "Parse C is best": "C"}
    ui_char = choice_map.get(best_choice)
    
    if not ui_char or ui_char not in session['randomized_parse_map']:
        print(f"❌ Invalid choice: {best_choice} -> {ui_char}")
        return "Error: Invalid selection", {}, *[gr.update()]*13
    
    best_tool_name = session['randomized_parse_map'][ui_char].get('tool name', 'Unknown')
    
    session['annotations'] = {
        'A': {'evaluation': eval_a or 'Not specified', 'explanation': exp_a or ''}, 
        'B': {'evaluation': eval_b or 'Not specified', 'explanation': exp_b or ''},
        'C': {'evaluation': eval_c or 'Not specified', 'explanation': exp_c or ''},
        'comparison': {'best': best_tool_name, 'explanation': best_exp or ''}
    }
    
    app.write_annotations_to_sheet(annotator_name, session)
    return load_next_triplet(annotator_name)

# --- Gradio UI Definition ---
with gr.Blocks(title="3-Way Parse Comparison", theme=gr.themes.Soft()) as demo:
    gr.HTML(CUSTOM_CSS) # Inject the CSS for better table rendering
    gr.Markdown("# πŸ“Š 3-Way Document Parse Comparison Tool")
    annotator_name_state = gr.State("")
    current_data_state = gr.State({})

    with gr.Column(visible=True) as setup_section:
        name_input = gr.Textbox(label="Enter your name"); setup_btn = gr.Button("Start Session", variant="primary")
        setup_message = gr.Textbox(label="Status", interactive=False)
        triplet_selector = gr.Dropdown(label="Available Document Triplets (Optional, will auto-load if empty)", visible=False)
        start_btn = gr.Button("Start Annotation", variant="primary", visible=False)

    with gr.Column(visible=False) as annotation_section:
        progress_display = gr.Textbox(label="Progress", interactive=False)
        gr.Markdown("### πŸ“„ Original Document"); screenshot_display = gr.Image()
        gr.Markdown("---")
        with gr.Row():
            gr.Markdown("### Select View Type:"); show_rendered_btn = gr.Button("πŸ–ΌοΈ Rendered"); show_raw_btn = gr.Button("πŸ“„ Raw Markdown"); show_diff_btn = gr.Button("✨ Diffs vs. Gold")
        with gr.Row():
            with gr.Column(): gr.Markdown("### πŸ† Gold Standard"); gold_display = gr.Markdown()
            with gr.Column(): gr.Markdown("### Parse A"); parse_a_display = gr.Markdown()
            with gr.Column(): gr.Markdown("### Parse B"); parse_b_display = gr.Markdown()
            with gr.Column(): gr.Markdown("### Parse C"); parse_c_display = gr.Markdown()
        gr.Markdown("---")
        with gr.Row():
            with gr.Column(scale=2):
                 with gr.Row():
                    with gr.Column(): eval_a = gr.Dropdown(label="Evaluate A", choices=["No issues", "Minor", "Severe"]); exp_a = gr.Textbox(label="Explanation A", lines=3)
                    with gr.Column(): eval_b = gr.Dropdown(label="Evaluate B", choices=["No issues", "Minor", "Severe"]); exp_b = gr.Textbox(label="Explanation B", lines=3)
                    with gr.Column(): eval_c = gr.Dropdown(label="Evaluate C", choices=["No issues", "Minor", "Severe"]); exp_c = gr.Textbox(label="Explanation C", lines=3)
            with gr.Column(scale=1):
                comparison_choice = gr.Radio(["Parse A is best", "Parse B is best", "Parse C is best"], label="Which parse is best overall?")
                comparison_explanation = gr.Textbox(label="Explain final choice", lines=4)
        submit_btn = gr.Button("βœ… Submit & Load Next", variant="primary")

    # --- Event Handlers ---
    setup_btn.click(on_setup, [name_input], [setup_message, triplet_selector, start_btn]).then(lambda name: name, [name_input], [annotator_name_state])
    
    outputs_on_start = [progress_display, current_data_state, screenshot_display, 
                        gold_display, parse_a_display, parse_b_display, parse_c_display,
                        eval_a, exp_a, eval_b, exp_b, eval_c, exp_c,
                        comparison_choice, comparison_explanation]
    
    start_btn.click(start_annotation, [annotator_name_state, triplet_selector], outputs_on_start).then(
        lambda: gr.update(visible=False), None, [setup_section]).then(
        lambda: gr.update(visible=True), None, [annotation_section])

    view_displays = [gold_display, parse_a_display, parse_b_display, parse_c_display]
    show_rendered_btn.click(lambda data: update_view(data, 'rendered'), [current_data_state], view_displays)
    show_raw_btn.click(lambda data: update_view(data, 'raw'), [current_data_state], view_displays)
    show_diff_btn.click(lambda data: update_view(data, 'diff'), [current_data_state], view_displays)

    submit_inputs = [annotator_name_state, eval_a, exp_a, eval_b, exp_b, eval_c, exp_c, comparison_choice, comparison_explanation]
    submit_btn.click(submit_annotations, submit_inputs, outputs_on_start)

if __name__ == "__main__":
    demo.launch(debug=True)