|
|
"""Statistical analysis service using Modal code execution. |
|
|
|
|
|
This module provides Modal-based statistical analysis WITHOUT depending on |
|
|
agent_framework. This allows it to be used in the simple orchestrator mode |
|
|
without requiring the magentic optional dependency. |
|
|
|
|
|
The AnalysisAgent (in src/agents/) wraps this service for magentic mode. |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import re |
|
|
from functools import lru_cache, partial |
|
|
from typing import Any, Literal |
|
|
|
|
|
|
|
|
VerdictType = Literal["SUPPORTED", "REFUTED", "INCONCLUSIVE"] |
|
|
|
|
|
from pydantic import BaseModel, Field |
|
|
from pydantic_ai import Agent |
|
|
|
|
|
from src.agent_factory.judges import get_model |
|
|
from src.tools.code_execution import ( |
|
|
CodeExecutionError, |
|
|
get_code_executor, |
|
|
get_sandbox_library_prompt, |
|
|
) |
|
|
from src.utils.models import Evidence |
|
|
|
|
|
|
|
|
class AnalysisResult(BaseModel): |
|
|
"""Result of statistical analysis.""" |
|
|
|
|
|
verdict: VerdictType = Field( |
|
|
description="SUPPORTED, REFUTED, or INCONCLUSIVE", |
|
|
) |
|
|
confidence: float = Field(ge=0.0, le=1.0, description="Confidence in verdict (0-1)") |
|
|
statistical_evidence: str = Field( |
|
|
description="Summary of statistical findings from code execution" |
|
|
) |
|
|
code_generated: str = Field(description="Python code that was executed") |
|
|
execution_output: str = Field(description="Output from code execution") |
|
|
key_findings: list[str] = Field(default_factory=list, description="Key takeaways") |
|
|
limitations: list[str] = Field(default_factory=list, description="Limitations") |
|
|
|
|
|
|
|
|
class StatisticalAnalyzer: |
|
|
"""Performs statistical analysis using Modal code execution. |
|
|
|
|
|
This service: |
|
|
1. Generates Python code for statistical analysis using LLM |
|
|
2. Executes code in Modal sandbox |
|
|
3. Interprets results |
|
|
4. Returns verdict (SUPPORTED/REFUTED/INCONCLUSIVE) |
|
|
|
|
|
Note: This class has NO agent_framework dependency, making it safe |
|
|
to use in the simple orchestrator without the magentic extra. |
|
|
""" |
|
|
|
|
|
def __init__(self) -> None: |
|
|
"""Initialize the analyzer.""" |
|
|
self._code_executor: Any = None |
|
|
self._agent: Agent[None, str] | None = None |
|
|
|
|
|
def _get_code_executor(self) -> Any: |
|
|
"""Lazy initialization of code executor.""" |
|
|
if self._code_executor is None: |
|
|
self._code_executor = get_code_executor() |
|
|
return self._code_executor |
|
|
|
|
|
def _get_agent(self) -> Agent[None, str]: |
|
|
"""Lazy initialization of LLM agent for code generation.""" |
|
|
if self._agent is None: |
|
|
library_versions = get_sandbox_library_prompt() |
|
|
self._agent = Agent( |
|
|
model=get_model(), |
|
|
output_type=str, |
|
|
system_prompt=f"""You are a biomedical data scientist. |
|
|
|
|
|
Generate Python code to analyze research evidence and test hypotheses. |
|
|
|
|
|
Guidelines: |
|
|
1. Use pandas, numpy, scipy.stats for analysis |
|
|
2. Print clear, interpretable results |
|
|
3. Include statistical tests (t-tests, chi-square, etc.) |
|
|
4. Calculate effect sizes and confidence intervals |
|
|
5. Keep code concise (<50 lines) |
|
|
6. Set 'result' variable to SUPPORTED, REFUTED, or INCONCLUSIVE |
|
|
|
|
|
Available libraries: |
|
|
{library_versions} |
|
|
|
|
|
Output format: Return ONLY executable Python code, no explanations.""", |
|
|
) |
|
|
return self._agent |
|
|
|
|
|
async def analyze( |
|
|
self, |
|
|
query: str, |
|
|
evidence: list[Evidence], |
|
|
hypothesis: dict[str, Any] | None = None, |
|
|
) -> AnalysisResult: |
|
|
"""Run statistical analysis on evidence. |
|
|
|
|
|
Args: |
|
|
query: The research question |
|
|
evidence: List of Evidence objects to analyze |
|
|
hypothesis: Optional hypothesis dict with drug, target, pathway, effect |
|
|
|
|
|
Returns: |
|
|
AnalysisResult with verdict and statistics |
|
|
""" |
|
|
|
|
|
evidence_summary = self._summarize_evidence(evidence) |
|
|
hypothesis_text = "" |
|
|
if hypothesis: |
|
|
hypothesis_text = ( |
|
|
f"\nHypothesis: {hypothesis.get('drug', 'Unknown')} β " |
|
|
f"{hypothesis.get('target', '?')} β " |
|
|
f"{hypothesis.get('pathway', '?')} β " |
|
|
f"{hypothesis.get('effect', '?')}\n" |
|
|
f"Confidence: {hypothesis.get('confidence', 0.5):.0%}\n" |
|
|
) |
|
|
|
|
|
prompt = f"""Generate Python code to statistically analyze: |
|
|
|
|
|
**Research Question**: {query} |
|
|
{hypothesis_text} |
|
|
|
|
|
**Evidence Summary**: |
|
|
{evidence_summary} |
|
|
|
|
|
Generate executable Python code to analyze this evidence.""" |
|
|
|
|
|
try: |
|
|
|
|
|
agent = self._get_agent() |
|
|
code_result = await agent.run(prompt) |
|
|
generated_code = code_result.output |
|
|
|
|
|
|
|
|
loop = asyncio.get_running_loop() |
|
|
executor = self._get_code_executor() |
|
|
execution = await loop.run_in_executor( |
|
|
None, partial(executor.execute, generated_code, timeout=120) |
|
|
) |
|
|
|
|
|
if not execution["success"]: |
|
|
return AnalysisResult( |
|
|
verdict="INCONCLUSIVE", |
|
|
confidence=0.0, |
|
|
statistical_evidence=( |
|
|
f"Execution failed: {execution.get('error', 'Unknown error')}" |
|
|
), |
|
|
code_generated=generated_code, |
|
|
execution_output=execution.get("stderr", ""), |
|
|
key_findings=[], |
|
|
limitations=["Code execution failed"], |
|
|
) |
|
|
|
|
|
|
|
|
return self._interpret_results(generated_code, execution) |
|
|
|
|
|
except CodeExecutionError as e: |
|
|
return AnalysisResult( |
|
|
verdict="INCONCLUSIVE", |
|
|
confidence=0.0, |
|
|
statistical_evidence=str(e), |
|
|
code_generated="", |
|
|
execution_output="", |
|
|
key_findings=[], |
|
|
limitations=[f"Analysis error: {e}"], |
|
|
) |
|
|
|
|
|
def _summarize_evidence(self, evidence: list[Evidence]) -> str: |
|
|
"""Summarize evidence for code generation prompt.""" |
|
|
if not evidence: |
|
|
return "No evidence available." |
|
|
|
|
|
lines = [] |
|
|
for i, ev in enumerate(evidence[:5], 1): |
|
|
content = ev.content |
|
|
truncated = content[:200] + ("..." if len(content) > 200 else "") |
|
|
lines.append(f"{i}. {truncated}") |
|
|
lines.append(f" Source: {ev.citation.title}") |
|
|
lines.append(f" Relevance: {ev.relevance:.0%}\n") |
|
|
|
|
|
return "\n".join(lines) |
|
|
|
|
|
def _interpret_results( |
|
|
self, |
|
|
code: str, |
|
|
execution: dict[str, Any], |
|
|
) -> AnalysisResult: |
|
|
"""Interpret code execution results.""" |
|
|
stdout = execution["stdout"] |
|
|
stdout_upper = stdout.upper() |
|
|
|
|
|
|
|
|
verdict: VerdictType = "INCONCLUSIVE" |
|
|
if re.search(r"\bSUPPORTED\b", stdout_upper) and not re.search( |
|
|
r"\b(?:NOT|UN)SUPPORTED\b", stdout_upper |
|
|
): |
|
|
verdict = "SUPPORTED" |
|
|
elif re.search(r"\bREFUTED\b", stdout_upper): |
|
|
verdict = "REFUTED" |
|
|
|
|
|
|
|
|
key_findings = [] |
|
|
for line in stdout.split("\n"): |
|
|
line_lower = line.lower() |
|
|
if any(kw in line_lower for kw in ["p-value", "significant", "effect", "mean"]): |
|
|
key_findings.append(line.strip()) |
|
|
|
|
|
|
|
|
confidence = self._calculate_confidence(stdout) |
|
|
|
|
|
return AnalysisResult( |
|
|
verdict=verdict, |
|
|
confidence=confidence, |
|
|
statistical_evidence=stdout.strip(), |
|
|
code_generated=code, |
|
|
execution_output=stdout, |
|
|
key_findings=key_findings[:5], |
|
|
limitations=[ |
|
|
"Analysis based on summary data only", |
|
|
"Limited to available evidence", |
|
|
"Statistical tests assume data independence", |
|
|
], |
|
|
) |
|
|
|
|
|
def _calculate_confidence(self, output: str) -> float: |
|
|
"""Calculate confidence based on statistical results.""" |
|
|
p_values = re.findall(r"p[-\s]?value[:\s]+(\d+\.?\d*)", output.lower()) |
|
|
|
|
|
if p_values: |
|
|
try: |
|
|
min_p = min(float(p) for p in p_values) |
|
|
if min_p < 0.001: |
|
|
return 0.95 |
|
|
elif min_p < 0.01: |
|
|
return 0.90 |
|
|
elif min_p < 0.05: |
|
|
return 0.80 |
|
|
else: |
|
|
return 0.60 |
|
|
except ValueError: |
|
|
pass |
|
|
|
|
|
return 0.70 |
|
|
|
|
|
|
|
|
@lru_cache(maxsize=1) |
|
|
def get_statistical_analyzer() -> StatisticalAnalyzer: |
|
|
"""Get or create singleton StatisticalAnalyzer instance (thread-safe via lru_cache).""" |
|
|
return StatisticalAnalyzer() |
|
|
|