jtlevine's picture
Add AI trigger explanations behind EXPLANATION_MODE flag
57d708b
"""
Claude-powered heat alert explanation generator.
When a heat risk trigger fires, enrolled workers need a plain-language
explanation of WHY, what it means for their payout, and what protective
actions to take. This module generates bilingual (English + Swahili)
explanations using Claude with RAG context, with a template-based
fallback if Claude is unavailable.
"""
from __future__ import annotations
import logging
import os
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Dict, Optional
from config import HEAT_THRESHOLDS, PAYOUT_PER_EVENT_USD, ZONE_MAP, UrbanZone
from src.explanation.knowledge_base import (
INSURANCE_PRODUCT_INFO,
SWAHILI_TERMS,
get_emergency_contacts,
get_protective_actions,
get_zone_context,
)
log = logging.getLogger(__name__)
# -- Data containers -------------------------------------------------------
@dataclass
class TriggerEvent:
"""A triggered insurance event to be explained."""
zone_id: str
trigger_level: str # critical, warning, watch
triggered_at: str # ISO timestamp
max_temp_c: float = 0.0
max_wbgt_c: float = 0.0
consecutive_days: int = 0
heat_risk_score: float = 0.0
contributing_factors: list[str] = field(default_factory=list)
@dataclass
class ExplanationResult:
"""Generated explanation for a trigger event."""
zone_id: str
trigger_level: str
english_text: str
swahili_text: str
payout_estimate: Dict[str, Any]
protective_actions: list[str]
emergency_contacts: Dict[str, str]
generated_at: str = field(
default_factory=lambda: datetime.now(timezone.utc).isoformat()
)
provider: str = "template" # claude or template
zone_name: str = ""
city: str = ""
zone_context: str = ""
tokens_used: int = 0
# -- Claude-based explainer ------------------------------------------------
class TriggerExplainer:
"""Generates bilingual heat alert explanations using Claude with RAG."""
def __init__(self, api_key: Optional[str] = None, model: str = "claude-haiku-4-5-20251001"):
self.api_key = api_key or os.environ.get("ANTHROPIC_API_KEY", "")
self.model = model
self._client = None
# Try to load hybrid RAG retriever; falls back to full context injection
try:
from src.explanation.rag_provider import HybridRetriever
self._retriever = HybridRetriever()
log.info("Hybrid RAG retriever loaded successfully")
except Exception as exc:
log.info("RAG retriever not available, using full context injection: %s", exc)
self._retriever = None
def _get_client(self):
if self._client is None:
try:
import anthropic
self._client = anthropic.AsyncAnthropic(api_key=self.api_key)
except Exception as exc:
log.warning("Could not init Anthropic client: %s", exc)
return self._client
async def explain(self, event, zone=None, basis_risk=None) -> ExplanationResult:
"""Generate a full explanation for a heat trigger event."""
# Support both TriggerEvent and HeatTriggerEvent dataclass inputs
zone_id = getattr(event, 'zone_id', '')
trigger_level = getattr(event, 'trigger_level', 'watch')
zone_obj = zone or ZONE_MAP.get(zone_id)
if zone_obj is None:
raise ValueError(f"Unknown zone: {zone_id}")
payout = _compute_payout(event, zone_obj)
actions = get_protective_actions(trigger_level)
contacts = get_emergency_contacts(zone_obj.city)
if self.api_key:
try:
english, swahili = await self._generate_claude(event, zone_obj, payout)
return ExplanationResult(
zone_id=zone_id,
trigger_level=trigger_level,
english_text=english,
swahili_text=swahili,
payout_estimate=payout,
protective_actions=actions,
emergency_contacts=contacts,
provider="claude",
zone_name=zone_obj.name,
city=zone_obj.city,
zone_context=get_zone_context(zone_id),
)
except Exception as exc:
log.warning("Claude explanation failed, using template: %s", exc)
english = _template_english(event, zone_obj, payout)
swahili = _template_swahili(event, zone_obj, payout)
return ExplanationResult(
zone_id=zone_id,
trigger_level=trigger_level,
english_text=english,
swahili_text=swahili,
payout_estimate=payout,
protective_actions=actions,
emergency_contacts=contacts,
provider="template",
zone_name=zone_obj.name,
city=zone_obj.city,
zone_context=get_zone_context(zone_id),
)
async def _generate_claude(
self, event, zone: UrbanZone, payout: Dict[str, Any]
) -> tuple[str, str]:
# Use hybrid RAG retrieval if available, otherwise fall back to full injection
if self._retriever:
query = (
f"{event.trigger_level} heat alert {zone.name} {zone.city} "
f"{zone.settlement_type} outdoor workers"
)
context_docs = self._retriever.retrieve(query, zone_id=event.zone_id)
context = "\n---\n".join(context_docs)
else:
context = get_zone_context(event.zone_id)
system = (
"You are a heat safety notification system for outdoor workers in East Africa. "
"Your audience is workers in markets, construction sites, and informal "
"settlements — many with limited formal education. Write clearly, simply, "
"and with empathy. Do not use jargon or technical abbreviations. "
"Explain what happened, what their payout is, and what they should do to "
"stay safe. Keep the explanation to 4-6 sentences."
)
max_temp = getattr(event, 'max_temp_c', 0)
max_wbgt = getattr(event, 'max_wbgt_c', 0)
consec = getattr(event, 'consecutive_days', getattr(event, 'consecutive_days_above', 0))
thresholds = HEAT_THRESHOLDS.get(event.trigger_level, {})
user = (
f"A {event.trigger_level.upper()} heat alert has been triggered for "
f"outdoor workers in {zone.name}, {zone.city}.\n\n"
f"What happened:\n"
f"- Maximum temperature: {max_temp:.1f}C "
f"(threshold: {thresholds.get('temp_c', 'N/A')}C)\n"
f"- Maximum WBGT (feels-like for workers): {max_wbgt:.1f}C "
f"(threshold: {thresholds.get('wbgt_c', 'N/A')}C)\n"
f"- Consecutive days above threshold: {consec} "
f"(required: {thresholds.get('consecutive_days', 'N/A')})\n"
f"- Settlement type: {zone.settlement_type}\n"
f"- Outdoor worker exposure: {zone.outdoor_exposure_pct:.0%}\n"
f"- Estimated workers affected: {zone.worker_population_est:,}\n\n"
f"Payout: {payout['currency_symbol']}{payout['amount']} per worker\n\n"
f"Knowledge base:\n{context}\n\n"
f"Write a 4-6 sentence explanation for the worker. Tell them what is "
f"happening with the heat, their payout amount, and the most important "
f"thing they should do right now to stay safe."
)
client = self._get_client()
if client is None:
raise RuntimeError("Anthropic client not available")
msg = await client.messages.create(
model=self.model,
max_tokens=500,
system=system,
messages=[{"role": "user", "content": user}],
)
english = msg.content[0].text.strip()
swahili = await self._translate_to_swahili(english, zone.name)
return english, swahili
async def _translate_to_swahili(self, english_text: str, zone_name: str) -> str:
system = (
"You are a professional translator specializing in Swahili (Kiswahili). "
"Translate the given English heat safety notification to simple, clear "
"Swahili that a non-technical person can understand. Keep numbers, "
"currency amounts, and proper nouns unchanged. Return only the translated text."
)
user = (
f"Translate this heat alert notification for workers in {zone_name} "
f"to Swahili:\n\n{english_text}"
)
client = self._get_client()
if client is None:
raise RuntimeError("Anthropic client not available")
msg = await client.messages.create(
model=self.model,
max_tokens=600,
system=system,
messages=[{"role": "user", "content": user}],
)
return msg.content[0].text.strip()
# -- Template-based fallback explainer (same interface) --------------------
class TemplateExplainer:
"""Template-based fallback when Claude is unavailable."""
async def explain(self, event, zone=None, basis_risk=None) -> ExplanationResult:
zone_id = getattr(event, 'zone_id', '')
trigger_level = getattr(event, 'trigger_level', 'watch')
zone_obj = zone or ZONE_MAP.get(zone_id)
if zone_obj is None:
raise ValueError(f"Unknown zone: {zone_id}")
payout = _compute_payout(event, zone_obj)
actions = get_protective_actions(trigger_level)
contacts = get_emergency_contacts(zone_obj.city)
english = _template_english(event, zone_obj, payout)
swahili = _template_swahili(event, zone_obj, payout)
return ExplanationResult(
zone_id=zone_id,
trigger_level=trigger_level,
english_text=english,
swahili_text=swahili,
payout_estimate=payout,
protective_actions=actions,
emergency_contacts=contacts,
provider="template",
zone_name=zone_obj.name,
city=zone_obj.city,
zone_context=get_zone_context(zone_id),
)
# -- Payout computation ---------------------------------------------------
def _compute_payout(event, zone: UrbanZone) -> Dict[str, Any]:
trigger_level = getattr(event, 'trigger_level', 'watch')
amount = PAYOUT_PER_EVENT_USD.get(trigger_level, 0)
return {
"amount": amount,
"currency": "USD",
"currency_symbol": "$",
"settlement_type": zone.settlement_type,
"trigger_level": trigger_level,
"delivery_method": "M-Pesa" if zone.settlement_type in ("informal", "mixed") else "Mobile money",
"expected_delivery": "Within 48 hours of trigger verification",
"is_payout": amount > 0,
"workers_covered": zone.worker_population_est,
"total_payout": amount * zone.worker_population_est,
}
# -- Template-based fallback explanations ----------------------------------
_LEVEL_DESCRIPTIONS = {
"critical": (
"A CRITICAL heat alert has been triggered",
"Onyo la HATARI la joto kali limetolewa",
),
"warning": (
"A WARNING heat alert has been issued",
"Onyo la joto limetolewa",
),
"watch": (
"A WATCH heat advisory has been issued",
"Tahadhari ya joto imetolewa",
),
}
def _template_english(event, zone: UrbanZone, payout: Dict[str, Any]) -> str:
trigger_level = getattr(event, 'trigger_level', 'watch')
level_en, _ = _LEVEL_DESCRIPTIONS.get(trigger_level, ("A heat alert has been issued", ""))
max_temp = getattr(event, 'max_temp_c', 0)
max_wbgt = getattr(event, 'max_wbgt_c', 0)
consec = getattr(event, 'consecutive_days', getattr(event, 'consecutive_days_above', 0))
lines = [f"{level_en} for outdoor workers in {zone.name}, {zone.city}."]
thresholds = HEAT_THRESHOLDS.get(trigger_level, {})
reasons: list[str] = []
if max_temp >= thresholds.get("temp_c", 999):
reasons.append(
f"temperatures reached {max_temp:.0f}C, above the "
f"{thresholds['temp_c']}C safety threshold"
)
if max_wbgt >= thresholds.get("wbgt_c", 999):
reasons.append(
f"the heat-humidity index (WBGT) reached {max_wbgt:.0f}C, "
f"above the {thresholds['wbgt_c']}C danger level"
)
if consec >= thresholds.get("consecutive_days", 999):
reasons.append(
f"these dangerous conditions lasted {consec} consecutive days"
)
if reasons:
lines.append("This was triggered because " + " and ".join(reasons) + ".")
else:
lines.append("Heat conditions have exceeded the safety threshold for your area.")
if payout["is_payout"]:
lines.append(
f"Your payout is {payout['currency_symbol']}{payout['amount']} per worker, "
f"which will be sent via {payout['delivery_method']} within 48 hours."
)
actions = get_protective_actions(trigger_level)
if actions:
lines.append(f"Most important: {actions[0]}")
return " ".join(lines)
def _template_swahili(event, zone: UrbanZone, payout: Dict[str, Any]) -> str:
trigger_level = getattr(event, 'trigger_level', 'watch')
_, level_sw = _LEVEL_DESCRIPTIONS.get(trigger_level, ("", "Onyo la joto limetolewa"))
max_temp = getattr(event, 'max_temp_c', 0)
consec = getattr(event, 'consecutive_days', getattr(event, 'consecutive_days_above', 0))
lines = [f"{level_sw} kwa wafanyakazi wa nje katika {zone.name}, {zone.city}."]
lines.append(
f"Joto limefika {max_temp:.0f}C kwa siku {consec} mfululizo."
)
if payout["is_payout"]:
lines.append(
f"Malipo yako ni {payout['currency_symbol']}{payout['amount']} kwa kila mfanyakazi. "
f"Utapokea kupitia {payout['delivery_method']} ndani ya masaa 48."
)
if trigger_level == "critical":
lines.append(
"Acha kazi za nje sasa hivi. Nenda kivulini au ndani ya nyumba. "
"Kunywa maji mengi."
)
elif trigger_level == "warning":
lines.append(
"Punguza kazi za nje. Fanya kazi nzito asubuhi mapema au jioni. "
"Pumzika kivulini kila saa."
)
else:
lines.append(
"Kuwa makini. Kunywa maji zaidi. Panga kazi nzito kwa masaa ya baridi."
)
return " ".join(lines)