""" QAgents-Workflows: Configuration Central configuration for the multi-agent quantum circuit optimization system. Path: QAgents-workflos/config.py Related: agents/llm_adapter.py (uses GEMINI_MODELS for fallback cascade) run_evaluation.py (uses config for evaluation settings) workflows/workflow_definitions.py (references rate limits) """ from pathlib import Path from dataclasses import dataclass, field from typing import Optional, List, Dict import os # Load environment variables from .env file try: from dotenv import load_dotenv load_dotenv() except ImportError: # If python-dotenv is not installed, continue without loading .env # (it will use system environment variables only) pass # Paths PROJECT_ROOT = Path(__file__).parent QUANTUM_MCP_ROOT = PROJECT_ROOT.parent / "QuantumArchitect-MCP" # ============================================================================= # GEMINI MODEL CASCADE (sorted by RPD - highest to lowest for optimal fallback) # ============================================================================= # When a model hits rate limits (RPM/RPD), fallback to next model in list. # Free tier limits (as of 2025): # - Gemma 3: 30 RPM, 15K TPM, 14,400 RPD (HIGHEST availability) # - Flash-Lite: 15 RPM, 250K TPM, 1,000 RPD # - Flash 2.5: 10 RPM, 250K TPM, 250 RPD # - Flash 2.0: 15 RPM, 1M TPM, 200 RPD # - Flash 2.0 Lite: 30 RPM, 1M TPM, 200 RPD # - Pro 2.5: 2 RPM, 125K TPM, 50 RPD (LOWEST availability) # # EXPECTED REQUESTS PER EVALUATION (9 problems): # - Naked mode: 0 LLM calls (direct MCP only) # - Guided mode: ~36 LLM calls (4 per problem) # - Blackboard: ~72-108 LLM calls (8-12 per problem) # ============================================================================= GEMINI_MODELS: List[Dict] = [ # Highest RPD - most available (14,400/day = 10/min continuously) { "name": "gemma-3-27b-it", "rpm": 30, "tpm": 15_000, "rpd": 14_400, "priority": 1, "notes": "Best for high-volume, may have lower quality than Flash" }, # Good balance - default model (1,000/day) { "name": "gemini-2.5-flash-lite", "rpm": 15, "tpm": 250_000, "rpd": 1_000, "priority": 2, "notes": "Good balance of quality and availability - DEFAULT" }, # Higher quality - moderate availability (250/day) { "name": "gemini-2.5-flash", "rpm": 10, "tpm": 250_000, "rpd": 250, "priority": 3, "notes": "Better quality, lower availability" }, # High TPM for long contexts (200/day) { "name": "gemini-2.0-flash", "rpm": 15, "tpm": 1_000_000, "rpd": 200, "priority": 4, "notes": "Good for long contexts, moderate availability" }, # Fast variant (200/day) { "name": "gemini-2.0-flash-lite", "rpm": 30, "tpm": 1_000_000, "rpd": 200, "priority": 5, "notes": "Fast responses, lower availability" }, # Lowest RPD - highest quality, use sparingly (50/day) { "name": "gemini-2.5-pro", "rpm": 2, "tpm": 125_000, "rpd": 50, "priority": 6, "notes": "Highest quality, use sparingly - LAST RESORT" }, ] def get_model_by_priority(priority: int = 1) -> Optional[Dict]: """Get model config by priority (1=highest RPD).""" for model in GEMINI_MODELS: if model["priority"] == priority: return model return None def get_next_model(current_name: str) -> Optional[Dict]: """Get next model in fallback chain.""" for i, model in enumerate(GEMINI_MODELS): if model["name"] == current_name: if i + 1 < len(GEMINI_MODELS): return GEMINI_MODELS[i + 1] return None def get_model_config(model_name: str) -> Optional[Dict]: """Get model config by name.""" for model in GEMINI_MODELS: if model["name"] == model_name: return model return None @dataclass class MCPConfig: """MCP Server configuration.""" host: str = "127.0.0.1" port: int = 7861 base_url: str = field(init=False) def __post_init__(self): self.base_url = f"http://{self.host}:{self.port}" @dataclass class RateLimitConfig: """Rate limiting based on Gemini API free tier limits.""" # Default to gemini-2.5-flash-lite limits rpm_limit: int = 15 # Requests per minute tpm_limit: int = 250_000 # Tokens per minute rpd_limit: int = 1_000 # Requests per day # Conservative buffer (80% of limit = 12 RPM effective) rpm_buffer: float = 0.8 @property def min_request_interval(self) -> float: """Minimum seconds between requests: 60 / (15 * 0.8) = 5 seconds.""" return 60.0 / (self.rpm_limit * self.rpm_buffer) @dataclass class LLMConfig: """LLM configuration for agents - model agnostic via Gemini and LiteLLM. Environment Variables (HuggingFace Space compatible): - LLM_PROVIDER: Provider name (gemini, openai, anthropic, groq, ollama). Default: "gemini" - LLM_MODEL: Model identifier. Default: "gemini-2.5-flash-lite" - GOOGLE_API_KEY: Gemini API key (Gemini provider) - GENAI_API_KEY: Alternative Gemini API key (fallback) - OPENAI_API_KEY: OpenAI API key (OpenAI provider) - ANTHROPIC_API_KEY: Anthropic API key (Anthropic provider) - GROQ_API_KEY: Groq API key (Groq provider) """ # Provider options: gemini, openai, anthropic, groq, ollama, etc. # Reads from LLM_PROVIDER env var, falls back to "gemini" provider: str = field(default_factory=lambda: os.getenv("LLM_PROVIDER", "gemini")) # Model identifier - reads from LLM_MODEL env var, falls back to "gemini-2.5-flash-lite" model: str = field(default_factory=lambda: os.getenv("LLM_MODEL", "gemini-2.5-flash-lite")) # API key - tries GOOGLE_API_KEY first (Gemini), then GENAI_API_KEY as fallback # Use None as default and fetch dynamically to support HuggingFace Spaces api_key: Optional[str] = None temperature: float = 0.2 max_tokens: int = 2000 # Rate limiting rate_limit: RateLimitConfig = field(default_factory=RateLimitConfig) enable_rate_limiting: bool = True # Set to False to disable # Multi-model fallback enable_fallback: bool = True # Enable automatic model switching on rate limit fallback_on_error: bool = True # Also fallback on API errors def __post_init__(self): """Initialize API key from environment if not set.""" if self.api_key is None: # Try GOOGLE_API_KEY first, then GENAI_API_KEY self.api_key = os.getenv("GOOGLE_API_KEY") or os.getenv("GENAI_API_KEY") def get_api_key(self) -> Optional[str]: """Get current API key, checking environment on each call for HF Spaces.""" # Always check environment first to support dynamic Secrets in HF Spaces return os.getenv("GOOGLE_API_KEY") or os.getenv("GENAI_API_KEY") or self.api_key @property def model_string(self) -> str: """Get full model string for API calls.""" if self.provider in ["gemini"]: return self.model else: # LiteLLM format: provider/model return f"{self.provider}/{self.model}" @dataclass class DatabaseConfig: """Database/storage configuration.""" db_path: Path = field(default_factory=lambda: PROJECT_ROOT / "database" / "data") log_path: Path = field(default_factory=lambda: PROJECT_ROOT / "database" / "logs") memory_path: Path = field(default_factory=lambda: PROJECT_ROOT / "database" / "memory") def __post_init__(self): # Ensure directories exist for path in [self.db_path, self.log_path, self.memory_path]: path.mkdir(parents=True, exist_ok=True) @dataclass class CostTrackingConfig: """Cost and usage tracking configuration.""" enabled: bool = True track_requests: bool = True track_tokens: bool = True track_time: bool = True # Usage counters (reset daily in production) total_requests: int = 0 total_tokens: int = 0 total_time_ms: float = 0.0 # Per-model tracking model_usage: Dict[str, Dict] = field(default_factory=dict) def record_request(self, model: str, tokens: int, time_ms: float): """Record a request for cost tracking.""" if not self.enabled: return self.total_requests += 1 self.total_tokens += tokens self.total_time_ms += time_ms if model not in self.model_usage: self.model_usage[model] = {"requests": 0, "tokens": 0, "time_ms": 0.0} self.model_usage[model]["requests"] += 1 self.model_usage[model]["tokens"] += tokens self.model_usage[model]["time_ms"] += time_ms def get_summary(self) -> Dict: """Get cost tracking summary.""" return { "total_requests": self.total_requests, "total_tokens": self.total_tokens, "total_time_ms": self.total_time_ms, "avg_time_per_request": self.total_time_ms / max(1, self.total_requests), "model_breakdown": self.model_usage.copy() } def reset(self): """Reset all counters.""" self.total_requests = 0 self.total_tokens = 0 self.total_time_ms = 0.0 self.model_usage = {} @dataclass class EvaluationConfig: """Evaluation settings.""" num_runs: int = 5 # Number of runs per problem for reliability timeout_seconds: float = 120.0 # Max time per problem save_results: bool = True # Cost tracking for evaluation cost_tracking: CostTrackingConfig = field(default_factory=CostTrackingConfig) @dataclass class SystemConfig: """Master configuration.""" mcp: MCPConfig = field(default_factory=MCPConfig) llm: LLMConfig = field(default_factory=LLMConfig) database: DatabaseConfig = field(default_factory=DatabaseConfig) evaluation: EvaluationConfig = field(default_factory=EvaluationConfig) # System mode: "blackboard", "guided", or "naked" active_mode: str = "guided" # Debug settings verbose: bool = True log_level: str = "INFO" # Global config instance config = SystemConfig() def set_mode(mode: str): """Switch between blackboard, guided, and naked modes.""" if mode not in ("blackboard", "guided", "naked"): raise ValueError(f"Invalid mode: {mode}. Use 'blackboard', 'guided', or 'naked'") config.active_mode = mode def get_mode() -> str: """Get current system mode.""" return config.active_mode def set_api_key(api_key: str): """Set the API key for LLM calls.""" config.llm.api_key = api_key def get_cost_summary() -> Dict: """Get the current cost tracking summary.""" return config.evaluation.cost_tracking.get_summary() def reset_cost_tracking(): """Reset cost tracking counters.""" config.evaluation.cost_tracking.reset()