Spaces:
Sleeping
Sleeping
File size: 14,500 Bytes
6dc9d46 696f787 6dc9d46 ad2e847 6dc9d46 696f787 6dc9d46 696f787 6dc9d46 696f787 6dc9d46 696f787 6dc9d46 696f787 aefac4f 6dc9d46 696f787 6dc9d46 696f787 6dc9d46 ad2e847 6dc9d46 ad2e847 696f787 ad2e847 6dc9d46 696f787 6dc9d46 696f787 6dc9d46 aefac4f 696f787 6dc9d46 aefac4f 6dc9d46 696f787 6dc9d46 696f787 6dc9d46 696f787 6dc9d46 696f787 6dc9d46 696f787 9659593 6dc9d46 9659593 6dc9d46 9659593 6dc9d46 696f787 6dc9d46 696f787 6dc9d46 9659593 6dc9d46 696f787 6dc9d46 696f787 6dc9d46 696f787 6dc9d46 9659593 6dc9d46 696f787 6dc9d46 696f787 6dc9d46 696f787 6dc9d46 696f787 9659593 6dc9d46 9659593 aefac4f 6dc9d46 696f787 aefac4f 696f787 6dc9d46 9659593 6dc9d46 696f787 aefac4f 9659593 aefac4f 696f787 aefac4f 9659593 aefac4f 696f787 aefac4f 6dc9d46 696f787 aefac4f 6dc9d46 9659593 6dc9d46 696f787 aefac4f 6dc9d46 9659593 6dc9d46 696f787 aefac4f 6dc9d46 9659593 6dc9d46 696f787 6dc9d46 aefac4f 696f787 6dc9d46 9659593 6dc9d46 696f787 aefac4f 6dc9d46 9659593 aefac4f 6dc9d46 696f787 6dc9d46 9659593 6dc9d46 696f787 6dc9d46 aefac4f 696f787 6dc9d46 9659593 6dc9d46 696f787 6dc9d46 9659593 6dc9d46 696f787 6dc9d46 696f787 6dc9d46 9659593 6dc9d46 696f787 6dc9d46 aefac4f 6dc9d46 696f787 6dc9d46 aefac4f 6dc9d46 696f787 6dc9d46 aefac4f 6dc9d46 aefac4f 696f787 6dc9d46 aefac4f 6dc9d46 aefac4f 696f787 6dc9d46 aefac4f 6dc9d46 696f787 aefac4f 6dc9d46 696f787 6dc9d46 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 | """
RagBot Workflow Service
Wraps the RagBot workflow and formats comprehensive responses
"""
import sys
import time
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any
# Ensure project root is in path for src imports
_project_root = str(Path(__file__).parent.parent.parent.parent)
if _project_root not in sys.path:
sys.path.insert(0, _project_root)
from app.models.schemas import (
AgentOutput,
Analysis,
AnalysisResponse,
BiomarkerFlag,
ConfidenceAssessment,
DiseaseExplanation,
KeyDriver,
Prediction,
Recommendations,
SafetyAlert,
)
from src.state import PatientInput
from src.workflow import create_guild
class RagBotService:
"""
Service class to manage RagBot workflow lifecycle.
Initializes once, then handles multiple analysis requests.
"""
def __init__(self):
"""Initialize the workflow (loads vector store, models, etc.)"""
self.guild = None
self.initialized = False
self.init_time = None
def initialize(self):
"""Initialize the Clinical Insight Guild (expensive operation)"""
if self.initialized:
return
print("INFO: Initializing RagBot workflow...")
start_time = time.time()
import os
try:
# Set working directory via environment so vector store paths resolve
# without a process-global os.chdir() (which is thread-unsafe).
ragbot_root = Path(__file__).parent.parent.parent.parent
os.environ["RAGBOT_ROOT"] = str(ragbot_root)
print(f"INFO: Project root: {ragbot_root}")
# Temporarily chdir only during initialization (single-threaded at startup)
original_dir = os.getcwd()
os.chdir(ragbot_root)
self.guild = create_guild()
self.initialized = True
self.init_time = datetime.now()
elapsed = (time.time() - start_time) * 1000
print(f"OK: RagBot initialized successfully ({elapsed:.0f}ms)")
except Exception as e:
print(f"ERROR: Failed to initialize RagBot: {e}")
raise
finally:
# Restore original directory
os.chdir(original_dir)
def get_uptime_seconds(self) -> float:
"""Get API uptime in seconds"""
if not self.init_time:
return 0.0
return (datetime.now() - self.init_time).total_seconds()
def is_ready(self) -> bool:
"""Check if service is ready to handle requests"""
return self.initialized and self.guild is not None
def analyze(
self,
biomarkers: dict[str, float],
patient_context: dict[str, Any],
model_prediction: dict[str, Any],
extracted_biomarkers: dict[str, float] | None = None,
) -> AnalysisResponse:
"""
Run complete analysis workflow and format full detailed response.
Args:
biomarkers: Dictionary of biomarker names to values
patient_context: Patient demographic information
model_prediction: Disease prediction (disease, confidence, probabilities)
extracted_biomarkers: Original extracted biomarkers (for natural language input)
Returns:
Complete AnalysisResponse with all details
"""
if not self.is_ready():
raise RuntimeError("RagBot service not initialized. Call initialize() first.")
request_id = f"req_{uuid.uuid4().hex[:12]}"
start_time = time.time()
try:
# Create PatientInput
patient_input = PatientInput(
biomarkers=biomarkers, model_prediction=model_prediction, patient_context=patient_context
)
# Run workflow
workflow_result = self.guild.run(patient_input)
# Calculate processing time
processing_time_ms = (time.time() - start_time) * 1000
# Format response
response = self._format_response(
request_id=request_id,
workflow_result=workflow_result,
input_biomarkers=biomarkers,
extracted_biomarkers=extracted_biomarkers,
patient_context=patient_context,
model_prediction=model_prediction,
processing_time_ms=processing_time_ms,
)
return response
except Exception as e:
# Re-raise with context
raise RuntimeError(f"Analysis failed during workflow execution: {e!s}") from e
def _format_response(
self,
request_id: str,
workflow_result: dict[str, Any],
input_biomarkers: dict[str, float],
extracted_biomarkers: dict[str, float],
patient_context: dict[str, Any],
model_prediction: dict[str, Any],
processing_time_ms: float,
) -> AnalysisResponse:
"""
Format complete detailed response from workflow result.
Preserves ALL data from workflow execution.
workflow_result is now the full LangGraph state dict containing:
- final_response: dict from response_synthesizer
- agent_outputs: list of AgentOutput objects
- biomarker_flags: list of BiomarkerFlag objects
- safety_alerts: list of SafetyAlert objects
- sop_version, processing_timestamp, etc.
"""
# The synthesizer output is nested inside final_response
final_response = workflow_result.get("final_response", {}) or {}
# Extract main prediction
prediction = Prediction(
disease=model_prediction["disease"],
confidence=model_prediction["confidence"],
probabilities=model_prediction.get("probabilities", {}),
)
# Biomarker flags: prefer state-level data (BiomarkerFlag objects from validator),
# fall back to synthesizer output
state_flags = workflow_result.get("biomarker_flags", [])
if state_flags:
biomarker_flags = []
for flag in state_flags:
if hasattr(flag, "model_dump"):
biomarker_flags.append(BiomarkerFlag(**flag.model_dump()))
elif isinstance(flag, dict):
biomarker_flags.append(BiomarkerFlag(**flag))
else:
biomarker_flags_source = final_response.get("biomarker_flags", [])
if not biomarker_flags_source:
biomarker_flags_source = final_response.get("analysis", {}).get("biomarker_flags", [])
biomarker_flags = [
BiomarkerFlag(**flag) if isinstance(flag, dict) else BiomarkerFlag(**flag.model_dump())
for flag in biomarker_flags_source
]
# Safety alerts: prefer state-level data, fall back to synthesizer
state_alerts = workflow_result.get("safety_alerts", [])
if state_alerts:
safety_alerts = []
for alert in state_alerts:
if hasattr(alert, "model_dump"):
safety_alerts.append(SafetyAlert(**alert.model_dump()))
elif isinstance(alert, dict):
safety_alerts.append(SafetyAlert(**alert))
else:
safety_alerts_source = final_response.get("safety_alerts", [])
if not safety_alerts_source:
safety_alerts_source = final_response.get("analysis", {}).get("safety_alerts", [])
safety_alerts = [
SafetyAlert(**alert) if isinstance(alert, dict) else SafetyAlert(**alert.model_dump())
for alert in safety_alerts_source
]
# Extract key drivers from synthesizer output
key_drivers_data = final_response.get("key_drivers", [])
if not key_drivers_data:
key_drivers_data = final_response.get("analysis", {}).get("key_drivers", [])
key_drivers = []
for driver in key_drivers_data:
if isinstance(driver, dict):
key_drivers.append(KeyDriver(**driver))
# Disease explanation from synthesizer
disease_exp_data = final_response.get("disease_explanation", {})
if not disease_exp_data:
disease_exp_data = final_response.get("analysis", {}).get("disease_explanation", {})
disease_explanation = DiseaseExplanation(
pathophysiology=disease_exp_data.get("pathophysiology", ""),
citations=disease_exp_data.get("citations", []),
retrieved_chunks=disease_exp_data.get("retrieved_chunks"),
)
# Recommendations from synthesizer
recs_data = final_response.get("recommendations", {})
if not recs_data:
recs_data = final_response.get("clinical_recommendations", {})
if not recs_data:
recs_data = final_response.get("analysis", {}).get("recommendations", {})
recommendations = Recommendations(
immediate_actions=recs_data.get("immediate_actions", []),
lifestyle_changes=recs_data.get("lifestyle_changes", []),
monitoring=recs_data.get("monitoring", []),
follow_up=recs_data.get("follow_up"),
)
# Confidence assessment from synthesizer
conf_data = final_response.get("confidence_assessment", {})
if not conf_data:
conf_data = final_response.get("analysis", {}).get("confidence_assessment", {})
confidence_assessment = ConfidenceAssessment(
prediction_reliability=conf_data.get("prediction_reliability", "UNKNOWN"),
evidence_strength=conf_data.get("evidence_strength", "UNKNOWN"),
limitations=conf_data.get("limitations", []),
reasoning=conf_data.get("reasoning"),
)
# Alternative diagnoses
alternative_diagnoses = final_response.get("alternative_diagnoses")
if alternative_diagnoses is None:
alternative_diagnoses = final_response.get("analysis", {}).get("alternative_diagnoses")
# Assemble complete analysis
analysis = Analysis(
biomarker_flags=biomarker_flags,
safety_alerts=safety_alerts,
key_drivers=key_drivers,
disease_explanation=disease_explanation,
recommendations=recommendations,
confidence_assessment=confidence_assessment,
alternative_diagnoses=alternative_diagnoses,
)
# Agent outputs from state (these are src.state.AgentOutput objects)
agent_outputs_data = workflow_result.get("agent_outputs", [])
agent_outputs = []
for agent_out in agent_outputs_data:
if hasattr(agent_out, "model_dump"):
agent_outputs.append(AgentOutput(**agent_out.model_dump()))
elif isinstance(agent_out, dict):
agent_outputs.append(AgentOutput(**agent_out))
# Workflow metadata
workflow_metadata = {
"sop_version": workflow_result.get("sop_version"),
"processing_timestamp": workflow_result.get("processing_timestamp"),
"agents_executed": len(agent_outputs),
"workflow_success": True,
}
# Conversational summary (if available)
conversational_summary = final_response.get("conversational_summary")
if not conversational_summary:
conversational_summary = final_response.get("patient_summary", {}).get("narrative")
# Generate conversational summary if not present
if not conversational_summary:
conversational_summary = self._generate_conversational_summary(
prediction=prediction,
safety_alerts=safety_alerts,
key_drivers=key_drivers,
recommendations=recommendations,
)
# Assemble final response
response = AnalysisResponse(
status="success",
request_id=request_id,
timestamp=datetime.now().isoformat(),
extracted_biomarkers=extracted_biomarkers,
input_biomarkers=input_biomarkers,
patient_context=patient_context,
prediction=prediction,
analysis=analysis,
agent_outputs=agent_outputs,
workflow_metadata=workflow_metadata,
conversational_summary=conversational_summary,
processing_time_ms=processing_time_ms,
sop_version=workflow_result.get("sop_version", "Baseline"),
)
return response
def _generate_conversational_summary(
self, prediction: Prediction, safety_alerts: list, key_drivers: list, recommendations: Recommendations
) -> str:
"""Generate a simple conversational summary"""
summary_parts = []
summary_parts.append("Hi there!\n")
summary_parts.append("Based on your biomarkers, I analyzed your results.\n")
# Prediction
summary_parts.append(f"\nPrimary Finding: {prediction.disease}")
summary_parts.append(f" Confidence: {prediction.confidence:.0%}\n")
# Safety alerts
if safety_alerts:
summary_parts.append("\nIMPORTANT SAFETY ALERTS:")
for alert in safety_alerts[:3]: # Top 3
summary_parts.append(f" - {alert.biomarker}: {alert.message}")
summary_parts.append(f" Action: {alert.action}")
# Key drivers
if key_drivers:
summary_parts.append("\nWhy this prediction?")
for driver in key_drivers[:3]: # Top 3
summary_parts.append(f" - {driver.biomarker} ({driver.value}): {driver.explanation[:100]}...")
# Recommendations
if recommendations.immediate_actions:
summary_parts.append("\nWhat You Should Do:")
for i, action in enumerate(recommendations.immediate_actions[:3], 1):
summary_parts.append(f" {i}. {action}")
summary_parts.append("\nImportant: This is an AI-assisted analysis, NOT medical advice.")
summary_parts.append(" Please consult a healthcare professional for proper diagnosis and treatment.")
return "\n".join(summary_parts)
# Global service instance (singleton)
_ragbot_service = None
def get_ragbot_service() -> RagBotService:
"""Get or create the global RagBot service instance"""
global _ragbot_service
if _ragbot_service is None:
_ragbot_service = RagBotService()
return _ragbot_service
|