Spaces:
Sleeping
Sleeping
| # response_formatter.py | |
| from typing import Dict, List, Any, Tuple, Callable | |
| import re | |
| class ResponseFormatter: | |
| # Class-level handler map for extensibility | |
| _handler_map = {} | |
| def format_healthcare_response(scenario_text: str, analysis_results: Dict[str, Any]) -> str: | |
| """Format healthcare analysis response based on scenario tasks""" | |
| # Extract tasks from scenario | |
| tasks = ResponseFormatter._extract_scenario_tasks(scenario_text) | |
| response = "# Healthcare Scenario Analysis\n\n" | |
| # Executive Summary | |
| response += "## Executive Summary\n\n" | |
| response += ResponseFormatter._generate_executive_summary(analysis_results) | |
| response += "\n\n" | |
| # Process each task | |
| for task in tasks: | |
| section_content = ResponseFormatter._process_task(task, analysis_results) | |
| if section_content: | |
| response += section_content + "\n\n" | |
| # Validation Results | |
| if "validation" in analysis_results: | |
| response += "## Analysis Validation\n\n" | |
| response += ResponseFormatter._format_validation_results(analysis_results["validation"]) | |
| response += "\n\n" | |
| # Provenance | |
| response += "## Provenance\n\n" | |
| response += "This analysis is based on:\n" | |
| response += "- Scenario description provided by the user\n" | |
| response += "- Uploaded data files\n" | |
| response += "- Calculations performed on the provided data\n" | |
| return response | |
| def _extract_scenario_tasks(scenario_text: str) -> List[Tuple[str, str]]: | |
| """Extract specific tasks from scenario text and return as (section_header, task_description) tuples""" | |
| tasks = [] | |
| lines = scenario_text.split('\n') | |
| in_tasks = False | |
| current_task = "" | |
| for line in lines: | |
| line = line.strip() | |
| # Check if we're entering the tasks section | |
| if line.lower().startswith('tasks'): | |
| in_tasks = True | |
| continue | |
| # Check if we're leaving the tasks section | |
| if in_tasks and (line.lower().startswith('operational recommendations') or | |
| line.lower().startswith('future integration') or | |
| line.lower().startswith('prepare your findings') or | |
| line.lower().startswith('conclusions') or | |
| line.lower().startswith('next steps')): | |
| in_tasks = False | |
| if current_task: | |
| section_header = ResponseFormatter._get_section_header(current_task) | |
| tasks.append((section_header, current_task)) | |
| continue | |
| # Collect task content | |
| if in_tasks: | |
| # Check if this is a new task (starts with number or bullet) | |
| if re.match(r'^\d+\.', line) or line.startswith(('•', '-', '*')): | |
| if current_task: | |
| section_header = ResponseFormatter._get_section_header(current_task) | |
| tasks.append((section_header, current_task)) | |
| current_task = line | |
| elif line: | |
| current_task += " " + line | |
| # Add the last task if exists | |
| if current_task: | |
| section_header = ResponseFormatter._get_section_header(current_task) | |
| tasks.append((section_header, current_task)) | |
| return tasks | |
| def _get_section_header(task_description: str) -> str: | |
| """Map task description to a clean section header with comprehensive keyword matching""" | |
| task_lower = task_description.lower() | |
| # Data Preparation patterns | |
| if any(keyword in task_lower for keyword in [ | |
| "load the data", "data preparation", "data loading", "data import", | |
| "data processing", "data cleaning", "data filtering" | |
| ]): | |
| return "Data Preparation" | |
| # Capacity Analysis patterns | |
| elif any(keyword in task_lower for keyword in [ | |
| "bed capacity", "bed utilization", "bed availability", "bed occupancy", | |
| "capacity analysis", "resource capacity", "service capacity" | |
| ]): | |
| return "Capacity Analysis" | |
| # Facility Decline patterns | |
| elif any(keyword in task_lower for keyword in [ | |
| "facilities with the greatest declines", "facility decline", | |
| "bed reduction", "capacity reduction", "facility closure" | |
| ]): | |
| return "Facilities with Greatest Declines" | |
| # Long-Term Care patterns | |
| elif any(keyword in task_lower for keyword in [ | |
| "assess long-term care", "long-term care capacity", "ltc assessment", | |
| "nursing home capacity", "residential care capacity", "care facility assessment" | |
| ]): | |
| return "Long-Term Care Capacity Assessment" | |
| # Recommendations patterns | |
| elif any(keyword in task_lower for keyword in [ | |
| "recommend actions", "operational recommendations", "recommendations", | |
| "action plan", "mitigation strategies", "improvement strategies" | |
| ]): | |
| return "Operational Recommendations" | |
| # Integration patterns | |
| elif any(keyword in task_lower for keyword in [ | |
| "future integration", "augmented ai", "decision support", "ai integration", | |
| "system integration", "data integration", "predictive analytics" | |
| ]): | |
| return "Future Integration Opportunities" | |
| # Patient Satisfaction patterns | |
| elif any(keyword in task_lower for keyword in [ | |
| "patient satisfaction", "patient experience", "patient feedback", | |
| "satisfaction analysis", "experience analysis" | |
| ]): | |
| return "Patient Satisfaction Analysis" | |
| # Staff Utilization patterns | |
| elif any(keyword in task_lower for keyword in [ | |
| "staff utilization", "workforce analysis", "staffing analysis", | |
| "human resources", "staff productivity", "staff allocation" | |
| ]): | |
| return "Staff Utilization Analysis" | |
| # Equipment patterns | |
| elif any(keyword in task_lower for keyword in [ | |
| "equipment inventory", "medical equipment", "device utilization", | |
| "equipment assessment", "resource inventory" | |
| ]): | |
| return "Equipment Inventory Assessment" | |
| # Financial Analysis patterns | |
| elif any(keyword in task_lower for keyword in [ | |
| "financial analysis", "cost analysis", "budget analysis", | |
| "economic analysis", "resource costing" | |
| ]): | |
| return "Financial Analysis" | |
| # Quality Metrics patterns | |
| elif any(keyword in task_lower for keyword in [ | |
| "quality metrics", "performance metrics", "outcome measures", | |
| "quality indicators", "performance indicators" | |
| ]): | |
| return "Quality Metrics Analysis" | |
| # Geographic Distribution patterns | |
| elif any(keyword in task_lower for keyword in [ | |
| "geographic distribution", "spatial analysis", "location analysis", | |
| "regional distribution", "service distribution" | |
| ]): | |
| return "Geographic Distribution Analysis" | |
| # Access patterns | |
| elif any(keyword in task_lower for keyword in [ | |
| "access analysis", "accessibility", "service access", | |
| "barriers to care", "equity analysis" | |
| ]): | |
| return "Healthcare Access Analysis" | |
| # Wait Time patterns | |
| elif any(keyword in task_lower for keyword in [ | |
| "wait time", "waiting time", "delay analysis", | |
| "queue analysis", "timeliness" | |
| ]): | |
| return "Wait Time Analysis" | |
| # Population Health patterns | |
| elif any(keyword in task_lower for keyword in [ | |
| "population health", "epidemiology", "disease burden", | |
| "health status", "community health" | |
| ]): | |
| return "Population Health Analysis" | |
| # Emergency Services patterns | |
| elif any(keyword in task_lower for keyword in [ | |
| "emergency services", "er analysis", "emergency department", | |
| "urgent care", "emergency response" | |
| ]): | |
| return "Emergency Services Analysis" | |
| # Specialized Care patterns | |
| elif any(keyword in task_lower for keyword in [ | |
| "specialized care", "specialty services", "tertiary care", | |
| "specialized services", "advanced care" | |
| ]): | |
| return "Specialized Care Analysis" | |
| # Preventive Care patterns | |
| elif any(keyword in task_lower for keyword in [ | |
| "preventive care", "prevention", "screening", | |
| "wellness programs", "health promotion" | |
| ]): | |
| return "Preventive Care Analysis" | |
| # Telehealth patterns | |
| elif any(keyword in task_lower for keyword in [ | |
| "telehealth", "virtual care", "remote care", | |
| "digital health", "e-health" | |
| ]): | |
| return "Telehealth Analysis" | |
| # Research patterns | |
| elif any(keyword in task_lower for keyword in [ | |
| "research", "clinical trials", "innovation", | |
| "evidence-based practice", "research analysis" | |
| ]): | |
| return "Research and Innovation Analysis" | |
| else: | |
| # Extract the first sentence as a fallback | |
| first_sentence = task_description.split('.')[0] | |
| return first_sentence.strip() | |
| def _process_task(task_tuple: Tuple[str, str], analysis_results: Dict[str, Any]) -> str: | |
| """Process a single task and return formatted content""" | |
| section_header, task_description = task_tuple | |
| # Initialize default handler map | |
| default_handler_map = { | |
| "Data Preparation": ResponseFormatter._handle_data_preparation_task, | |
| "Capacity Analysis": ResponseFormatter._handle_bed_capacity_task, | |
| "Facilities with Greatest Declines": ResponseFormatter._handle_facility_declines_task, | |
| "Long-Term Care Capacity Assessment": ResponseFormatter._handle_long_term_care_task, | |
| "Operational Recommendations": ResponseFormatter._handle_recommendations_task, | |
| "Future Integration Opportunities": ResponseFormatter._handle_integration_task, | |
| "Patient Satisfaction Analysis": ResponseFormatter._handle_patient_satisfaction_task, | |
| "Staff Utilization Analysis": ResponseFormatter._handle_staff_utilization_task, | |
| "Equipment Inventory Assessment": ResponseFormatter._handle_equipment_task, | |
| "Financial Analysis": ResponseFormatter._handle_financial_task, | |
| "Quality Metrics Analysis": ResponseFormatter._handle_quality_metrics_task, | |
| "Geographic Distribution Analysis": ResponseFormatter._handle_geographic_distribution_task, | |
| "Healthcare Access Analysis": ResponseFormatter._handle_access_task, | |
| "Wait Time Analysis": ResponseFormatter._handle_wait_time_task, | |
| "Population Health Analysis": ResponseFormatter._handle_population_health_task, | |
| "Emergency Services Analysis": ResponseFormatter._handle_emergency_services_task, | |
| "Specialized Care Analysis": ResponseFormatter._handle_specialized_care_task, | |
| "Preventive Care Analysis": ResponseFormatter._handle_preventive_care_task, | |
| "Telehealth Analysis": ResponseFormatter._handle_telehealth_task, | |
| "Research and Innovation Analysis": ResponseFormatter._handle_research_task | |
| } | |
| # Combine default handlers with any dynamically registered handlers | |
| handler_map = {**default_handler_map, **ResponseFormatter._handler_map} | |
| if section_header in handler_map: | |
| return handler_map[section_header](task_description, analysis_results) | |
| else: | |
| # Use generic handler for unknown task types | |
| return ResponseFormatter._handle_generic_task(section_header, task_description, analysis_results) | |
| def register_handler(section_header: str, handler_function: Callable): | |
| """Register a new handler for a specific section header""" | |
| ResponseFormatter._handler_map[section_header] = handler_function | |
| def _handle_generic_task(section_header: str, task_description: str, analysis_results: Dict[str, Any]) -> str: | |
| """Handle generic tasks that don't match specific patterns""" | |
| response = f"## {section_header}\n\n" | |
| # Try to find relevant data in analysis_results | |
| relevant_data = {} | |
| # Look for data keys that might be related to the task | |
| task_keywords = section_header.lower().split() | |
| for key, value in analysis_results.items(): | |
| key_lower = key.lower() | |
| if any(keyword in key_lower for keyword in task_keywords): | |
| relevant_data[key] = value | |
| if relevant_data: | |
| for key, value in relevant_data.items(): | |
| if isinstance(value, dict) and value: | |
| response += f"### {key.replace('_', ' ').title()}\n\n" | |
| for subkey, subvalue in value.items(): | |
| if isinstance(subvalue, (int, float)): | |
| response += f"**{subkey.replace('_', ' ').title()}**: {subvalue:,}\n\n" | |
| elif isinstance(subvalue, str): | |
| response += f"**{subkey.replace('_', ' ').title()}**: {subvalue}\n\n" | |
| elif isinstance(subvalue, list): | |
| response += f"**{subkey.replace('_', ' ').title()}**:\n\n" | |
| for item in subvalue[:5]: # Show first 5 items | |
| if isinstance(item, dict): | |
| for item_key, item_value in item.items(): | |
| response += f"- {item_key.replace('_', ' ').title()}: {item_value}\n" | |
| else: | |
| response += f"- {item}\n" | |
| response += "\n" | |
| elif isinstance(value, (int, float)): | |
| response += f"**{key.replace('_', ' ').title()}**: {value:,}\n\n" | |
| elif isinstance(value, str): | |
| response += f"**{key.replace('_', ' ').title()}**: {value}\n\n" | |
| else: | |
| response += "No specific analysis results available for this task.\n\n" | |
| response += "This task requires specialized analysis that may not be available in the current dataset.\n" | |
| return response | |
| def _handle_data_preparation_task(task: str, analysis_results: Dict[str, Any]) -> str: | |
| """Handle data preparation task""" | |
| response = "## Data Preparation\n\n" | |
| if "data_preparation" not in analysis_results: | |
| return response + "No data preparation results available" | |
| data_prep = analysis_results["data_preparation"] | |
| # Facility type frequency | |
| if "facility_type_frequency" in data_prep: | |
| response += "### Facility Type Frequency\n\n" | |
| response += "| Facility Type | Count |\n" | |
| response += "|---------------|-------|\n" | |
| for ftype, count in data_prep["facility_type_frequency"].items(): | |
| response += f"| {ftype} | {count} |\n" | |
| response += "\n" | |
| # Top cities with facility breakdown | |
| if "top_cities" in data_prep and "city_facility_breakdown" in data_prep: | |
| response += "### Top Cities by Facility Count\n\n" | |
| response += "| City | Hospitals | Nursing/Residential | Ambulatory | Total |\n" | |
| response += "|------|-----------|-------------------|------------|-------|\n" | |
| for city in data_prep["top_cities"]: | |
| if city in data_prep["city_facility_breakdown"]: | |
| breakdown = data_prep["city_facility_breakdown"][city] | |
| hospitals = breakdown.get("Hospitals", 0) | |
| nursing = breakdown.get("Nursing and residential care facilities", 0) | |
| ambulatory = breakdown.get("Ambulatory health care services", 0) | |
| total = hospitals + nursing + ambulatory | |
| response += f"| {city} | {hospitals} | {nursing} | {ambulatory} | {total} |\n" | |
| response += "\n" | |
| return response | |
| def _handle_bed_capacity_task(task: str, analysis_results: Dict[str, Any]) -> str: | |
| """Handle bed capacity analysis task""" | |
| response = "## Capacity Analysis\n\n" | |
| if "capacity_analysis" not in analysis_results: | |
| return response + "No capacity analysis results available" | |
| capacity = analysis_results["capacity_analysis"] | |
| # Bed capacity by zone | |
| if "bed_capacity_by_zone" in capacity: | |
| response += "### Bed Capacity by Zone\n\n" | |
| response += "| Zone | Current Beds | Previous Beds | Absolute Change | Percentage Change |\n" | |
| response += "|------|--------------|---------------|-----------------|-------------------|\n" | |
| for zone, data in capacity["bed_capacity_by_zone"].items(): | |
| current = data.get("beds_current", 0) | |
| previous = data.get("beds_prev", 0) | |
| abs_change = current - previous | |
| perc_change = (abs_change / previous * 100) if previous > 0 else 0 | |
| response += f"| {zone} | {current:,} | {previous:,} | {abs_change:+,} | {perc_change:+.1f}% |\n" | |
| response += "\n" | |
| # Zones with largest decreases | |
| if "zone_with_largest_absolute_decrease" in capacity: | |
| response += f"**Zone with Largest Absolute Decrease**: {capacity['zone_with_largest_absolute_decrease']}\n\n" | |
| if "zone_with_largest_percentage_decrease" in capacity: | |
| response += f"**Zone with Largest Percentage Decrease**: {capacity['zone_with_largest_percentage_decrease']}\n\n" | |
| return response | |
| def _handle_facility_declines_task(task: str, analysis_results: Dict[str, Any]) -> str: | |
| """Handle facilities with greatest declines task""" | |
| response = "## Facilities with Greatest Declines\n\n" | |
| if "capacity_analysis" not in analysis_results or "facilities_with_declines" not in analysis_results["capacity_analysis"]: | |
| return response + "No facility decline data available" | |
| facilities = analysis_results["capacity_analysis"]["facilities_with_declines"] | |
| response += "| Facility Name | Zone | Teaching Status | Bed Change |\n" | |
| response += "|---------------|------|----------------|------------|\n" | |
| for facility in facilities[:5]: # Top 5 | |
| name = facility.get("facility_name", "") | |
| zone = facility.get("zone", "") | |
| teaching = facility.get("teaching_status", "") | |
| change = facility.get("bed_change", 0) | |
| response += f"| {name} | {zone} | {teaching} | {change:+,} |\n" | |
| return response | |
| def _handle_long_term_care_task(task: str, analysis_results: Dict[str, Any]) -> str: | |
| """Handle long-term care capacity assessment task""" | |
| response = "## Long-Term Care Capacity Assessment\n\n" | |
| if "long_term_care_assessment" not in analysis_results: | |
| return response + "No long-term care assessment results available" | |
| ltc = analysis_results["long_term_care_assessment"] | |
| # Zone with largest percentage decrease | |
| if "zone_with_largest_percentage_decrease" in ltc: | |
| response += f"**Zone with Largest Percentage Decrease**: {ltc['zone_with_largest_percentage_decrease']}\n\n" | |
| # Major city in that zone | |
| if "major_city_in_zone" in ltc: | |
| response += f"**Major City in Zone**: {ltc['major_city_in_zone']}\n\n" | |
| # Facility counts in major city | |
| if "facility_counts" in ltc: | |
| counts = ltc["facility_counts"] | |
| response += "### Facility Counts in Major City\n\n" | |
| response += f"- Hospitals: {counts.get('hospitals', 0)}\n" | |
| response += f"- Nursing/Residential Care Facilities: {counts.get('nursing_residential_care', 0)}\n" | |
| response += f"- Ambulatory Health Services: {counts.get('ambulatory', 0)}\n\n" | |
| # Nursing to hospital ratio | |
| if "nursing_to_hospital_ratio" in ltc: | |
| ratio = ltc["nursing_to_hospital_ratio"] | |
| response += f"**Nursing/Residential to Hospital Ratio**: {ratio:.2f}\n\n" | |
| # Capacity assessment | |
| if "capacity_assessment" in ltc: | |
| assessment = ltc["capacity_assessment"] | |
| response += f"**Long-Term Care Capacity Assessment**: {assessment.upper()}\n\n" | |
| if assessment == "insufficient": | |
| response += "The ratio of nursing/residential care facilities to hospitals is below the recommended threshold of 1.5. " | |
| response += "This indicates insufficient long-term care capacity to support the healthcare system in this area.\n\n" | |
| else: | |
| response += "The ratio of nursing/residential care facilities to hospitals meets or exceeds the recommended threshold. " | |
| response += "This indicates sufficient long-term care capacity.\n\n" | |
| return response | |
| def _handle_recommendations_task(task: str, analysis_results: Dict[str, Any]) -> str: | |
| """Handle operational recommendations task""" | |
| response = "## Operational Recommendations\n\n" | |
| if "recommendations" not in analysis_results or not analysis_results["recommendations"]: | |
| return response + "No recommendations available" | |
| recommendations = analysis_results["recommendations"] | |
| # Group by priority | |
| high_priority = [r for r in recommendations if r.get("priority") == "High"] | |
| medium_priority = [r for r in recommendations if r.get("priority") == "Medium"] | |
| low_priority = [r for r in recommendations if r.get("priority") == "Low"] | |
| if high_priority: | |
| response += "### High Priority Recommendations\n\n" | |
| for i, rec in enumerate(high_priority, 1): | |
| response += f"**{i}. {rec.get('title', 'Recommendation')}**\n\n" | |
| response += f"{rec.get('description', '')}\n\n" | |
| if "justification" in rec: | |
| response += f"**Justification**: {rec['justification']}\n\n" | |
| if "expected_impact" in rec: | |
| response += f"**Expected Impact**: {rec['expected_impact']}\n\n" | |
| response += "---\n\n" | |
| if medium_priority: | |
| response += "### Medium Priority Recommendations\n\n" | |
| for i, rec in enumerate(medium_priority, 1): | |
| response += f"**{i}. {rec.get('title', 'Recommendation')}**\n\n" | |
| response += f"{rec.get('description', '')}\n\n" | |
| if "justification" in rec: | |
| response += f"**Justification**: {rec['justification']}\n\n" | |
| if "expected_impact" in rec: | |
| response += f"**Expected Impact**: {rec['expected_impact']}\n\n" | |
| response += "---\n\n" | |
| if low_priority: | |
| response += "### Low Priority Recommendations\n\n" | |
| for i, rec in enumerate(low_priority, 1): | |
| response += f"**{i}. {rec.get('title', 'Recommendation')}**\n\n" | |
| response += f"{rec.get('description', '')}\n\n" | |
| if "justification" in rec: | |
| response += f"**Justification**: {rec['justification']}\n\n" | |
| if "expected_impact" in rec: | |
| response += f"**Expected Impact**: {rec['expected_impact']}\n\n" | |
| response += "---\n\n" | |
| return response | |
| def _handle_integration_task(task: str, analysis_results: Dict[str, Any]) -> str: | |
| """Handle future integration opportunities task""" | |
| response = "## Future Integration Opportunities\n\n" | |
| if "future_integration" not in analysis_results: | |
| return response + "No integration opportunities identified" | |
| integration = analysis_results["future_integration"] | |
| # Summary | |
| if "summary" in integration: | |
| response += f"{integration['summary']}\n\n" | |
| # Opportunities | |
| if "opportunities" in integration: | |
| for i, opp in enumerate(integration["opportunities"], 1): | |
| response += f"**{i}. {opp.get('title', 'Opportunity')}**\n\n" | |
| response += f"{opp.get('description', '')}\n\n" | |
| if "benefits" in opp: | |
| response += "**Benefits:**\n" | |
| for benefit in opp["benefits"]: | |
| response += f"- {benefit}\n" | |
| response += "\n" | |
| if "challenges" in opp: | |
| response += "**Challenges:**\n" | |
| for challenge in opp["challenges"]: | |
| response += f"- {challenge}\n" | |
| response += "\n" | |
| if "example_metric_or_model" in opp: | |
| response += f"**Example Metric/Model**: {opp['example_metric_or_model']}\n\n" | |
| if "timeline" in opp: | |
| response += f"**Timeline**: {opp['timeline']}\n\n" | |
| response += "---\n\n" | |
| return response | |
| def _handle_patient_satisfaction_task(task: str, analysis_results: Dict[str, Any]) -> str: | |
| """Handle patient satisfaction analysis task""" | |
| response = "## Patient Satisfaction Analysis\n\n" | |
| if "patient_satisfaction" not in analysis_results: | |
| return response + "No patient satisfaction data available" | |
| satisfaction = analysis_results["patient_satisfaction"] | |
| # Overall satisfaction | |
| if "overall_satisfaction" in satisfaction: | |
| response += f"**Overall Patient Satisfaction**: {satisfaction['overall_satisfaction']:.1f}/5.0\n\n" | |
| # Satisfaction by category | |
| if "satisfaction_by_category" in satisfaction: | |
| response += "### Satisfaction by Category\n\n" | |
| response += "| Category | Score |\n" | |
| response += "|----------|-------|\n" | |
| for category, score in satisfaction["satisfaction_by_category"].items(): | |
| response += f"| {category} | {score:.1f}/5.0 |\n" | |
| response += "\n" | |
| # Trends | |
| if "satisfaction_trends" in satisfaction: | |
| response += "### Satisfaction Trends\n\n" | |
| response += "| Period | Satisfaction Score |\n" | |
| response += "|--------|-------------------|\n" | |
| for period, score in satisfaction["satisfaction_trends"].items(): | |
| response += f"| {period} | {score:.1f}/5.0 |\n" | |
| response += "\n" | |
| return response | |
| def _handle_staff_utilization_task(task: str, analysis_results: Dict[str, Any]) -> str: | |
| """Handle staff utilization analysis task""" | |
| response = "## Staff Utilization Analysis\n\n" | |
| if "staff_utilization" not in analysis_results: | |
| return response + "No staff utilization data available" | |
| staff = analysis_results["staff_utilization"] | |
| # Overall utilization | |
| if "overall_utilization" in staff: | |
| response += f"**Overall Staff Utilization**: {staff['overall_utilization']:.1%}\n\n" | |
| # Utilization by role | |
| if "utilization_by_role" in staff: | |
| response += "### Utilization by Role\n\n" | |
| response += "| Role | Utilization Rate |\n" | |
| response += "|------|------------------|\n" | |
| for role, rate in staff["utilization_by_role"].items(): | |
| response += f"| {role} | {rate:.1%} |\n" | |
| response += "\n" | |
| # Staffing levels | |
| if "staffing_levels" in staff: | |
| response += "### Staffing Levels\n\n" | |
| response += "| Role | Current Staff | Recommended Staff | Gap |\n" | |
| response += "|------|---------------|-------------------|-----|\n" | |
| for role, data in staff["staffing_levels"].items(): | |
| current = data.get("current", 0) | |
| recommended = data.get("recommended", 0) | |
| gap = current - recommended | |
| response += f"| {role} | {current} | {recommended} | {gap:+} |\n" | |
| response += "\n" | |
| return response | |
| def _handle_equipment_task(task: str, analysis_results: Dict[str, Any]) -> str: | |
| """Handle equipment inventory assessment task""" | |
| response = "## Equipment Inventory Assessment\n\n" | |
| if "equipment_inventory" not in analysis_results: | |
| return response + "No equipment inventory data available" | |
| equipment = analysis_results["equipment_inventory"] | |
| # Total equipment value | |
| if "total_value" in equipment: | |
| response += f"**Total Equipment Value**: ${equipment['total_value']:,.2f}\n\n" | |
| # Equipment by category | |
| if "equipment_by_category" in equipment: | |
| response += "### Equipment by Category\n\n" | |
| response += "| Category | Count | Total Value |\n" | |
| response += "|----------|-------|-------------|\n" | |
| for category, data in equipment["equipment_by_category"].items(): | |
| count = data.get("count", 0) | |
| value = data.get("value", 0) | |
| response += f"| {category} | {count} | ${value:,.2f} |\n" | |
| response += "\n" | |
| # Utilization rates | |
| if "utilization_rates" in equipment: | |
| response += "### Equipment Utilization Rates\n\n" | |
| response += "| Equipment Type | Utilization Rate |\n" | |
| response += "|----------------|------------------|\n" | |
| for eq_type, rate in equipment["utilization_rates"].items(): | |
| response += f"| {eq_type} | {rate:.1%} |\n" | |
| response += "\n" | |
| return response | |
| def _handle_financial_task(task: str, analysis_results: Dict[str, Any]) -> str: | |
| """Handle financial analysis task""" | |
| response = "## Financial Analysis\n\n" | |
| if "financial_analysis" not in analysis_results: | |
| return response + "No financial analysis data available" | |
| financial = analysis_results["financial_analysis"] | |
| # Total budget | |
| if "total_budget" in financial: | |
| response += f"**Total Budget**: ${financial['total_budget']:,.2f}\n\n" | |
| # Expenditure by category | |
| if "expenditure_by_category" in financial: | |
| response += "### Expenditure by Category\n\n" | |
| response += "| Category | Amount | Percentage |\n" | |
| response += "|----------|--------|------------|\n" | |
| for category, data in financial["expenditure_by_category"].items(): | |
| amount = data.get("amount", 0) | |
| percentage = data.get("percentage", 0) | |
| response += f"| {category} | ${amount:,.2f} | {percentage:.1f}% |\n" | |
| response += "\n" | |
| # Cost per patient | |
| if "cost_per_patient" in financial: | |
| response += f"**Average Cost per Patient**: ${financial['cost_per_patient']:,.2f}\n\n" | |
| # Financial trends | |
| if "financial_trends" in financial: | |
| response += "### Financial Trends\n\n" | |
| response += "| Period | Revenue | Expenditure | Surplus/Deficit |\n" | |
| response += "|--------|---------|-------------|-----------------|\n" | |
| for period, data in financial["financial_trends"].items(): | |
| revenue = data.get("revenue", 0) | |
| expenditure = data.get("expenditure", 0) | |
| surplus = revenue - expenditure | |
| response += f"| {period} | ${revenue:,.2f} | ${expenditure:,.2f} | ${surplus:+,.2f} |\n" | |
| response += "\n" | |
| return response | |
| def _handle_quality_metrics_task(task: str, analysis_results: Dict[str, Any]) -> str: | |
| """Handle quality metrics analysis task""" | |
| response = "## Quality Metrics Analysis\n\n" | |
| if "quality_metrics" not in analysis_results: | |
| return response + "No quality metrics data available" | |
| quality = analysis_results["quality_metrics"] | |
| # Overall quality score | |
| if "overall_quality_score" in quality: | |
| response += f"**Overall Quality Score**: {quality['overall_quality_score']:.1f}/100\n\n" | |
| # Quality indicators | |
| if "quality_indicators" in quality: | |
| response += "### Quality Indicators\n\n" | |
| response += "| Indicator | Current Value | Target | Status |\n" | |
| response += "|-----------|---------------|--------|--------|\n" | |
| for indicator, data in quality["quality_indicators"].items(): | |
| current = data.get("current", 0) | |
| target = data.get("target", 0) | |
| status = "✅ Met" if current >= target else "❌ Not Met" | |
| response += f"| {indicator} | {current:.1f} | {target:.1f} | {status} |\n" | |
| response += "\n" | |
| # Trends | |
| if "quality_trends" in quality: | |
| response += "### Quality Trends\n\n" | |
| response += "| Period | Quality Score |\n" | |
| response += "|--------|---------------|\n" | |
| for period, score in quality["quality_trends"].items(): | |
| response += f"| {period} | {score:.1f}/100 |\n" | |
| response += "\n" | |
| return response | |
| def _handle_geographic_distribution_task(task: str, analysis_results: Dict[str, Any]) -> str: | |
| """Handle geographic distribution analysis task""" | |
| response = "## Geographic Distribution Analysis\n\n" | |
| if "geographic_distribution" not in analysis_results: | |
| return response + "No geographic distribution data available" | |
| geo = analysis_results["geographic_distribution"] | |
| # Total facilities | |
| if "total_facilities" in geo: | |
| response += f"**Total Facilities**: {geo['total_facilities']:,}\n\n" | |
| # Distribution by region | |
| if "distribution_by_region" in geo: | |
| response += "### Distribution by Region\n\n" | |
| response += "| Region | Facility Count | Population Served | Facilities per 100K |\n" | |
| response += "|--------|----------------|-------------------|-------------------|\n" | |
| for region, data in geo["distribution_by_region"].items(): | |
| count = data.get("facility_count", 0) | |
| population = data.get("population_served", 0) | |
| per_100k = (count / population * 100000) if population > 0 else 0 | |
| response += f"| {region} | {count:,} | {population:,} | {per_100k:.1f} |\n" | |
| response += "\n" | |
| # Access inequality | |
| if "gini_coefficient" in geo: | |
| gini = geo["gini_coefficient"] | |
| level = "High" if gini > 0.4 else "Moderate" if gini > 0.2 else "Low" | |
| response += f"**Access Inequality**: {level} (Gini coefficient: {gini:.2f})\n\n" | |
| return response | |
| def _handle_access_task(task: str, analysis_results: Dict[str, Any]) -> str: | |
| """Handle healthcare access analysis task""" | |
| response = "## Healthcare Access Analysis\n\n" | |
| if "healthcare_access" not in analysis_results: | |
| return response + "No healthcare access data available" | |
| access = analysis_results["healthcare_access"] | |
| # Overall access score | |
| if "overall_access_score" in access: | |
| response += f"**Overall Access Score**: {access['overall_access_score']:.1f}/100\n\n" | |
| # Access by service type | |
| if "access_by_service" in access: | |
| response += "### Access by Service Type\n\n" | |
| response += "| Service Type | Access Score | Wait Time (Days) | Availability |\n" | |
| response += "|--------------|-------------|-----------------|-------------|\n" | |
| for service, data in access["access_by_service"].items(): | |
| score = data.get("access_score", 0) | |
| wait_time = data.get("wait_time_days", 0) | |
| availability = data.get("availability", "N/A") | |
| response += f"| {service} | {score:.1f}/100 | {wait_time:.1f} | {availability} |\n" | |
| response += "\n" | |
| # Barriers to care | |
| if "barriers_to_care" in access: | |
| response += "### Top Barriers to Care\n\n" | |
| for i, barrier in enumerate(access["barriers_to_care"][:5], 1): | |
| response += f"{i}. {barrier}\n" | |
| response += "\n" | |
| return response | |
| def _handle_wait_time_task(task: str, analysis_results: Dict[str, Any]) -> str: | |
| """Handle wait time analysis task""" | |
| response = "## Wait Time Analysis\n\n" | |
| if "wait_time_analysis" not in analysis_results: | |
| return response + "No wait time data available" | |
| wait_time = analysis_results["wait_time_analysis"] | |
| # Average wait time | |
| if "average_wait_time" in wait_time: | |
| response += f"**Average Wait Time**: {wait_time['average_wait_time']:.1f} days\n\n" | |
| # Wait time by service | |
| if "wait_time_by_service" in wait_time: | |
| response += "### Wait Time by Service\n\n" | |
| response += "| Service | Average Wait (Days) | Target Wait (Days) | Status |\n" | |
| response += "|---------|-------------------|-------------------|--------|\n" | |
| for service, data in wait_time["wait_time_by_service"].items(): | |
| current = data.get("current_wait", 0) | |
| target = data.get("target_wait", 0) | |
| status = "✅ Met" if current <= target else "❌ Exceeded" | |
| response += f"| {service} | {current:.1f} | {target:.1f} | {status} |\n" | |
| response += "\n" | |
| # Trends | |
| if "wait_time_trends" in wait_time: | |
| response += "### Wait Time Trends\n\n" | |
| response += "| Period | Average Wait (Days) |\n" | |
| response += "|--------|-------------------|\n" | |
| for period, wait in wait_time["wait_time_trends"].items(): | |
| response += f"| {period} | {wait:.1f} |\n" | |
| response += "\n" | |
| return response | |
| def _handle_population_health_task(task: str, analysis_results: Dict[str, Any]) -> str: | |
| """Handle population health analysis task""" | |
| response = "## Population Health Analysis\n\n" | |
| if "population_health" not in analysis_results: | |
| return response + "No population health data available" | |
| health = analysis_results["population_health"] | |
| # Total population | |
| if "total_population" in health: | |
| response += f"**Total Population**: {health['total_population']:,}\n\n" | |
| # Health indicators | |
| if "health_indicators" in health: | |
| response += "### Key Health Indicators\n\n" | |
| response += "| Indicator | Value | Target |\n" | |
| response += "|-----------|-------|--------|\n" | |
| for indicator, data in health["health_indicators"].items(): | |
| value = data.get("value", 0) | |
| target = data.get("target", 0) | |
| response += f"| {indicator} | {value:.1f} | {target:.1f} |\n" | |
| response += "\n" | |
| # Disease burden | |
| if "disease_burden" in health: | |
| response += "### Top Causes of Disease Burden\n\n" | |
| response += "| Disease | DALYs | Percentage |\n" | |
| response += "|---------|-------|------------|\n" | |
| for disease, data in health["disease_burden"].items(): | |
| dalys = data.get("dalys", 0) | |
| percentage = data.get("percentage", 0) | |
| response += f"| {disease} | {dalys:,} | {percentage:.1f}% |\n" | |
| response += "\n" | |
| return response | |
| def _handle_emergency_services_task(task: str, analysis_results: Dict[str, Any]) -> str: | |
| """Handle emergency services analysis task""" | |
| response = "## Emergency Services Analysis\n\n" | |
| if "emergency_services" not in analysis_results: | |
| return response + "No emergency services data available" | |
| emergency = analysis_results["emergency_services"] | |
| # Total ER visits | |
| if "total_er_visits" in emergency: | |
| response += f"**Total ER Visits**: {emergency['total_er_visits']:,}\n\n" | |
| # ER utilization | |
| if "er_utilization" in emergency: | |
| response += "### ER Utilization Metrics\n\n" | |
| response += "| Metric | Value |\n" | |
| response += "|--------|-------|\n" | |
| for metric, value in emergency["er_utilization"].items(): | |
| if isinstance(value, float): | |
| response += f"| {metric.replace('_', ' ').title()} | {value:.1f} |\n" | |
| else: | |
| response += f"| {metric.replace('_', ' ').title()} | {value} |\n" | |
| response += "\n" | |
| # Wait times | |
| if "er_wait_times" in emergency: | |
| response += "### ER Wait Times\n\n" | |
| response += "| Triage Level | Average Wait (Hours) | Target Wait (Hours) |\n" | |
| response += "|--------------|---------------------|---------------------|\n" | |
| for level, data in emergency["er_wait_times"].items(): | |
| current = data.get("current_wait", 0) | |
| target = data.get("target_wait", 0) | |
| response += f"| {level} | {current:.1f} | {target:.1f} |\n" | |
| response += "\n" | |
| return response | |
| def _handle_specialized_care_task(task: str, analysis_results: Dict[str, Any]) -> str: | |
| """Handle specialized care analysis task""" | |
| response = "## Specialized Care Analysis\n\n" | |
| if "specialized_care" not in analysis_results: | |
| return response + "No specialized care data available" | |
| specialized = analysis_results["specialized_care"] | |
| # Total specialized facilities | |
| if "total_specialized_facilities" in specialized: | |
| response += f"**Total Specialized Facilities**: {specialized['total_specialized_facilities']:,}\n\n" | |
| # Services by specialty | |
| if "services_by_specialty" in specialized: | |
| response += "### Services by Specialty\n\n" | |
| response += "| Specialty | Facility Count | Patient Volume | Wait Time (Days) |\n" | |
| response += "|-----------|----------------|----------------|-----------------|\n" | |
| for specialty, data in specialized["services_by_specialty"].items(): | |
| count = data.get("facility_count", 0) | |
| volume = data.get("patient_volume", 0) | |
| wait_time = data.get("wait_time_days", 0) | |
| response += f"| {specialty} | {count} | {volume:,} | {wait_time:.1f} |\n" | |
| response += "\n" | |
| # Access metrics | |
| if "access_metrics" in specialized: | |
| response += "### Access Metrics\n\n" | |
| response += "| Metric | Value |\n" | |
| response += "|--------|-------|\n" | |
| for metric, value in specialized["access_metrics"].items(): | |
| if isinstance(value, float): | |
| response += f"| {metric.replace('_', ' ').title()} | {value:.1f} |\n" | |
| else: | |
| response += f"| {metric.replace('_', ' ').title()} | {value} |\n" | |
| response += "\n" | |
| return response | |
| def _handle_preventive_care_task(task: str, analysis_results: Dict[str, Any]) -> str: | |
| """Handle preventive care analysis task""" | |
| response = "## Preventive Care Analysis\n\n" | |
| if "preventive_care" not in analysis_results: | |
| return response + "No preventive care data available" | |
| preventive = analysis_results["preventive_care"] | |
| # Screening coverage | |
| if "screening_coverage" in preventive: | |
| response += "### Screening Coverage Rates\n\n" | |
| response += "| Screening Type | Coverage Rate | Target |\n" | |
| response += "|----------------|---------------|--------|\n" | |
| for screening, data in preventive["screening_coverage"].items(): | |
| current = data.get("current_rate", 0) | |
| target = data.get("target_rate", 0) | |
| response += f"| {screening} | {current:.1%} | {target:.1%} |\n" | |
| response += "\n" | |
| # Vaccination rates | |
| if "vaccination_rates" in preventive: | |
| response += "### Vaccination Rates\n\n" | |
| response += "| Vaccine | Coverage Rate | Herd Immunity Threshold |\n" | |
| response += "|---------|---------------|------------------------|\n" | |
| for vaccine, data in preventive["vaccination_rates"].items(): | |
| coverage = data.get("coverage_rate", 0) | |
| threshold = data.get("herd_immunity_threshold", 0) | |
| status = "✅ Achieved" if coverage >= threshold else "❌ Not Achieved" | |
| response += f"| {vaccine} | {coverage:.1%} | {threshold:.1%} | {status} |\n" | |
| response += "\n" | |
| # Wellness programs | |
| if "wellness_programs" in preventive: | |
| response += "### Wellness Program Participation\n\n" | |
| response += "| Program | Participants | Completion Rate |\n" | |
| response += "|---------|-------------|-----------------|\n" | |
| for program, data in preventive["wellness_programs"].items(): | |
| participants = data.get("participants", 0) | |
| completion = data.get("completion_rate", 0) | |
| response += f"| {program} | {participants:,} | {completion:.1%} |\n" | |
| response += "\n" | |
| return response | |
| def _handle_telehealth_task(task: str, analysis_results: Dict[str, Any]) -> str: | |
| """Handle telehealth analysis task""" | |
| response = "## Telehealth Analysis\n\n" | |
| if "telehealth" not in analysis_results: | |
| return response + "No telehealth data available" | |
| telehealth = analysis_results["telehealth"] | |
| # Total virtual visits | |
| if "total_virtual_visits" in telehealth: | |
| response += f"**Total Virtual Visits**: {telehealth['total_virtual_visits']:,}\n\n" | |
| # Adoption metrics | |
| if "adoption_metrics" in telehealth: | |
| response += "### Telehealth Adoption Metrics\n\n" | |
| response += "| Metric | Value |\n" | |
| response += "|--------|-------|\n" | |
| for metric, value in telehealth["adoption_metrics"].items(): | |
| if isinstance(value, float): | |
| response += f"| {metric.replace('_', ' ').title()} | {value:.1f} |\n" | |
| else: | |
| response += f"| {metric.replace('_', ' ').title()} | {value} |\n" | |
| response += "\n" | |
| # Satisfaction | |
| if "patient_satisfaction" in telehealth: | |
| response += "### Patient Satisfaction with Telehealth\n\n" | |
| response += "| Aspect | Satisfaction Score |\n" | |
| response += "|--------|-------------------|\n" | |
| for aspect, score in telehealth["patient_satisfaction"].items(): | |
| response += f"| {aspect} | {score:.1f}/5.0 |\n" | |
| response += "\n" | |
| return response | |
| def _handle_research_task(task: str, analysis_results: Dict[str, Any]) -> str: | |
| """Handle research and innovation analysis task""" | |
| response = "## Research and Innovation Analysis\n\n" | |
| if "research_innovation" not in analysis_results: | |
| return response + "No research and innovation data available" | |
| research = analysis_results["research_innovation"] | |
| # Active trials | |
| if "active_clinical_trials" in research: | |
| response += f"**Active Clinical Trials**: {research['active_clinical_trials']:,}\n\n" | |
| # Research areas | |
| if "research_areas" in research: | |
| response += "### Research Focus Areas\n\n" | |
| response += "| Area | Number of Projects | Funding |\n" | |
| response += "|------|-------------------|---------|\n" | |
| for area, data in research["research_areas"].items(): | |
| projects = data.get("projects", 0) | |
| funding = data.get("funding", 0) | |
| response += f"| {area} | {projects} | ${funding:,.2f} |\n" | |
| response += "\n" | |
| # Innovation metrics | |
| if "innovation_metrics" in research: | |
| response += "### Innovation Metrics\n\n" | |
| response += "| Metric | Value |\n" | |
| response += "|--------|-------|\n" | |
| for metric, value in research["innovation_metrics"].items(): | |
| if isinstance(value, float): | |
| response += f"| {metric.replace('_', ' ').title()} | {value:.1f} |\n" | |
| else: | |
| response += f"| {metric.replace('_', ' ').title()} | {value} |\n" | |
| response += "\n" | |
| return response | |
| def _generate_executive_summary(results: Dict[str, Any]) -> str: | |
| """Generate executive summary based on analysis results""" | |
| summary = [] | |
| if "capacity_analysis" in results: | |
| capacity = results["capacity_analysis"] | |
| if "total_capacity" in capacity: | |
| summary.append(f"Total system capacity: {capacity['total_capacity']:,} beds") | |
| if "average_utilization" in capacity: | |
| summary.append(f"Average utilization: {capacity['average_utilization']:.1%}") | |
| if "facility_distribution" in results: | |
| dist = results["facility_distribution"] | |
| if "geographic_inequality" in dist: | |
| inequality = dist["geographic_inequality"] | |
| level = "High" if inequality > 0.4 else "Moderate" if inequality > 0.2 else "Low" | |
| summary.append(f"Geographic distribution inequality: {level} (Gini: {inequality:.2f})") | |
| if "recommendations" in results: | |
| high_priority = [r for r in results["recommendations"] if r.get("priority") == "High"] | |
| if high_priority: | |
| summary.append(f"{len(high_priority)} high-priority recommendations identified") | |
| if "validation" in results: | |
| completion_rate = results["validation"].get("completion_rate", 0) | |
| summary.append(f"Analysis completion rate: {completion_rate:.1%}") | |
| return " | ".join(summary) if summary else "No key metrics identified" | |
| def _format_validation_results(validation: Dict[str, Any]) -> str: | |
| """Format validation results""" | |
| response = f"**Analysis Completion Rate**: {validation['completion_rate']:.1%}\n\n" | |
| if validation["all_tasks_completed"]: | |
| response += "✅ All required tasks were completed successfully.\n\n" | |
| else: | |
| response += "⚠️ The following tasks were not completed:\n\n" | |
| for task in validation["missing_tasks"]: | |
| response += f"- {task}\n" | |
| response += "\n" | |
| return response |