DeepBoner / docs /implementation /03_phase_judge.md
VibecoderMcSwaggins's picture
docs: expand Phase 3 Judge implementation specifications
e35d6b1
|
raw
history blame
23.6 kB

Phase 3 Implementation Spec: Judge Vertical Slice

Goal: Implement the "Brain" of the agent β€” evaluating evidence quality and deciding next steps. Philosophy: "Structured Output or Bust." Estimated Effort: 3-4 hours Prerequisite: Phase 2 complete (Search slice working)


1. The Slice Definition

This slice covers:

  1. Input: A user question + a list of Evidence (from Phase 2).
  2. Process:
    • Construct a prompt with the evidence.
    • Call LLM via PydanticAI (enforces structured output).
    • Parse response into typed assessment.
  3. Output: A JudgeAssessment object with decision + next queries.

Directory: src/features/judge/


2. Why PydanticAI for the Judge?

We use PydanticAI because:

  • βœ… Structured Output: Forces LLM to return valid JSON matching our Pydantic model
  • βœ… Retry Logic: Built-in retry with exponential backoff
  • βœ… Multi-Provider: Works with OpenAI, Anthropic, Gemini
  • βœ… Type Safety: Full typing support
# PydanticAI forces the LLM to return EXACTLY this structure
class JudgeAssessment(BaseModel):
    sufficient: bool
    recommendation: Literal["continue", "synthesize"]
    next_search_queries: list[str]

3. Models (src/features/judge/models.py)

"""Data models for the Judge feature."""
from pydantic import BaseModel, Field
from typing import Literal


class EvidenceQuality(BaseModel):
    """Quality assessment of a single piece of evidence."""

    relevance_score: int = Field(
        ...,
        ge=0,
        le=10,
        description="How relevant is this evidence to the query (0-10)"
    )
    credibility_score: int = Field(
        ...,
        ge=0,
        le=10,
        description="How credible is the source (0-10)"
    )
    key_finding: str = Field(
        ...,
        max_length=200,
        description="One-sentence summary of the key finding"
    )


class DrugCandidate(BaseModel):
    """A potential drug repurposing candidate identified in the evidence."""

    drug_name: str = Field(..., description="Name of the drug")
    original_indication: str = Field(..., description="What the drug was originally approved for")
    proposed_indication: str = Field(..., description="The new proposed use")
    mechanism: str = Field(..., description="Proposed mechanism of action")
    evidence_strength: Literal["weak", "moderate", "strong"] = Field(
        ...,
        description="Strength of supporting evidence"
    )


class JudgeAssessment(BaseModel):
    """The judge's assessment of the collected evidence."""

    # Core Decision
    sufficient: bool = Field(
        ...,
        description="Is there enough evidence to write a report?"
    )
    recommendation: Literal["continue", "synthesize"] = Field(
        ...,
        description="Should we search more or synthesize a report?"
    )

    # Reasoning
    reasoning: str = Field(
        ...,
        max_length=500,
        description="Explanation of the assessment"
    )

    # Scores
    overall_quality_score: int = Field(
        ...,
        ge=0,
        le=10,
        description="Overall quality of evidence (0-10)"
    )
    coverage_score: int = Field(
        ...,
        ge=0,
        le=10,
        description="How well does evidence cover the query (0-10)"
    )

    # Extracted Information
    candidates: list[DrugCandidate] = Field(
        default_factory=list,
        description="Drug candidates identified in the evidence"
    )

    # Next Steps (only if recommendation == "continue")
    next_search_queries: list[str] = Field(
        default_factory=list,
        max_length=5,
        description="Suggested follow-up queries if more evidence needed"
    )

    # Gaps Identified
    gaps: list[str] = Field(
        default_factory=list,
        description="Information gaps identified in current evidence"
    )

4. Prompts (src/features/judge/prompts.py)

Prompts are code. They are versioned, tested, and parameterized.

"""Prompt templates for the Judge feature."""
from typing import List
from src.features.search.models import Evidence


# System prompt - defines the judge's role and constraints
JUDGE_SYSTEM_PROMPT = """You are a biomedical research quality assessor specializing in drug repurposing.

Your job is to evaluate evidence retrieved from PubMed and web searches, and decide if:
1. There is SUFFICIENT evidence to write a research report
2. More searching is needed to fill gaps

## Evaluation Criteria

### For "sufficient" = True (ready to synthesize):
- At least 3 relevant pieces of evidence
- At least one peer-reviewed source (PubMed)
- Clear mechanism of action identified
- Drug candidates with at least "moderate" evidence strength

### For "sufficient" = False (continue searching):
- Fewer than 3 relevant pieces
- No clear drug candidates identified
- Major gaps in mechanism understanding
- All evidence is low quality

## Output Requirements
- Be STRICT. Only mark sufficient=True if evidence is genuinely adequate
- Always provide reasoning for your decision
- If continuing, suggest SPECIFIC, ACTIONABLE search queries
- Identify concrete gaps, not vague statements

## Important
- You are assessing DRUG REPURPOSING potential
- Focus on: mechanism of action, existing clinical data, safety profile
- Ignore marketing content or non-scientific sources"""


def format_evidence_for_prompt(evidence_list: List[Evidence]) -> str:
    """Format evidence list into a string for the prompt."""
    if not evidence_list:
        return "NO EVIDENCE COLLECTED YET"

    formatted = []
    for i, ev in enumerate(evidence_list, 1):
        formatted.append(f"""
--- Evidence #{i} ---
Source: {ev.citation.source.upper()}
Title: {ev.citation.title}
Date: {ev.citation.date}
URL: {ev.citation.url}

Content:
{ev.content[:1500]}
---""")

    return "\n".join(formatted)


def build_judge_user_prompt(question: str, evidence: List[Evidence]) -> str:
    """Build the user prompt for the judge."""
    evidence_text = format_evidence_for_prompt(evidence)

    return f"""## Research Question
{question}

## Collected Evidence ({len(evidence)} pieces)
{evidence_text}

## Your Task
Assess the evidence above and provide your structured assessment.
If evidence is insufficient, suggest 2-3 specific follow-up search queries."""


# For testing: a simplified prompt that's easier to mock
JUDGE_TEST_PROMPT = "Assess the following evidence and return a JudgeAssessment."

5. Handler (src/features/judge/handlers.py)

The handler uses PydanticAI for structured LLM output.

"""Judge handler - evaluates evidence quality using LLM."""
from typing import List
import structlog
from pydantic_ai import Agent
from pydantic_ai.models.openai import OpenAIModel
from pydantic_ai.models.anthropic import AnthropicModel
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

from src.shared.config import settings
from src.shared.exceptions import JudgeError
from src.features.search.models import Evidence
from .models import JudgeAssessment
from .prompts import JUDGE_SYSTEM_PROMPT, build_judge_user_prompt

logger = structlog.get_logger()


def get_llm_model():
    """Get the configured LLM model for PydanticAI."""
    if settings.llm_provider == "openai":
        return OpenAIModel(
            settings.llm_model,
            api_key=settings.get_api_key(),
        )
    elif settings.llm_provider == "anthropic":
        return AnthropicModel(
            settings.llm_model,
            api_key=settings.get_api_key(),
        )
    else:
        raise JudgeError(f"Unknown LLM provider: {settings.llm_provider}")


# Create the PydanticAI agent with structured output
judge_agent = Agent(
    model=get_llm_model(),
    result_type=JudgeAssessment,  # Forces structured output!
    system_prompt=JUDGE_SYSTEM_PROMPT,
)


class JudgeHandler:
    """Handles evidence assessment using LLM."""

    def __init__(self, agent: Agent | None = None):
        """
        Initialize the judge handler.

        Args:
            agent: Optional PydanticAI agent (for testing injection)
        """
        self.agent = agent or judge_agent
        self._call_count = 0

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=2, max=10),
        retry=retry_if_exception_type((TimeoutError, ConnectionError)),
        reraise=True,
    )
    async def assess(
        self,
        question: str,
        evidence: List[Evidence],
    ) -> JudgeAssessment:
        """
        Assess the quality and sufficiency of evidence.

        Args:
            question: The original research question
            evidence: List of Evidence objects to assess

        Returns:
            JudgeAssessment with decision and recommendations

        Raises:
            JudgeError: If assessment fails after retries
        """
        logger.info(
            "Starting evidence assessment",
            question=question[:100],
            evidence_count=len(evidence),
        )

        self._call_count += 1

        # Build the prompt
        user_prompt = build_judge_user_prompt(question, evidence)

        try:
            # Run the agent - PydanticAI handles structured output
            result = await self.agent.run(user_prompt)

            # result.data is already a JudgeAssessment (typed!)
            assessment = result.data

            logger.info(
                "Assessment complete",
                sufficient=assessment.sufficient,
                recommendation=assessment.recommendation,
                quality_score=assessment.overall_quality_score,
                candidates_found=len(assessment.candidates),
            )

            return assessment

        except Exception as e:
            logger.error("Judge assessment failed", error=str(e))
            raise JudgeError(f"Failed to assess evidence: {e}") from e

    @property
    def call_count(self) -> int:
        """Number of LLM calls made (for budget tracking)."""
        return self._call_count


# Alternative: Direct OpenAI client (if PydanticAI doesn't work)
class FallbackJudgeHandler:
    """Fallback handler using direct OpenAI client with JSON mode."""

    def __init__(self):
        import openai
        self.client = openai.AsyncOpenAI(api_key=settings.get_api_key())

    async def assess(
        self,
        question: str,
        evidence: List[Evidence],
    ) -> JudgeAssessment:
        """Assess using direct OpenAI API with JSON mode."""
        from .prompts import build_judge_user_prompt

        user_prompt = build_judge_user_prompt(question, evidence)

        response = await self.client.chat.completions.create(
            model=settings.llm_model,
            messages=[
                {"role": "system", "content": JUDGE_SYSTEM_PROMPT},
                {"role": "user", "content": user_prompt},
            ],
            response_format={"type": "json_object"},
            temperature=0.3,  # Lower temperature for more consistent assessments
        )

        # Parse the JSON response
        import json
        content = response.choices[0].message.content
        data = json.loads(content)

        return JudgeAssessment.model_validate(data)

6. TDD Workflow

Test File: tests/unit/features/judge/test_handler.py

"""Unit tests for the Judge handler."""
import pytest
from unittest.mock import AsyncMock, MagicMock, patch


class TestJudgeModels:
    """Tests for Judge data models."""

    def test_judge_assessment_valid(self):
        """JudgeAssessment should accept valid data."""
        from src.features.judge.models import JudgeAssessment

        assessment = JudgeAssessment(
            sufficient=True,
            recommendation="synthesize",
            reasoning="Strong evidence from multiple PubMed sources.",
            overall_quality_score=8,
            coverage_score=7,
            candidates=[],
            next_search_queries=[],
            gaps=[],
        )

        assert assessment.sufficient is True
        assert assessment.recommendation == "synthesize"

    def test_judge_assessment_score_bounds(self):
        """JudgeAssessment should reject invalid scores."""
        from src.features.judge.models import JudgeAssessment
        from pydantic import ValidationError

        with pytest.raises(ValidationError):
            JudgeAssessment(
                sufficient=True,
                recommendation="synthesize",
                reasoning="Test",
                overall_quality_score=15,  # Invalid: > 10
                coverage_score=5,
            )

    def test_drug_candidate_model(self):
        """DrugCandidate should validate properly."""
        from src.features.judge.models import DrugCandidate

        candidate = DrugCandidate(
            drug_name="Metformin",
            original_indication="Type 2 Diabetes",
            proposed_indication="Alzheimer's Disease",
            mechanism="Reduces neuroinflammation via AMPK activation",
            evidence_strength="moderate",
        )

        assert candidate.drug_name == "Metformin"
        assert candidate.evidence_strength == "moderate"


class TestJudgePrompts:
    """Tests for prompt formatting."""

    def test_format_evidence_empty(self):
        """format_evidence_for_prompt should handle empty list."""
        from src.features.judge.prompts import format_evidence_for_prompt

        result = format_evidence_for_prompt([])
        assert "NO EVIDENCE" in result

    def test_format_evidence_with_items(self):
        """format_evidence_for_prompt should format evidence correctly."""
        from src.features.judge.prompts import format_evidence_for_prompt
        from src.features.search.models import Evidence, Citation

        evidence = [
            Evidence(
                content="Test content about metformin",
                citation=Citation(
                    source="pubmed",
                    title="Test Article",
                    url="https://pubmed.ncbi.nlm.nih.gov/123/",
                    date="2024-01-15",
                ),
            )
        ]

        result = format_evidence_for_prompt(evidence)

        assert "Evidence #1" in result
        assert "PUBMED" in result
        assert "Test Article" in result
        assert "metformin" in result

    def test_build_judge_user_prompt(self):
        """build_judge_user_prompt should include question and evidence."""
        from src.features.judge.prompts import build_judge_user_prompt
        from src.features.search.models import Evidence, Citation

        evidence = [
            Evidence(
                content="Sample content",
                citation=Citation(
                    source="pubmed",
                    title="Sample",
                    url="https://example.com",
                    date="2024",
                ),
            )
        ]

        result = build_judge_user_prompt(
            "What drugs could treat Alzheimer's?",
            evidence,
        )

        assert "Alzheimer" in result
        assert "1 pieces" in result


class TestJudgeHandler:
    """Tests for JudgeHandler."""

    @pytest.mark.asyncio
    async def test_assess_returns_assessment(self, mocker):
        """JudgeHandler.assess should return JudgeAssessment."""
        from src.features.judge.handlers import JudgeHandler
        from src.features.judge.models import JudgeAssessment
        from src.features.search.models import Evidence, Citation

        # Create a mock agent
        mock_result = MagicMock()
        mock_result.data = JudgeAssessment(
            sufficient=True,
            recommendation="synthesize",
            reasoning="Good evidence",
            overall_quality_score=8,
            coverage_score=7,
        )

        mock_agent = AsyncMock()
        mock_agent.run = AsyncMock(return_value=mock_result)

        # Create handler with mock agent
        handler = JudgeHandler(agent=mock_agent)

        evidence = [
            Evidence(
                content="Test content",
                citation=Citation(
                    source="pubmed",
                    title="Test",
                    url="https://example.com",
                    date="2024",
                ),
            )
        ]

        # Act
        result = await handler.assess("Test question", evidence)

        # Assert
        assert isinstance(result, JudgeAssessment)
        assert result.sufficient is True
        assert result.recommendation == "synthesize"
        mock_agent.run.assert_called_once()

    @pytest.mark.asyncio
    async def test_assess_increments_call_count(self, mocker):
        """JudgeHandler should track LLM call count."""
        from src.features.judge.handlers import JudgeHandler
        from src.features.judge.models import JudgeAssessment

        mock_result = MagicMock()
        mock_result.data = JudgeAssessment(
            sufficient=False,
            recommendation="continue",
            reasoning="Need more evidence",
            overall_quality_score=4,
            coverage_score=3,
            next_search_queries=["metformin mechanism"],
        )

        mock_agent = AsyncMock()
        mock_agent.run = AsyncMock(return_value=mock_result)

        handler = JudgeHandler(agent=mock_agent)

        assert handler.call_count == 0

        await handler.assess("Q1", [])
        assert handler.call_count == 1

        await handler.assess("Q2", [])
        assert handler.call_count == 2

    @pytest.mark.asyncio
    async def test_assess_raises_judge_error_on_failure(self, mocker):
        """JudgeHandler should raise JudgeError on failure."""
        from src.features.judge.handlers import JudgeHandler
        from src.shared.exceptions import JudgeError

        mock_agent = AsyncMock()
        mock_agent.run = AsyncMock(side_effect=Exception("LLM API error"))

        handler = JudgeHandler(agent=mock_agent)

        with pytest.raises(JudgeError, match="Failed to assess"):
            await handler.assess("Test", [])

    @pytest.mark.asyncio
    async def test_assess_continues_when_insufficient(self, mocker):
        """JudgeHandler should return next_search_queries when insufficient."""
        from src.features.judge.handlers import JudgeHandler
        from src.features.judge.models import JudgeAssessment

        mock_result = MagicMock()
        mock_result.data = JudgeAssessment(
            sufficient=False,
            recommendation="continue",
            reasoning="Not enough peer-reviewed sources",
            overall_quality_score=3,
            coverage_score=2,
            next_search_queries=[
                "metformin alzheimer clinical trial",
                "AMPK neuroprotection mechanism",
            ],
            gaps=["No clinical trial data", "Mechanism unclear"],
        )

        mock_agent = AsyncMock()
        mock_agent.run = AsyncMock(return_value=mock_result)

        handler = JudgeHandler(agent=mock_agent)
        result = await handler.assess("Test", [])

        assert result.sufficient is False
        assert result.recommendation == "continue"
        assert len(result.next_search_queries) == 2
        assert len(result.gaps) == 2

7. Integration Test (Optional, Real LLM)

# tests/integration/test_judge_live.py
"""Integration tests that hit real LLM APIs (run manually)."""
import pytest
import os


@pytest.mark.integration
@pytest.mark.slow
@pytest.mark.skipif(
    not os.getenv("OPENAI_API_KEY"),
    reason="OPENAI_API_KEY not set"
)
@pytest.mark.asyncio
async def test_judge_live_assessment():
    """Test real LLM assessment (requires API key)."""
    from src.features.judge.handlers import JudgeHandler
    from src.features.search.models import Evidence, Citation

    handler = JudgeHandler()

    evidence = [
        Evidence(
            content="""Metformin, a first-line antidiabetic drug, has shown
            neuroprotective properties in preclinical studies. The drug activates
            AMPK, which may reduce neuroinflammation and improve mitochondrial
            function in neurons.""",
            citation=Citation(
                source="pubmed",
                title="Metformin and Neuroprotection: A Review",
                url="https://pubmed.ncbi.nlm.nih.gov/12345/",
                date="2024-01-15",
            ),
        ),
        Evidence(
            content="""A retrospective cohort study found that diabetic patients
            taking metformin had a 30% lower risk of developing dementia compared
            to those on other antidiabetic medications.""",
            citation=Citation(
                source="pubmed",
                title="Metformin Use and Dementia Risk",
                url="https://pubmed.ncbi.nlm.nih.gov/67890/",
                date="2023-11-20",
            ),
        ),
    ]

    result = await handler.assess(
        "What is the potential of metformin for treating Alzheimer's disease?",
        evidence,
    )

    # Basic sanity checks
    assert result.sufficient in [True, False]
    assert result.recommendation in ["continue", "synthesize"]
    assert 0 <= result.overall_quality_score <= 10
    assert len(result.reasoning) > 0


# Run with: uv run pytest tests/integration -m integration

8. Module Exports (src/features/judge/__init__.py)

"""Judge feature - evidence quality assessment."""
from .models import JudgeAssessment, DrugCandidate, EvidenceQuality
from .handlers import JudgeHandler
from .prompts import JUDGE_SYSTEM_PROMPT, build_judge_user_prompt

__all__ = [
    "JudgeAssessment",
    "DrugCandidate",
    "EvidenceQuality",
    "JudgeHandler",
    "JUDGE_SYSTEM_PROMPT",
    "build_judge_user_prompt",
]

9. Implementation Checklist

  • Create src/features/judge/models.py with all Pydantic models
  • Create src/features/judge/prompts.py with prompt templates
  • Create src/features/judge/handlers.py with JudgeHandler
  • Create src/features/judge/__init__.py with exports
  • Write tests in tests/unit/features/judge/test_handler.py
  • Run uv run pytest tests/unit/features/judge/ -v β€” ALL TESTS MUST PASS
  • (Optional) Run integration test with real API key
  • Commit: git commit -m "feat: phase 3 judge slice complete"

10. Definition of Done

Phase 3 is COMPLETE when:

  1. βœ… All unit tests pass
  2. βœ… JudgeHandler returns valid JudgeAssessment objects
  3. βœ… Structured output is enforced (no raw JSON strings)
  4. βœ… Retry logic works (test by mocking transient failures)
  5. βœ… Can run this in Python REPL (with API key):
import asyncio
from src.features.judge.handlers import JudgeHandler
from src.features.search.models import Evidence, Citation

async def test():
    handler = JudgeHandler()
    evidence = [
        Evidence(
            content="Metformin shows neuroprotective properties...",
            citation=Citation(
                source="pubmed",
                title="Metformin Review",
                url="https://pubmed.ncbi.nlm.nih.gov/123/",
                date="2024",
            ),
        )
    ]
    result = await handler.assess("Can metformin treat Alzheimer's?", evidence)
    print(f"Sufficient: {result.sufficient}")
    print(f"Recommendation: {result.recommendation}")
    print(f"Reasoning: {result.reasoning}")

asyncio.run(test())

Proceed to Phase 4 ONLY after all checkboxes are complete.