Spaces:
Sleeping
Sleeping
| """ | |
| Hugging Face Space for viewing Mediform/seed_data_v5 dataset. | |
| Displays doctor-patient conversations with EHR reference tracking. | |
| """ | |
| import gradio as gr | |
| import re | |
| import json | |
| from datasets import load_dataset | |
| def parse_json_fields(item: dict) -> dict: | |
| """Parse JSON string fields in dataset item.""" | |
| result = dict(item) | |
| # Fields that may be stored as JSON strings in HF dataset | |
| json_fields = ["conversations", "ehr_dict", "orders"] | |
| for field in json_fields: | |
| if field in result and isinstance(result[field], str): | |
| try: | |
| result[field] = json.loads(result[field]) | |
| except json.JSONDecodeError: | |
| pass | |
| return result | |
| def load_data(): | |
| """Load dataset from Hugging Face Hub or local fallback.""" | |
| try: | |
| ds = load_dataset("Mediform/seed_data_v5", split="train") | |
| # Convert to list of dicts and parse JSON string fields | |
| data = [parse_json_fields(dict(row)) for row in ds] | |
| return data | |
| except Exception as e: | |
| print(f"Failed to load from HF Hub: {e}") | |
| # Fallback to local file if available | |
| try: | |
| with open("term_groups_ehr_dataset_v3.json", "r", encoding="utf-8") as f: | |
| local_data = json.load(f) | |
| return local_data.get("data", []) | |
| except: | |
| return [] | |
| # Load data at startup | |
| DATA = load_data() | |
| # Category mapping for display | |
| CATEGORY_LABELS = { | |
| "history": "History (Anamnese)", | |
| "findings": "Findings (Befunde)", | |
| "treatment": "Treatment (Therapie)", | |
| "plan": "Plan (Prozedere)", | |
| "order": "Orders (Anordnungen)" | |
| } | |
| VARIANTS = ["natural", "inline_dictation", "post_dictation"] | |
| def get_conversation_options(): | |
| """Get list of conversation options for dropdown.""" | |
| options = [] | |
| for i, item in enumerate(DATA): | |
| scenario = item.get("brief_scenario", f"Conversation {i+1}") | |
| # Truncate long scenarios | |
| if len(scenario) > 80: | |
| scenario = scenario[:77] + "..." | |
| options.append(f"{i+1}. {scenario}") | |
| return options | |
| def extract_refs_from_turn(content: str) -> dict: | |
| """ | |
| Extract <ref keys="...">...</ref> tags from turn content. | |
| Returns dict mapping category to list of (key, text) tuples. | |
| """ | |
| refs = {"history": [], "findings": [], "treatment": [], "plan": [], "order": []} | |
| # Pattern to match <ref keys="key1,key2">text</ref> | |
| pattern = r'<ref\s+keys="([^"]+)">([^<]+)</ref>' | |
| for match in re.finditer(pattern, content): | |
| keys_str = match.group(1) | |
| text = match.group(2) | |
| for key in keys_str.split(","): | |
| key = key.strip() | |
| # Determine category from key prefix | |
| if key.startswith("history_"): | |
| refs["history"].append((key, text)) | |
| elif key.startswith("findings_"): | |
| refs["findings"].append((key, text)) | |
| elif key.startswith("treatment_"): | |
| refs["treatment"].append((key, text)) | |
| elif key.startswith("plan_"): | |
| refs["plan"].append((key, text)) | |
| elif key.startswith("order_"): | |
| refs["order"].append((key, text)) | |
| return refs | |
| def clean_turn_content(content: str) -> str: | |
| """Remove <ref> tags but keep the text content.""" | |
| return re.sub(r'<ref\s+keys="[^"]+">([^<]+)</ref>', r'\1', content) | |
| def format_role(role: str) -> str: | |
| """Format role for display.""" | |
| role_map = { | |
| "patient": "Patient", | |
| "doctor": "Arzt", | |
| "doctor_dictation": "Arzt (Diktat)" | |
| } | |
| return role_map.get(role, role) | |
| def get_role_color(role: str) -> str: | |
| """Get background color for role.""" | |
| if role == "patient": | |
| return "#e3f2fd" # Light blue | |
| elif role == "doctor": | |
| return "#e8f5e9" # Light green | |
| else: | |
| return "#fff3e0" # Light orange for dictation | |
| def render_conversation(conv_idx: int, variant: str, turn_idx: int): | |
| """ | |
| Render conversation up to turn_idx and collect EHR references. | |
| Returns (conversation_html, history, findings, treatment, plan, orders, max_turns, current_turn) | |
| """ | |
| if not DATA or conv_idx < 0 or conv_idx >= len(DATA): | |
| return "<p>No data available</p>", "", "", "", "", "", 0, 0 | |
| item = DATA[conv_idx] | |
| conversations = item.get("conversations", {}) | |
| if variant not in conversations: | |
| return f"<p>Variant '{variant}' not available</p>", "", "", "", "", "", 0, 0 | |
| turns = conversations[variant].get("turns", []) | |
| max_turns = len(turns) | |
| if max_turns == 0: | |
| return "<p>No turns in this conversation</p>", "", "", "", "", "", 0, 0 | |
| # Clamp turn_idx | |
| turn_idx = max(0, min(turn_idx, max_turns - 1)) | |
| # Get EHR data for reference lookup | |
| ehr_dict = item.get("ehr_dict", {}) | |
| # Collect all refs up to current turn | |
| all_refs = {"history": {}, "findings": {}, "treatment": {}, "plan": {}, "order": {}} | |
| # Build conversation HTML | |
| conv_html = '<div style="max-height: 500px; overflow-y: auto; padding: 10px;">' | |
| for i in range(turn_idx + 1): | |
| turn = turns[i] | |
| role = turn.get("role", "unknown") | |
| content = turn.get("content", "") | |
| # Extract refs from this turn | |
| turn_refs = extract_refs_from_turn(content) | |
| # Add refs to collected refs (using key as identifier to avoid duplicates) | |
| for category, ref_list in turn_refs.items(): | |
| for key, text in ref_list: | |
| if key not in all_refs[category]: | |
| # Look up full text from ehr_dict | |
| full_text = ehr_dict.get(key, text) | |
| all_refs[category][key] = full_text | |
| # Clean content for display | |
| clean_content = clean_turn_content(content) | |
| role_display = format_role(role) | |
| bg_color = get_role_color(role) | |
| conv_html += f''' | |
| <div style="margin-bottom: 12px; padding: 10px; border-radius: 8px; background-color: {bg_color};"> | |
| <strong style="color: #333;">{role_display}:</strong> | |
| <p style="margin: 5px 0 0 0; color: #444;">{clean_content}</p> | |
| </div> | |
| ''' | |
| conv_html += '</div>' | |
| # Format bucket contents | |
| def format_bucket(refs_dict: dict) -> str: | |
| if not refs_dict: | |
| return "<em style='color: #999;'>Keine Einträge</em>" | |
| items = [] | |
| for key, text in sorted(refs_dict.items()): | |
| # Handle orders which might be JSON | |
| if key.startswith("order_") and text.startswith("{"): | |
| try: | |
| order_data = json.loads(text) | |
| text = order_data.get("details", text) | |
| except: | |
| pass | |
| items.append(f"<li style='margin-bottom: 8px;'>{text}</li>") | |
| return f"<ul style='margin: 0; padding-left: 20px;'>{''.join(items)}</ul>" | |
| history_html = format_bucket(all_refs["history"]) | |
| findings_html = format_bucket(all_refs["findings"]) | |
| treatment_html = format_bucket(all_refs["treatment"]) | |
| plan_html = format_bucket(all_refs["plan"]) | |
| orders_html = format_bucket(all_refs["order"]) | |
| return conv_html, history_html, findings_html, treatment_html, plan_html, orders_html, max_turns, turn_idx | |
| def on_conversation_change(conv_selection: str, variant: str): | |
| """Handle conversation dropdown change.""" | |
| if not conv_selection: | |
| return "<p>Select a conversation</p>", "", "", "", "", "", 0, 0 | |
| # Extract index from selection (format: "1. scenario...") | |
| try: | |
| conv_idx = int(conv_selection.split(".")[0]) - 1 | |
| except: | |
| conv_idx = 0 | |
| # Start at first turn | |
| return render_conversation(conv_idx, variant, 0) | |
| def on_variant_change(conv_selection: str, variant: str, current_turn: int): | |
| """Handle variant dropdown change.""" | |
| if not conv_selection: | |
| return "<p>Select a conversation</p>", "", "", "", "", "", 0, 0 | |
| try: | |
| conv_idx = int(conv_selection.split(".")[0]) - 1 | |
| except: | |
| conv_idx = 0 | |
| # Reset to first turn when variant changes | |
| return render_conversation(conv_idx, variant, 0) | |
| def on_next(conv_selection: str, variant: str, current_turn: int, max_turns: int): | |
| """Go to next turn.""" | |
| if not conv_selection: | |
| return "<p>Select a conversation</p>", "", "", "", "", "", 0, 0 | |
| try: | |
| conv_idx = int(conv_selection.split(".")[0]) - 1 | |
| except: | |
| conv_idx = 0 | |
| new_turn = min(current_turn + 1, max_turns - 1) | |
| return render_conversation(conv_idx, variant, new_turn) | |
| def on_back(conv_selection: str, variant: str, current_turn: int, max_turns: int): | |
| """Go to previous turn.""" | |
| if not conv_selection: | |
| return "<p>Select a conversation</p>", "", "", "", "", "", 0, 0 | |
| try: | |
| conv_idx = int(conv_selection.split(".")[0]) - 1 | |
| except: | |
| conv_idx = 0 | |
| new_turn = max(current_turn - 1, 0) | |
| return render_conversation(conv_idx, variant, new_turn) | |
| def on_reset(conv_selection: str, variant: str): | |
| """Reset to first turn.""" | |
| if not conv_selection: | |
| return "<p>Select a conversation</p>", "", "", "", "", "", 0, 0 | |
| try: | |
| conv_idx = int(conv_selection.split(".")[0]) - 1 | |
| except: | |
| conv_idx = 0 | |
| return render_conversation(conv_idx, variant, 0) | |
| def on_end(conv_selection: str, variant: str, max_turns: int): | |
| """Go to last turn.""" | |
| if not conv_selection: | |
| return "<p>Select a conversation</p>", "", "", "", "", "", 0, 0 | |
| try: | |
| conv_idx = int(conv_selection.split(".")[0]) - 1 | |
| except: | |
| conv_idx = 0 | |
| return render_conversation(conv_idx, variant, max_turns - 1) | |
| # Build Gradio interface | |
| with gr.Blocks(title="Medical Conversation Viewer") as demo: | |
| gr.Markdown(""" | |
| # Medical Conversation Dataset Viewer | |
| View synthetic German doctor-patient conversations with EHR (Electronic Health Record) reference tracking. | |
| **Instructions:** | |
| 1. Select a conversation from the dropdown | |
| 2. Choose a conversation variant (natural, inline_dictation, post_dictation) | |
| 3. Use the navigation buttons to step through the conversation | |
| 4. Watch the EHR buckets populate as references appear in the dialogue | |
| """) | |
| # State variables | |
| max_turns_state = gr.State(0) | |
| current_turn_state = gr.State(0) | |
| # Top controls | |
| with gr.Row(): | |
| conv_dropdown = gr.Dropdown( | |
| choices=get_conversation_options(), | |
| label="Select Conversation", | |
| value=get_conversation_options()[0] if get_conversation_options() else None, | |
| scale=3 | |
| ) | |
| variant_dropdown = gr.Dropdown( | |
| choices=VARIANTS, | |
| label="Variant", | |
| value="natural", | |
| scale=1 | |
| ) | |
| # Navigation controls | |
| with gr.Row(): | |
| reset_btn = gr.Button("⏮ Start", size="sm") | |
| back_btn = gr.Button("◀ Back", size="sm") | |
| turn_display = gr.Markdown("Turn: 1 / 1") | |
| next_btn = gr.Button("Next ▶", size="sm") | |
| end_btn = gr.Button("End ⏭", size="sm") | |
| # Main content area | |
| with gr.Row(): | |
| # Left: Conversation | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Conversation") | |
| conversation_html = gr.HTML("<p>Select a conversation to begin</p>") | |
| # Right: EHR Buckets | |
| with gr.Column(scale=1): | |
| gr.Markdown("### EHR Summary") | |
| with gr.Accordion("History (Anamnese)", open=True): | |
| history_html = gr.HTML("<em style='color: #999;'>Keine Einträge</em>") | |
| with gr.Accordion("Findings (Befunde)", open=True): | |
| findings_html = gr.HTML("<em style='color: #999;'>Keine Einträge</em>") | |
| with gr.Accordion("Treatment (Therapie)", open=True): | |
| treatment_html = gr.HTML("<em style='color: #999;'>Keine Einträge</em>") | |
| with gr.Accordion("Plan (Prozedere)", open=True): | |
| plan_html = gr.HTML("<em style='color: #999;'>Keine Einträge</em>") | |
| with gr.Accordion("Orders (Anordnungen)", open=True): | |
| orders_html = gr.HTML("<em style='color: #999;'>Keine Einträge</em>") | |
| # Output components list for convenience | |
| outputs = [ | |
| conversation_html, | |
| history_html, | |
| findings_html, | |
| treatment_html, | |
| plan_html, | |
| orders_html, | |
| max_turns_state, | |
| current_turn_state | |
| ] | |
| # Update turn display | |
| def update_turn_display(current_turn, max_turns): | |
| return f"**Turn: {current_turn + 1} / {max_turns}**" | |
| # Event handlers | |
| def handle_conversation_change(conv, var): | |
| result = on_conversation_change(conv, var) | |
| turn_text = update_turn_display(result[7], result[6]) | |
| return result + (turn_text,) | |
| def handle_variant_change(conv, var, curr): | |
| result = on_variant_change(conv, var, curr) | |
| turn_text = update_turn_display(result[7], result[6]) | |
| return result + (turn_text,) | |
| def handle_next(conv, var, curr, max_t): | |
| result = on_next(conv, var, curr, max_t) | |
| turn_text = update_turn_display(result[7], result[6]) | |
| return result + (turn_text,) | |
| def handle_back(conv, var, curr, max_t): | |
| result = on_back(conv, var, curr, max_t) | |
| turn_text = update_turn_display(result[7], result[6]) | |
| return result + (turn_text,) | |
| def handle_reset(conv, var): | |
| result = on_reset(conv, var) | |
| turn_text = update_turn_display(result[7], result[6]) | |
| return result + (turn_text,) | |
| def handle_end(conv, var, max_t): | |
| result = on_end(conv, var, max_t) | |
| turn_text = update_turn_display(result[7], result[6]) | |
| return result + (turn_text,) | |
| # Wire up events | |
| conv_dropdown.change( | |
| fn=handle_conversation_change, | |
| inputs=[conv_dropdown, variant_dropdown], | |
| outputs=outputs + [turn_display] | |
| ) | |
| variant_dropdown.change( | |
| fn=handle_variant_change, | |
| inputs=[conv_dropdown, variant_dropdown, current_turn_state], | |
| outputs=outputs + [turn_display] | |
| ) | |
| next_btn.click( | |
| fn=handle_next, | |
| inputs=[conv_dropdown, variant_dropdown, current_turn_state, max_turns_state], | |
| outputs=outputs + [turn_display] | |
| ) | |
| back_btn.click( | |
| fn=handle_back, | |
| inputs=[conv_dropdown, variant_dropdown, current_turn_state, max_turns_state], | |
| outputs=outputs + [turn_display] | |
| ) | |
| reset_btn.click( | |
| fn=handle_reset, | |
| inputs=[conv_dropdown, variant_dropdown], | |
| outputs=outputs + [turn_display] | |
| ) | |
| end_btn.click( | |
| fn=handle_end, | |
| inputs=[conv_dropdown, variant_dropdown, max_turns_state], | |
| outputs=outputs + [turn_display] | |
| ) | |
| # Load initial conversation | |
| demo.load( | |
| fn=handle_conversation_change, | |
| inputs=[conv_dropdown, variant_dropdown], | |
| outputs=outputs + [turn_display] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |