#!/usr/bin/env python3 """ Multi-Country Legal RAG System - Interactive Testing Mode with Human-in-the-Loop Support """ import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).parent)) import asyncio import logging import time from datetime import datetime from typing import Optional from core.system_initializer import setup_system # Setup comprehensive logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler('legal_rag_system.log', mode='a') ] ) logger = logging.getLogger(__name__) class LegalRAGTester: """ Interactive tester for the Legal RAG system with support for: - Human-in-the-loop interrupts - Session management - Statistics tracking - Assistance workflow testing """ def __init__(self): self.system = None self.chat_manager = None self.session_id = f"test_{datetime.now().strftime('%Y%m%d_%H%M%S')}" self.message_count = 0 async def initialize(self): """Initialize the Legal RAG system""" print("🚀 Initializing Legal RAG System...") try: self.system = await setup_system() self.chat_manager = self.system["chat_manager"] print("✅ System initialized successfully!") # Print system info info = self.chat_manager.get_checkpointer_info() stats = self.chat_manager.get_global_stats() print(f"📊 Checkpointer: {info['type']} ({info['description']})") print(f"💾 Persistent: {info['persistent']}") print(f"🎯 Session ID: {self.session_id}") print(f"🌍 Available countries: Bénin, Madagascar") return True except Exception as e: logger.exception("Failed to initialize system") print(f"❌ Initialization failed: {e}") return False def _handle_interrupt(self, interrupt_value: dict) -> str: """ Handle human-in-the-loop interrupts synchronously. This is called when the graph needs human approval. Args: interrupt_value: Interrupt data containing message and context Returns: Moderator's decision (approve/reject) """ print("\n" + "="*70) print("🔒 HUMAN APPROVAL REQUIRED") print("="*70) # Extract interrupt information message = interrupt_value.get("message", "") email = interrupt_value.get("user_email", "N/A") country = interrupt_value.get("country", "N/A") description = interrupt_value.get("description", "N/A") # Display formatted approval request print(message) print() # Get moderator decision with validation while True: moderator_input = input("🔐 Moderator Decision: ").strip() if not moderator_input: print("⚠️ Please provide a decision (approve/reject)") continue # Validate input input_lower = moderator_input.lower() if any(keyword in input_lower for keyword in ["approve", "approuver", "accept"]): print("✅ Request APPROVED") break elif any(keyword in input_lower for keyword in ["reject", "rejeter", "refuse"]): print("❌ Request REJECTED") break else: print("⚠️ Invalid decision. Use 'approve [reason]' or 'reject [reason]'") continue print("="*70 + "\n") return moderator_input async def chat(self, message: str) -> Optional[str]: """ Send a chat message and get response with interrupt handling. Args: message: User message Returns: Assistant response or None on error """ if not self.chat_manager: print("❌ System not initialized. Please restart.") return None try: start_time = time.time() self.message_count += 1 # Send message with interrupt handler response = await self.chat_manager.chat( message=message, session_id=self.session_id, interrupt_handler=self._handle_interrupt # Enable synchronous interrupt handling ) response_time = time.time() - start_time # Display response print(f"\n🤖 Assistant ({response_time:.2f}s):") # Format multi-line responses for line in response.split('\n'): if line.strip(): print(f" {line}") else: print() # Check for pending interrupts (async mode fallback) if self.chat_manager.has_pending_interrupt(self.session_id): print("\n⏸️ 💡 System paused - waiting for moderator decision") print(" Your next message will be treated as approval/rejection") return response except KeyboardInterrupt: raise # Re-raise to allow graceful shutdown except Exception as e: logger.exception(f"Error processing message: {message}") print(f"❌ Error: {str(e)}") return None def display_stats(self): """Display current system statistics""" if not self.chat_manager: print("❌ System not initialized") return stats = self.chat_manager.get_global_stats() routing = stats.get('routing_stats', {}) print("\n" + "="*70) print("📊 SYSTEM STATISTICS") print("="*70) print(f"💬 Session Messages: {self.message_count}") print(f"🔢 Total System Queries: {stats.get('total_queries', 0)}") print(f"👥 Active Sessions: {stats.get('active_sessions', 0)}") print(f"⏸️ Pending Interrupts: {stats.get('pending_interrupts', 0)}") print() print("🔀 Routing Statistics:") print(f" 📍 Bénin queries: {routing.get('benin', 0)}") print(f" 📍 Madagascar queries: {routing.get('madagascar', 0)}") print(f" ❓ Unclear queries: {routing.get('unclear', 0)}") print() print(f"💾 Storage: {stats.get('type', 'unknown')} - {stats.get('description', '')}") print("="*70 + "\n") def display_help(self): """Display help information""" print("\n" + "="*70) print("📋 AVAILABLE COMMANDS") print("="*70) print(" quit, exit, q - Exit the program") print(" stats, s - Show system statistics") print(" clear, cls - Clear the screen") print(" help, h, ? - Show this help message") print(" history - Show conversation history") print(" reset - Reset current session") print() print("💡 TESTING TIPS:") print(" - Legal queries: Ask about laws in Bénin or Madagascar") print(" - Assistance: Say 'je veux parler à un avocat'") print(" - Follow the prompts for email and description") print(" - You'll be prompted for approval when needed") print("="*70 + "\n") async def display_history(self): """Display conversation history""" if not self.chat_manager: print("❌ System not initialized") return try: history = await self.chat_manager.get_conversation_history(self.session_id) if not history: print("\n💬 No conversation history yet\n") return print("\n" + "="*70) print("💬 CONVERSATION HISTORY") print("="*70) for i, msg in enumerate(history, 1): role = "👤 User" if msg.type == "human" else "🤖 Assistant" content = msg.content[:100] + "..." if len(msg.content) > 100 else msg.content print(f"{i}. {role}: {content}") print("="*70 + "\n") except Exception as e: logger.exception("Error displaying history") print(f"❌ Error: {e}\n") def reset_session(self): """Reset the current session""" old_session = self.session_id self.session_id = f"test_{datetime.now().strftime('%Y%m%d_%H%M%S')}" self.message_count = 0 print(f"\n🔄 Session reset") print(f" Old: {old_session}") print(f" New: {self.session_id}\n") def clear_screen(self): """Clear the terminal screen""" print("\033[H\033[J", end="") async def interactive_mode(): """Run the system in interactive mode with full interrupt support""" tester = LegalRAGTester() # Initialize system if not await tester.initialize(): print("❌ Failed to initialize. Exiting.") return # Display welcome message print("\n" + "="*70) print("🎯 LEGAL RAG INTERACTIVE TESTING MODE") print("="*70) print("Type 'help' for available commands") print("="*70) # Quick test questions test_questions = [ "Bonjour, comment ça va?", "Quelle est la procédure de divorce au Bénin?", "Je veux parler à un avocat spécialisé", "Quels sont les droits des enfants à Madagascar?", "Quelles sont les conditions pour se marier au Bénin?", ] print("\n💡 Quick test questions:") for i, question in enumerate(test_questions, 1): print(f" {i}. {question}") print() # Main interaction loop while True: try: # Get user input user_input = input("👤 You: ").strip() if not user_input: continue # Handle commands cmd_lower = user_input.lower() # Exit commands if cmd_lower in ['quit', 'exit', 'q']: print("\n👋 Goodbye! Thank you for testing.") break # Stats command elif cmd_lower in ['stats', 's']: tester.display_stats() continue # Clear screen elif cmd_lower in ['clear', 'cls']: tester.clear_screen() continue # Help command elif cmd_lower in ['help', 'h', '?']: tester.display_help() continue # History command elif cmd_lower == 'history': await tester.display_history() continue # Reset session elif cmd_lower == 'reset': tester.reset_session() continue # Regular chat message else: await tester.chat(user_input) except KeyboardInterrupt: print("\n\n⚠️ Interrupted by user") print("Type 'quit' to exit or continue chatting\n") continue except Exception as e: logger.exception("Unexpected error in main loop") print(f"\n❌ Unexpected error: {e}\n") async def demo_mode(): """Run automated demo of the system capabilities""" tester = LegalRAGTester() print("🚀 Running Automated Demo...") if not await tester.initialize(): print("❌ Failed to initialize. Exiting demo.") return demo_scenarios = [ { "name": "Greeting", "messages": ["Bonjour, je m'appelle Test"] }, { "name": "Legal Query - Bénin", "messages": ["Quelle est la procédure de divorce au Bénin?"] }, { "name": "Legal Query - Madagascar", "messages": ["Quels sont les droits de succession à Madagascar?"] }, { "name": "Conversation Repair", "messages": ["Peux-tu répéter plus simplement?"] }, { "name": "Summary Request", "messages": ["Résume notre conversation"] } ] print("\n" + "="*70) print("🧪 DEMO SCENARIOS") print("="*70 + "\n") for i, scenario in enumerate(demo_scenarios, 1): print(f"\n{'='*70}") print(f"📋 Scenario {i}/{len(demo_scenarios)}: {scenario['name']}") print('='*70) for message in scenario['messages']: print(f"\n🧪 Testing: '{message}'") await tester.chat(message) await asyncio.sleep(1.5) # Pause between messages print("\n" + "="*70) print("✅ DEMO COMPLETED") print("="*70) tester.display_stats() async def test_assistance_workflow(): """Test the complete assistance workflow with interrupts""" tester = LegalRAGTester() print("🧪 Testing Assistance Workflow with Human Approval...") if not await tester.initialize(): print("❌ Failed to initialize. Exiting test.") return print("\n" + "="*70) print("🔬 ASSISTANCE WORKFLOW TEST") print("="*70 + "\n") # Simulated assistance workflow workflow_steps = [ "Je veux parler à un avocat", "test@example.com", "J'ai besoin d'aide pour un problème de divorce au Bénin", "oui" ] step_names = [ "1. Initiate assistance request", "2. Provide email", "3. Describe situation", "4. Confirm request" ] for step_name, message in zip(step_names, workflow_steps): print(f"\n📍 {step_name}") print(f" Message: '{message}'") await tester.chat(message) await asyncio.sleep(1) print("\n" + "="*70) print("✅ WORKFLOW TEST COMPLETED") print("="*70) tester.display_stats() def main(): """Main entry point with argument parsing""" import argparse parser = argparse.ArgumentParser( description="Legal RAG System - Interactive Testing with Human-in-the-Loop Support", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python main.py # Interactive mode (default) python main.py --mode demo # Automated demo python main.py --mode test # Test assistance workflow python main.py --debug # Enable debug logging """ ) parser.add_argument( "--mode", choices=["interactive", "demo", "test"], default="interactive", help="Run mode: interactive (default), demo, or test" ) parser.add_argument( "--debug", action="store_true", help="Enable debug logging" ) args = parser.parse_args() # Adjust logging level if debug if args.debug: logging.getLogger().setLevel(logging.DEBUG) print("🐛 Debug logging enabled\n") # Run selected mode try: if args.mode == "interactive": asyncio.run(interactive_mode()) elif args.mode == "demo": asyncio.run(demo_mode()) elif args.mode == "test": asyncio.run(test_assistance_workflow()) except KeyboardInterrupt: print("\n\n👋 Program interrupted by user. Goodbye!") except Exception as e: logger.exception("Fatal error occurred") print(f"\n❌ Fatal error: {e}") sys.exit(1) if __name__ == "__main__": main()