""" Secure Environment Variable Management This module provides secure loading and validation of environment variables for the Wanderlust AI travel planner. """ import os import secrets from pathlib import Path from typing import Optional, Dict, Any from dotenv import load_dotenv from pydantic import BaseModel, Field, field_validator import logging logger = logging.getLogger(__name__) class EnvironmentConfig(BaseModel): """Pydantic model for environment configuration validation.""" # AI API Keys (Required) anthropic_api_key: str = Field(..., description="Anthropic Claude API key") tavily_api_key: str = Field(..., description="Tavily Search API key") openai_api_key: Optional[str] = Field(None, description="OpenAI API key (optional)") # Application Configuration environment: str = Field(default="development", description="Application environment") debug: bool = Field(default=True, description="Debug mode") port: int = Field(default=7860, description="Application port") # Logging Configuration log_level: str = Field(default="INFO", description="Log level") log_file: Optional[str] = Field(None, description="Log file path") # Rate Limiting rate_limit_per_minute: int = Field(default=60, description="Rate limit per minute") rate_limit_per_hour: int = Field(default=1000, description="Rate limit per hour") # Security secret_key: str = Field(..., description="Secret key for session management") session_timeout_minutes: int = Field(default=30, description="Session timeout in minutes") @field_validator('environment') @classmethod def validate_environment(cls, v: str) -> str: """Validate environment value.""" allowed = {'development', 'testing', 'production'} if v not in allowed: raise ValueError(f'Environment must be one of {allowed}') return v @field_validator('log_level') @classmethod def validate_log_level(cls, v: str) -> str: """Validate log level.""" allowed = {'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'} if v.upper() not in allowed: raise ValueError(f'Log level must be one of {allowed}') return v.upper() @field_validator('port') @classmethod def validate_port(cls, v: int) -> int: """Validate port number.""" if not 1 <= v <= 65535: raise ValueError('Port must be between 1 and 65535') return v @field_validator('anthropic_api_key', 'tavily_api_key') @classmethod def validate_api_key(cls, v: str) -> str: """Validate API key format.""" if not v or v == "your_api_key_here": raise ValueError("API key cannot be empty or placeholder") if len(v) < 10: raise ValueError("API key appears to be too short") return v class Config: """Pydantic configuration.""" env_file = ".env" env_file_encoding = "utf-8" case_sensitive = False def load_environment() -> EnvironmentConfig: """ Load and validate environment variables. Returns: EnvironmentConfig: Validated environment configuration Raises: ValueError: If required environment variables are missing or invalid FileNotFoundError: If .env file is not found """ # Find the project root directory current_dir = Path(__file__).parent project_root = current_dir.parent.parent.parent # Look for .env file in project root env_file = project_root / ".env" if not env_file.exists(): logger.warning(f".env file not found at {env_file}") logger.info("Please copy .env.example to .env and fill in your API keys") # Check if .env.example exists env_example = project_root / ".env.example" if env_example.exists(): logger.info(f"Found .env.example at {env_example}") else: logger.error("No .env.example file found. Please create one with required variables.") raise FileNotFoundError(f".env file not found at {env_file}") # Load environment variables from .env file load_dotenv(env_file) logger.info(f"Loaded environment variables from {env_file}") # Validate and return configuration try: config = EnvironmentConfig() logger.info("Environment configuration validated successfully") return config except Exception as e: logger.error(f"Environment validation failed: {e}") raise ValueError(f"Invalid environment configuration: {e}") def generate_secret_key() -> str: """ Generate a secure secret key for session management. Returns: str: A secure random secret key """ return secrets.token_urlsafe(32) def check_environment_security() -> Dict[str, Any]: """ Check environment security and return security status. Returns: Dict[str, Any]: Security status information """ security_status = { "secure": True, "warnings": [], "errors": [], "recommendations": [] } # Check for common security issues env_vars = os.environ # Check for placeholder values placeholder_vars = [] for key, value in env_vars.items(): if "your_" in value.lower() or "placeholder" in value.lower(): placeholder_vars.append(key) if placeholder_vars: security_status["warnings"].append(f"Placeholder values found in: {', '.join(placeholder_vars)}") security_status["secure"] = False # Check for weak secret keys secret_key = env_vars.get("SECRET_KEY", "") if secret_key and len(secret_key) < 32: security_status["warnings"].append("SECRET_KEY appears to be too short") security_status["recommendations"].append("Generate a new secret key with: python -c \"import secrets; print(secrets.token_urlsafe(32))\"") # Check for debug mode in production if env_vars.get("ENVIRONMENT") == "production" and env_vars.get("DEBUG", "").lower() == "true": security_status["errors"].append("DEBUG mode is enabled in production environment") security_status["secure"] = False # Check for missing required keys required_keys = ["ANTHROPIC_API_KEY", "TAVILY_API_KEY"] missing_keys = [key for key in required_keys if not env_vars.get(key)] if missing_keys: security_status["errors"].append(f"Missing required API keys: {', '.join(missing_keys)}") security_status["secure"] = False return security_status def setup_environment() -> EnvironmentConfig: """ Setup environment with security checks. Returns: EnvironmentConfig: Validated environment configuration """ logger.info("Setting up environment configuration...") # Check security first security_status = check_environment_security() if security_status["errors"]: logger.error("Security errors found:") for error in security_status["errors"]: logger.error(f" - {error}") raise ValueError("Environment security check failed") if security_status["warnings"]: logger.warning("Security warnings:") for warning in security_status["warnings"]: logger.warning(f" - {warning}") if security_status["recommendations"]: logger.info("Security recommendations:") for rec in security_status["recommendations"]: logger.info(f" - {rec}") # Load and validate environment config = load_environment() logger.info("Environment setup completed successfully") return config # Example usage and testing if __name__ == "__main__": try: # Setup logging logging.basicConfig(level=logging.INFO) # Load environment config = setup_environment() print("✅ Environment configuration loaded successfully!") print(f"Environment: {config.environment}") print(f"Debug mode: {config.debug}") print(f"Port: {config.port}") print(f"Log level: {config.log_level}") # Check security security = check_environment_security() if security["secure"]: print("✅ Environment security check passed") else: print("⚠️ Environment security issues found") except Exception as e: print(f"❌ Environment setup failed: {e}") print("\nTo fix this:") print("1. Copy .env.example to .env") print("2. Fill in your actual API keys") print("3. Run this script again")