VibecoderMcSwaggins's picture
fix(coderabbit): Address all CodeRabbit review findings
4bfa475
raw
history blame
27.9 kB
"""Judge handler for evidence assessment using PydanticAI."""
import asyncio
import json
import os
from functools import partial
from typing import Any, ClassVar
import structlog
from huggingface_hub import InferenceClient
from pydantic_ai import Agent
from pydantic_ai.models.huggingface import HuggingFaceModel
from pydantic_ai.models.openai import OpenAIChatModel
from pydantic_ai.providers.huggingface import HuggingFaceProvider
from pydantic_ai.providers.openai import OpenAIProvider
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential
from src.config.domain import ResearchDomain
from src.prompts.judge import (
format_empty_evidence_prompt,
format_user_prompt,
get_system_prompt,
select_evidence_for_judge,
)
from src.utils.config import settings
from src.utils.models import AssessmentDetails, Evidence, JudgeAssessment
logger = structlog.get_logger()
def _extract_titles_from_evidence(
evidence: list[Evidence], max_items: int = 5, fallback_message: str | None = None
) -> list[str]:
"""Extract truncated titles from evidence for fallback display.
Args:
evidence: List of evidence items
max_items: Maximum number of titles to extract
fallback_message: Message to return if no evidence provided
Returns:
List of truncated titles (max 150 chars each)
"""
findings = []
for e in evidence[:max_items]:
title = e.citation.title
if len(title) > 150:
title = title[:147] + "..."
findings.append(title)
if not findings and fallback_message:
return [fallback_message]
return findings
def get_model(api_key: str | None = None) -> Any:
"""Get the LLM model based on available API keys.
Priority order:
1. BYOK api_key parameter (auto-detects provider from prefix)
2. OpenAI (if OPENAI_API_KEY set in env)
3. HuggingFace (free fallback)
Args:
api_key: Optional BYOK key. Auto-detects provider from prefix:
- "sk-ant-..." β†’ Anthropic (NOT SUPPORTED - raises error)
- "sk-..." β†’ OpenAI
- Other β†’ Falls through to env vars
Raises:
NotImplementedError: If Anthropic key detected (no embeddings support).
Note: Anthropic is NOT supported because it lacks embeddings API.
See P3_REMOVE_ANTHROPIC_PARTIAL_WIRING.md.
"""
# Priority 1: BYOK - Auto-detect provider from key prefix
if api_key:
if api_key.startswith("sk-ant-"):
# Anthropic not supported - no embeddings API
raise NotImplementedError(
"Anthropic is not supported (no embeddings API). "
"Use OpenAI key (sk-...) or leave empty for free HuggingFace tier."
)
if api_key.startswith("sk-"):
# OpenAI BYOK
openai_provider = OpenAIProvider(api_key=api_key)
return OpenAIChatModel(settings.openai_model, provider=openai_provider)
# Priority 2: OpenAI from env (most common, best tool calling)
if settings.has_openai_key:
openai_provider = OpenAIProvider(api_key=settings.openai_api_key)
return OpenAIChatModel(settings.openai_model, provider=openai_provider)
# Priority 3: HuggingFace (free fallback)
# Use 7B model to stay on HuggingFace native infrastructure (avoid Novita 500s)
model_name = settings.huggingface_model or "Qwen/Qwen2.5-7B-Instruct"
# Try settings.hf_token first, then fall back to HF_TOKEN env var
# HuggingFaceProvider requires a token - it won't work without one
hf_token = settings.hf_token or os.environ.get("HF_TOKEN")
if hf_token:
hf_provider = HuggingFaceProvider(api_key=hf_token)
return HuggingFaceModel(model_name, provider=hf_provider)
# No HF token available - raise clear error
raise RuntimeError(
"No LLM API key available. Either:\n"
" 1. Set OPENAI_API_KEY for premium tier, or\n"
" 2. Set HF_TOKEN for free HuggingFace tier\n"
"Get a free HF token at: https://huggingface.co/settings/tokens"
)
class JudgeHandler:
"""
Handles evidence assessment using an LLM with structured output.
Uses PydanticAI to ensure responses match the JudgeAssessment schema.
"""
def __init__(
self,
model: Any = None,
domain: ResearchDomain | str | None = None,
api_key: str | None = None,
) -> None:
"""
Initialize the JudgeHandler.
Args:
model: Optional PydanticAI model. If None, uses config default.
domain: Research domain for prompt customization.
api_key: Optional BYOK key (auto-detects provider from prefix).
"""
self.model = model or get_model(api_key=api_key)
self.domain = domain
self.agent = Agent(
model=self.model,
output_type=JudgeAssessment,
system_prompt=get_system_prompt(domain),
retries=3,
)
async def assess(
self,
question: str,
evidence: list[Evidence],
iteration: int = 0,
max_iterations: int = 10,
) -> JudgeAssessment:
"""
Assess evidence and determine if it's sufficient.
Args:
question: The user's research question
evidence: List of Evidence objects from search
iteration: Current iteration number
max_iterations: Maximum allowed iterations
Returns:
JudgeAssessment with evaluation results
Raises:
JudgeError: If assessment fails after retries
"""
logger.info(
"Starting evidence assessment",
question=question[:100],
evidence_count=len(evidence),
iteration=iteration,
domain=self.domain,
)
# Format the prompt based on whether we have evidence
if evidence:
# Select diverse evidence using embeddings (if available)
selected_evidence = await select_evidence_for_judge(evidence, question)
user_prompt = format_user_prompt(
question,
selected_evidence,
iteration,
max_iterations,
total_evidence_count=len(evidence),
domain=self.domain,
)
else:
user_prompt = format_empty_evidence_prompt(question)
try:
# Run the agent with structured output
result = await self.agent.run(user_prompt)
assessment = result.output
logger.info(
"Assessment complete",
sufficient=assessment.sufficient,
recommendation=assessment.recommendation,
confidence=assessment.confidence,
)
return assessment
except Exception as e:
# Log with context for debugging
logger.error(
"Assessment failed",
error=str(e),
exc_type=type(e).__name__,
evidence_count=len(evidence),
)
# Return a safe default assessment on failure
return self._create_fallback_assessment(question, str(e))
def _create_fallback_assessment(
self,
question: str,
error: str,
) -> JudgeAssessment:
"""
Create a fallback assessment when LLM fails.
Args:
question: The original question
error: The error message
Returns:
Safe fallback JudgeAssessment
"""
return JudgeAssessment(
details=AssessmentDetails(
mechanism_score=0,
mechanism_reasoning="Assessment failed due to LLM error",
clinical_evidence_score=0,
clinical_reasoning="Assessment failed due to LLM error",
drug_candidates=[],
key_findings=[],
),
sufficient=False,
confidence=0.0,
recommendation="continue",
next_search_queries=[
f"{question} mechanism",
f"{question} clinical trials",
f"{question} drug candidates",
],
reasoning=f"Assessment failed: {error}. Recommend retrying with refined queries.",
)
class HFInferenceJudgeHandler:
"""
JudgeHandler using HuggingFace Inference API for FREE LLM calls.
Defaults to Llama-3.1-8B-Instruct (requires HF_TOKEN) or falls back to public models.
Important: Handler Instance Lifecycle
-------------------------------------
This handler maintains query-scoped state (consecutive_failures, last_question).
Create a NEW instance per research query to avoid state leakage between users.
In the current architecture (app.py), a new handler is created per Gradio request,
so this is safe. However, if refactoring to share handlers across requests (e.g.,
connection pooling), the state management would need to be redesigned.
See CodeRabbit review PR #104 for details on this architectural consideration.
"""
FALLBACK_MODELS: ClassVar[list[str]] = [
"meta-llama/Llama-3.1-8B-Instruct", # Primary (Gated)
"mistralai/Mistral-7B-Instruct-v0.3", # Secondary
"HuggingFaceH4/zephyr-7b-beta", # Fallback (Ungated)
]
# Force synthesis after N consecutive failures to prevent infinite loops
# Rationale: 3 models x 3 retries each = 9 total API attempts before circuit break
MAX_CONSECUTIVE_FAILURES: ClassVar[int] = 3
def __init__(
self,
model_id: str | None = None,
domain: ResearchDomain | str | None = None,
) -> None:
"""
Initialize with HF Inference client.
Args:
model_id: Optional specific model ID. If None, uses FALLBACK_MODELS chain.
domain: Research domain for prompt customization.
"""
self.model_id = model_id
self.domain = domain
# Will automatically use HF_TOKEN from env if available
self.client = InferenceClient()
self.call_count = 0
self.consecutive_failures = 0 # Track failures to prevent infinite loops
self.last_question: str | None = None
self.last_evidence: list[Evidence] | None = None
async def assess(
self,
question: str,
evidence: list[Evidence],
iteration: int = 0,
max_iterations: int = 10,
) -> JudgeAssessment:
"""
Assess evidence using HuggingFace Inference API.
Attempts models in order until one succeeds.
After MAX_CONSECUTIVE_FAILURES, forces synthesis to prevent infinite loops.
"""
self.call_count += 1
# Session-based reset: new question = new research session = reset failures
# Prevents failure state from leaking across different user queries
if question != self.last_question and self.last_question is not None:
self.consecutive_failures = 0
self.last_question = question
self.last_evidence = evidence
# BUG FIX: After N consecutive failures, force synthesis to break infinite loop
if self.consecutive_failures >= self.MAX_CONSECUTIVE_FAILURES:
logger.warning(
"Max consecutive failures reached, forcing synthesis",
failures=self.consecutive_failures,
evidence_count=len(evidence),
)
return self._create_forced_synthesis_assessment(question, evidence)
# Format the user prompt
if evidence:
selected_evidence = await select_evidence_for_judge(evidence, question)
user_prompt = format_user_prompt(
question,
selected_evidence,
iteration,
max_iterations,
total_evidence_count=len(evidence),
domain=self.domain,
)
else:
user_prompt = format_empty_evidence_prompt(question)
models_to_try: list[str] = [self.model_id] if self.model_id else self.FALLBACK_MODELS
last_error: Exception | None = None
for model in models_to_try:
try:
result = await self._call_with_retry(model, user_prompt, question)
self.consecutive_failures = 0 # Reset on success
return result
except Exception as e:
# Check for 402/Quota AND 429/Rate-limit errors to fail fast
# (CodeRabbit review: added 429 handling)
error_str = str(e)
if any(
indicator in error_str.lower()
for indicator in [
"402",
"quota",
"payment required",
"429",
"rate limit",
"too many requests",
]
):
logger.error("HF API limit reached", error=error_str)
return self._create_quota_exhausted_assessment(question, evidence)
logger.warning("Model failed", model=model, error=str(e))
last_error = e
continue
# All models failed - increment failure counter
self.consecutive_failures += 1
logger.error(
"All HF models failed",
error=str(last_error),
consecutive_failures=self.consecutive_failures,
)
return self._create_fallback_assessment(question, str(last_error))
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=4),
retry=retry_if_exception_type(Exception),
reraise=True,
)
async def _call_with_retry(self, model: str, prompt: str, question: str) -> JudgeAssessment:
"""Make API call with retry logic using chat_completion."""
loop = asyncio.get_running_loop()
system_prompt = get_system_prompt(self.domain)
# Build messages for chat_completion (model-agnostic)
messages = [
{
"role": "system",
"content": f"""{system_prompt}
IMPORTANT: Respond with ONLY valid JSON matching this schema:
{{
"details": {{
"mechanism_score": <int 0-10>,
"mechanism_reasoning": "<string>",
"clinical_evidence_score": <int 0-10>,
"clinical_reasoning": "<string>",
"drug_candidates": ["<string>", ...],
"key_findings": ["<string>", ...]
}},
"sufficient": <bool>,
"confidence": <float 0-1>,
"recommendation": "continue" | "synthesize",
"next_search_queries": ["<string>", ...],
"reasoning": "<string>"
}}""",
},
{"role": "user", "content": prompt},
]
# Use chat_completion (conversational task - supported by all models)
response = await loop.run_in_executor(
None,
lambda: self.client.chat_completion(
messages=messages,
model=model,
max_tokens=1024,
temperature=0.1,
),
)
# Extract content from response
content = response.choices[0].message.content
if not content:
raise ValueError("Empty response from model")
# Extract and parse JSON
json_data = self._extract_json(content)
if not json_data:
raise ValueError("No valid JSON found in response")
return JudgeAssessment(**json_data)
def _extract_json(self, text: str) -> dict[str, Any] | None:
"""
Robust JSON extraction that handles markdown blocks and nested braces.
"""
text = text.strip()
# Remove markdown code blocks if present (with bounds checking)
if "```json" in text:
parts = text.split("```json", 1)
if len(parts) > 1:
inner_parts = parts[1].split("```", 1)
text = inner_parts[0]
elif "```" in text:
parts = text.split("```", 1)
if len(parts) > 1:
inner_parts = parts[1].split("```", 1)
text = inner_parts[0]
text = text.strip()
# Find first '{'
start_idx = text.find("{")
if start_idx == -1:
return None
# Stack-based parsing ignoring chars in strings
count = 0
in_string = False
escape = False
for i, char in enumerate(text[start_idx:], start=start_idx):
if in_string:
if escape:
escape = False
elif char == "\\":
escape = True
elif char == '"':
in_string = False
elif char == '"':
in_string = True
elif char == "{":
count += 1
elif char == "}":
count -= 1
if count == 0:
try:
result = json.loads(text[start_idx : i + 1])
if isinstance(result, dict):
return result
return None
except json.JSONDecodeError:
return None
return None
def _create_quota_exhausted_assessment(
self,
question: str,
evidence: list[Evidence],
) -> JudgeAssessment:
"""Create an assessment that stops the loop when quota is exhausted."""
findings = _extract_titles_from_evidence(
evidence,
max_items=5,
fallback_message="No findings available (Quota exceeded and no search results).",
)
return JudgeAssessment(
details=AssessmentDetails(
mechanism_score=0,
mechanism_reasoning="Free tier quota exhausted. Unable to analyze mechanism.",
clinical_evidence_score=0,
clinical_reasoning=(
"Free tier quota exhausted. Unable to analyze clinical evidence."
),
drug_candidates=["Upgrade to paid API for drug extraction."],
key_findings=findings,
),
sufficient=True, # STOP THE LOOP
confidence=0.0,
recommendation="synthesize",
next_search_queries=[],
reasoning=(
"⚠️ **Free Tier Quota Exceeded** ⚠️\n\n"
"The HuggingFace Inference API free tier limit has been reached. "
"The search results listed below were retrieved but could not be "
"analyzed by the AI. "
"Please try again later, or add an OpenAI API key above "
"for unlimited access."
),
)
def _create_forced_synthesis_assessment(
self,
question: str,
evidence: list[Evidence],
) -> JudgeAssessment:
"""Force synthesis after repeated failures to prevent infinite loops."""
findings = _extract_titles_from_evidence(
evidence,
max_items=5,
fallback_message="No findings available (API failures prevented analysis).",
)
return JudgeAssessment(
details=AssessmentDetails(
mechanism_score=0,
mechanism_reasoning="AI analysis unavailable after repeated API failures.",
clinical_evidence_score=0,
clinical_reasoning="AI analysis unavailable after repeated API failures.",
drug_candidates=["AI analysis required for drug identification."],
key_findings=findings,
),
sufficient=True, # FORCE STOP
confidence=0.1,
recommendation="synthesize",
next_search_queries=[],
reasoning=(
f"⚠️ **HF Inference Unavailable** ⚠️\n\n"
f"The free tier AI service failed {self.MAX_CONSECUTIVE_FAILURES} times. "
f"Search found {len(evidence)} sources (listed below) but they could not "
"be analyzed by AI.\n\n"
"**Options:**\n"
"- Add an OpenAI API key for reliable analysis\n"
"- Try again later when HF Inference is available\n"
"- Review the raw search results below"
),
)
def _create_fallback_assessment(
self,
question: str,
error: str,
) -> JudgeAssessment:
"""Create a fallback assessment when inference fails."""
return JudgeAssessment(
details=AssessmentDetails(
mechanism_score=0,
mechanism_reasoning=f"Assessment failed: {error}",
clinical_evidence_score=0,
clinical_reasoning=f"Assessment failed: {error}",
drug_candidates=[],
key_findings=[],
),
sufficient=False,
confidence=0.0,
recommendation="continue",
next_search_queries=[
f"{question} mechanism",
f"{question} clinical trials",
f"{question} drug candidates",
],
reasoning=f"HF Inference failed: {error}. Recommend configuring OpenAI API key.",
)
async def synthesize(self, system_prompt: str, user_prompt: str) -> str:
"""
Synthesize a research report using free HuggingFace Inference.
Uses the same chat_completion API as judging, so Free Tier gets
consistent behavior across judge AND synthesis.
Returns:
Narrative text if successful.
Raises:
SynthesisError: If all models fail, with context about what was tried.
"""
from src.utils.exceptions import SynthesisError
loop = asyncio.get_running_loop()
models_to_try = [self.model_id] if self.model_id else self.FALLBACK_MODELS
errors: list[str] = []
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
for model in models_to_try:
try:
logger.info("HF synthesis attempt", model=model)
response = await loop.run_in_executor(
None,
partial(
self.client.chat_completion,
messages=messages,
model=model,
max_tokens=2048, # Longer for synthesis
temperature=0.7, # More creative for narrative
),
)
content = response.choices[0].message.content
if content and len(content.strip()) > 50:
logger.info("HF synthesis success", model=model, chars=len(content))
return content.strip()
# Response too short - log and try next model
length = len(content.strip()) if content else 0
errors.append(f"{model}: Response too short ({length} chars)")
logger.warning("HF synthesis response too short", model=model, length=length)
except Exception as e:
errors.append(f"{model}: {e!s}")
logger.warning("HF synthesis model failed", model=model, error=str(e))
continue
logger.error("All HF synthesis models failed", models=models_to_try, errors=errors)
raise SynthesisError(
"All HuggingFace synthesis models failed",
attempted_models=models_to_try,
errors=errors,
)
class MockJudgeHandler:
"""
Mock JudgeHandler for demo mode without LLM calls.
Extracts meaningful information from real search results
to provide a useful demo experience without requiring API keys.
"""
def __init__(
self,
mock_response: JudgeAssessment | None = None,
domain: ResearchDomain | str | None = None,
) -> None:
"""
Initialize with optional mock response.
Args:
mock_response: The assessment to return. If None, extracts from evidence.
domain: Research domain (ignored in mock but kept for interface compatibility).
"""
self.mock_response = mock_response
self.domain = domain
self.call_count = 0
self.last_question: str | None = None
self.last_evidence: list[Evidence] | None = None
def _extract_key_findings(self, evidence: list[Evidence], max_findings: int = 5) -> list[str]:
"""Extract key findings from evidence titles."""
# Helper guarantees non-empty list when fallback_message is provided
return _extract_titles_from_evidence(
evidence,
max_items=max_findings,
fallback_message="No specific findings extracted (demo mode)",
)
def _extract_drug_candidates(self, question: str, evidence: list[Evidence]) -> list[str]:
"""Extract drug candidates - demo mode returns honest message."""
# Don't attempt heuristic extraction - it produces garbage like "Oral", "Kidney"
# Real drug extraction requires LLM analysis
return [
"Drug identification requires AI analysis",
"Enter API key above for full results",
]
async def assess(
self,
question: str,
evidence: list[Evidence],
iteration: int = 0,
max_iterations: int = 10,
) -> JudgeAssessment:
"""Return assessment based on actual evidence (demo mode)."""
self.call_count += 1
self.last_question = question
self.last_evidence = evidence
if self.mock_response:
return self.mock_response
min_evidence = 3
evidence_count = len(evidence)
# Extract meaningful data from actual evidence
drug_candidates = self._extract_drug_candidates(question, evidence)
key_findings = self._extract_key_findings(evidence)
# Calculate scores based on evidence quantity
mechanism_score = min(10, evidence_count * 2) if evidence_count > 0 else 0
clinical_score = min(10, evidence_count) if evidence_count > 0 else 0
return JudgeAssessment(
details=AssessmentDetails(
mechanism_score=mechanism_score,
mechanism_reasoning=(
f"Demo mode: Found {evidence_count} sources. "
"Configure LLM API key for detailed mechanism analysis."
),
clinical_evidence_score=clinical_score,
clinical_reasoning=(
f"Demo mode: {evidence_count} sources retrieved from PubMed, "
"ClinicalTrials.gov, and Europe PMC. Full analysis requires LLM API key."
),
drug_candidates=drug_candidates,
key_findings=key_findings,
),
sufficient=evidence_count >= min_evidence,
confidence=min(0.5, evidence_count * 0.1) if evidence_count > 0 else 0.0,
recommendation="synthesize" if evidence_count >= min_evidence else "continue",
next_search_queries=(
[f"{question} mechanism", f"{question} clinical trials"]
if evidence_count < min_evidence
else []
),
reasoning=(
f"Demo mode assessment based on {evidence_count} real search results. "
"For AI-powered analysis with drug candidate identification and "
"evidence synthesis, configure OPENAI_API_KEY."
),
)