""" Shared Configuration Module for Medium Agent Ecosystem This module provides a centralized configuration system that both Medium-Scraper and medium-mcp-server can import and extend. Maintains backward compatibility while providing a single source of truth. """ import os from dataclasses import dataclass, field from typing import Optional, Dict, Any from pathlib import Path @dataclass class SharedConfig: """ Shared configuration for the Medium Agent ecosystem. Both Medium-Scraper and medium-mcp-server extend this base config. All settings can be overridden via environment variables. """ # ======================================================================== # Scraper Settings # ======================================================================== max_workers: int = 5 max_batch_size: int = 20 default_timeout: int = 30 max_concurrency: int = 5 # ======================================================================== # API Keys # ======================================================================== groq_api_key: Optional[str] = None gemini_api_key: Optional[str] = None openai_api_key: Optional[str] = None elevenlabs_api_key: Optional[str] = None # ======================================================================== # HTTP Settings (Connection Pooling) # ======================================================================== max_connections: int = 100 max_keepalive_connections: int = 20 keepalive_expiry: float = 5.0 http_timeout: int = 30 enable_http2: bool = True # ======================================================================== # Rate Limiting # ======================================================================== requests_per_minute: int = 60 enable_rate_limiting: bool = True # ======================================================================== # Database Settings # ======================================================================== db_pool_size: int = 5 db_timeout: int = 30 enable_async_db: bool = True db_wal_mode: bool = True # Write-Ahead Logging for better concurrency # ======================================================================== # Resilience Settings # ======================================================================== circuit_breaker_threshold: int = 5 circuit_breaker_timeout: int = 300 max_retries: int = 3 retry_backoff_base: float = 1.0 # Initial delay in seconds retry_backoff_multiplier: float = 2.0 # Exponential multiplier # ======================================================================== # Logging Settings # ======================================================================== log_level: str = "INFO" enable_structured_logging: bool = True log_format: str = "json" # "json" or "console" # ======================================================================== # Cache Settings # ======================================================================== enable_caching: bool = True cache_ttl: int = 3600 # 1 hour default cache_max_size: int = 1000 # Max items in memory cache # ======================================================================== # Paths # ======================================================================== base_dir: Path = field(default_factory=lambda: Path(__file__).parent) db_path: Optional[Path] = None output_dir: Optional[Path] = None @classmethod def from_env(cls, env_prefix: str = "") -> "SharedConfig": """ Load configuration from environment variables. Args: env_prefix: Optional prefix for environment variables (e.g., "MCP_" or "SCRAPER_") Returns: SharedConfig instance with values from environment """ def get_env(key: str, default: Any = None, cast_type=str) -> Any: """Get environment variable with optional prefix and type casting.""" env_key = f"{env_prefix}{key}" if env_prefix else key value = os.getenv(env_key, os.getenv(key, default)) if value is None: return default # Type casting if cast_type == bool: return str(value).lower() in ('true', '1', 'yes', 'on') elif cast_type == int: return int(value) elif cast_type == float: return float(value) else: return value return cls( # Scraper settings max_workers=get_env("MAX_WORKERS", 5, int), max_batch_size=get_env("MAX_BATCH_SIZE", 20, int), default_timeout=get_env("DEFAULT_TIMEOUT", 30, int), max_concurrency=get_env("MAX_CONCURRENCY", 5, int), # API Keys groq_api_key=get_env("GROQ_API_KEY"), gemini_api_key=get_env("GEMINI_API_KEY"), openai_api_key=get_env("OPENAI_API_KEY"), elevenlabs_api_key=get_env("ELEVENLABS_API_KEY"), # HTTP Settings max_connections=get_env("HTTP_MAX_CONNECTIONS", 100, int), max_keepalive_connections=get_env("HTTP_MAX_KEEPALIVE", 20, int), keepalive_expiry=get_env("HTTP_KEEPALIVE_EXPIRY", 5.0, float), http_timeout=get_env("HTTP_TIMEOUT", 30, int), enable_http2=get_env("ENABLE_HTTP2", True, bool), # Rate Limiting requests_per_minute=get_env("RATE_LIMIT_RPM", 60, int), enable_rate_limiting=get_env("ENABLE_RATE_LIMITING", True, bool), # Database db_pool_size=get_env("DB_POOL_SIZE", 5, int), db_timeout=get_env("DB_TIMEOUT", 30, int), enable_async_db=get_env("ENABLE_ASYNC_DB", True, bool), db_wal_mode=get_env("DB_WAL_MODE", True, bool), # Resilience circuit_breaker_threshold=get_env("CIRCUIT_BREAKER_THRESHOLD", 5, int), circuit_breaker_timeout=get_env("CIRCUIT_BREAKER_TIMEOUT", 300, int), max_retries=get_env("MAX_RETRIES", 3, int), retry_backoff_base=get_env("RETRY_BACKOFF_BASE", 1.0, float), retry_backoff_multiplier=get_env("RETRY_BACKOFF_MULTIPLIER", 2.0, float), # Logging log_level=get_env("LOG_LEVEL", "INFO"), enable_structured_logging=get_env("ENABLE_STRUCTURED_LOGGING", True, bool), log_format=get_env("LOG_FORMAT", "json"), # Cache enable_caching=get_env("ENABLE_CACHING", True, bool), cache_ttl=get_env("CACHE_TTL", 3600, int), cache_max_size=get_env("CACHE_MAX_SIZE", 1000, int), ) def to_dict(self) -> Dict[str, Any]: """Convert configuration to dictionary.""" return { k: str(v) if isinstance(v, Path) else v for k, v in self.__dict__.items() } def __repr__(self) -> str: """String representation with sensitive data masked.""" safe_dict = self.to_dict() # Mask sensitive keys sensitive_keys = ['groq_api_key', 'gemini_api_key', 'openai_api_key', 'elevenlabs_api_key'] for key in sensitive_keys: if safe_dict.get(key): safe_dict[key] = safe_dict[key][:8] + "..." if safe_dict[key] else None return f"SharedConfig({safe_dict})" # Singleton instance for global access (optional) _global_config: Optional[SharedConfig] = None def get_config(reload: bool = False) -> SharedConfig: """ Get global configuration instance. Args: reload: If True, reload config from environment Returns: SharedConfig instance """ global _global_config if _global_config is None or reload: _global_config = SharedConfig.from_env() return _global_config def set_config(config: SharedConfig) -> None: """ Set global configuration instance. Args: config: SharedConfig instance to use globally """ global _global_config _global_config = config