BlakeL's picture
Upload 115 files
3f9f85b verified
"""
Secure Environment Variable Management
This module provides secure loading and validation of environment variables
for the Wanderlust AI travel planner.
"""
import os
import secrets
from pathlib import Path
from typing import Optional, Dict, Any
from dotenv import load_dotenv
from pydantic import BaseModel, Field, field_validator
import logging
logger = logging.getLogger(__name__)
class EnvironmentConfig(BaseModel):
"""Pydantic model for environment configuration validation."""
# AI API Keys (Required)
anthropic_api_key: str = Field(..., description="Anthropic Claude API key")
tavily_api_key: str = Field(..., description="Tavily Search API key")
openai_api_key: Optional[str] = Field(None, description="OpenAI API key (optional)")
# Application Configuration
environment: str = Field(default="development", description="Application environment")
debug: bool = Field(default=True, description="Debug mode")
port: int = Field(default=7860, description="Application port")
# Logging Configuration
log_level: str = Field(default="INFO", description="Log level")
log_file: Optional[str] = Field(None, description="Log file path")
# Rate Limiting
rate_limit_per_minute: int = Field(default=60, description="Rate limit per minute")
rate_limit_per_hour: int = Field(default=1000, description="Rate limit per hour")
# Security
secret_key: str = Field(..., description="Secret key for session management")
session_timeout_minutes: int = Field(default=30, description="Session timeout in minutes")
@field_validator('environment')
@classmethod
def validate_environment(cls, v: str) -> str:
"""Validate environment value."""
allowed = {'development', 'testing', 'production'}
if v not in allowed:
raise ValueError(f'Environment must be one of {allowed}')
return v
@field_validator('log_level')
@classmethod
def validate_log_level(cls, v: str) -> str:
"""Validate log level."""
allowed = {'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'}
if v.upper() not in allowed:
raise ValueError(f'Log level must be one of {allowed}')
return v.upper()
@field_validator('port')
@classmethod
def validate_port(cls, v: int) -> int:
"""Validate port number."""
if not 1 <= v <= 65535:
raise ValueError('Port must be between 1 and 65535')
return v
@field_validator('anthropic_api_key', 'tavily_api_key')
@classmethod
def validate_api_key(cls, v: str) -> str:
"""Validate API key format."""
if not v or v == "your_api_key_here":
raise ValueError("API key cannot be empty or placeholder")
if len(v) < 10:
raise ValueError("API key appears to be too short")
return v
class Config:
"""Pydantic configuration."""
env_file = ".env"
env_file_encoding = "utf-8"
case_sensitive = False
def load_environment() -> EnvironmentConfig:
"""
Load and validate environment variables.
Returns:
EnvironmentConfig: Validated environment configuration
Raises:
ValueError: If required environment variables are missing or invalid
FileNotFoundError: If .env file is not found
"""
# Find the project root directory
current_dir = Path(__file__).parent
project_root = current_dir.parent.parent.parent
# Look for .env file in project root
env_file = project_root / ".env"
if not env_file.exists():
logger.warning(f".env file not found at {env_file}")
logger.info("Please copy .env.example to .env and fill in your API keys")
# Check if .env.example exists
env_example = project_root / ".env.example"
if env_example.exists():
logger.info(f"Found .env.example at {env_example}")
else:
logger.error("No .env.example file found. Please create one with required variables.")
raise FileNotFoundError(f".env file not found at {env_file}")
# Load environment variables from .env file
load_dotenv(env_file)
logger.info(f"Loaded environment variables from {env_file}")
# Validate and return configuration
try:
config = EnvironmentConfig()
logger.info("Environment configuration validated successfully")
return config
except Exception as e:
logger.error(f"Environment validation failed: {e}")
raise ValueError(f"Invalid environment configuration: {e}")
def generate_secret_key() -> str:
"""
Generate a secure secret key for session management.
Returns:
str: A secure random secret key
"""
return secrets.token_urlsafe(32)
def check_environment_security() -> Dict[str, Any]:
"""
Check environment security and return security status.
Returns:
Dict[str, Any]: Security status information
"""
security_status = {
"secure": True,
"warnings": [],
"errors": [],
"recommendations": []
}
# Check for common security issues
env_vars = os.environ
# Check for placeholder values
placeholder_vars = []
for key, value in env_vars.items():
if "your_" in value.lower() or "placeholder" in value.lower():
placeholder_vars.append(key)
if placeholder_vars:
security_status["warnings"].append(f"Placeholder values found in: {', '.join(placeholder_vars)}")
security_status["secure"] = False
# Check for weak secret keys
secret_key = env_vars.get("SECRET_KEY", "")
if secret_key and len(secret_key) < 32:
security_status["warnings"].append("SECRET_KEY appears to be too short")
security_status["recommendations"].append("Generate a new secret key with: python -c \"import secrets; print(secrets.token_urlsafe(32))\"")
# Check for debug mode in production
if env_vars.get("ENVIRONMENT") == "production" and env_vars.get("DEBUG", "").lower() == "true":
security_status["errors"].append("DEBUG mode is enabled in production environment")
security_status["secure"] = False
# Check for missing required keys
required_keys = ["ANTHROPIC_API_KEY", "TAVILY_API_KEY"]
missing_keys = [key for key in required_keys if not env_vars.get(key)]
if missing_keys:
security_status["errors"].append(f"Missing required API keys: {', '.join(missing_keys)}")
security_status["secure"] = False
return security_status
def setup_environment() -> EnvironmentConfig:
"""
Setup environment with security checks.
Returns:
EnvironmentConfig: Validated environment configuration
"""
logger.info("Setting up environment configuration...")
# Check security first
security_status = check_environment_security()
if security_status["errors"]:
logger.error("Security errors found:")
for error in security_status["errors"]:
logger.error(f" - {error}")
raise ValueError("Environment security check failed")
if security_status["warnings"]:
logger.warning("Security warnings:")
for warning in security_status["warnings"]:
logger.warning(f" - {warning}")
if security_status["recommendations"]:
logger.info("Security recommendations:")
for rec in security_status["recommendations"]:
logger.info(f" - {rec}")
# Load and validate environment
config = load_environment()
logger.info("Environment setup completed successfully")
return config
# Example usage and testing
if __name__ == "__main__":
try:
# Setup logging
logging.basicConfig(level=logging.INFO)
# Load environment
config = setup_environment()
print("✅ Environment configuration loaded successfully!")
print(f"Environment: {config.environment}")
print(f"Debug mode: {config.debug}")
print(f"Port: {config.port}")
print(f"Log level: {config.log_level}")
# Check security
security = check_environment_security()
if security["secure"]:
print("✅ Environment security check passed")
else:
print("⚠️ Environment security issues found")
except Exception as e:
print(f"❌ Environment setup failed: {e}")
print("\nTo fix this:")
print("1. Copy .env.example to .env")
print("2. Fill in your actual API keys")
print("3. Run this script again")