File size: 6,904 Bytes
461adca | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 | """
Configuration validator.
"""
import os
from pathlib import Path
from typing import List, Optional
from config.settings import Settings
class ConfigValidator:
"""Validates application configuration."""
def __init__(self, settings: Settings):
"""
Initialize the validator.
Args:
settings: Settings object to validate
"""
self.settings = settings
self.errors: List[str] = []
self.warnings: List[str] = []
def validate_all(self) -> bool:
"""
Run all validation checks.
Returns:
True if all validations pass, False otherwise
"""
self.errors = []
self.warnings = []
self.validate_api_keys()
self.validate_paths()
self.validate_models()
self.validate_session_config()
self.validate_redis_config()
return len(self.errors) == 0
def validate_api_keys(self) -> None:
"""Validate that required API keys are present."""
required_keys = {
'openai': 'OPENAI_API_KEY',
'anthropic': 'ANTHROPIC_API_KEY',
'gemini': 'GEMINI_API_KEY',
'deepseek': 'DEEPSEEK_API_KEY'
}
for provider in self.settings.models.providers:
key_name = required_keys.get(provider)
if key_name and not os.getenv(key_name):
self.errors.append(
f"Missing API key for provider '{provider}': {key_name} not found in environment"
)
# AWS credentials are optional
if not os.getenv('AWS_ACCESS_KEY_ID') or not os.getenv('AWS_SECRET_ACCESS_KEY'):
self.warnings.append(
"AWS credentials not found. S3 functionality will be disabled. "
"Will use local files only."
)
def validate_paths(self) -> None:
"""Validate file paths and directories."""
# Validate local directory
local_dir = Path(self.settings.aws.local_dir)
if not local_dir.exists():
self.warnings.append(
f"Local directory does not exist: {local_dir}. "
f"It will be created on initialization."
)
# Validate required files
for filename in self.settings.required_files:
filepath = local_dir / filename
if not filepath.exists():
self.warnings.append(
f"Required file not found: {filepath}. "
f"Will attempt to download from S3 if available."
)
# Validate logging directory
if self.settings.logging.file:
log_file = Path(self.settings.logging.file)
log_dir = log_file.parent
if not log_dir.exists():
self.warnings.append(
f"Log directory does not exist: {log_dir}. "
f"It will be created on initialization."
)
def validate_models(self) -> None:
"""Validate model configurations."""
# Check that each provider has at least one model
for provider in self.settings.models.providers:
gen_models = getattr(self.settings.models.generation, provider, [])
analysis_models = getattr(self.settings.models.analysis, provider, [])
if not gen_models:
self.warnings.append(
f"No generation models configured for provider '{provider}'"
)
if not analysis_models:
self.warnings.append(
f"No analysis models configured for provider '{provider}'"
)
# Check that at least one model is marked as default
if gen_models and not any(m.default for m in gen_models):
self.warnings.append(
f"No default generation model set for provider '{provider}'"
)
if analysis_models and not any(m.default for m in analysis_models):
self.warnings.append(
f"No default analysis model set for provider '{provider}'"
)
def validate_session_config(self) -> None:
"""Validate session configuration."""
if self.settings.session.timeout_minutes <= 0:
self.errors.append("Session timeout must be positive")
if self.settings.session.cleanup_interval_minutes <= 0:
self.errors.append("Session cleanup interval must be positive")
if self.settings.session.max_sessions <= 0:
self.errors.append("Max sessions must be positive")
if self.settings.session.cleanup_interval_minutes >= self.settings.session.timeout_minutes:
self.warnings.append(
"Session cleanup interval should be less than timeout for efficiency"
)
def validate_redis_config(self) -> None:
"""Validate Redis configuration if Redis storage is used."""
if self.settings.session.storage_type == "redis":
if not self.settings.redis.host:
self.errors.append("Redis host is required when using Redis storage")
if self.settings.redis.port <= 0 or self.settings.redis.port > 65535:
self.errors.append("Redis port must be between 1 and 65535")
if self.settings.redis.db < 0:
self.errors.append("Redis database number must be non-negative")
def get_errors(self) -> List[str]:
"""Get list of validation errors."""
return self.errors
def get_warnings(self) -> List[str]:
"""Get list of validation warnings."""
return self.warnings
def print_report(self) -> None:
"""Print validation report."""
if self.errors:
print("\n❌ Configuration Errors:")
for error in self.errors:
print(f" - {error}")
if self.warnings:
print("\n⚠️ Configuration Warnings:")
for warning in self.warnings:
print(f" - {warning}")
if not self.errors and not self.warnings:
print("\n✅ Configuration is valid!")
def validate_configuration(settings: Settings, print_report: bool = True) -> bool:
"""
Validate configuration settings.
Args:
settings: Settings object to validate
print_report: Whether to print validation report
Returns:
True if configuration is valid, False otherwise
"""
validator = ConfigValidator(settings)
is_valid = validator.validate_all()
if print_report:
validator.print_report()
return is_valid
|