Medium-MCP / src /shared_config.py
Nikhil Pravin Pise
feat: Switch to Groq as primary LLM provider
40cdc42
"""
Shared Configuration Module for Medium Agent Ecosystem
This module provides a centralized configuration system that both
Medium-Scraper and medium-mcp-server can import and extend.
Maintains backward compatibility while providing a single source of truth.
"""
import os
from dataclasses import dataclass, field
from typing import Optional, Dict, Any
from pathlib import Path
@dataclass
class SharedConfig:
"""
Shared configuration for the Medium Agent ecosystem.
Both Medium-Scraper and medium-mcp-server extend this base config.
All settings can be overridden via environment variables.
"""
# ========================================================================
# Scraper Settings
# ========================================================================
max_workers: int = 5
max_batch_size: int = 20
default_timeout: int = 30
max_concurrency: int = 5
# ========================================================================
# API Keys
# ========================================================================
groq_api_key: Optional[str] = None
gemini_api_key: Optional[str] = None
openai_api_key: Optional[str] = None
elevenlabs_api_key: Optional[str] = None
# ========================================================================
# HTTP Settings (Connection Pooling)
# ========================================================================
max_connections: int = 100
max_keepalive_connections: int = 20
keepalive_expiry: float = 5.0
http_timeout: int = 30
enable_http2: bool = True
# ========================================================================
# Rate Limiting
# ========================================================================
requests_per_minute: int = 60
enable_rate_limiting: bool = True
# ========================================================================
# Database Settings
# ========================================================================
db_pool_size: int = 5
db_timeout: int = 30
enable_async_db: bool = True
db_wal_mode: bool = True # Write-Ahead Logging for better concurrency
# ========================================================================
# Resilience Settings
# ========================================================================
circuit_breaker_threshold: int = 5
circuit_breaker_timeout: int = 300
max_retries: int = 3
retry_backoff_base: float = 1.0 # Initial delay in seconds
retry_backoff_multiplier: float = 2.0 # Exponential multiplier
# ========================================================================
# Logging Settings
# ========================================================================
log_level: str = "INFO"
enable_structured_logging: bool = True
log_format: str = "json" # "json" or "console"
# ========================================================================
# Cache Settings
# ========================================================================
enable_caching: bool = True
cache_ttl: int = 3600 # 1 hour default
cache_max_size: int = 1000 # Max items in memory cache
# ========================================================================
# Paths
# ========================================================================
base_dir: Path = field(default_factory=lambda: Path(__file__).parent)
db_path: Optional[Path] = None
output_dir: Optional[Path] = None
@classmethod
def from_env(cls, env_prefix: str = "") -> "SharedConfig":
"""
Load configuration from environment variables.
Args:
env_prefix: Optional prefix for environment variables
(e.g., "MCP_" or "SCRAPER_")
Returns:
SharedConfig instance with values from environment
"""
def get_env(key: str, default: Any = None, cast_type=str) -> Any:
"""Get environment variable with optional prefix and type casting."""
env_key = f"{env_prefix}{key}" if env_prefix else key
value = os.getenv(env_key, os.getenv(key, default))
if value is None:
return default
# Type casting
if cast_type == bool:
return str(value).lower() in ('true', '1', 'yes', 'on')
elif cast_type == int:
return int(value)
elif cast_type == float:
return float(value)
else:
return value
return cls(
# Scraper settings
max_workers=get_env("MAX_WORKERS", 5, int),
max_batch_size=get_env("MAX_BATCH_SIZE", 20, int),
default_timeout=get_env("DEFAULT_TIMEOUT", 30, int),
max_concurrency=get_env("MAX_CONCURRENCY", 5, int),
# API Keys
groq_api_key=get_env("GROQ_API_KEY"),
gemini_api_key=get_env("GEMINI_API_KEY"),
openai_api_key=get_env("OPENAI_API_KEY"),
elevenlabs_api_key=get_env("ELEVENLABS_API_KEY"),
# HTTP Settings
max_connections=get_env("HTTP_MAX_CONNECTIONS", 100, int),
max_keepalive_connections=get_env("HTTP_MAX_KEEPALIVE", 20, int),
keepalive_expiry=get_env("HTTP_KEEPALIVE_EXPIRY", 5.0, float),
http_timeout=get_env("HTTP_TIMEOUT", 30, int),
enable_http2=get_env("ENABLE_HTTP2", True, bool),
# Rate Limiting
requests_per_minute=get_env("RATE_LIMIT_RPM", 60, int),
enable_rate_limiting=get_env("ENABLE_RATE_LIMITING", True, bool),
# Database
db_pool_size=get_env("DB_POOL_SIZE", 5, int),
db_timeout=get_env("DB_TIMEOUT", 30, int),
enable_async_db=get_env("ENABLE_ASYNC_DB", True, bool),
db_wal_mode=get_env("DB_WAL_MODE", True, bool),
# Resilience
circuit_breaker_threshold=get_env("CIRCUIT_BREAKER_THRESHOLD", 5, int),
circuit_breaker_timeout=get_env("CIRCUIT_BREAKER_TIMEOUT", 300, int),
max_retries=get_env("MAX_RETRIES", 3, int),
retry_backoff_base=get_env("RETRY_BACKOFF_BASE", 1.0, float),
retry_backoff_multiplier=get_env("RETRY_BACKOFF_MULTIPLIER", 2.0, float),
# Logging
log_level=get_env("LOG_LEVEL", "INFO"),
enable_structured_logging=get_env("ENABLE_STRUCTURED_LOGGING", True, bool),
log_format=get_env("LOG_FORMAT", "json"),
# Cache
enable_caching=get_env("ENABLE_CACHING", True, bool),
cache_ttl=get_env("CACHE_TTL", 3600, int),
cache_max_size=get_env("CACHE_MAX_SIZE", 1000, int),
)
def to_dict(self) -> Dict[str, Any]:
"""Convert configuration to dictionary."""
return {
k: str(v) if isinstance(v, Path) else v
for k, v in self.__dict__.items()
}
def __repr__(self) -> str:
"""String representation with sensitive data masked."""
safe_dict = self.to_dict()
# Mask sensitive keys
sensitive_keys = ['groq_api_key', 'gemini_api_key', 'openai_api_key', 'elevenlabs_api_key']
for key in sensitive_keys:
if safe_dict.get(key):
safe_dict[key] = safe_dict[key][:8] + "..." if safe_dict[key] else None
return f"SharedConfig({safe_dict})"
# Singleton instance for global access (optional)
_global_config: Optional[SharedConfig] = None
def get_config(reload: bool = False) -> SharedConfig:
"""
Get global configuration instance.
Args:
reload: If True, reload config from environment
Returns:
SharedConfig instance
"""
global _global_config
if _global_config is None or reload:
_global_config = SharedConfig.from_env()
return _global_config
def set_config(config: SharedConfig) -> None:
"""
Set global configuration instance.
Args:
config: SharedConfig instance to use globally
"""
global _global_config
_global_config = config