Spaces:
Sleeping
Sleeping
| """ | |
| API Testing Module for AI Travel Planner | |
| This module demonstrates: | |
| - Async API testing patterns | |
| - Comprehensive error handling | |
| - Response validation | |
| - Retry logic implementation | |
| - Step-by-step debugging techniques | |
| """ | |
| import asyncio | |
| import json | |
| import time | |
| from contextlib import asynccontextmanager | |
| from datetime import datetime, timedelta | |
| from typing import Any, Dict, List, Optional, Union, Tuple | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| import httpx | |
| from pydantic import BaseModel, Field, ValidationError | |
| from ..utils.logging import get_logger | |
| from ..utils.security import ErrorResponse, ErrorType | |
| class APITestResult(BaseModel): | |
| """Standardized API test result.""" | |
| api_name: str = Field(..., description="Name of the API being tested") | |
| endpoint: str = Field(..., description="API endpoint tested") | |
| success: bool = Field(..., description="Whether the test passed") | |
| response_time_ms: float = Field(..., description="Response time in milliseconds") | |
| status_code: Optional[int] = Field(None, description="HTTP status code") | |
| error_type: Optional[str] = Field(None, description="Type of error if any") | |
| error_message: Optional[str] = Field(None, description="Error message if any") | |
| response_data: Optional[Dict[str, Any]] = Field(None, description="Response data if successful") | |
| timestamp: datetime = Field(default_factory=datetime.now, description="Test timestamp") | |
| retry_count: int = Field(0, description="Number of retries attempted") | |
| class APIErrorType(Enum): | |
| """Types of API errors for proper handling.""" | |
| NETWORK_ERROR = "network_error" | |
| AUTHENTICATION_ERROR = "authentication_error" | |
| AUTHORIZATION_ERROR = "authorization_error" | |
| RATE_LIMIT_ERROR = "rate_limit_error" | |
| TIMEOUT_ERROR = "timeout_error" | |
| VALIDATION_ERROR = "validation_error" | |
| SERVER_ERROR = "server_error" | |
| UNKNOWN_ERROR = "unknown_error" | |
| class APIConfig: | |
| """Configuration for API testing.""" | |
| name: str | |
| base_url: str | |
| api_key: str | |
| timeout: int = 30 | |
| max_retries: int = 3 | |
| retry_delay: float = 1.0 | |
| rate_limit_per_minute: int = 60 | |
| class APITester: | |
| """ | |
| Comprehensive API testing class with debugging capabilities. | |
| This class demonstrates: | |
| - Async API testing patterns | |
| - Proper error handling and categorization | |
| - Response validation | |
| - Retry logic with exponential backoff | |
| - Step-by-step debugging techniques | |
| """ | |
| def __init__(self, config: APIConfig): | |
| """Initialize the API tester.""" | |
| self.config = config | |
| self.logger = get_logger(f"api_tester.{config.name}") | |
| self._client: Optional[httpx.AsyncClient] = None | |
| self._request_count = 0 | |
| self._rate_limit_tokens = config.rate_limit_per_minute | |
| self._rate_limit_reset = datetime.now() + timedelta(minutes=1) | |
| self.logger.info(f"Initialized API tester for {config.name}") | |
| async def __aenter__(self): | |
| """Async context manager entry.""" | |
| await self._setup_client() | |
| return self | |
| async def __aexit__(self, exc_type, exc_val, exc_tb): | |
| """Async context manager exit with cleanup.""" | |
| await self._cleanup_client() | |
| if exc_type: | |
| self.logger.error(f"API tester exited with error: {exc_val}") | |
| else: | |
| self.logger.info(f"API tester completed successfully") | |
| async def _setup_client(self): | |
| """Setup HTTP client with proper configuration.""" | |
| if self._client: | |
| return | |
| self._client = httpx.AsyncClient( | |
| base_url=self.config.base_url, | |
| timeout=httpx.Timeout(self.config.timeout), | |
| headers={ | |
| "Authorization": f"Bearer {self.config.api_key}", | |
| "User-Agent": f"WanderlustAI-APITester/1.0", | |
| "Content-Type": "application/json" | |
| } | |
| ) | |
| self.logger.info(f"HTTP client setup for {self.config.name}") | |
| async def _cleanup_client(self): | |
| """Cleanup HTTP client resources.""" | |
| if self._client: | |
| await self._client.aclose() | |
| self._client = None | |
| self.logger.info(f"HTTP client cleaned up for {self.config.name}") | |
| async def _check_rate_limit(self): | |
| """Check and enforce rate limiting.""" | |
| now = datetime.now() | |
| # Reset tokens if minute has passed | |
| if now >= self._rate_limit_reset: | |
| self._rate_limit_tokens = self.config.rate_limit_per_minute | |
| self._rate_limit_reset = now + timedelta(minutes=1) | |
| self.logger.debug(f"Rate limit reset for {self.config.name}") | |
| # Check if we have tokens available | |
| if self._rate_limit_tokens <= 0: | |
| wait_time = (self._rate_limit_reset - now).total_seconds() | |
| self.logger.warning(f"Rate limit exceeded for {self.config.name}, waiting {wait_time:.1f}s") | |
| await asyncio.sleep(wait_time) | |
| await self._check_rate_limit() | |
| # Consume a token | |
| self._rate_limit_tokens -= 1 | |
| self.logger.debug(f"Rate limit: {self._rate_limit_tokens} tokens remaining for {self.config.name}") | |
| def _categorize_error(self, error: Exception, status_code: Optional[int] = None) -> APIErrorType: | |
| """ | |
| Categorize API errors for proper handling. | |
| This is crucial for implementing appropriate retry strategies. | |
| """ | |
| if isinstance(error, httpx.TimeoutException): | |
| return APIErrorType.TIMEOUT_ERROR | |
| elif isinstance(error, httpx.ConnectError): | |
| return APIErrorType.NETWORK_ERROR | |
| elif isinstance(error, httpx.HTTPStatusError): | |
| if status_code == 401: | |
| return APIErrorType.AUTHENTICATION_ERROR | |
| elif status_code == 403: | |
| return APIErrorType.AUTHORIZATION_ERROR | |
| elif status_code == 429: | |
| return APIErrorType.RATE_LIMIT_ERROR | |
| elif 500 <= status_code < 600: | |
| return APIErrorType.SERVER_ERROR | |
| else: | |
| return APIErrorType.UNKNOWN_ERROR | |
| elif isinstance(error, httpx.RequestError): | |
| return APIErrorType.NETWORK_ERROR | |
| else: | |
| return APIErrorType.UNKNOWN_ERROR | |
| def _should_retry(self, error_type: APIErrorType, retry_count: int) -> bool: | |
| """ | |
| Determine if a request should be retried based on error type. | |
| This implements intelligent retry logic: | |
| - Network errors: Retry (temporary) | |
| - Timeout errors: Retry (temporary) | |
| - Server errors: Retry (temporary) | |
| - Auth errors: Don't retry (permanent) | |
| - Rate limit errors: Retry with longer delay | |
| """ | |
| if retry_count >= self.config.max_retries: | |
| return False | |
| retryable_errors = { | |
| APIErrorType.NETWORK_ERROR, | |
| APIErrorType.TIMEOUT_ERROR, | |
| APIErrorType.SERVER_ERROR, | |
| APIErrorType.RATE_LIMIT_ERROR | |
| } | |
| return error_type in retryable_errors | |
| def _calculate_retry_delay(self, error_type: APIErrorType, retry_count: int) -> float: | |
| """ | |
| Calculate retry delay with exponential backoff. | |
| Different error types get different retry strategies: | |
| - Rate limit errors: Longer delay | |
| - Network errors: Standard exponential backoff | |
| - Server errors: Standard exponential backoff | |
| """ | |
| base_delay = self.config.retry_delay | |
| if error_type == APIErrorType.RATE_LIMIT_ERROR: | |
| # Rate limit errors need longer delays | |
| return base_delay * (2 ** retry_count) * 2 | |
| else: | |
| # Standard exponential backoff | |
| return base_delay * (2 ** retry_count) | |
| async def test_api_call( | |
| self, | |
| method: str, | |
| endpoint: str, | |
| data: Optional[Dict[str, Any]] = None, | |
| params: Optional[Dict[str, Any]] = None, | |
| headers: Optional[Dict[str, str]] = None | |
| ) -> APITestResult: | |
| """ | |
| Test an API call with comprehensive error handling and retry logic. | |
| This is the main testing function that demonstrates: | |
| - Proper error handling | |
| - Retry logic with exponential backoff | |
| - Response validation | |
| - Performance monitoring | |
| """ | |
| start_time = time.time() | |
| retry_count = 0 | |
| # Rate limiting check | |
| await self._check_rate_limit() | |
| # Prepare request | |
| request_headers = {} | |
| if headers: | |
| request_headers.update(headers) | |
| self.logger.info(f"Testing {method} {endpoint} for {self.config.name}") | |
| while retry_count <= self.config.max_retries: | |
| try: | |
| async with self.get_client() as client: | |
| self.logger.debug(f"{self.config.name}: {method} {endpoint} (attempt {retry_count + 1})") | |
| # Make the request | |
| response = await client.request( | |
| method=method, | |
| url=endpoint, | |
| json=data, | |
| params=params, | |
| headers=request_headers | |
| ) | |
| # Calculate response time | |
| response_time = (time.time() - start_time) * 1000 | |
| # Update request tracking | |
| self._request_count += 1 | |
| # Create test result | |
| result = APITestResult( | |
| api_name=self.config.name, | |
| endpoint=endpoint, | |
| success=response.status_code < 400, | |
| response_time_ms=response_time, | |
| status_code=response.status_code, | |
| retry_count=retry_count | |
| ) | |
| if result.success: | |
| try: | |
| result.response_data = response.json() | |
| self.logger.info(f"✅ {self.config.name}: {method} {endpoint} - {response.status_code} ({response_time:.1f}ms)") | |
| except json.JSONDecodeError as e: | |
| result.success = False | |
| result.error_type = APIErrorType.VALIDATION_ERROR.value | |
| result.error_message = f"Invalid JSON response: {e}" | |
| self.logger.warning(f"❌ {self.config.name}: Invalid JSON response") | |
| else: | |
| result.error_type = APIErrorType.SERVER_ERROR.value | |
| result.error_message = response.text | |
| self.logger.warning(f"❌ {self.config.name}: {method} {endpoint} - {response.status_code}") | |
| return result | |
| except Exception as e: | |
| error_type = self._categorize_error(e, getattr(e, 'response', {}).get('status_code')) | |
| response_time = (time.time() - start_time) * 1000 | |
| self.logger.warning(f"❌ {self.config.name}: {method} {endpoint} - {error_type.value} (attempt {retry_count + 1})") | |
| # Check if we should retry | |
| if self._should_retry(error_type, retry_count): | |
| retry_count += 1 | |
| delay = self._calculate_retry_delay(error_type, retry_count - 1) | |
| self.logger.info(f"🔄 Retrying {self.config.name} in {delay:.1f}s (attempt {retry_count + 1})") | |
| await asyncio.sleep(delay) | |
| continue | |
| else: | |
| # Don't retry, return error result | |
| return APITestResult( | |
| api_name=self.config.name, | |
| endpoint=endpoint, | |
| success=False, | |
| response_time_ms=response_time, | |
| error_type=error_type.value, | |
| error_message=str(e), | |
| retry_count=retry_count | |
| ) | |
| # This should never be reached, but just in case | |
| return APITestResult( | |
| api_name=self.config.name, | |
| endpoint=endpoint, | |
| success=False, | |
| response_time_ms=(time.time() - start_time) * 1000, | |
| error_type=APIErrorType.UNKNOWN_ERROR.value, | |
| error_message="Max retries exceeded", | |
| retry_count=retry_count | |
| ) | |
| async def get_client(self): | |
| """Context manager for HTTP client.""" | |
| if not self._client: | |
| await self._setup_client() | |
| try: | |
| yield self._client | |
| finally: | |
| pass | |
| # ========================================================================= | |
| # SPECIFIC API TESTING METHODS | |
| # ========================================================================= | |
| async def test_anthropic_api(self, test_message: str = "Hello, how are you?") -> APITestResult: | |
| """ | |
| Test Anthropic Claude API with a simple message. | |
| This demonstrates testing a specific API with proper request structure. | |
| """ | |
| self.logger.info(f"Testing Anthropic API with message: '{test_message}'") | |
| # Anthropic API request structure | |
| request_data = { | |
| "model": "claude-3-sonnet-20240229", | |
| "max_tokens": 100, | |
| "messages": [ | |
| { | |
| "role": "user", | |
| "content": test_message | |
| } | |
| ] | |
| } | |
| result = await self.test_api_call( | |
| method="POST", | |
| endpoint="/v1/messages", | |
| data=request_data | |
| ) | |
| # Validate Anthropic-specific response structure | |
| if result.success and result.response_data: | |
| result = self._validate_anthropic_response(result) | |
| return result | |
| async def test_tavily_api(self, query: str = "travel destinations") -> APITestResult: | |
| """ | |
| Test Tavily search API with a simple query. | |
| This demonstrates testing a different API with different request structure. | |
| """ | |
| self.logger.info(f"Testing Tavily API with query: '{query}'") | |
| # Tavily API request structure | |
| request_data = { | |
| "query": query, | |
| "search_depth": "basic", | |
| "include_answer": True, | |
| "include_raw_content": False, | |
| "max_results": 5 | |
| } | |
| result = await self.test_api_call( | |
| method="POST", | |
| endpoint="/search", | |
| data=request_data | |
| ) | |
| # Validate Tavily-specific response structure | |
| if result.success and result.response_data: | |
| result = self._validate_tavily_response(result) | |
| return result | |
| def _validate_anthropic_response(self, result: APITestResult) -> APITestResult: | |
| """ | |
| Validate Anthropic API response structure. | |
| This demonstrates response validation for specific APIs. | |
| """ | |
| try: | |
| data = result.response_data | |
| # Check required fields | |
| required_fields = ["id", "type", "role", "content"] | |
| for field in required_fields: | |
| if field not in data: | |
| result.success = False | |
| result.error_type = APIErrorType.VALIDATION_ERROR.value | |
| result.error_message = f"Missing required field: {field}" | |
| return result | |
| # Check content structure | |
| if not isinstance(data.get("content"), list): | |
| result.success = False | |
| result.error_type = APIErrorType.VALIDATION_ERROR.value | |
| result.error_message = "Content field must be a list" | |
| return result | |
| # Check usage information | |
| if "usage" in data: | |
| usage = data["usage"] | |
| if not all(key in usage for key in ["input_tokens", "output_tokens"]): | |
| result.success = False | |
| result.error_type = APIErrorType.VALIDATION_ERROR.value | |
| result.error_message = "Usage field missing required token information" | |
| return result | |
| self.logger.info(f"✅ Anthropic response validation passed") | |
| except Exception as e: | |
| result.success = False | |
| result.error_type = APIErrorType.VALIDATION_ERROR.value | |
| result.error_message = f"Response validation error: {e}" | |
| self.logger.error(f"❌ Anthropic response validation failed: {e}") | |
| return result | |
| def _validate_tavily_response(self, result: APITestResult) -> APITestResult: | |
| """ | |
| Validate Tavily API response structure. | |
| This demonstrates response validation for a different API. | |
| """ | |
| try: | |
| data = result.response_data | |
| # Check required fields | |
| required_fields = ["query", "follow_up_questions", "images", "results"] | |
| for field in required_fields: | |
| if field not in data: | |
| result.success = False | |
| result.error_type = APIErrorType.VALIDATION_ERROR.value | |
| result.error_message = f"Missing required field: {field}" | |
| return result | |
| # Check results structure | |
| results = data.get("results", []) | |
| if not isinstance(results, list): | |
| result.success = False | |
| result.error_type = APIErrorType.VALIDATION_ERROR.value | |
| result.error_message = "Results field must be a list" | |
| return result | |
| # Check individual result structure | |
| for i, result_item in enumerate(results): | |
| if not isinstance(result_item, dict): | |
| result.success = False | |
| result.error_type = APIErrorType.VALIDATION_ERROR.value | |
| result.error_message = f"Result {i} must be a dictionary" | |
| return result | |
| if not all(key in result_item for key in ["title", "url", "content"]): | |
| result.success = False | |
| result.error_type = APIErrorType.VALIDATION_ERROR.value | |
| result.error_message = f"Result {i} missing required fields" | |
| return result | |
| self.logger.info(f"✅ Tavily response validation passed") | |
| except Exception as e: | |
| result.success = False | |
| result.error_type = APIErrorType.VALIDATION_ERROR.value | |
| result.error_message = f"Response validation error: {e}" | |
| self.logger.error(f"❌ Tavily response validation failed: {e}") | |
| return result | |
| # ========================================================================= | |
| # DEBUGGING UTILITIES | |
| # ========================================================================= | |
| def debug_api_issue(self, result: APITestResult) -> Dict[str, Any]: | |
| """ | |
| Provide step-by-step debugging information for API issues. | |
| This demonstrates comprehensive debugging techniques. | |
| """ | |
| debug_info = { | |
| "api_name": result.api_name, | |
| "endpoint": result.endpoint, | |
| "success": result.success, | |
| "response_time_ms": result.response_time_ms, | |
| "timestamp": result.timestamp, | |
| "retry_count": result.retry_count | |
| } | |
| if not result.success: | |
| debug_info["error_analysis"] = self._analyze_error(result) | |
| debug_info["troubleshooting_steps"] = self._get_troubleshooting_steps(result) | |
| debug_info["recommendations"] = self._get_recommendations(result) | |
| return debug_info | |
| def _analyze_error(self, result: APITestResult) -> Dict[str, Any]: | |
| """Analyze the error and provide detailed information.""" | |
| analysis = { | |
| "error_type": result.error_type, | |
| "error_message": result.error_message, | |
| "status_code": result.status_code | |
| } | |
| # Add specific analysis based on error type | |
| if result.error_type == APIErrorType.AUTHENTICATION_ERROR.value: | |
| analysis["likely_causes"] = [ | |
| "Invalid API key", | |
| "Expired API key", | |
| "Incorrect authentication header format", | |
| "API key not activated" | |
| ] | |
| elif result.error_type == APIErrorType.RATE_LIMIT_ERROR.value: | |
| analysis["likely_causes"] = [ | |
| "Too many requests per minute", | |
| "Exceeded daily quota", | |
| "Rate limit not properly handled" | |
| ] | |
| elif result.error_type == APIErrorType.NETWORK_ERROR.value: | |
| analysis["likely_causes"] = [ | |
| "Internet connection issues", | |
| "DNS resolution problems", | |
| "Firewall blocking requests", | |
| "API server down" | |
| ] | |
| elif result.error_type == APIErrorType.TIMEOUT_ERROR.value: | |
| analysis["likely_causes"] = [ | |
| "Request timeout too short", | |
| "API server overloaded", | |
| "Network latency issues" | |
| ] | |
| return analysis | |
| def _get_troubleshooting_steps(self, result: APITestResult) -> List[str]: | |
| """Get step-by-step troubleshooting steps.""" | |
| steps = [] | |
| if result.error_type == APIErrorType.AUTHENTICATION_ERROR.value: | |
| steps = [ | |
| "1. Verify your API key is correct", | |
| "2. Check if the API key is activated", | |
| "3. Ensure the authentication header format is correct", | |
| "4. Test with a simple curl command", | |
| "5. Check API documentation for authentication requirements" | |
| ] | |
| elif result.error_type == APIErrorType.RATE_LIMIT_ERROR.value: | |
| steps = [ | |
| "1. Check your current rate limit usage", | |
| "2. Implement proper rate limiting in your code", | |
| "3. Consider upgrading your API plan", | |
| "4. Add delays between requests", | |
| "5. Use exponential backoff for retries" | |
| ] | |
| elif result.error_type == APIErrorType.NETWORK_ERROR.value: | |
| steps = [ | |
| "1. Check your internet connection", | |
| "2. Test with a different network", | |
| "3. Verify the API endpoint URL", | |
| "4. Check if there are firewall restrictions", | |
| "5. Test with a simple ping to the API server" | |
| ] | |
| elif result.error_type == APIErrorType.TIMEOUT_ERROR.value: | |
| steps = [ | |
| "1. Increase the timeout value", | |
| "2. Check if the API server is responding", | |
| "3. Test with a smaller request", | |
| "4. Check network latency", | |
| "5. Consider using a different API endpoint" | |
| ] | |
| return steps | |
| def _get_recommendations(self, result: APITestResult) -> List[str]: | |
| """Get recommendations for fixing the issue.""" | |
| recommendations = [] | |
| if result.error_type == APIErrorType.AUTHENTICATION_ERROR.value: | |
| recommendations = [ | |
| "Double-check your API key configuration", | |
| "Review the API documentation for authentication examples", | |
| "Test with a minimal request first" | |
| ] | |
| elif result.error_type == APIErrorType.RATE_LIMIT_ERROR.value: | |
| recommendations = [ | |
| "Implement proper rate limiting", | |
| "Consider using a queue system for requests", | |
| "Monitor your API usage regularly" | |
| ] | |
| elif result.error_type == APIErrorType.NETWORK_ERROR.value: | |
| recommendations = [ | |
| "Check your network configuration", | |
| "Consider using a VPN if there are regional restrictions", | |
| "Test from a different location" | |
| ] | |
| elif result.error_type == APIErrorType.TIMEOUT_ERROR.value: | |
| recommendations = [ | |
| "Optimize your request size", | |
| "Implement proper timeout handling", | |
| "Consider using async requests for better performance" | |
| ] | |
| return recommendations | |
| # ========================================================================= | |
| # UTILITY METHODS | |
| # ========================================================================= | |
| def get_stats(self) -> Dict[str, Any]: | |
| """Get API tester statistics.""" | |
| return { | |
| "api_name": self.config.name, | |
| "request_count": self._request_count, | |
| "rate_limit_tokens": self._rate_limit_tokens, | |
| "rate_limit_reset": self._rate_limit_reset | |
| } | |
| async def health_check(self) -> bool: | |
| """Check if the API is healthy.""" | |
| try: | |
| # Try a simple request to check connectivity | |
| result = await self.test_api_call("GET", "/health") | |
| return result.success | |
| except Exception as e: | |
| self.logger.error(f"Health check failed for {self.config.name}: {e}") | |
| return False | |