Spaces:
Running
Running
| import os | |
| import json | |
| from typing import List, Dict, Tuple, Optional, Any | |
| from collections import Counter, defaultdict | |
| import gradio as gr | |
| # ββ Local CONFIG ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| DATA_FILE = "gradio_ner_data.json" | |
| def load_initial_data() -> List[Dict]: | |
| if not os.path.exists(DATA_FILE): | |
| raise FileNotFoundError(f"{DATA_FILE} not found in current directory.") | |
| with open(DATA_FILE, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| # Calculate mixed types (types that have both True and False LLM assessments) | |
| type_assessments = defaultdict(set) | |
| for rec in data: | |
| if rec.get("type") and rec.get("llm_is_dataset_contextual") is not None: | |
| type_assessments[rec["type"]].add(rec["llm_is_dataset_contextual"]) | |
| mixed_types = {t for t, assessments in type_assessments.items() if True in assessments and False in assessments} | |
| # Flag records | |
| for rec in data: | |
| rec["is_mixed_type"] = rec.get("type") in mixed_types | |
| return data | |
| class DynamicDataset: | |
| def __init__(self, data: List[Dict]): | |
| self.data = data | |
| self.len = len(data) | |
| self.current = 0 | |
| def example(self, idx: int) -> Dict: | |
| self.current = max(0, min(self.len - 1, idx)) | |
| return self.data[self.current] | |
| class MixedTypeManager: | |
| def __init__(self, data: List[Dict]): | |
| self.grouped_data = defaultdict(lambda: {'true': [], 'false': []}) | |
| self.mixed_types = [] | |
| # Group data | |
| for rec in data: | |
| dtype = rec.get("type") | |
| is_ds = rec.get("llm_is_dataset_contextual") | |
| if dtype and is_ds is not None: | |
| key = 'true' if is_ds else 'false' | |
| self.grouped_data[dtype][key].append(rec) | |
| # Identify mixed types | |
| for dtype, groups in self.grouped_data.items(): | |
| if groups['true'] and groups['false']: | |
| self.mixed_types.append(dtype) | |
| # Sort by total count | |
| self.mixed_types.sort(key=lambda t: len(self.grouped_data[t]['true']) + len(self.grouped_data[t]['false']), reverse=True) | |
| def get_example(self, dtype: str, is_dataset: bool, idx: int) -> Dict: | |
| if dtype not in self.grouped_data: | |
| return {} | |
| group = self.grouped_data[dtype]['true' if is_dataset else 'false'] | |
| if not group: | |
| return {} | |
| # Cycle through examples | |
| safe_idx = idx % len(group) | |
| return group[safe_idx] | |
| def get_count(self, dtype: str, is_dataset: bool) -> int: | |
| if dtype not in self.grouped_data: | |
| return 0 | |
| return len(self.grouped_data[dtype]['true' if is_dataset else 'false']) | |
| # ββ Highlight utils ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def prepare_for_highlight(rec: Dict) -> List[Tuple[str, Optional[str]]]: | |
| text = rec.get("text", "") or "" | |
| ner_spans = rec.get("ner_annotated", rec.get("ner_text", [])) or [] | |
| segments = [] | |
| last_idx = 0 | |
| for start, end, label in sorted(ner_spans, key=lambda x: x[0]): | |
| try: | |
| start = int(start) | |
| end = int(end) | |
| except: | |
| continue | |
| if start < 0 or end <= start or start > len(text): | |
| continue | |
| end = min(end, len(text)) | |
| if start > last_idx: | |
| segments.append((text[last_idx:start], None)) | |
| segments.append((text[start:end], str(label))) | |
| last_idx = end | |
| if last_idx < len(text): | |
| segments.append((text[last_idx:], None)) | |
| return segments | |
| # ββ Filtering helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def record_matches_filters(rec: Dict, llm_dataset_filter: str, type_filter: str): | |
| # Use LLM assessment instead of is_dataset | |
| llm_is_ds = rec.get("llm_is_dataset_contextual") | |
| # If LLM assessment is not available, skip this record | |
| if llm_is_ds is None: | |
| return False | |
| if llm_dataset_filter == "LLM: Datasets only" and not llm_is_ds: | |
| return False | |
| if llm_dataset_filter == "LLM: Non-datasets only" and llm_is_ds: | |
| return False | |
| if llm_dataset_filter == "π₯ Show Confusion/Mixed Cases": | |
| # Only show records that are part of a mixed type group | |
| return rec.get("is_mixed_type", False) | |
| if type_filter != "All types": | |
| return rec.get("type") == type_filter | |
| return True | |
| # ββ Documentation βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| DOCUMENTATION = """ | |
| # π Monitoring of Data Use - User Guide | |
| ## What is this tool? | |
| This application helps you **review and explore dataset mentions** extracted documents. | |
| It displays text excerpts where potential datasets have been identified, along with metadata about each mention. | |
| ## What you'll see | |
| Each record shows: | |
| - **π Source Document**: The filename and page number where the text was found | |
| - **π Highlighted Text**: The original text with dataset mentions highlighted | |
| - **π Data Type**: The category of the dataset (e.g., census, survey, database) | |
| - **β Dataset Status**: Whether this mention actually refers to a dataset | |
| - **π‘ Context**: The surrounding text that provides context | |
| - **π Explanation**: Why this was classified as a dataset (or not) | |
| ## How to use this tool | |
| ### π― Navigation | |
| - **Browse Records**: Use the slider to jump to any record by number | |
| - **Previous/Next Buttons**: Navigate through records one at a time | |
| - **Filters**: The Previous/Next buttons respect your active filters | |
| ### π Filtering Options | |
| 1. **Dataset Status Filter** | |
| - **All**: Show all records | |
| - **Datasets only**: Show only records that contain actual dataset references | |
| - **Non-datasets only**: Show records that were identified but don't actually refer to datasets | |
| 2. **Data Type Filter** | |
| - Filter by specific data types (census, survey, database, etc.) | |
| - Types are sorted by frequency (most common first) | |
| ### π‘ Tips | |
| - Use filters to focus on specific types of data mentions | |
| - The "Contains Dataset" field tells you if the mention is a true dataset reference | |
| - Review the "Explanation" to understand the classification reasoning | |
| - Highlighted text shows exactly where the dataset mention appears in context | |
| ## π Try It Yourself! | |
| Want to extract datasets from your own text? Try our **Dataset Extraction Tool**: | |
| π **[Launch Dataset Extraction Tool](https://huggingface.co/spaces/ai4data/datause-extraction)** | |
| This interactive tool allows you to: | |
| - β¨ **Extract datasets** from your own text or documents | |
| - π **Use predefined samples** to see how it works | |
| - π¬ **Explore the extraction process** in real-time | |
| Perfect for testing the extraction capabilities on new documents or experimenting with different types of text! | |
| ## Data Source | |
| This viewer uses data from World Bank project documents. | |
| """ | |
| # ββ Gradio App βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def create_demo() -> gr.Blocks: | |
| data = load_initial_data() | |
| dynamic_dataset = DynamicDataset(data) | |
| mixed_manager = MixedTypeManager(data) | |
| # Count types and sort by frequency (most common first) | |
| type_counter = Counter(rec.get("type") for rec in data if rec.get("type")) | |
| type_values = [t for t, _ in type_counter.most_common()] | |
| type_choices = ["All types"] + type_values | |
| def make_info(rec): | |
| """Format record metadata for display.""" | |
| fn = rec.get("filename", "β") | |
| pg = rec.get("page", "β") | |
| v_type = rec.get("type", "β") | |
| empirical_context = rec.get("empirical_context", "β") | |
| explanation = rec.get("explanation", "β") | |
| is_mixed = rec.get("is_mixed_type", False) | |
| llm_is_dataset = rec.get("llm_is_dataset_contextual") | |
| # Apply conditional highlighting based on LLM assessment | |
| if rec.get("ner_text") and rec.get("text") and llm_is_dataset is not None: | |
| try: | |
| start, end = rec["ner_text"][0][0], rec["ner_text"][0][1] | |
| term = rec["text"][start:end] | |
| if llm_is_dataset: | |
| highlight_style = 'background-color: #90ee90; color: black; padding: 2px 4px; border-radius: 4px; font-weight: bold; border: 1px solid #5cb85c;' | |
| else: | |
| highlight_style = 'background-color: #ff7f7f; color: black; padding: 2px 4px; border-radius: 4px; font-weight: bold; border: 1px solid #d9534f;' | |
| if term and term in empirical_context: | |
| empirical_context = empirical_context.replace(term, f'<span style="{highlight_style}">{term}</span>') | |
| except Exception: | |
| pass | |
| # Build HTML | |
| type_html = f"<code>{v_type}</code>" | |
| if is_mixed: | |
| type_html += " β οΈ <b>Mixed/Confusing Type</b>" | |
| html = f""" | |
| <h3>π Document Information</h3> | |
| <p><b>File:</b> <code>{fn}</code><br> | |
| <b>Page:</b> <code>{pg}</code></p> | |
| <h3>π·οΈ Type</h3> | |
| <p>{type_html}</p> | |
| <h3>π Surrounding Text</h3> | |
| <p>{empirical_context}</p> | |
| """ | |
| # Add LLM contextual analysis section if available | |
| llm_reasons = rec.get("llm_contextual_reason", []) | |
| llm_thinking = rec.get("llm_thinking_contextual", "") | |
| if llm_is_dataset is not None: | |
| status_icon = 'β ' if llm_is_dataset else 'β' | |
| status_text = 'Is a dataset' if llm_is_dataset else 'Not a dataset' | |
| html += f""" | |
| <h3>π€ Contextual Analysis</h3> | |
| <p><b>Assessment:</b> {status_icon} {status_text}</p> | |
| """ | |
| if llm_reasons: | |
| html += "<p><b>Reasoning:</b></p><ul>" | |
| for reason in llm_reasons: | |
| html += f"<li>{reason}</li>" | |
| html += "</ul>" | |
| if llm_thinking: | |
| html += f""" | |
| <p><b>Detailed Analysis:</b></p> | |
| <blockquote style="border-left: 3px solid #ccc; padding-left: 10px; color: #666;"> | |
| {llm_thinking} | |
| </blockquote> | |
| """ | |
| return html | |
| # Basic load by slider index (ignores filters) | |
| def load_example(idx: int): | |
| rec = dynamic_dataset.example(idx) | |
| segs = prepare_for_highlight(rec) | |
| return segs, idx, make_info(rec) | |
| # When filters change β jump to first matching record | |
| def jump_on_filters(llm_dataset_filter, type_filter): | |
| n = dynamic_dataset.len | |
| for i in range(n): | |
| if record_matches_filters(data[i], llm_dataset_filter, type_filter): | |
| dynamic_dataset.current = i | |
| rec = data[i] | |
| segs = prepare_for_highlight(rec) | |
| return segs, i, make_info(rec) | |
| # No match β return blank | |
| return [], 0, "β οΈ No matching records found with the selected filters." | |
| # Navigation respecting filters | |
| def nav_next(llm_dataset_filter, type_filter): | |
| i = dynamic_dataset.current + 1 | |
| n = dynamic_dataset.len | |
| while i < n: | |
| if record_matches_filters(data[i], llm_dataset_filter, type_filter): | |
| break | |
| i += 1 | |
| if i >= n: | |
| i = dynamic_dataset.current | |
| dynamic_dataset.current = i | |
| rec = data[i] | |
| return prepare_for_highlight(rec), i, make_info(rec) | |
| def nav_prev(llm_dataset_filter, type_filter): | |
| i = dynamic_dataset.current - 1 | |
| while i >= 0: | |
| if record_matches_filters(data[i], llm_dataset_filter, type_filter): | |
| break | |
| i -= 1 | |
| if i < 0: | |
| i = dynamic_dataset.current | |
| dynamic_dataset.current = i | |
| rec = data[i] | |
| return prepare_for_highlight(rec), i, make_info(rec) | |
| # Comparison Logic | |
| def load_comparison(dtype, pos_idx, neg_idx): | |
| if not dtype: | |
| return [], "Select a type", [], "Select a type" | |
| pos_rec = mixed_manager.get_example(dtype, True, pos_idx) | |
| neg_rec = mixed_manager.get_example(dtype, False, neg_idx) | |
| pos_hl = prepare_for_highlight(pos_rec) | |
| neg_hl = prepare_for_highlight(neg_rec) | |
| pos_info = make_info(pos_rec) | |
| neg_info = make_info(neg_rec) | |
| # Add count info | |
| pos_total = mixed_manager.get_count(dtype, True) | |
| neg_total = mixed_manager.get_count(dtype, False) | |
| pos_header = f"### β IS Dataset ({pos_idx % pos_total + 1}/{pos_total})" | |
| neg_header = f"### β NOT Dataset ({neg_idx % neg_total + 1}/{neg_total})" | |
| return pos_hl, pos_info, neg_hl, neg_info, pos_header, neg_header | |
| def next_pos(dtype, current_idx): | |
| return current_idx + 1 | |
| def next_neg(dtype, current_idx): | |
| return current_idx + 1 | |
| # ---- UI ---- | |
| with gr.Blocks(title="Monitoring of Data Use") as demo: | |
| gr.Markdown("# π Monitoring of Data Use") | |
| # gr.Markdown(f"*Exploring {dynamic_dataset.len:,} dataset mentions from World Bank documents*") | |
| with gr.Tabs(): | |
| with gr.Tab("π How to Use"): | |
| gr.Markdown(DOCUMENTATION) | |
| with gr.Tab("π Viewer"): | |
| with gr.Row(): | |
| prog = gr.Slider( | |
| minimum=0, | |
| maximum=dynamic_dataset.len - 1, | |
| value=0, | |
| step=1, | |
| label=f"π Browse Records (1 to {dynamic_dataset.len:,})", | |
| interactive=True, | |
| ) | |
| with gr.Row(): | |
| llm_dataset_filter = gr.Dropdown( | |
| choices=["π₯ Show Confusion/Mixed Cases", "All", "LLM: Datasets only", "LLM: Non-datasets only"], | |
| value="π₯ Show Confusion/Mixed Cases", | |
| label="π€ Filter by Assessment", | |
| ) | |
| type_filter = gr.Dropdown( | |
| choices=type_choices, | |
| value="All types", | |
| label="π Filter by Data Type", | |
| ) | |
| inp_box = gr.HighlightedText( | |
| label="π Document Text (with highlighted dataset mentions)", | |
| interactive=False, | |
| show_legend=False, | |
| value="" | |
| ) | |
| info_md = gr.HTML(label="βΉοΈ Record Details") | |
| with gr.Row(): | |
| prev_btn = gr.Button("β¬ οΈ Previous", variant="secondary", size="lg") | |
| next_btn = gr.Button("Next β‘οΈ", variant="primary", size="lg") | |
| # Initial load | |
| demo.load( | |
| fn=load_example, | |
| inputs=prog, | |
| outputs=[inp_box, prog, info_md], | |
| ) | |
| # Slider navigation | |
| prog.release( | |
| fn=load_example, | |
| inputs=prog, | |
| outputs=[inp_box, prog, info_md], | |
| ) | |
| # Filters | |
| llm_dataset_filter.change( | |
| fn=jump_on_filters, | |
| inputs=[llm_dataset_filter, type_filter], | |
| outputs=[inp_box, prog, info_md], | |
| ) | |
| type_filter.change( | |
| fn=jump_on_filters, | |
| inputs=[llm_dataset_filter, type_filter], | |
| outputs=[inp_box, prog, info_md], | |
| ) | |
| # Prev / Next navigation respecting filters | |
| prev_btn.click( | |
| fn=nav_prev, | |
| inputs=[llm_dataset_filter, type_filter], | |
| outputs=[inp_box, prog, info_md], | |
| ) | |
| next_btn.click( | |
| fn=nav_next, | |
| inputs=[llm_dataset_filter, type_filter], | |
| outputs=[inp_box, prog, info_md], | |
| ) | |
| with gr.Tab("βοΈ Comparison"): | |
| gr.Markdown("### Side-by-Side Comparison of Mixed Types") | |
| gr.Markdown("Compare examples where the **same type** is classified differently based on context.") | |
| with gr.Row(): | |
| comp_type_selector = gr.Dropdown( | |
| choices=mixed_manager.mixed_types, | |
| value=mixed_manager.mixed_types[0] if mixed_manager.mixed_types else None, | |
| label="Select Mixed Type to Compare", | |
| ) | |
| # State for indices | |
| pos_idx_state = gr.State(0) | |
| neg_idx_state = gr.State(0) | |
| with gr.Row(): | |
| # Left Column: Positive | |
| with gr.Column(): | |
| pos_header = gr.Markdown("### β IS Dataset") | |
| pos_hl_box = gr.HighlightedText(label="Context", interactive=False, show_legend=False, value="") | |
| pos_info_box = gr.HTML() | |
| pos_next_btn = gr.Button("Next Example β‘οΈ") | |
| # Right Column: Negative | |
| with gr.Column(): | |
| neg_header = gr.Markdown("### β NOT Dataset") | |
| neg_hl_box = gr.HighlightedText(label="Context", interactive=False, show_legend=False, value="") | |
| neg_info_box = gr.HTML() | |
| neg_next_btn = gr.Button("Next Example β‘οΈ") | |
| # Events | |
| comp_type_selector.change( | |
| fn=lambda: (0, 0), # Reset indices | |
| outputs=[pos_idx_state, neg_idx_state] | |
| ).then( | |
| fn=load_comparison, | |
| inputs=[comp_type_selector, pos_idx_state, neg_idx_state], | |
| outputs=[pos_hl_box, pos_info_box, neg_hl_box, neg_info_box, pos_header, neg_header] | |
| ) | |
| pos_next_btn.click( | |
| fn=next_pos, | |
| inputs=[comp_type_selector, pos_idx_state], | |
| outputs=[pos_idx_state] | |
| ).then( | |
| fn=load_comparison, | |
| inputs=[comp_type_selector, pos_idx_state, neg_idx_state], | |
| outputs=[pos_hl_box, pos_info_box, neg_hl_box, neg_info_box, pos_header, neg_header] | |
| ) | |
| neg_next_btn.click( | |
| fn=next_neg, | |
| inputs=[comp_type_selector, neg_idx_state], | |
| outputs=[neg_idx_state] | |
| ).then( | |
| fn=load_comparison, | |
| inputs=[comp_type_selector, pos_idx_state, neg_idx_state], | |
| outputs=[pos_hl_box, pos_info_box, neg_hl_box, neg_info_box, pos_header, neg_header] | |
| ) | |
| # Initial Load | |
| demo.load( | |
| fn=load_comparison, | |
| inputs=[comp_type_selector, pos_idx_state, neg_idx_state], | |
| outputs=[pos_hl_box, pos_info_box, neg_hl_box, neg_info_box, pos_header, neg_header] | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| create_demo().launch(share=False, debug=False) | |