File size: 8,743 Bytes
3f9f85b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
"""
Secure Environment Variable Management

This module provides secure loading and validation of environment variables
for the Wanderlust AI travel planner.
"""

import os
import secrets
from pathlib import Path
from typing import Optional, Dict, Any
from dotenv import load_dotenv
from pydantic import BaseModel, Field, field_validator
import logging

logger = logging.getLogger(__name__)


class EnvironmentConfig(BaseModel):
    """Pydantic model for environment configuration validation."""
    
    # AI API Keys (Required)
    anthropic_api_key: str = Field(..., description="Anthropic Claude API key")
    tavily_api_key: str = Field(..., description="Tavily Search API key")
    openai_api_key: Optional[str] = Field(None, description="OpenAI API key (optional)")
    
    # Application Configuration
    environment: str = Field(default="development", description="Application environment")
    debug: bool = Field(default=True, description="Debug mode")
    port: int = Field(default=7860, description="Application port")
    
    # Logging Configuration
    log_level: str = Field(default="INFO", description="Log level")
    log_file: Optional[str] = Field(None, description="Log file path")
    
    # Rate Limiting
    rate_limit_per_minute: int = Field(default=60, description="Rate limit per minute")
    rate_limit_per_hour: int = Field(default=1000, description="Rate limit per hour")
    
    # Security
    secret_key: str = Field(..., description="Secret key for session management")
    session_timeout_minutes: int = Field(default=30, description="Session timeout in minutes")
    
    @field_validator('environment')
    @classmethod
    def validate_environment(cls, v: str) -> str:
        """Validate environment value."""
        allowed = {'development', 'testing', 'production'}
        if v not in allowed:
            raise ValueError(f'Environment must be one of {allowed}')
        return v
    
    @field_validator('log_level')
    @classmethod
    def validate_log_level(cls, v: str) -> str:
        """Validate log level."""
        allowed = {'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'}
        if v.upper() not in allowed:
            raise ValueError(f'Log level must be one of {allowed}')
        return v.upper()
    
    @field_validator('port')
    @classmethod
    def validate_port(cls, v: int) -> int:
        """Validate port number."""
        if not 1 <= v <= 65535:
            raise ValueError('Port must be between 1 and 65535')
        return v
    
    @field_validator('anthropic_api_key', 'tavily_api_key')
    @classmethod
    def validate_api_key(cls, v: str) -> str:
        """Validate API key format."""
        if not v or v == "your_api_key_here":
            raise ValueError("API key cannot be empty or placeholder")
        if len(v) < 10:
            raise ValueError("API key appears to be too short")
        return v
    
    class Config:
        """Pydantic configuration."""
        env_file = ".env"
        env_file_encoding = "utf-8"
        case_sensitive = False


def load_environment() -> EnvironmentConfig:
    """
    Load and validate environment variables.
    
    Returns:
        EnvironmentConfig: Validated environment configuration
        
    Raises:
        ValueError: If required environment variables are missing or invalid
        FileNotFoundError: If .env file is not found
    """
    # Find the project root directory
    current_dir = Path(__file__).parent
    project_root = current_dir.parent.parent.parent
    
    # Look for .env file in project root
    env_file = project_root / ".env"
    
    if not env_file.exists():
        logger.warning(f".env file not found at {env_file}")
        logger.info("Please copy .env.example to .env and fill in your API keys")
        
        # Check if .env.example exists
        env_example = project_root / ".env.example"
        if env_example.exists():
            logger.info(f"Found .env.example at {env_example}")
        else:
            logger.error("No .env.example file found. Please create one with required variables.")
        
        raise FileNotFoundError(f".env file not found at {env_file}")
    
    # Load environment variables from .env file
    load_dotenv(env_file)
    logger.info(f"Loaded environment variables from {env_file}")
    
    # Validate and return configuration
    try:
        config = EnvironmentConfig()
        logger.info("Environment configuration validated successfully")
        return config
    except Exception as e:
        logger.error(f"Environment validation failed: {e}")
        raise ValueError(f"Invalid environment configuration: {e}")


def generate_secret_key() -> str:
    """
    Generate a secure secret key for session management.
    
    Returns:
        str: A secure random secret key
    """
    return secrets.token_urlsafe(32)


def check_environment_security() -> Dict[str, Any]:
    """
    Check environment security and return security status.
    
    Returns:
        Dict[str, Any]: Security status information
    """
    security_status = {
        "secure": True,
        "warnings": [],
        "errors": [],
        "recommendations": []
    }
    
    # Check for common security issues
    env_vars = os.environ
    
    # Check for placeholder values
    placeholder_vars = []
    for key, value in env_vars.items():
        if "your_" in value.lower() or "placeholder" in value.lower():
            placeholder_vars.append(key)
    
    if placeholder_vars:
        security_status["warnings"].append(f"Placeholder values found in: {', '.join(placeholder_vars)}")
        security_status["secure"] = False
    
    # Check for weak secret keys
    secret_key = env_vars.get("SECRET_KEY", "")
    if secret_key and len(secret_key) < 32:
        security_status["warnings"].append("SECRET_KEY appears to be too short")
        security_status["recommendations"].append("Generate a new secret key with: python -c \"import secrets; print(secrets.token_urlsafe(32))\"")
    
    # Check for debug mode in production
    if env_vars.get("ENVIRONMENT") == "production" and env_vars.get("DEBUG", "").lower() == "true":
        security_status["errors"].append("DEBUG mode is enabled in production environment")
        security_status["secure"] = False
    
    # Check for missing required keys
    required_keys = ["ANTHROPIC_API_KEY", "TAVILY_API_KEY"]
    missing_keys = [key for key in required_keys if not env_vars.get(key)]
    
    if missing_keys:
        security_status["errors"].append(f"Missing required API keys: {', '.join(missing_keys)}")
        security_status["secure"] = False
    
    return security_status


def setup_environment() -> EnvironmentConfig:
    """
    Setup environment with security checks.
    
    Returns:
        EnvironmentConfig: Validated environment configuration
    """
    logger.info("Setting up environment configuration...")
    
    # Check security first
    security_status = check_environment_security()
    
    if security_status["errors"]:
        logger.error("Security errors found:")
        for error in security_status["errors"]:
            logger.error(f"  - {error}")
        raise ValueError("Environment security check failed")
    
    if security_status["warnings"]:
        logger.warning("Security warnings:")
        for warning in security_status["warnings"]:
            logger.warning(f"  - {warning}")
    
    if security_status["recommendations"]:
        logger.info("Security recommendations:")
        for rec in security_status["recommendations"]:
            logger.info(f"  - {rec}")
    
    # Load and validate environment
    config = load_environment()
    
    logger.info("Environment setup completed successfully")
    return config


# Example usage and testing
if __name__ == "__main__":
    try:
        # Setup logging
        logging.basicConfig(level=logging.INFO)
        
        # Load environment
        config = setup_environment()
        
        print("✅ Environment configuration loaded successfully!")
        print(f"Environment: {config.environment}")
        print(f"Debug mode: {config.debug}")
        print(f"Port: {config.port}")
        print(f"Log level: {config.log_level}")
        
        # Check security
        security = check_environment_security()
        if security["secure"]:
            print("✅ Environment security check passed")
        else:
            print("⚠️ Environment security issues found")
            
    except Exception as e:
        print(f"❌ Environment setup failed: {e}")
        print("\nTo fix this:")
        print("1. Copy .env.example to .env")
        print("2. Fill in your actual API keys")
        print("3. Run this script again")