Spaces:
Sleeping
Sleeping
| # app.py - Gradio version for Hugging Face Spaces | |
| import gradio as gr | |
| import requests | |
| import json | |
| import time | |
| from typing import Dict, Any, Tuple, List | |
| # Configuration - Can switch between local and AWS | |
| LOCAL_API = "http://localhost:8080" | |
| AWS_API = "https://rj92jktas7.us-east-2.awsapprunner.com" | |
| # Global variables to maintain state | |
| current_session = { | |
| 'session_id': None, | |
| 'validation_started': False, | |
| 'all_questions': [], | |
| 'current_question_id': None, | |
| 'validation_complete': False, | |
| 'api_base_url': AWS_API | |
| } | |
| # Sample records for testing | |
| SAMPLE_RECORDS = { | |
| "Medical Record 1 - Multiple Issues": { | |
| "UUID": "sample-001", | |
| "patient_info": { | |
| "name": "John Doe", | |
| "age": 28 | |
| }, | |
| "clinical_risk": { | |
| "thyroid_disease": "how are you?", | |
| "anemia": None, | |
| "preeclampsia_current": "not sure", | |
| "geriatric_mother": "maybe", | |
| "malaria_chronic_history": ".", | |
| "preeclampsia_prior": "[insert explanation here]" | |
| }, | |
| "rmc1": { | |
| "rmc1_respect": True, | |
| "rmc1_explanation": "[insert explanation here]", | |
| "rmc1_facility": "Test Hospital" | |
| }, | |
| "immunization_birth": { | |
| "birth_bcg": False, | |
| "birth_oral_polio": True, | |
| "birth_no_vax_reason": "not sure" | |
| } | |
| }, | |
| "Medical Record 2 - Few Issues": { | |
| "UUID": "sample-002", | |
| "patient_info": { | |
| "name": "Jane Smith", | |
| "age": 35 | |
| }, | |
| "clinical_risk": { | |
| "thyroid_disease": True, | |
| "anemia": "maybe", | |
| "preeclampsia_current": False, | |
| "malaria_chronic_history": "[insert explanation here]" | |
| }, | |
| "rmc2": { | |
| "rmc2_respect": False, | |
| "rmc2_explanation": ".", | |
| "rmc2_facility": "Another Hospital" | |
| } | |
| } | |
| } | |
| def check_api_health(): | |
| """Check if the API is running and get feature info""" | |
| try: | |
| response = requests.get(f"{current_session['api_base_url']}/health", timeout=10) | |
| if response.status_code == 200: | |
| health_data = response.json() | |
| return True, health_data | |
| else: | |
| return False, f"Status code: {response.status_code}" | |
| except Exception as e: | |
| return False, f"Error: {str(e)}" | |
| def start_validation_session(record: Dict[str, Any]): | |
| """Start a validation session with the API""" | |
| try: | |
| response = requests.post(f"{current_session['api_base_url']}/start-validation", json=record, timeout=30) | |
| if response.status_code == 200: | |
| return response.json() | |
| else: | |
| return None, f"Failed to start validation: {response.text}" | |
| except Exception as e: | |
| return None, f"Error connecting to API: {str(e)}" | |
| def get_question_by_id(question_id: str): | |
| """Get a specific question by its unique ID""" | |
| try: | |
| response = requests.get(f"{current_session['api_base_url']}/get-question/{question_id}", timeout=15) | |
| if response.status_code == 200: | |
| return response.json() | |
| else: | |
| return None | |
| except Exception as e: | |
| return None | |
| def list_all_questions(session_id: str): | |
| """Get list of all questions for a session""" | |
| try: | |
| response = requests.get(f"{current_session['api_base_url']}/list-questions/{session_id}", timeout=15) | |
| if response.status_code == 200: | |
| return response.json() | |
| else: | |
| return None | |
| except Exception as e: | |
| return None | |
| def submit_response_to_question(question_id: str, user_response: str): | |
| """Submit a response to a specific question""" | |
| try: | |
| response = requests.post(f"{current_session['api_base_url']}/submit-response/{question_id}", | |
| json={"response": user_response}, timeout=30) | |
| if response.status_code == 200: | |
| return response.json() | |
| else: | |
| return None | |
| except Exception as e: | |
| return None | |
| def get_final_results(session_id: str): | |
| """Get the final validation results""" | |
| try: | |
| response = requests.get(f"{current_session['api_base_url']}/get-results/{session_id}", timeout=15) | |
| if response.status_code == 200: | |
| return response.json() | |
| else: | |
| return None | |
| except Exception as e: | |
| return None | |
| # Gradio Interface Functions | |
| def update_api_endpoint(api_choice, custom_url=""): | |
| """Update the API endpoint based on user selection""" | |
| if api_choice == "AWS Deployment": | |
| current_session['api_base_url'] = AWS_API | |
| status = f"β Using AWS App Runner: {AWS_API}" | |
| elif api_choice == "Local Development": | |
| current_session['api_base_url'] = LOCAL_API | |
| status = f"βΉοΈ Using Local API: {LOCAL_API}" | |
| else: # Custom URL | |
| current_session['api_base_url'] = custom_url if custom_url else AWS_API | |
| status = f"π§ Using Custom URL: {current_session['api_base_url']}" | |
| # Check API health | |
| is_healthy, health_info = check_api_health() | |
| if is_healthy: | |
| if isinstance(health_info, dict): | |
| status += f"\n\nπ API Status:\n" | |
| status += f"β’ Version: {health_info.get('version', 'N/A')}\n" | |
| status += f"β’ Active Sessions: {health_info.get('active_sessions', 0)}\n" | |
| status += f"β’ Total Questions: {health_info.get('total_questions', 0)}\n" | |
| features = health_info.get('features', []) | |
| if features: | |
| status += f"β’ Features: {', '.join(features)}" | |
| else: | |
| status += f"\n\nβ API Error: {health_info}" | |
| return status | |
| def load_sample_record(sample_choice): | |
| """Load a sample record based on user selection""" | |
| if sample_choice and sample_choice in SAMPLE_RECORDS: | |
| record_json = json.dumps(SAMPLE_RECORDS[sample_choice], indent=2) | |
| return record_json, f"β Loaded: {sample_choice}" | |
| return "", "Please select a sample record" | |
| def parse_json_record(json_text): | |
| """Parse and validate JSON record""" | |
| if not json_text.strip(): | |
| return "β Please provide a JSON record", "" | |
| try: | |
| record = json.loads(json_text) | |
| preview = json.dumps(record, indent=2) | |
| return f"β Valid JSON record parsed successfully!", preview | |
| except json.JSONDecodeError as e: | |
| return f"β Invalid JSON: {str(e)}", "" | |
| def start_validation(json_text): | |
| """Start the validation process""" | |
| if not json_text.strip(): | |
| return "β Please provide a JSON record first", "", gr.update(visible=False) | |
| try: | |
| record = json.loads(json_text) | |
| except json.JSONDecodeError as e: | |
| return f"β Invalid JSON: {str(e)}", "", gr.update(visible=False) | |
| # Start validation session | |
| result = start_validation_session(record) | |
| if isinstance(result, tuple): # Error case | |
| return f"β {result[1]}", "", gr.update(visible=False) | |
| if result: | |
| current_session['session_id'] = result["session_id"] | |
| current_session['all_questions'] = result.get("questions", []) | |
| current_session['validation_started'] = True | |
| if result["total_questions"] == 0: | |
| current_session['validation_complete'] = True | |
| return "β No missing fields found! Record is already complete.", "", gr.update(visible=False) | |
| else: | |
| status = f"β Created {result['total_questions']} individual questions with unique tracking IDs\n" | |
| # Show tracking info | |
| tracking_info = result.get("tracking", {}) | |
| if tracking_info.get("question_tracking_enabled"): | |
| status += "π Each question now has a unique ID for precise tracking" | |
| # Get first question | |
| question_status = list_all_questions(current_session['session_id']) | |
| if question_status: | |
| questions_list = question_status["questions"] | |
| if questions_list: | |
| first_question = questions_list[0] | |
| current_session['current_question_id'] = first_question['question_id'] | |
| # Get question details | |
| question_details = get_question_by_id(first_question['question_id']) | |
| if question_details: | |
| question_display = f"**Field:** {question_details['field_path']}\n" | |
| question_display += f"**Question ID:** {question_details['question_id']}\n\n" | |
| question_display += f"### {question_details['question']}" | |
| progress_info = f"Progress: 0/{result['total_questions']} (0.0%)" | |
| return status, f"{progress_info}\n\n{question_display}", gr.update(visible=True) | |
| return status, "Loading questions...", gr.update(visible=True) | |
| else: | |
| return "β Failed to start validation session", "", gr.update(visible=False) | |
| def submit_answer(answer_text): | |
| """Submit an answer to the current question""" | |
| if not answer_text.strip(): | |
| return "β Please provide an answer", "", gr.update(visible=True) | |
| if not current_session.get('current_question_id'): | |
| return "β No active question", "", gr.update(visible=True) | |
| # Submit response | |
| response = submit_response_to_question(current_session['current_question_id'], answer_text) | |
| if not response: | |
| return "β Failed to submit response", "", gr.update(visible=True) | |
| # Process response | |
| result_text = "" | |
| if response.get("status") == "needs_clarification": | |
| result_text = f"β οΈ Your answer needs clarification\n\n" | |
| result_text += f"**Follow-up question:** {response['clarification_question']}\n\n" | |
| result_text += "Please provide a clearer, more specific answer." | |
| # Update current question to the clarification | |
| current_session['current_question_id'] = response.get('clarification_question_id', current_session['current_question_id']) | |
| return result_text, "", gr.update(visible=True) | |
| elif response.get("status") in ["completed", "low_confidence"]: | |
| validation = response["validation"] | |
| confidence = validation["confidence"] | |
| if response.get("status") == "completed": | |
| result_text = f"β Answer accepted! (Confidence: {confidence:.2f})\n\n" | |
| else: | |
| result_text = f"β οΈ Answer accepted with low confidence: {confidence:.2f}\n\n" | |
| # Show tracking details | |
| tracking_info = response.get("tracking", {}) | |
| if tracking_info: | |
| result_text += f"π **Tracking Details:**\n" | |
| result_text += f"β’ Question ID: {tracking_info.get('question_unique_id', 'N/A')}\n" | |
| result_text += f"β’ Response Time: {tracking_info.get('response_timestamp', 'N/A')}\n" | |
| result_text += f"β’ Attempt Count: {tracking_info.get('attempt_count', 'N/A')}\n" | |
| result_text += f"β’ Validation Confidence: {tracking_info.get('validation_confidence', 0):.3f}\n\n" | |
| # Check if all completed | |
| progress = response.get("progress", {}) | |
| if progress.get("all_completed"): | |
| current_session['validation_complete'] = True | |
| result_text += "π **All questions completed!**" | |
| return result_text, "All questions have been answered. You can now view the final results.", gr.update(visible=False) | |
| else: | |
| # Get next question | |
| question_status = list_all_questions(current_session['session_id']) | |
| if question_status: | |
| questions_list = question_status["questions"] | |
| completed_count = question_status["completed_questions"] | |
| total_count = question_status["total_questions"] | |
| # Find next pending question | |
| next_question = None | |
| for q in questions_list: | |
| if q['status'] in ['pending', 'needs_clarification']: | |
| next_question = q | |
| break | |
| if next_question: | |
| current_session['current_question_id'] = next_question['question_id'] | |
| # Get question details | |
| question_details = get_question_by_id(next_question['question_id']) | |
| if question_details: | |
| question_display = f"**Field:** {question_details['field_path']}\n" | |
| question_display += f"**Question ID:** {question_details['question_id']}\n\n" | |
| question_display += f"### {question_details['question']}" | |
| progress_info = f"Progress: {completed_count}/{total_count} ({completed_count/total_count*100:.1f}%)" | |
| return result_text, f"{progress_info}\n\n{question_display}", gr.update(visible=True) | |
| return result_text, "Loading next question...", gr.update(visible=True) | |
| return "β Unexpected response", "", gr.update(visible=True) | |
| def get_results(): | |
| """Get the final validation results""" | |
| if not current_session.get('session_id'): | |
| return "β No active session" | |
| results = get_final_results(current_session['session_id']) | |
| if not results: | |
| return "β Failed to get results" | |
| # Format results | |
| result_text = "# π Validation Complete!\n\n" | |
| # Summary metrics | |
| result_text += "## π Summary\n" | |
| result_text += f"β’ **Fields Processed:** {results['summary']['total_fields_processed']}\n" | |
| result_text += f"β’ **Success Rate:** {results['summary']['success_rate']:.1f}%\n" | |
| result_text += f"β’ **Average Confidence:** {results['summary']['average_confidence']:.2f}\n" | |
| result_text += f"β’ **Average Attempts per Question:** {results['summary']['average_attempts_per_question']:.1f}\n\n" | |
| # Tracking metrics | |
| result_text += "## π Question Tracking\n" | |
| result_text += f"β’ **Total Questions:** {results['summary']['total_questions']}\n" | |
| result_text += f"β’ **Questions with Clarification:** {results['summary']['questions_with_clarification']}\n" | |
| result_text += f"β’ **Completed Questions:** {results['summary']['completed_questions']}\n\n" | |
| # Updated record | |
| result_text += "## π Updated Record\n" | |
| result_text += "```json\n" | |
| result_text += json.dumps(results["updated_record"], indent=2) | |
| result_text += "\n```\n\n" | |
| # Download files | |
| updated_record_json = json.dumps(results["updated_record"], indent=2) | |
| tracking_data = { | |
| "session_info": results.get("session_info", {}), | |
| "question_details": results.get("question_details", {}), | |
| "summary": results.get("summary", {}) | |
| } | |
| tracking_json = json.dumps(tracking_data, indent=2) | |
| full_results_json = json.dumps(results, indent=2) | |
| return result_text, updated_record_json, tracking_json, full_results_json | |
| def reset_session(): | |
| """Reset the current session""" | |
| global current_session | |
| session_id = current_session.get('session_id', 'new') | |
| current_session = { | |
| 'session_id': None, | |
| 'validation_started': False, | |
| 'all_questions': [], | |
| 'current_question_id': None, | |
| 'validation_complete': False, | |
| 'api_base_url': current_session['api_base_url'] # Keep the API setting | |
| } | |
| return f"π Session reset. Ready for new validation.", "", "", gr.update(visible=False) | |
| # Create Gradio Interface | |
| def create_interface(): | |
| with gr.Blocks(title="Medical Record Validation", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# π₯ Medical Record Data Validation") | |
| gr.Markdown("**Enhanced with Question-Level Tracking**") | |
| with gr.Tab("π Setup & Configuration"): | |
| gr.Markdown("## API Configuration") | |
| api_choice = gr.Radio( | |
| choices=["AWS Deployment", "Local Development", "Custom URL"], | |
| value="AWS Deployment", | |
| label="Choose API endpoint" | |
| ) | |
| custom_url = gr.Textbox( | |
| label="Custom API URL", | |
| placeholder="https://your-api-url.com", | |
| visible=False | |
| ) | |
| api_status = gr.Markdown("π Checking API status...") | |
| # Update API status when choice changes | |
| def update_custom_url_visibility(choice): | |
| return gr.update(visible=(choice == "Custom URL")) | |
| api_choice.change( | |
| update_custom_url_visibility, | |
| inputs=[api_choice], | |
| outputs=[custom_url] | |
| ) | |
| api_choice.change( | |
| update_api_endpoint, | |
| inputs=[api_choice, custom_url], | |
| outputs=[api_status] | |
| ) | |
| custom_url.change( | |
| update_api_endpoint, | |
| inputs=[api_choice, custom_url], | |
| outputs=[api_status] | |
| ) | |
| with gr.Tab("π Record Input"): | |
| gr.Markdown("## Step 1: Provide Medical Record") | |
| with gr.Row(): | |
| with gr.Column(): | |
| sample_choice = gr.Dropdown( | |
| choices=list(SAMPLE_RECORDS.keys()), | |
| label="Choose from sample records", | |
| value=None | |
| ) | |
| load_sample_btn = gr.Button("Load Sample Record", variant="secondary") | |
| with gr.Column(): | |
| upload_file = gr.File( | |
| label="Upload JSON file", | |
| file_types=[".json"] | |
| ) | |
| json_input = gr.Textbox( | |
| label="Medical Record JSON", | |
| placeholder="Paste your JSON record here or use the options above...", | |
| lines=10 | |
| ) | |
| parse_status = gr.Markdown("") | |
| json_preview = gr.Code(label="Record Preview", language="json", visible=False) | |
| validate_btn = gr.Button("π Start Validation", variant="primary", size="lg") | |
| validation_status = gr.Markdown("") | |
| # Event handlers | |
| load_sample_btn.click( | |
| load_sample_record, | |
| inputs=[sample_choice], | |
| outputs=[json_input, parse_status] | |
| ) | |
| json_input.change( | |
| parse_json_record, | |
| inputs=[json_input], | |
| outputs=[parse_status, json_preview] | |
| ) | |
| with gr.Tab("β Answer Questions"): | |
| gr.Markdown("## Step 2: Answer Validation Questions") | |
| question_display = gr.Markdown("Start validation to see questions here...") | |
| with gr.Row(): | |
| answer_input = gr.Textbox( | |
| label="Your Answer", | |
| placeholder="Enter your response here...", | |
| scale=3 | |
| ) | |
| submit_btn = gr.Button("Submit Answer", variant="primary", scale=1) | |
| answer_result = gr.Markdown("") | |
| question_section = gr.Group(visible=False) | |
| with question_section: | |
| submit_btn.click( | |
| submit_answer, | |
| inputs=[answer_input], | |
| outputs=[answer_result, question_display, question_section] | |
| ) | |
| with gr.Tab("π Results"): | |
| gr.Markdown("## Step 3: Validation Results") | |
| get_results_btn = gr.Button("π₯ Get Final Results", variant="primary") | |
| results_display = gr.Markdown("Complete validation to see results here...") | |
| with gr.Row(): | |
| download_record = gr.File(label="Updated Record", visible=False) | |
| download_tracking = gr.File(label="Tracking Details", visible=False) | |
| download_full = gr.File(label="Complete Results", visible=False) | |
| reset_btn = gr.Button("π Start New Validation", variant="secondary") | |
| reset_status = gr.Markdown("") | |
| # Main event handlers | |
| validate_btn.click( | |
| start_validation, | |
| inputs=[json_input], | |
| outputs=[validation_status, question_display, question_section] | |
| ) | |
| get_results_btn.click( | |
| get_results, | |
| outputs=[results_display, download_record, download_tracking, download_full] | |
| ) | |
| reset_btn.click( | |
| reset_session, | |
| outputs=[reset_status, validation_status, question_display, question_section] | |
| ) | |
| # Initialize API status | |
| demo.load( | |
| lambda: update_api_endpoint("AWS Deployment"), | |
| outputs=[api_status] | |
| ) | |
| return demo | |
| # Launch the app | |
| if __name__ == "__main__": | |
| demo = create_interface() | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False | |
| ) |