#!/usr/bin/env python3 """ Integration tests for autonomous retry and self-correction system. This script tests the retry functionality with a running backend. It verifies that retry steps appear in reasoning traces and analytics. Usage: python test_retry_integration.py Prerequisites: - FastAPI backend running on http://localhost:8000 - MCP server running - Optional: LLM service available """ import requests import json import time import sys from pathlib import Path BASE_URL = "http://localhost:8000" TENANT_ID = "retry_test_tenant" TIMEOUT = 120 # Increased timeout for LLM calls (model loading can take time) def print_section(title, char="=", width=70): """Print a formatted section header.""" print("\n" + char * width) print(f" {title}") print(char * width) def print_success(msg): """Print success message.""" print(f"✅ {msg}") def print_warning(msg): """Print warning message.""" print(f"⚠️ {msg}") def print_error(msg): """Print error message.""" print(f"❌ {msg}") def print_info(msg): """Print info message.""" print(f"ℹ️ {msg}") def check_backend(): """Check if backend is running.""" try: response = requests.get(f"{BASE_URL}/health", timeout=5) return response.status_code == 200 except: return False def test_rag_retry_scenario(): """Test RAG retry when scores are low.""" print_section("Test 1: RAG Retry with Low Scores") # First, ingest a document that might not be highly relevant to test query print_info("Ingesting test document...") try: ingest_response = requests.post( f"{BASE_URL}/rag/ingest", json={ "tenant_id": TENANT_ID, "content": "This is a general document about various topics. It mentions computers, technology, and general information." }, timeout=TIMEOUT ) print(f" Ingest status: {ingest_response.status_code}") except requests.exceptions.Timeout: print_warning(f"Ingest request timed out after {TIMEOUT} seconds") except Exception as e: print_warning(f"Could not ingest document: {e}") # Send a query that will likely have low relevance initially print_info("Sending query that should trigger RAG retry...") try: debug_response = requests.post( f"{BASE_URL}/agent/debug", json={ "tenant_id": TENANT_ID, "message": "What is quantum computing and how does quantum entanglement work?" }, timeout=TIMEOUT ) if debug_response.status_code == 200: debug_data = debug_response.json() reasoning_trace = debug_data.get("reasoning_trace", []) # Look for retry steps in reasoning trace retry_steps = [] for step in reasoning_trace: step_str = json.dumps(step).lower() if "retry" in step_str or "rag_retry" in step_str or "threshold" in step_str: retry_steps.append(step) print(f"\n Found {len(retry_steps)} retry-related steps:") for step in retry_steps[:5]: # Show first 5 step_name = step.get("step", "unknown") print(f" - {step_name}") if retry_steps: print_success("RAG retry system is working!") return True else: print_warning("No retry steps found (may not have triggered - scores might be good)") return True # Not a failure, just didn't need retry else: print_error(f"Request failed: {debug_response.status_code}") print_error(f"Response: {debug_response.text[:200]}") return False except requests.exceptions.Timeout: print_error(f"Request timed out after {TIMEOUT} seconds") print_error(" Possible causes:") print_error(" - Ollama is not running or model is not loaded") print_error(" - MCP server is not running") print_error(" - LLM call is taking too long") print_error("\n To fix:") print_error(" 1. Check if Ollama is running: ollama serve") print_error(" 2. Check if model is available: ollama list") print_error(" 3. Pull the model if needed: ollama pull llama3.1:latest") return False except requests.exceptions.ConnectionError: print_error("Cannot connect to backend. Is it running on port 8000?") return False except Exception as e: print_error(f"Error: {e}") import traceback traceback.print_exc() return False def test_web_retry_scenario(): """Test web search retry when results are empty.""" print_section("Test 2: Web Search Retry with Empty Results") # Send a query with an obscure term that might return empty results print_info("Sending obscure query to trigger web retry...") try: debug_response = requests.post( f"{BASE_URL}/agent/debug", json={ "tenant_id": TENANT_ID, "message": "Explain the concept of zyxwvutsrqp in detail" }, timeout=TIMEOUT ) if debug_response.status_code == 200: debug_data = debug_response.json() reasoning_trace = debug_data.get("reasoning_trace", []) # Look for web retry steps retry_steps = [] for step in reasoning_trace: step_str = json.dumps(step).lower() if "web_retry" in step_str or ("web" in step_str and "retry" in step_str): retry_steps.append(step) print(f"\n Found {len(retry_steps)} web retry steps:") for step in retry_steps[:5]: step_name = step.get("step", "unknown") print(f" - {step_name}") if 'rewritten_query' in step: print(f" Rewritten: {step['rewritten_query'][:60]}...") if retry_steps: print_success("Web retry system is working!") return True else: print_warning("No web retry steps found (results might have been found on first try)") return True # Not a failure else: print_error(f"Request failed: {debug_response.status_code}") return False except requests.exceptions.Timeout: print_error(f"Request timed out after {TIMEOUT} seconds") print_warning(" This may happen if Ollama is loading the model") return False except requests.exceptions.ConnectionError: print_error("Cannot connect to backend") return False except requests.exceptions.Timeout: print_error(f"Request timed out after {TIMEOUT} seconds") print_warning(" This may happen if Ollama is loading the model") return False except Exception as e: print_error(f"Error: {e}") return False def test_reasoning_trace_contains_retry_info(): """Verify retry steps appear in reasoning traces.""" print_section("Test 3: Verify Reasoning Trace Contains Retry Info") try: debug_response = requests.post( f"{BASE_URL}/agent/debug", json={ "tenant_id": TENANT_ID, "message": "What is artificial intelligence and machine learning?" }, timeout=TIMEOUT ) if debug_response.status_code == 200: debug_data = debug_response.json() reasoning_trace = debug_data.get("reasoning_trace", []) print(f"\n Reasoning trace has {len(reasoning_trace)} steps") print("\n Step breakdown:") retry_related_count = 0 for i, step in enumerate(reasoning_trace[:10]): # Show first 10 step_name = step.get("step", "unknown") step_str = str(step).lower() is_retry_related = "retry" in step_str or "repair" in step_str or "threshold" in step_str if is_retry_related: retry_related_count += 1 marker = "⚡" else: marker = " " print(f" {marker} {i+1}. {step_name}") if retry_related_count > 0: print_success(f"Found {retry_related_count} retry-related steps in reasoning trace") return True else: print_warning("No retry-related steps found (may not have been needed)") return True else: print_error(f"Request failed: {debug_response.status_code}") return False except requests.exceptions.Timeout: print_error(f"Request timed out after {TIMEOUT} seconds") print_warning(" This may happen if Ollama is loading the model") return False except Exception as e: print_error(f"Error: {e}") return False def test_analytics_logging(): """Test that retry attempts are logged to analytics.""" print_section("Test 4: Analytics Logging for Retries") try: # Send a query that might trigger retries print_info("Sending query to generate activity...") requests.post( f"{BASE_URL}/agent/message", json={ "tenant_id": TENANT_ID, "message": "Explain quantum mechanics" }, timeout=TIMEOUT ) # Wait a moment for analytics to be logged time.sleep(1) # Check analytics print_info("Checking analytics for retry tool calls...") analytics_response = requests.get( f"{BASE_URL}/analytics/tool-usage?days=1", headers={"x-tenant-id": TENANT_ID}, timeout=TIMEOUT ) if analytics_response.status_code == 200: data = analytics_response.json() tool_logs = data.get("logs", []) print(f" Found {len(tool_logs)} tool usage logs") # Look for retry-related tool names retry_tools = [] for log in tool_logs: tool_name = log.get("tool_name", "").lower() if "retry" in tool_name: retry_tools.append(log) print(f" Found {len(retry_tools)} retry-related tool calls:") for tool in retry_tools[:5]: tool_name = tool.get("tool_name") timestamp = tool.get("timestamp", "unknown") success = tool.get("success", False) status = "✅" if success else "❌" print(f" {status} {tool_name} at {timestamp}") if len(retry_tools) > 0: print_success("Retry attempts are being logged to analytics!") return True else: print_warning("No retry tool calls found (may not have triggered retries)") return True else: print_warning(f"Could not fetch analytics: {analytics_response.status_code}") return True # Don't fail on analytics endpoint issues except requests.exceptions.Timeout: print_warning(f"Analytics check timed out after {TIMEOUT} seconds") return True # Don't fail the whole test on analytics issues except Exception as e: print_warning(f"Analytics check failed: {e}") return True # Don't fail the whole test on analytics issues def test_full_agent_flow(): """Test full agent flow with retry system integrated.""" print_section("Test 5: Full Agent Flow with Retry Integration") try: print_info("Sending complete agent request...") response = requests.post( f"{BASE_URL}/agent/message", json={ "tenant_id": TENANT_ID, "message": "What is machine learning and how does it differ from deep learning?", "temperature": 0.0 }, timeout=TIMEOUT ) if response.status_code == 200: data = response.json() has_text = "text" in data and data["text"] has_decision = "decision" in data has_tool_traces = "tool_traces" in data print(f"\n Response components:") print(f" - Has text: {'✅' if has_text else '❌'}") print(f" - Has decision: {'✅' if has_decision else '❌'}") print(f" - Has tool traces: {'✅' if has_tool_traces else '❌'}") if has_text: text_preview = data["text"][:100] + "..." if len(data["text"]) > 100 else data["text"] print(f"\n Response preview: {text_preview}") if has_tool_traces: tool_traces = data["tool_traces"] print(f"\n Tool traces: {len(tool_traces)} steps") for trace in tool_traces[:3]: tool = trace.get("tool", "unknown") print(f" - {tool}") if has_text and has_decision: print_success("Full agent flow completed successfully!") return True else: print_error("Agent flow incomplete") return False else: print_error(f"Request failed: {response.status_code}") print_error(f"Response: {response.text[:200]}") return False except requests.exceptions.Timeout: print_error(f"Request timed out after {TIMEOUT} seconds") print_warning(" This may happen if Ollama is loading the model") return False except requests.exceptions.Timeout: print_error(f"Request timed out after {TIMEOUT} seconds") print_warning(" This may happen if Ollama is loading the model") return False except Exception as e: print_error(f"Error: {e}") return False def test_agent_plan_endpoint(): """Test agent plan endpoint shows retry considerations.""" print_section("Test 6: Agent Plan Endpoint") try: print_info("Checking agent plan for query...") response = requests.post( f"{BASE_URL}/agent/plan", json={ "tenant_id": TENANT_ID, "message": "Explain neural networks" }, timeout=TIMEOUT ) if response.status_code == 200: data = response.json() has_plan = "plan" in data has_intent = "intent" in data has_reason = "reason" in data print(f"\n Plan components:") print(f" - Has plan: {'✅' if has_plan else '❌'}") print(f" - Has intent: {'✅' if has_intent else '❌'}") print(f" - Has reason: {'✅' if has_reason else '❌'}") if has_plan: plan = data["plan"] print(f"\n Plan action: {plan.get('action', 'unknown')}") print(f" Plan tool: {plan.get('tool', 'none')}") if has_reason: print(f" Reason: {data['reason'][:100]}...") print_success("Agent plan endpoint working!") return True else: print_warning(f"Plan endpoint returned: {response.status_code}") return True # Don't fail on plan endpoint except requests.exceptions.Timeout: print_warning(f"Plan endpoint request timed out after {TIMEOUT} seconds") return True # Don't fail on this except Exception as e: print_warning(f"Plan endpoint check failed: {e}") return True # Don't fail on this def main(): """Run all integration tests.""" print("\n" + "🚀" * 35) print(" Retry & Self-Correction System Integration Tests") print("🚀" * 35) # Check backend print_section("Prerequisites Check") if not check_backend(): print_error("Backend is not running on http://localhost:8000") print_error("Please start the backend before running tests:") print_error(" uvicorn backend.api.main:app --port 8000") print_error("\nOr run: python start.bat") sys.exit(1) else: print_success("Backend is running!") print("\n" + "=" * 70) print(" Starting Integration Tests") print("=" * 70) print(f"\n⏱️ Timeout: {TIMEOUT} seconds per request") print(" (First request may take longer if Ollama needs to load the model)") print("\n⚠️ Note: Some tests may not trigger retries if:") print(" - RAG scores are already high (no retry needed)") print(" - Web search finds results immediately") print(" - System is working perfectly (which is good!)") print("\nPress Enter to continue or Ctrl+C to cancel...") try: input() except KeyboardInterrupt: print("\n\nTests cancelled.") sys.exit(0) results = [] # Run tests results.append(("RAG Retry Scenario", test_rag_retry_scenario())) time.sleep(0.5) results.append(("Web Retry Scenario", test_web_retry_scenario())) time.sleep(0.5) results.append(("Reasoning Trace Verification", test_reasoning_trace_contains_retry_info())) time.sleep(0.5) results.append(("Analytics Logging", test_analytics_logging())) time.sleep(0.5) results.append(("Full Agent Flow", test_full_agent_flow())) time.sleep(0.5) results.append(("Agent Plan Endpoint", test_agent_plan_endpoint())) # Summary print_section("Test Summary", "=", 70) passed = 0 for test_name, result in results: status = "✅ PASS" if result else "❌ FAIL" print(f"{status} - {test_name}") if result: passed += 1 print(f"\n📊 Results: {passed}/{len(results)} tests passed") if passed == len(results): print_success("All tests passed!") elif passed >= len(results) * 0.8: print_warning("Most tests passed (some may not have triggered retries, which is fine)") else: print_error("Some tests failed. Check errors above.") print("\n💡 Tips:") print(" - Use /agent/debug endpoint to see detailed reasoning traces") print(" - Check /analytics/tool-usage for retry attempt logs") print(" - Retry system works automatically - no configuration needed") print("\n📝 Next steps:") print(" - Run unit tests: pytest backend/tests/test_retry_system.py -v") print(" - Check TESTING_GUIDE.md for more testing options") if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\n\nTests interrupted by user.") sys.exit(0) except Exception as e: print_error(f"Unexpected error: {e}") import traceback traceback.print_exc() sys.exit(1)