rafmacalaba's picture
switch to new data.json
b877288
raw
history blame
27.4 kB
import os
import json
from typing import List, Dict, Tuple, Optional, Any
from collections import Counter, defaultdict
import gradio as gr
# ── Local CONFIG ──────────────────────────────────────────────────────────────
DATA_FILE = "consolidated_data_optimized.json"
def load_initial_data() -> List[Dict]:
if not os.path.exists(DATA_FILE):
raise FileNotFoundError(f"{DATA_FILE} not found in current directory.")
with open(DATA_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
# Sort to show records with relations first (most informative)
data.sort(key=lambda x: len(x.get('ner_text', [])), reverse=True)
return data
class DynamicDataset:
def __init__(self, data: List[Dict]):
self.data = data
self.len = len(data)
self.current = 0
def example(self, idx: int) -> Dict:
self.current = max(0, min(self.len - 1, idx))
return self.data[self.current]
class ComparisonManager:
def __init__(self, data: List[Dict]):
self.data = data
# Group by type
self.type_groups = defaultdict(lambda: {'validated': [], 'not_validated': []})
# Group by term (extract from ner_text)
self.term_groups = defaultdict(lambda: {'validated': [], 'not_validated': []})
for rec in data:
dtype = rec.get("type")
is_validated = rec.get("validated", False)
tags = rec.get("tags", [])
# Only include borderline cases
if "borderline" not in tags:
continue
# Group by type
if dtype:
key = 'validated' if is_validated else 'not_validated'
self.type_groups[dtype][key].append(rec)
# Extract term from ner_text
if rec.get('ner_text') and len(rec['ner_text']) > 0:
start, end, label = rec['ner_text'][0]
if label == 'named' and rec.get('text'):
term = rec['text'][start:end]
if term and "confusing_term" in tags:
key = 'validated' if is_validated else 'not_validated'
self.term_groups[term][key].append(rec)
# Get mixed types (sorted by total count)
self.mixed_types = []
for dtype, groups in self.type_groups.items():
if groups['validated'] and groups['not_validated']:
total = len(groups['validated']) + len(groups['not_validated'])
self.mixed_types.append((dtype, total))
self.mixed_types.sort(key=lambda x: x[1], reverse=True)
self.mixed_types = [t[0] for t in self.mixed_types]
# Get confusing terms (sorted by total count)
self.confusing_terms = []
for term, groups in self.term_groups.items():
if groups['validated'] and groups['not_validated']:
total = len(groups['validated']) + len(groups['not_validated'])
self.confusing_terms.append((term, total))
self.confusing_terms.sort(key=lambda x: x[1], reverse=True)
self.confusing_terms = [t[0] for t in self.confusing_terms]
def get_example_by_type(self, dtype: str, is_validated: bool, idx: int) -> Dict:
if dtype not in self.type_groups:
return {}
group = self.type_groups[dtype]['validated' if is_validated else 'not_validated']
if not group:
return {}
safe_idx = idx % len(group)
return group[safe_idx]
def get_count_by_type(self, dtype: str, is_validated: bool) -> int:
if dtype not in self.type_groups:
return 0
return len(self.type_groups[dtype]['validated' if is_validated else 'not_validated'])
def get_example_by_term(self, term: str, is_validated: bool, idx: int) -> Dict:
if term not in self.term_groups:
return {}
group = self.term_groups[term]['validated' if is_validated else 'not_validated']
if not group:
return {}
safe_idx = idx % len(group)
return group[safe_idx]
def get_count_by_term(self, term: str, is_validated: bool) -> int:
if term not in self.term_groups:
return 0
return len(self.term_groups[term]['validated' if is_validated else 'not_validated'])
# ── Highlight utils ──────────────────────────────────────────────────────────
def prepare_for_highlight(rec: Dict) -> List[Tuple[str, Optional[str]]]:
text = rec.get("text", "") or ""
ner_spans = rec.get("ner_text", []) or []
segments = []
last_idx = 0
for start, end, label in sorted(ner_spans, key=lambda x: x[0]):
try:
start = int(start)
end = int(end)
except:
continue
if start < 0 or end <= start or start > len(text):
continue
end = min(end, len(text))
if start > last_idx:
segments.append((text[last_idx:start], None))
segments.append((text[start:end], str(label)))
last_idx = end
if last_idx < len(text):
segments.append((text[last_idx:], None))
return segments
# ── Filtering helpers ─────────────────────────────────────────────────────────
def record_matches_filters(rec: Dict, dataset_filter: str, type_filter: str):
is_validated = rec.get("validated", False)
tags = rec.get("tags", [])
if dataset_filter == "Datasets only" and not is_validated:
return False
if dataset_filter == "Non-datasets only" and is_validated:
return False
if dataset_filter == "Borderline Cases Only":
return "borderline" in tags
if type_filter != "All types":
return rec.get("type") == type_filter
return True
# ── Documentation ─────────────────────────────────────────────────────────────
DOCUMENTATION = """
# πŸ“Š Monitoring of Data Use - User Guide
## What is this tool?
This application helps you **review and explore dataset mentions** extracted from documents.
It displays text excerpts where potential datasets have been identified, along with metadata about each mention.
## What you'll see
Each record shows:
- **πŸ“„ Source Document**: The filename and page number where the text was found
- **πŸ” Highlighted Text**: The original text with dataset mentions highlighted
- **πŸ“‹ Data Type**: The category of the dataset (e.g., census, survey, database)
- **βœ… Dataset Status**: Whether this mention actually refers to a dataset
- **πŸ’‘ Context**: The surrounding text that provides context
- **πŸ“ Explanation**: Why this was classified as a dataset (or not)
- **🏷️ Tags**: Borderline, mixed type, or confusing term indicators
## How to use this tool
### 🎯 Navigation
- **Browse Records**: Use the slider to jump to any record by number
- **Previous/Next Buttons**: Navigate through records one at a time
- **Filters**: The Previous/Next buttons respect your active filters
### πŸ” Filtering Options
1. **Dataset Status Filter**
- **All**: Show all records
- **Datasets only**: Show only records that contain actual dataset references
- **Non-datasets only**: Show records that were identified but don't actually refer to datasets
- **πŸ”₯ Borderline Cases Only**: Show only confusing/mixed cases
2. **Data Type Filter**
- Filter by specific data types (census, survey, database, etc.)
- Types are sorted by frequency (most common first)
### βš–οΈ Comparison Tab
The Comparison tab helps you understand **why the same type or term** can be validated differently:
1. **By Type**: Compare examples of the same data type (e.g., "system") with different validation outcomes
2. **By Term**: Compare the exact same term (e.g., "Project MIS") appearing in different contexts
This helps identify:
- What contextual signals distinguish valid from invalid datasets
- Why borderline cases are confusing
- Patterns in validation decisions
### πŸ’‘ Tips
- Use filters to focus on specific types of data mentions
- The "Validated" field tells you if the mention is a true dataset reference
- Review the "Explanation" to understand the classification reasoning
- Highlighted text shows exactly where the dataset mention appears in context
- Check tags to identify borderline/confusing cases
## Data Source
This viewer uses data from World Bank project documents with revalidation analysis.
"""
# ── Gradio App ───────────────────────────────────────────────────────────────
def create_demo() -> gr.Blocks:
data = load_initial_data()
dynamic_dataset = DynamicDataset(data)
comparison_manager = ComparisonManager(data)
# Count types and sort by frequency (most common first)
type_counter = Counter(rec.get("type") for rec in data if rec.get("type"))
type_values = [t for t, _ in type_counter.most_common()]
type_choices = ["All types"] + type_values
def make_info(rec):
"""Format record metadata for display."""
fn = rec.get("filename", "β€”")
pg = rec.get("page", "β€”")
v_type = rec.get("type", "β€”")
empirical_context = rec.get("empirical_context", "β€”")
explanation = rec.get("explanation", "β€”")
tags = rec.get("tags", [])
is_validated = rec.get("validated", False)
contextual_signal = rec.get("contextual_signal", "β€”")
contextual_reason_model = rec.get("contextual_reason_model", "β€”")
contextual_reason_agent = rec.get("contextual_reason_agent", "β€”")
# Apply conditional highlighting based on validation
if rec.get("ner_text") and rec.get("text") and is_validated is not None:
try:
start, end = rec["ner_text"][0][0], rec["ner_text"][0][1]
term = rec["text"][start:end]
if is_validated:
highlight_style = 'background-color: #90ee90; color: black; padding: 2px 4px; border-radius: 4px; font-weight: bold; border: 1px solid #5cb85c;'
else:
highlight_style = 'background-color: #ff7f7f; color: black; padding: 2px 4px; border-radius: 4px; font-weight: bold; border: 1px solid #d9534f;'
if term and term in empirical_context:
empirical_context = empirical_context.replace(term, f'<span style="{highlight_style}">{term}</span>')
except Exception:
pass
# Build HTML
type_html = f"<code>{v_type}</code>"
# Add type stats if available
type_stats = rec.get("type_stats")
if type_stats:
type_html += f" <small>(Type: {type_stats['validated']} βœ… / {type_stats['not_validated']} ❌)</small>"
tags_html = ""
# Add tags
if tags:
tag_badges = []
if "borderline" in tags:
tag_badges.append("⚠️ <b>Borderline</b>")
if "mixed_type" in tags:
tag_badges.append("πŸ” <b>Mixed Type</b>")
if "confusing_term" in tags:
tag_badges.append("πŸ€” <b>Confusing Term</b>")
if tag_badges:
tags_html = " ".join(tag_badges)
html = f"""
<h3>πŸ“„ Document Information</h3>
<p><b>File:</b> <code>{fn}</code><br>
<b>Page:</b> <code>{pg}</code></p>
<h3>🏷️ Type</h3>
<p>{type_html}</p>
"""
if tags_html:
html += f"""
<h3>🚩 Tags</h3>
<p>{tags_html}</p>
"""
html += f"""
<h3>πŸ“ Surrounding Text</h3>
<p>{empirical_context}</p>
"""
# Add validation analysis
status_icon = 'βœ…' if is_validated else '❌'
status_text = 'Is a dataset' if is_validated else 'Not a dataset'
html += f"""
<h3>πŸ€– Validation Analysis</h3>
<p><b>Assessment:</b> {status_icon} {status_text}</p>
<p><b>Contextual Signal:</b> <code>{contextual_signal}</code></p>
"""
if contextual_reason_agent:
html += f"""
<p><b>Agent Reasoning:</b></p>
<blockquote style="border-left: 3px solid #ccc; padding-left: 10px; color: #666;">
{contextual_reason_agent}
</blockquote>
"""
if contextual_reason_model:
html += f"""
<p><b>Model Reasoning:</b></p>
<blockquote style="border-left: 3px solid #999; padding-left: 10px; color: #888;">
{contextual_reason_model}
</blockquote>
"""
return html
# Basic load by slider index (ignores filters)
def load_example(idx: int):
rec = dynamic_dataset.example(idx)
segs = prepare_for_highlight(rec)
return segs, idx, make_info(rec)
# When filters change β†’ jump to first matching record
def jump_on_filters(dataset_filter, type_filter):
n = dynamic_dataset.len
for i in range(n):
if record_matches_filters(data[i], dataset_filter, type_filter):
dynamic_dataset.current = i
rec = data[i]
segs = prepare_for_highlight(rec)
return segs, i, make_info(rec)
# No match β†’ return blank
return [], 0, "⚠️ No matching records found with the selected filters."
# Navigation respecting filters
def nav_next(dataset_filter, type_filter):
i = dynamic_dataset.current + 1
n = dynamic_dataset.len
while i < n:
if record_matches_filters(data[i], dataset_filter, type_filter):
break
i += 1
if i >= n:
i = dynamic_dataset.current
dynamic_dataset.current = i
rec = data[i]
return prepare_for_highlight(rec), i, make_info(rec)
def nav_prev(dataset_filter, type_filter):
i = dynamic_dataset.current - 1
while i >= 0:
if record_matches_filters(data[i], dataset_filter, type_filter):
break
i -= 1
if i < 0:
i = dynamic_dataset.current
dynamic_dataset.current = i
rec = data[i]
return prepare_for_highlight(rec), i, make_info(rec)
# Comparison Logic - By Type
def load_type_comparison(dtype, pos_idx, neg_idx):
if not dtype:
return [], "Select a type", [], "Select a type", "### βœ… IS Dataset", "### ❌ NOT Dataset"
pos_rec = comparison_manager.get_example_by_type(dtype, True, pos_idx)
neg_rec = comparison_manager.get_example_by_type(dtype, False, neg_idx)
pos_hl = prepare_for_highlight(pos_rec) if pos_rec else []
neg_hl = prepare_for_highlight(neg_rec) if neg_rec else []
pos_info = make_info(pos_rec) if pos_rec else "No examples"
neg_info = make_info(neg_rec) if neg_rec else "No examples"
# Add count info
pos_total = comparison_manager.get_count_by_type(dtype, True)
neg_total = comparison_manager.get_count_by_type(dtype, False)
pos_header = f"### βœ… IS Dataset ({(pos_idx % pos_total) + 1 if pos_total > 0 else 0}/{pos_total})"
neg_header = f"### ❌ NOT Dataset ({(neg_idx % neg_total) + 1 if neg_total > 0 else 0}/{neg_total})"
return pos_hl, pos_info, neg_hl, neg_info, pos_header, neg_header
# Comparison Logic - By Term
def load_term_comparison(term, pos_idx, neg_idx):
if not term:
return [], "Select a term", [], "Select a term", "### βœ… IS Dataset", "### ❌ NOT Dataset"
pos_rec = comparison_manager.get_example_by_term(term, True, pos_idx)
neg_rec = comparison_manager.get_example_by_term(term, False, neg_idx)
pos_hl = prepare_for_highlight(pos_rec) if pos_rec else []
neg_hl = prepare_for_highlight(neg_rec) if neg_rec else []
pos_info = make_info(pos_rec) if pos_rec else "No examples"
neg_info = make_info(neg_rec) if neg_rec else "No examples"
# Add count info
pos_total = comparison_manager.get_count_by_term(term, True)
neg_total = comparison_manager.get_count_by_term(term, False)
pos_header = f"### βœ… IS Dataset ({(pos_idx % pos_total) + 1 if pos_total > 0 else 0}/{pos_total})"
neg_header = f"### ❌ NOT Dataset ({(neg_idx % neg_total) + 1 if neg_total > 0 else 0}/{neg_total})"
return pos_hl, pos_info, neg_hl, neg_info, pos_header, neg_header
def next_pos(current_idx):
return current_idx + 1
def next_neg(current_idx):
return current_idx + 1
# ---- UI ----
with gr.Blocks(title="Monitoring of Data Use") as demo:
gr.Markdown("# πŸ“Š Monitoring of Data Use")
with gr.Tabs():
with gr.Tab("πŸ“– How to Use"):
gr.Markdown(DOCUMENTATION)
with gr.Tab("πŸ” Viewer"):
with gr.Row():
prog = gr.Slider(
minimum=0,
maximum=dynamic_dataset.len - 1,
value=0,
step=1,
label=f"πŸ“‘ Browse Records (1 to {dynamic_dataset.len:,})",
interactive=True,
)
with gr.Row():
dataset_filter = gr.Dropdown(
choices=["All", "Datasets only", "Non-datasets only", "Borderline Cases Only"],
value="Datasets only",
label="🎯 Filter by Validation Status",
)
type_filter = gr.Dropdown(
choices=type_choices,
value="All types",
label="πŸ“‚ Filter by Data Type",
)
inp_box = gr.HighlightedText(
label="πŸ“„ Document Text (with highlighted dataset mentions)",
interactive=False,
show_legend=False,
value=""
)
info_md = gr.HTML(label="ℹ️ Record Details")
with gr.Row():
prev_btn = gr.Button("⬅️ Previous", variant="secondary", size="lg")
next_btn = gr.Button("Next ➑️", variant="primary", size="lg")
# Initial load
demo.load(
fn=load_example,
inputs=prog,
outputs=[inp_box, prog, info_md],
)
# Slider navigation
prog.release(
fn=load_example,
inputs=prog,
outputs=[inp_box, prog, info_md],
)
# Filters
dataset_filter.change(
fn=jump_on_filters,
inputs=[dataset_filter, type_filter],
outputs=[inp_box, prog, info_md],
)
type_filter.change(
fn=jump_on_filters,
inputs=[dataset_filter, type_filter],
outputs=[inp_box, prog, info_md],
)
# Prev / Next navigation respecting filters
prev_btn.click(
fn=nav_prev,
inputs=[dataset_filter, type_filter],
outputs=[inp_box, prog, info_md],
)
next_btn.click(
fn=nav_next,
inputs=[dataset_filter, type_filter],
outputs=[inp_box, prog, info_md],
)
with gr.Tab("βš–οΈ Comparison"):
gr.Markdown("### Side-by-Side Comparison of Borderline Cases")
gr.Markdown("Compare examples to understand **why the same type or term** is validated differently based on context.")
comparison_mode = gr.Radio(
choices=["By Type", "By Term"],
value="By Type",
label="Comparison Mode"
)
# Type comparison
with gr.Group(visible=True) as type_comparison_group:
gr.Markdown("**Compare by Data Type**: See how the same type (e.g., 'system') can be valid or invalid")
comp_type_selector = gr.Dropdown(
choices=comparison_manager.mixed_types,
value=comparison_manager.mixed_types[0] if comparison_manager.mixed_types else None,
label="Select Mixed Type to Compare",
)
type_pos_idx_state = gr.State(0)
type_neg_idx_state = gr.State(0)
with gr.Row():
with gr.Column():
type_pos_header = gr.Markdown("### βœ… IS Dataset")
type_pos_hl_box = gr.HighlightedText(label="Context", interactive=False, show_legend=False, value="")
type_pos_info_box = gr.HTML()
type_pos_next_btn = gr.Button("Next Example ➑️")
with gr.Column():
type_neg_header = gr.Markdown("### ❌ NOT Dataset")
type_neg_hl_box = gr.HighlightedText(label="Context", interactive=False, show_legend=False, value="")
type_neg_info_box = gr.HTML()
type_neg_next_btn = gr.Button("Next Example ➑️")
# Term comparison
with gr.Group(visible=False) as term_comparison_group:
gr.Markdown("**Compare by Term**: See how the exact same term appears in different validation contexts")
comp_term_selector = gr.Dropdown(
choices=comparison_manager.confusing_terms,
value=comparison_manager.confusing_terms[0] if comparison_manager.confusing_terms else None,
label="Select Confusing Term to Compare",
)
term_pos_idx_state = gr.State(0)
term_neg_idx_state = gr.State(0)
with gr.Row():
with gr.Column():
term_pos_header = gr.Markdown("### βœ… IS Dataset")
term_pos_hl_box = gr.HighlightedText(label="Context", interactive=False, show_legend=False, value="")
term_pos_info_box = gr.HTML()
term_pos_next_btn = gr.Button("Next Example ➑️")
with gr.Column():
term_neg_header = gr.Markdown("### ❌ NOT Dataset")
term_neg_hl_box = gr.HighlightedText(label="Context", interactive=False, show_legend=False, value="")
term_neg_info_box = gr.HTML()
term_neg_next_btn = gr.Button("Next Example ➑️")
# Toggle visibility based on mode
def toggle_comparison_mode(mode):
return gr.update(visible=mode == "By Type"), gr.update(visible=mode == "By Term")
comparison_mode.change(
fn=toggle_comparison_mode,
inputs=[comparison_mode],
outputs=[type_comparison_group, term_comparison_group]
)
# Type comparison events
comp_type_selector.change(
fn=lambda: (0, 0),
outputs=[type_pos_idx_state, type_neg_idx_state]
).then(
fn=load_type_comparison,
inputs=[comp_type_selector, type_pos_idx_state, type_neg_idx_state],
outputs=[type_pos_hl_box, type_pos_info_box, type_neg_hl_box, type_neg_info_box, type_pos_header, type_neg_header]
)
type_pos_next_btn.click(
fn=next_pos,
inputs=[type_pos_idx_state],
outputs=[type_pos_idx_state]
).then(
fn=load_type_comparison,
inputs=[comp_type_selector, type_pos_idx_state, type_neg_idx_state],
outputs=[type_pos_hl_box, type_pos_info_box, type_neg_hl_box, type_neg_info_box, type_pos_header, type_neg_header]
)
type_neg_next_btn.click(
fn=next_neg,
inputs=[type_neg_idx_state],
outputs=[type_neg_idx_state]
).then(
fn=load_type_comparison,
inputs=[comp_type_selector, type_pos_idx_state, type_neg_idx_state],
outputs=[type_pos_hl_box, type_pos_info_box, type_neg_hl_box, type_neg_info_box, type_pos_header, type_neg_header]
)
# Term comparison events
comp_term_selector.change(
fn=lambda: (0, 0),
outputs=[term_pos_idx_state, term_neg_idx_state]
).then(
fn=load_term_comparison,
inputs=[comp_term_selector, term_pos_idx_state, term_neg_idx_state],
outputs=[term_pos_hl_box, term_pos_info_box, term_neg_hl_box, term_neg_info_box, term_pos_header, term_neg_header]
)
term_pos_next_btn.click(
fn=next_pos,
inputs=[term_pos_idx_state],
outputs=[term_pos_idx_state]
).then(
fn=load_term_comparison,
inputs=[comp_term_selector, term_pos_idx_state, term_neg_idx_state],
outputs=[term_pos_hl_box, term_pos_info_box, term_neg_hl_box, term_neg_info_box, term_pos_header, term_neg_header]
)
term_neg_next_btn.click(
fn=next_neg,
inputs=[term_neg_idx_state],
outputs=[term_neg_idx_state]
).then(
fn=load_term_comparison,
inputs=[comp_term_selector, term_pos_idx_state, term_neg_idx_state],
outputs=[term_pos_hl_box, term_pos_info_box, term_neg_hl_box, term_neg_info_box, term_pos_header, term_neg_header]
)
return demo
if __name__ == "__main__":
create_demo().launch(share=False, debug=False)