# core/human_approval_node.py import logging from typing import Literal from langchain_core.runnables import RunnableConfig from langgraph.types import interrupt, Command from models.state_models import MultiCountryLegalState from core.assistance.email_service import AssistanceEmailService from datetime import datetime logger = logging.getLogger(__name__) class HumanApprovalNode: def __init__(self): self.email_service = AssistanceEmailService() async def process_approval( self, state: MultiCountryLegalState, config: RunnableConfig ) -> Command[Literal["process_assistance", "response"]]: """Process human approval with interrupt - uses Command(goto=...) pattern""" # Validate required fields BEFORE interrupt if not state.user_email or not state.assistance_description: logger.warning("Missing required fields for approval") return Command( goto="response", update={ "approval_status": "error", "approval_reason": "Données incomplètes", "messages": [{ "role": "assistant", "content": "❌ Données incomplètes pour l'approbation.", "meta": {} }] } ) logger.info(f"🔒 Human approval node triggered for {state.user_email}") # Prepare interrupt message interrupt_message = self._format_approval_request(state) # 🔥 CRITICAL: DO NOT wrap interrupt() in try-except! # Let GraphInterrupt propagate naturally to the graph executor moderator_input = interrupt({ "type": "human_approval", "user_email": state.user_email, "country": self._get_country_display(state), "description": state.assistance_description, "message": interrupt_message }) # 🎯 Code below ONLY executes after graph resumes from interrupt logger.info(f"🔥 Received moderator input: {moderator_input}") # Parse moderator decision decision = self._parse_decision(moderator_input) # Handle approval or rejection if decision["approved"]: return await self._handle_approval(state, decision) else: return await self._handle_rejection(state, decision) async def _handle_approval( self, state: MultiCountryLegalState, decision: dict ) -> Command[Literal["process_assistance"]]: """Handle approved request - routes to process_assistance""" logger.info(f"✅ Request APPROVED for {state.user_email}") # Send email email_result = self.email_service.send_assistance_request( user_email=state.user_email, user_query=state.last_search_query or "Demande d'assistance", assistance_description=state.assistance_description, country=self._get_country_display(state) ) logger.info(f"✅ Emails envoyés avec succès pour {state.user_email}") return Command( goto="process_assistance", update={ "approval_status": "approved", "approval_reason": decision["reason"], "approved_by": decision["moderator_id"], "approval_timestamp": datetime.now().isoformat(), "email_status": "sent" if email_result.get("success") else "error", "email_result": email_result } ) async def _handle_rejection( self, state: MultiCountryLegalState, decision: dict ) -> Command[Literal["response"]]: """Handle rejected request - routes directly to response""" logger.info(f"❌ Request REJECTED for {state.user_email}") message_content = f"""❌ **DEMANDE REFUSÉE** Votre demande d'assistance n'a pas été approuvée. **Raison:** {decision['reason']} Si vous pensez qu'il s'agit d'une erreur, veuillez reformuler votre demande avec plus de détails. """ return Command( goto="response", update={ "approval_status": "rejected", "approval_reason": decision["reason"], "approved_by": decision["moderator_id"], "approval_timestamp": datetime.now().isoformat(), "assistance_step": "completed", "messages": [{ "role": "assistant", "content": message_content, "meta": {"approval": "rejected"} }] } ) def _format_approval_request(self, state: MultiCountryLegalState) -> str: """Format the approval request message""" return f""" 🔒 **APPROBATION HUMAINE REQUISE** 📧 **Email:** {state.user_email} 🌍 **Pays:** {self._get_country_display(state)} 📝 **Description:** {state.assistance_description} 🔍 **Requête initiale:** {state.last_search_query or 'Non spécifiée'} **Instructions:** - Tapez "approve [raison]" pour approuver - Tapez "reject [raison]" pour rejeter **Exemples:** - "approve Demande légitime" - "reject Email invalide" """ def _parse_decision(self, user_input: str) -> dict: """Parse moderator decision from input""" if not user_input or not isinstance(user_input, str): return { "approved": False, "reason": "Input invalide", "moderator_id": "system" } input_lower = user_input.lower().strip() # Check for approval keywords approve_keywords = ["approve", "approuver", "oui", "yes", "ok", "accept"] is_approved = any(kw in input_lower for kw in approve_keywords) # Extract reason (text after the decision keyword) reason = user_input.strip() for keyword in approve_keywords + ["reject", "rejeter", "non", "no"]: if keyword in input_lower: parts = user_input.split(keyword, 1) if len(parts) > 1 and parts[1].strip(): reason = parts[1].strip() break if not reason or reason == user_input: reason = "Approuvé par modérateur" if is_approved else "Refusé par modérateur" return { "approved": is_approved, "reason": reason, "moderator_id": "human_moderator" } def _get_country_display(self, state: MultiCountryLegalState) -> str: """Get country display name""" country = state.country or state.legal_context.get("detected_country", "unknown") if country == "unknown" and state.assistance_description: country = MultiCountryLegalState.detect_country(state.assistance_description) country_map = { "benin": "Bénin", "madagascar": "Madagascar" } logger.debug(f"Country from state: {state.country}, legal_context: {state.legal_context.get('detected_country')}, description: {country}") return country_map.get(country, "Non spécifié")