MultiCountryRAG / core /human_approval_node.py
SAAHMATHWORKS
ready for hugging face space
f37bf1d
# core/human_approval_node.py
import logging
from typing import Literal
from langchain_core.runnables import RunnableConfig
from langgraph.types import interrupt, Command
from models.state_models import MultiCountryLegalState
from core.assistance.email_service import AssistanceEmailService
from datetime import datetime
logger = logging.getLogger(__name__)
class HumanApprovalNode:
def __init__(self):
self.email_service = AssistanceEmailService()
async def process_approval(
self,
state: MultiCountryLegalState,
config: RunnableConfig
) -> Command[Literal["process_assistance", "response"]]:
"""Process human approval with interrupt - uses Command(goto=...) pattern"""
# Validate required fields BEFORE interrupt
if not state.user_email or not state.assistance_description:
logger.warning("Missing required fields for approval")
return Command(
goto="response",
update={
"approval_status": "error",
"approval_reason": "Données incomplètes",
"messages": [{
"role": "assistant",
"content": "❌ Données incomplètes pour l'approbation.",
"meta": {}
}]
}
)
logger.info(f"🔒 Human approval node triggered for {state.user_email}")
# Prepare interrupt message
interrupt_message = self._format_approval_request(state)
# 🔥 CRITICAL: DO NOT wrap interrupt() in try-except!
# Let GraphInterrupt propagate naturally to the graph executor
moderator_input = interrupt({
"type": "human_approval",
"user_email": state.user_email,
"country": self._get_country_display(state),
"description": state.assistance_description,
"message": interrupt_message
})
# 🎯 Code below ONLY executes after graph resumes from interrupt
logger.info(f"🔥 Received moderator input: {moderator_input}")
# Parse moderator decision
decision = self._parse_decision(moderator_input)
# Handle approval or rejection
if decision["approved"]:
return await self._handle_approval(state, decision)
else:
return await self._handle_rejection(state, decision)
async def _handle_approval(
self,
state: MultiCountryLegalState,
decision: dict
) -> Command[Literal["process_assistance"]]:
"""Handle approved request - routes to process_assistance"""
logger.info(f"✅ Request APPROVED for {state.user_email}")
# Send email
email_result = self.email_service.send_assistance_request(
user_email=state.user_email,
user_query=state.last_search_query or "Demande d'assistance",
assistance_description=state.assistance_description,
country=self._get_country_display(state)
)
logger.info(f"✅ Emails envoyés avec succès pour {state.user_email}")
return Command(
goto="process_assistance",
update={
"approval_status": "approved",
"approval_reason": decision["reason"],
"approved_by": decision["moderator_id"],
"approval_timestamp": datetime.now().isoformat(),
"email_status": "sent" if email_result.get("success") else "error",
"email_result": email_result
}
)
async def _handle_rejection(
self,
state: MultiCountryLegalState,
decision: dict
) -> Command[Literal["response"]]:
"""Handle rejected request - routes directly to response"""
logger.info(f"❌ Request REJECTED for {state.user_email}")
message_content = f"""❌ **DEMANDE REFUSÉE**
Votre demande d'assistance n'a pas été approuvée.
**Raison:** {decision['reason']}
Si vous pensez qu'il s'agit d'une erreur, veuillez reformuler votre demande avec plus de détails.
"""
return Command(
goto="response",
update={
"approval_status": "rejected",
"approval_reason": decision["reason"],
"approved_by": decision["moderator_id"],
"approval_timestamp": datetime.now().isoformat(),
"assistance_step": "completed",
"messages": [{
"role": "assistant",
"content": message_content,
"meta": {"approval": "rejected"}
}]
}
)
def _format_approval_request(self, state: MultiCountryLegalState) -> str:
"""Format the approval request message"""
return f"""
🔒 **APPROBATION HUMAINE REQUISE**
📧 **Email:** {state.user_email}
🌍 **Pays:** {self._get_country_display(state)}
📝 **Description:** {state.assistance_description}
🔍 **Requête initiale:** {state.last_search_query or 'Non spécifiée'}
**Instructions:**
- Tapez "approve [raison]" pour approuver
- Tapez "reject [raison]" pour rejeter
**Exemples:**
- "approve Demande légitime"
- "reject Email invalide"
"""
def _parse_decision(self, user_input: str) -> dict:
"""Parse moderator decision from input"""
if not user_input or not isinstance(user_input, str):
return {
"approved": False,
"reason": "Input invalide",
"moderator_id": "system"
}
input_lower = user_input.lower().strip()
# Check for approval keywords
approve_keywords = ["approve", "approuver", "oui", "yes", "ok", "accept"]
is_approved = any(kw in input_lower for kw in approve_keywords)
# Extract reason (text after the decision keyword)
reason = user_input.strip()
for keyword in approve_keywords + ["reject", "rejeter", "non", "no"]:
if keyword in input_lower:
parts = user_input.split(keyword, 1)
if len(parts) > 1 and parts[1].strip():
reason = parts[1].strip()
break
if not reason or reason == user_input:
reason = "Approuvé par modérateur" if is_approved else "Refusé par modérateur"
return {
"approved": is_approved,
"reason": reason,
"moderator_id": "human_moderator"
}
def _get_country_display(self, state: MultiCountryLegalState) -> str:
"""Get country display name"""
country = state.country or state.legal_context.get("detected_country", "unknown")
if country == "unknown" and state.assistance_description:
country = MultiCountryLegalState.detect_country(state.assistance_description)
country_map = {
"benin": "Bénin",
"madagascar": "Madagascar"
}
logger.debug(f"Country from state: {state.country}, legal_context: {state.legal_context.get('detected_country')}, description: {country}")
return country_map.get(country, "Non spécifié")