Spaces:
Sleeping
Sleeping
| # response_formatter.py | |
| from typing import Dict, List, Any | |
| class ResponseFormatter: | |
| def format_healthcare_response(scenario_text: str, analysis_results: Dict[str, Any]) -> str: | |
| """Format healthcare analysis response with detailed structure""" | |
| response = "# Healthcare Scenario Analysis\n\n" | |
| # Executive Summary | |
| response += "## Executive Summary\n\n" | |
| response += ResponseFormatter._generate_executive_summary(analysis_results) | |
| response += "\n\n" | |
| # Data Preparation | |
| response += "## 1. Data Preparation\n\n" | |
| response += ResponseFormatter._generate_data_section(analysis_results) | |
| response += "\n\n" | |
| # Analysis Sections | |
| if "facility_distribution" in analysis_results: | |
| response += "## 2. Facility Distribution Analysis\n\n" | |
| response += ResponseFormatter._format_facility_distribution(analysis_results["facility_distribution"]) | |
| response += "\n\n" | |
| if "capacity_analysis" in analysis_results: | |
| response += "## 3. Capacity Analysis\n\n" | |
| response += ResponseFormatter._format_capacity_analysis(analysis_results["capacity_analysis"]) | |
| response += "\n\n" | |
| if "resource_allocation" in analysis_results: | |
| response += "## 4. Resource Allocation Analysis\n\n" | |
| response += ResponseFormatter._format_resource_allocation(analysis_results["resource_allocation"]) | |
| response += "\n\n" | |
| if "trends" in analysis_results: | |
| response += "## 5. Trend Analysis\n\n" | |
| response += ResponseFormatter._format_trends(analysis_results["trends"]) | |
| response += "\n\n" | |
| # Recommendations | |
| if "recommendations" in analysis_results: | |
| response += "## 6. Operational Recommendations\n\n" | |
| response += ResponseFormatter._format_recommendations(analysis_results["recommendations"]) | |
| response += "\n\n" | |
| # Future Integration | |
| if "future_integration" in analysis_results: | |
| response += "## 7. Future Integration Opportunities\n\n" | |
| response += ResponseFormatter._format_integration_opportunities(analysis_results["future_integration"]) | |
| response += "\n\n" | |
| # Provenance | |
| response += "## 8. Provenance\n\n" | |
| response += "This analysis is based on:\n" | |
| response += "- Scenario description provided by the user\n" | |
| response += "- Uploaded data files\n" | |
| response += "- Calculations performed on the provided data\n" | |
| return response | |
| def _generate_executive_summary(results: Dict[str, Any]) -> str: | |
| """Generate executive summary based on analysis results""" | |
| summary = [] | |
| if "capacity_analysis" in results: | |
| capacity = results["capacity_analysis"] | |
| if "total_capacity" in capacity: | |
| summary.append(f"Total system capacity: {capacity['total_capacity']:,} beds") | |
| if "average_utilization" in capacity: | |
| summary.append(f"Average utilization: {capacity['average_utilization']:.1%}") | |
| if "facility_distribution" in results: | |
| dist = results["facility_distribution"] | |
| if "geographic_inequality" in dist: | |
| inequality = dist["geographic_inequality"] | |
| level = "High" if inequality > 0.4 else "Moderate" if inequality > 0.2 else "Low" | |
| summary.append(f"Geographic distribution inequality: {level} (Gini: {inequality:.2f})") | |
| if "recommendations" in results: | |
| high_priority = [r for r in results["recommendations"] if r.get("priority") == "High"] | |
| if high_priority: | |
| summary.append(f"{len(high_priority)} high-priority recommendations identified") | |
| return " | ".join(summary) if summary else "No key metrics identified" | |
| def _generate_data_section(results: Dict[str, Any]) -> str: | |
| """Generate data preparation section""" | |
| data_sources = [] | |
| if "facility_distribution" in results: | |
| data_sources.append("Facility registry data") | |
| if "capacity_analysis" in results: | |
| data_sources.append("Bed capacity and utilization data") | |
| if "resource_allocation" in results: | |
| data_sources.append("Resource allocation data") | |
| if "trends" in results: | |
| data_sources.append("Historical trend data") | |
| if not data_sources: | |
| return "No relevant data sources identified" | |
| response = "The following data sources were used in this analysis:\n\n" | |
| for source in data_sources: | |
| response += f"- {source}\n" | |
| response += "\nData quality checks and transformations were applied to ensure accuracy and consistency." | |
| return response | |
| def _format_facility_distribution(dist_results: Dict[str, Any]) -> str: | |
| """Format facility distribution analysis results""" | |
| response = "" | |
| if "total_facilities" in dist_results: | |
| response += f"**Total Facilities**: {dist_results['total_facilities']:,}\n\n" | |
| if "geographic_distribution" in dist_results: | |
| response += "### Geographic Distribution\n\n" | |
| response += "| Region | Facility Count |\n" | |
| response += "|--------|---------------|\n" | |
| for region, count in dist_results["geographic_distribution"].items(): | |
| response += f"| {region} | {count} |\n" | |
| response += "\n" | |
| if "geographic_inequality" in dist_results: | |
| inequality = dist_results["geographic_inequality"] | |
| level = "High" if inequality > 0.4 else "Moderate" if inequality > 0.2 else "Low" | |
| response += f"**Geographic Inequality**: {level} (Gini coefficient: {inequality:.2f})\n\n" | |
| if "facility_type_distribution" in dist_results: | |
| response += "### Facility Type Distribution\n\n" | |
| response += "| Facility Type | Count |\n" | |
| response += "|---------------|-------|\n" | |
| for ftype, count in dist_results["facility_type_distribution"].items(): | |
| response += f"| {ftype} | {count} |\n" | |
| response += "\n" | |
| if "facility_diversity" in dist_results: | |
| diversity = dist_results["facility_diversity"] | |
| response += f"**Facility Diversity Index**: {diversity:.2f} (higher values indicate more diversity)\n\n" | |
| if "top_cities" in dist_results and "city_breakdown" in dist_results: | |
| response += "### Top Cities by Facility Count\n\n" | |
| response += "| City | Hospitals | Nursing/Residential | Ambulatory | Total |\n" | |
| response += "|------|-----------|-------------------|------------|-------|\n" | |
| for city in dist_results["top_cities"]: | |
| if city in dist_results["city_breakdown"]: | |
| breakdown = dist_results["city_breakdown"][city] | |
| hospitals = breakdown.get("Hospitals", 0) | |
| nursing = breakdown.get("Nursing and residential care facilities", 0) | |
| ambulatory = breakdown.get("Ambulatory health care services", 0) | |
| total = hospitals + nursing + ambulatory | |
| response += f"| {city} | {hospitals} | {nursing} | {ambulatory} | {total} |\n" | |
| response += "\n" | |
| return response if response else "No facility distribution data available" | |
| def _format_capacity_analysis(capacity_results: Dict[str, Any]) -> str: | |
| """Format capacity analysis results""" | |
| response = "" | |
| if "total_capacity" in capacity_results: | |
| response += f"**Total System Capacity**: {capacity_results['total_capacity']:,} beds\n\n" | |
| if "capacity_by_type" in capacity_results: | |
| response += "### Capacity by Facility Type\n\n" | |
| response += "| Facility Type | Capacity |\n" | |
| response += "|---------------|----------|\n" | |
| for ftype, capacity in capacity_results["capacity_by_type"].items(): | |
| response += f"| {ftype} | {capacity:,} |\n" | |
| response += "\n" | |
| if "average_utilization" in capacity_results: | |
| response += f"**Average System Utilization**: {capacity_results['average_utilization']:.1%}\n\n" | |
| if "utilization_by_type" in capacity_results: | |
| response += "### Utilization by Facility Type\n\n" | |
| response += "| Facility Type | Utilization |\n" | |
| response += "|---------------|-------------|\n" | |
| for ftype, util in capacity_results["utilization_by_type"].items(): | |
| response += f"| {ftype} | {util:.1%} |\n" | |
| response += "\n" | |
| if "capacity_trends" in capacity_results: | |
| response += "### Capacity Trends\n\n" | |
| response += "| Year | Capacity |\n" | |
| response += "|------|----------|\n" | |
| for year, capacity in capacity_results["capacity_trends"].items(): | |
| response += f"| {year} | {capacity:,} |\n" | |
| response += "\n" | |
| if "capacity_growth_rate" in capacity_results: | |
| growth = capacity_results["capacity_growth_rate"] | |
| response += f"**Overall Growth Rate**: {growth:.1f}%\n\n" | |
| if "zone_summary" in capacity_results: | |
| response += "### Bed Capacity by Zone\n\n" | |
| response += "| Zone | Current Beds | Previous Beds | Absolute Change | Percentage Change |\n" | |
| response += "|------|--------------|---------------|-----------------|-------------------|\n" | |
| for zone_data in capacity_results["zone_summary"]: | |
| # Extract zone name - try different possible keys | |
| zone_name = "Unknown" | |
| for key in ["zone", "Zone", "ZONE", "region", "Region", "REGION"]: | |
| if key in zone_data: | |
| zone_name = zone_data[key] | |
| break | |
| # Extract bed counts | |
| current_beds = zone_data.get("beds_current", zone_data.get("current", "N/A")) | |
| prev_beds = zone_data.get("beds_prev", zone_data.get("previous", "N/A")) | |
| abs_change = zone_data.get("bed_change", "N/A") | |
| pct_change = zone_data.get("percent_change", "N/A") | |
| response += f"| {zone_name} | {current_beds} | {prev_beds} | {abs_change} | {pct_change:.1f}% |\n" | |
| response += "\n" | |
| if "max_absolute_decrease" in capacity_results and isinstance(capacity_results["max_absolute_decrease"], dict): | |
| response += "### Zone with Largest Absolute Decrease\n\n" | |
| max_abs = capacity_results["max_absolute_decrease"] | |
| # Extract zone name | |
| zone_name = "Unknown" | |
| for key in ["zone", "Zone", "ZONE", "region", "Region", "REGION"]: | |
| if key in max_abs: | |
| zone_name = max_abs[key] | |
| break | |
| # Extract values | |
| abs_change = max_abs.get("bed_change", "N/A") | |
| pct_change = max_abs.get("percent_change", "N/A") | |
| response += f"**Zone**: {zone_name}\n" | |
| response += f"**Absolute Decrease**: {abs_change} beds\n" | |
| response += f"**Percentage Decrease**: {pct_change:.1f}%\n\n" | |
| if "max_percentage_decrease" in capacity_results and isinstance(capacity_results["max_percentage_decrease"], dict): | |
| response += "### Zone with Largest Percentage Decrease\n\n" | |
| max_pct = capacity_results["max_percentage_decrease"] | |
| # Extract zone name | |
| zone_name = "Unknown" | |
| for key in ["zone", "Zone", "ZONE", "region", "Region", "REGION"]: | |
| if key in max_pct: | |
| zone_name = max_pct[key] | |
| break | |
| # Extract values | |
| abs_change = max_pct.get("bed_change", "N/A") | |
| pct_change = max_pct.get("percent_change", "N/A") | |
| response += f"**Zone**: {zone_name}\n" | |
| response += f"**Absolute Decrease**: {abs_change} beds\n" | |
| response += f"**Percentage Decrease**: {pct_change:.1f}%\n\n" | |
| if "facilities_with_largest_declines" in capacity_results: | |
| response += "### Facilities with Largest Bed Declines\n\n" | |
| response += "| Facility | Zone | Teaching Status | Beds Lost |\n" | |
| response += "|----------|------|----------------|-----------|\n" | |
| for facility in capacity_results["facilities_with_largest_declines"]: | |
| name = facility.get("facility_name", "N/A") | |
| # Extract zone name | |
| zone_name = "Unknown" | |
| for key in ["zone", "Zone", "ZONE", "region", "Region", "REGION"]: | |
| if key in facility: | |
| zone_name = facility[key] | |
| break | |
| teaching = facility.get("teaching_status", "N/A") | |
| change = facility.get("bed_change", "N/A") | |
| response += f"| {name} | {zone_name} | {teaching} | {change} |\n" | |
| response += "\n" | |
| return response if response else "No capacity data available" | |
| def _format_resource_allocation(resource_results: Dict[str, Any]) -> str: | |
| """Format resource allocation results""" | |
| response = "" | |
| if "total_staff" in resource_results: | |
| response += f"**Total Staff**: {resource_results['total_staff']:,} FTEs\n\n" | |
| if "staff_per_bed_ratio" in resource_results: | |
| ratio = resource_results["staff_per_bed_ratio"] | |
| response += f"**Staff per Bed Ratio**: {ratio:.2f}\n\n" | |
| if ratio < 1.5: | |
| response += "⚠️ This ratio is below recommended levels\n\n" | |
| if "equipment_summary" in resource_results: | |
| response += "### Equipment Summary\n\n" | |
| for equipment, count in resource_results["equipment_summary"].items(): | |
| response += f"- {equipment}: {count:,}\n" | |
| response += "\n" | |
| return response if response else "No resource allocation data available" | |
| def _format_trends(trend_results: Dict[str, Any]) -> str: | |
| """Format trend analysis results""" | |
| response = "" | |
| if "year_over_year_trends" in trend_results: | |
| response += "### Year-over-Year Changes\n\n" | |
| response += "| Period | Absolute Change | Percentage Change |\n" | |
| response += "|--------|-----------------|-------------------|\n" | |
| for period, change in trend_results["year_over_year_trends"].items(): | |
| abs_change = change["absolute_change"] | |
| pct_change = change["percentage_change"] | |
| response += f"| {period} | {abs_change:+,} | {pct_change:+.1f}% |\n" | |
| response += "\n" | |
| return response if response else "No trend data available" | |
| def _format_recommendations(recommendations: List[Dict[str, str]]) -> str: | |
| """Format operational recommendations""" | |
| if not recommendations: | |
| return "No specific recommendations generated" | |
| response = "" | |
| for i, rec in enumerate(recommendations, 1): | |
| response += f"### {i}. {rec['title']}\n\n" | |
| response += f"**Priority**: {rec.get('priority', 'Medium')}\n\n" | |
| response += f"{rec['description']}\n\n" | |
| response += f"*Data source: {rec.get('data_source', 'Analysis results')}*\n\n" | |
| return response | |
| def _format_integration_opportunities(opportunities: Dict[str, Any]) -> str: | |
| """Format future integration opportunities""" | |
| response = "" | |
| if "data_integration" in opportunities: | |
| response += "### Data Integration Opportunities\n\n" | |
| for opp in opportunities["data_integration"]: | |
| response += f"**{opp['opportunity']}**\n\n" | |
| response += f"{opp['description']}\n\n" | |
| response += f"*Expected benefit: {opp['benefit']}*\n\n" | |
| if "ai_applications" in opportunities: | |
| response += "### AI Application Opportunities\n\n" | |
| for opp in opportunities["ai_applications"]: | |
| response += f"**{opp['opportunity']}**\n\n" | |
| response += f"{opp['description']}\n\n" | |
| response += f"*Expected benefit: {opp['benefit']}*\n\n" | |
| if "enhanced_metrics" in opportunities: | |
| response += "### Enhanced Metrics\n\n" | |
| for metric in opportunities["enhanced_metrics"]: | |
| response += f"**{metric['metric']}**\n\n" | |
| response += f"{metric['description']}\n\n" | |
| response += f"*Expected benefit: {metric['benefit']}*\n\n" | |
| return response if response else "No integration opportunities identified" |