Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Integration tests for autonomous retry and self-correction system. | |
| This script tests the retry functionality with a running backend. | |
| It verifies that retry steps appear in reasoning traces and analytics. | |
| Usage: | |
| python test_retry_integration.py | |
| Prerequisites: | |
| - FastAPI backend running on http://localhost:8000 | |
| - MCP server running | |
| - Optional: LLM service available | |
| """ | |
| import requests | |
| import json | |
| import time | |
| import sys | |
| from pathlib import Path | |
| BASE_URL = "http://localhost:8000" | |
| TENANT_ID = "retry_test_tenant" | |
| TIMEOUT = 120 # Increased timeout for LLM calls (model loading can take time) | |
| def print_section(title, char="=", width=70): | |
| """Print a formatted section header.""" | |
| print("\n" + char * width) | |
| print(f" {title}") | |
| print(char * width) | |
| def print_success(msg): | |
| """Print success message.""" | |
| print(f"β {msg}") | |
| def print_warning(msg): | |
| """Print warning message.""" | |
| print(f"β οΈ {msg}") | |
| def print_error(msg): | |
| """Print error message.""" | |
| print(f"β {msg}") | |
| def print_info(msg): | |
| """Print info message.""" | |
| print(f"βΉοΈ {msg}") | |
| def check_backend(): | |
| """Check if backend is running.""" | |
| try: | |
| response = requests.get(f"{BASE_URL}/health", timeout=5) | |
| return response.status_code == 200 | |
| except: | |
| return False | |
| def test_rag_retry_scenario(): | |
| """Test RAG retry when scores are low.""" | |
| print_section("Test 1: RAG Retry with Low Scores") | |
| # First, ingest a document that might not be highly relevant to test query | |
| print_info("Ingesting test document...") | |
| try: | |
| ingest_response = requests.post( | |
| f"{BASE_URL}/rag/ingest", | |
| json={ | |
| "tenant_id": TENANT_ID, | |
| "content": "This is a general document about various topics. It mentions computers, technology, and general information." | |
| }, | |
| timeout=TIMEOUT | |
| ) | |
| print(f" Ingest status: {ingest_response.status_code}") | |
| except requests.exceptions.Timeout: | |
| print_warning(f"Ingest request timed out after {TIMEOUT} seconds") | |
| except Exception as e: | |
| print_warning(f"Could not ingest document: {e}") | |
| # Send a query that will likely have low relevance initially | |
| print_info("Sending query that should trigger RAG retry...") | |
| try: | |
| debug_response = requests.post( | |
| f"{BASE_URL}/agent/debug", | |
| json={ | |
| "tenant_id": TENANT_ID, | |
| "message": "What is quantum computing and how does quantum entanglement work?" | |
| }, | |
| timeout=TIMEOUT | |
| ) | |
| if debug_response.status_code == 200: | |
| debug_data = debug_response.json() | |
| reasoning_trace = debug_data.get("reasoning_trace", []) | |
| # Look for retry steps in reasoning trace | |
| retry_steps = [] | |
| for step in reasoning_trace: | |
| step_str = json.dumps(step).lower() | |
| if "retry" in step_str or "rag_retry" in step_str or "threshold" in step_str: | |
| retry_steps.append(step) | |
| print(f"\n Found {len(retry_steps)} retry-related steps:") | |
| for step in retry_steps[:5]: # Show first 5 | |
| step_name = step.get("step", "unknown") | |
| print(f" - {step_name}") | |
| if retry_steps: | |
| print_success("RAG retry system is working!") | |
| return True | |
| else: | |
| print_warning("No retry steps found (may not have triggered - scores might be good)") | |
| return True # Not a failure, just didn't need retry | |
| else: | |
| print_error(f"Request failed: {debug_response.status_code}") | |
| print_error(f"Response: {debug_response.text[:200]}") | |
| return False | |
| except requests.exceptions.Timeout: | |
| print_error(f"Request timed out after {TIMEOUT} seconds") | |
| print_error(" Possible causes:") | |
| print_error(" - Ollama is not running or model is not loaded") | |
| print_error(" - MCP server is not running") | |
| print_error(" - LLM call is taking too long") | |
| print_error("\n To fix:") | |
| print_error(" 1. Check if Ollama is running: ollama serve") | |
| print_error(" 2. Check if model is available: ollama list") | |
| print_error(" 3. Pull the model if needed: ollama pull llama3.1:latest") | |
| return False | |
| except requests.exceptions.ConnectionError: | |
| print_error("Cannot connect to backend. Is it running on port 8000?") | |
| return False | |
| except Exception as e: | |
| print_error(f"Error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return False | |
| def test_web_retry_scenario(): | |
| """Test web search retry when results are empty.""" | |
| print_section("Test 2: Web Search Retry with Empty Results") | |
| # Send a query with an obscure term that might return empty results | |
| print_info("Sending obscure query to trigger web retry...") | |
| try: | |
| debug_response = requests.post( | |
| f"{BASE_URL}/agent/debug", | |
| json={ | |
| "tenant_id": TENANT_ID, | |
| "message": "Explain the concept of zyxwvutsrqp in detail" | |
| }, | |
| timeout=TIMEOUT | |
| ) | |
| if debug_response.status_code == 200: | |
| debug_data = debug_response.json() | |
| reasoning_trace = debug_data.get("reasoning_trace", []) | |
| # Look for web retry steps | |
| retry_steps = [] | |
| for step in reasoning_trace: | |
| step_str = json.dumps(step).lower() | |
| if "web_retry" in step_str or ("web" in step_str and "retry" in step_str): | |
| retry_steps.append(step) | |
| print(f"\n Found {len(retry_steps)} web retry steps:") | |
| for step in retry_steps[:5]: | |
| step_name = step.get("step", "unknown") | |
| print(f" - {step_name}") | |
| if 'rewritten_query' in step: | |
| print(f" Rewritten: {step['rewritten_query'][:60]}...") | |
| if retry_steps: | |
| print_success("Web retry system is working!") | |
| return True | |
| else: | |
| print_warning("No web retry steps found (results might have been found on first try)") | |
| return True # Not a failure | |
| else: | |
| print_error(f"Request failed: {debug_response.status_code}") | |
| return False | |
| except requests.exceptions.Timeout: | |
| print_error(f"Request timed out after {TIMEOUT} seconds") | |
| print_warning(" This may happen if Ollama is loading the model") | |
| return False | |
| except requests.exceptions.ConnectionError: | |
| print_error("Cannot connect to backend") | |
| return False | |
| except requests.exceptions.Timeout: | |
| print_error(f"Request timed out after {TIMEOUT} seconds") | |
| print_warning(" This may happen if Ollama is loading the model") | |
| return False | |
| except Exception as e: | |
| print_error(f"Error: {e}") | |
| return False | |
| def test_reasoning_trace_contains_retry_info(): | |
| """Verify retry steps appear in reasoning traces.""" | |
| print_section("Test 3: Verify Reasoning Trace Contains Retry Info") | |
| try: | |
| debug_response = requests.post( | |
| f"{BASE_URL}/agent/debug", | |
| json={ | |
| "tenant_id": TENANT_ID, | |
| "message": "What is artificial intelligence and machine learning?" | |
| }, | |
| timeout=TIMEOUT | |
| ) | |
| if debug_response.status_code == 200: | |
| debug_data = debug_response.json() | |
| reasoning_trace = debug_data.get("reasoning_trace", []) | |
| print(f"\n Reasoning trace has {len(reasoning_trace)} steps") | |
| print("\n Step breakdown:") | |
| retry_related_count = 0 | |
| for i, step in enumerate(reasoning_trace[:10]): # Show first 10 | |
| step_name = step.get("step", "unknown") | |
| step_str = str(step).lower() | |
| is_retry_related = "retry" in step_str or "repair" in step_str or "threshold" in step_str | |
| if is_retry_related: | |
| retry_related_count += 1 | |
| marker = "β‘" | |
| else: | |
| marker = " " | |
| print(f" {marker} {i+1}. {step_name}") | |
| if retry_related_count > 0: | |
| print_success(f"Found {retry_related_count} retry-related steps in reasoning trace") | |
| return True | |
| else: | |
| print_warning("No retry-related steps found (may not have been needed)") | |
| return True | |
| else: | |
| print_error(f"Request failed: {debug_response.status_code}") | |
| return False | |
| except requests.exceptions.Timeout: | |
| print_error(f"Request timed out after {TIMEOUT} seconds") | |
| print_warning(" This may happen if Ollama is loading the model") | |
| return False | |
| except Exception as e: | |
| print_error(f"Error: {e}") | |
| return False | |
| def test_analytics_logging(): | |
| """Test that retry attempts are logged to analytics.""" | |
| print_section("Test 4: Analytics Logging for Retries") | |
| try: | |
| # Send a query that might trigger retries | |
| print_info("Sending query to generate activity...") | |
| requests.post( | |
| f"{BASE_URL}/agent/message", | |
| json={ | |
| "tenant_id": TENANT_ID, | |
| "message": "Explain quantum mechanics" | |
| }, | |
| timeout=TIMEOUT | |
| ) | |
| # Wait a moment for analytics to be logged | |
| time.sleep(1) | |
| # Check analytics | |
| print_info("Checking analytics for retry tool calls...") | |
| analytics_response = requests.get( | |
| f"{BASE_URL}/analytics/tool-usage?days=1", | |
| headers={"x-tenant-id": TENANT_ID}, | |
| timeout=TIMEOUT | |
| ) | |
| if analytics_response.status_code == 200: | |
| data = analytics_response.json() | |
| tool_logs = data.get("logs", []) | |
| print(f" Found {len(tool_logs)} tool usage logs") | |
| # Look for retry-related tool names | |
| retry_tools = [] | |
| for log in tool_logs: | |
| tool_name = log.get("tool_name", "").lower() | |
| if "retry" in tool_name: | |
| retry_tools.append(log) | |
| print(f" Found {len(retry_tools)} retry-related tool calls:") | |
| for tool in retry_tools[:5]: | |
| tool_name = tool.get("tool_name") | |
| timestamp = tool.get("timestamp", "unknown") | |
| success = tool.get("success", False) | |
| status = "β " if success else "β" | |
| print(f" {status} {tool_name} at {timestamp}") | |
| if len(retry_tools) > 0: | |
| print_success("Retry attempts are being logged to analytics!") | |
| return True | |
| else: | |
| print_warning("No retry tool calls found (may not have triggered retries)") | |
| return True | |
| else: | |
| print_warning(f"Could not fetch analytics: {analytics_response.status_code}") | |
| return True # Don't fail on analytics endpoint issues | |
| except requests.exceptions.Timeout: | |
| print_warning(f"Analytics check timed out after {TIMEOUT} seconds") | |
| return True # Don't fail the whole test on analytics issues | |
| except Exception as e: | |
| print_warning(f"Analytics check failed: {e}") | |
| return True # Don't fail the whole test on analytics issues | |
| def test_full_agent_flow(): | |
| """Test full agent flow with retry system integrated.""" | |
| print_section("Test 5: Full Agent Flow with Retry Integration") | |
| try: | |
| print_info("Sending complete agent request...") | |
| response = requests.post( | |
| f"{BASE_URL}/agent/message", | |
| json={ | |
| "tenant_id": TENANT_ID, | |
| "message": "What is machine learning and how does it differ from deep learning?", | |
| "temperature": 0.0 | |
| }, | |
| timeout=TIMEOUT | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| has_text = "text" in data and data["text"] | |
| has_decision = "decision" in data | |
| has_tool_traces = "tool_traces" in data | |
| print(f"\n Response components:") | |
| print(f" - Has text: {'β ' if has_text else 'β'}") | |
| print(f" - Has decision: {'β ' if has_decision else 'β'}") | |
| print(f" - Has tool traces: {'β ' if has_tool_traces else 'β'}") | |
| if has_text: | |
| text_preview = data["text"][:100] + "..." if len(data["text"]) > 100 else data["text"] | |
| print(f"\n Response preview: {text_preview}") | |
| if has_tool_traces: | |
| tool_traces = data["tool_traces"] | |
| print(f"\n Tool traces: {len(tool_traces)} steps") | |
| for trace in tool_traces[:3]: | |
| tool = trace.get("tool", "unknown") | |
| print(f" - {tool}") | |
| if has_text and has_decision: | |
| print_success("Full agent flow completed successfully!") | |
| return True | |
| else: | |
| print_error("Agent flow incomplete") | |
| return False | |
| else: | |
| print_error(f"Request failed: {response.status_code}") | |
| print_error(f"Response: {response.text[:200]}") | |
| return False | |
| except requests.exceptions.Timeout: | |
| print_error(f"Request timed out after {TIMEOUT} seconds") | |
| print_warning(" This may happen if Ollama is loading the model") | |
| return False | |
| except requests.exceptions.Timeout: | |
| print_error(f"Request timed out after {TIMEOUT} seconds") | |
| print_warning(" This may happen if Ollama is loading the model") | |
| return False | |
| except Exception as e: | |
| print_error(f"Error: {e}") | |
| return False | |
| def test_agent_plan_endpoint(): | |
| """Test agent plan endpoint shows retry considerations.""" | |
| print_section("Test 6: Agent Plan Endpoint") | |
| try: | |
| print_info("Checking agent plan for query...") | |
| response = requests.post( | |
| f"{BASE_URL}/agent/plan", | |
| json={ | |
| "tenant_id": TENANT_ID, | |
| "message": "Explain neural networks" | |
| }, | |
| timeout=TIMEOUT | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| has_plan = "plan" in data | |
| has_intent = "intent" in data | |
| has_reason = "reason" in data | |
| print(f"\n Plan components:") | |
| print(f" - Has plan: {'β ' if has_plan else 'β'}") | |
| print(f" - Has intent: {'β ' if has_intent else 'β'}") | |
| print(f" - Has reason: {'β ' if has_reason else 'β'}") | |
| if has_plan: | |
| plan = data["plan"] | |
| print(f"\n Plan action: {plan.get('action', 'unknown')}") | |
| print(f" Plan tool: {plan.get('tool', 'none')}") | |
| if has_reason: | |
| print(f" Reason: {data['reason'][:100]}...") | |
| print_success("Agent plan endpoint working!") | |
| return True | |
| else: | |
| print_warning(f"Plan endpoint returned: {response.status_code}") | |
| return True # Don't fail on plan endpoint | |
| except requests.exceptions.Timeout: | |
| print_warning(f"Plan endpoint request timed out after {TIMEOUT} seconds") | |
| return True # Don't fail on this | |
| except Exception as e: | |
| print_warning(f"Plan endpoint check failed: {e}") | |
| return True # Don't fail on this | |
| def main(): | |
| """Run all integration tests.""" | |
| print("\n" + "π" * 35) | |
| print(" Retry & Self-Correction System Integration Tests") | |
| print("π" * 35) | |
| # Check backend | |
| print_section("Prerequisites Check") | |
| if not check_backend(): | |
| print_error("Backend is not running on http://localhost:8000") | |
| print_error("Please start the backend before running tests:") | |
| print_error(" uvicorn backend.api.main:app --port 8000") | |
| print_error("\nOr run: python start.bat") | |
| sys.exit(1) | |
| else: | |
| print_success("Backend is running!") | |
| print("\n" + "=" * 70) | |
| print(" Starting Integration Tests") | |
| print("=" * 70) | |
| print(f"\nβ±οΈ Timeout: {TIMEOUT} seconds per request") | |
| print(" (First request may take longer if Ollama needs to load the model)") | |
| print("\nβ οΈ Note: Some tests may not trigger retries if:") | |
| print(" - RAG scores are already high (no retry needed)") | |
| print(" - Web search finds results immediately") | |
| print(" - System is working perfectly (which is good!)") | |
| print("\nPress Enter to continue or Ctrl+C to cancel...") | |
| try: | |
| input() | |
| except KeyboardInterrupt: | |
| print("\n\nTests cancelled.") | |
| sys.exit(0) | |
| results = [] | |
| # Run tests | |
| results.append(("RAG Retry Scenario", test_rag_retry_scenario())) | |
| time.sleep(0.5) | |
| results.append(("Web Retry Scenario", test_web_retry_scenario())) | |
| time.sleep(0.5) | |
| results.append(("Reasoning Trace Verification", test_reasoning_trace_contains_retry_info())) | |
| time.sleep(0.5) | |
| results.append(("Analytics Logging", test_analytics_logging())) | |
| time.sleep(0.5) | |
| results.append(("Full Agent Flow", test_full_agent_flow())) | |
| time.sleep(0.5) | |
| results.append(("Agent Plan Endpoint", test_agent_plan_endpoint())) | |
| # Summary | |
| print_section("Test Summary", "=", 70) | |
| passed = 0 | |
| for test_name, result in results: | |
| status = "β PASS" if result else "β FAIL" | |
| print(f"{status} - {test_name}") | |
| if result: | |
| passed += 1 | |
| print(f"\nπ Results: {passed}/{len(results)} tests passed") | |
| if passed == len(results): | |
| print_success("All tests passed!") | |
| elif passed >= len(results) * 0.8: | |
| print_warning("Most tests passed (some may not have triggered retries, which is fine)") | |
| else: | |
| print_error("Some tests failed. Check errors above.") | |
| print("\nπ‘ Tips:") | |
| print(" - Use /agent/debug endpoint to see detailed reasoning traces") | |
| print(" - Check /analytics/tool-usage for retry attempt logs") | |
| print(" - Retry system works automatically - no configuration needed") | |
| print("\nπ Next steps:") | |
| print(" - Run unit tests: pytest backend/tests/test_retry_system.py -v") | |
| print(" - Check TESTING_GUIDE.md for more testing options") | |
| if __name__ == "__main__": | |
| try: | |
| main() | |
| except KeyboardInterrupt: | |
| print("\n\nTests interrupted by user.") | |
| sys.exit(0) | |
| except Exception as e: | |
| print_error(f"Unexpected error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| sys.exit(1) | |