Sai Kumar Taraka
feat: Add actual AI/ML capabilities with LLM, semantic embeddings, and reinforcement learning
9e8e9e2 | # src/config.py β Central configuration with Pydantic validation | |
| from __future__ import annotations | |
| import os | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Any | |
| from pydantic import BaseModel, Field | |
| import yaml | |
| # ββ Data Models ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class SignalDef(BaseModel): | |
| name: str | |
| direction: str = Field(pattern=r"^(input|output|inout)$") | |
| width: Optional[int] = 1 | |
| class InterfaceDef(BaseModel): | |
| name: str | |
| signals: List[SignalDef] = Field(min_length=1) | |
| class FieldDef(BaseModel): | |
| name: str | |
| bits: str | |
| description: Optional[str] = None | |
| class RegisterDef(BaseModel): | |
| name: str | |
| address: str | |
| fields: List[FieldDef] = [] | |
| description: Optional[str] = None | |
| access: Optional[str] = None | |
| size: Optional[int] = None | |
| reset_value: Optional[str] = None | |
| volatile: bool = False | |
| class ClockResetDef(BaseModel): | |
| clock: str = "clk" | |
| reset: str = "rst_n" | |
| reset_active: int = Field(default=0, ge=0, le=1) | |
| class DesignSpec(BaseModel): | |
| design_name: str = Field(min_length=1, pattern=r"^[a-zA-Z_][a-zA-Z0-9_]*$") | |
| clock_reset: ClockResetDef = ClockResetDef() | |
| interfaces: List[InterfaceDef] = Field(min_length=1) | |
| registers: List[RegisterDef] = [] | |
| protocol: str = Field(default="", pattern=r"^(uart|spi|i2c|axi4lite|apb|wishbone|)$") | |
| # ββ Pipeline / Engine Config βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class LoggingConfig(BaseModel): | |
| level: str = Field(default="INFO", pattern=r"^(DEBUG|INFO|WARNING|ERROR)$") | |
| file: Optional[str] = None | |
| format: str = "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s" | |
| class EvaluationConfig(BaseModel): | |
| enabled: bool = True | |
| metrics: List[str] = ["completeness", "syntax_validity", "coverage_readiness"] | |
| threshold: float = Field(default=0.7, ge=0.0, le=1.0) | |
| class TrackingConfig(BaseModel): | |
| enabled: bool = False | |
| backend: str = Field(default="local", pattern=r"^(local|mlflow)$") | |
| experiment_name: Optional[str] = None | |
| tracking_uri: Optional[str] = None | |
| class GenerationConfig(BaseModel): | |
| templates_dir: str = "src/generation/templates" | |
| output_dir: str = "output" | |
| overwrite: bool = False | |
| strict_validation: bool = True | |
| iteration: int = Field(default=0, ge=0) | |
| class AutoTrainConfig(BaseModel): | |
| enabled: bool = False | |
| max_iterations: int = Field(default=5, ge=1, le=50) | |
| coverage_target: float = Field(default=90.0, ge=0.0, le=100.0) | |
| coverage_gain_min: float = Field(default=2.0, ge=0.0, description="Min % gain per iteration to continue") | |
| simulator: str = Field(default="stub", pattern=r"^(stub|icarus|vcs|questa)$") | |
| sim_timeout: int = Field(default=300, ge=10) | |
| num_seeds: int = Field(default=3, ge=1, le=20, description="Number of regression seeds per iteration") | |
| generate_regression_test: bool = True | |
| class MLConfig(BaseModel): | |
| """Configuration for AI/ML-augmented generation with actual learning capabilities.""" | |
| enabled: bool = False | |
| model_type: str = Field(default="template", pattern=r"^(template|ml|hybrid|llm|semantic)$") | |
| similarity_threshold: float = Field(default=0.75, ge=0.0, le=1.0) | |
| auto_learn: bool = True | |
| index_path: Optional[str] = None | |
| top_k_retrieval: int = Field(default=3, ge=1, le=10) | |
| fallback_to_templates: bool = True | |
| use_llm: bool = True | |
| llm_model_name: Optional[str] = None | |
| llm_max_tokens: int = Field(default=1024, ge=64, le=4096) | |
| llm_temperature: float = Field(default=0.2, ge=0.0, le=1.0) | |
| llm_use_few_shot: bool = True | |
| use_semantic_encoder: bool = True | |
| semantic_model_name: str = "microsoft/codebert-base" | |
| use_learning: bool = True | |
| learning_storage_path: Optional[str] = None | |
| learning_rate: float = Field(default=0.1, ge=0.001, le=1.0) | |
| reinforcement_discount: float = Field(default=0.9, ge=0.0, le=1.0) | |
| exploration_epsilon: float = Field(default=0.05, ge=0.0, le=0.5) | |
| class PipelineConfig(BaseModel): | |
| generation: GenerationConfig = GenerationConfig() | |
| evaluation: EvaluationConfig = EvaluationConfig() | |
| tracking: TrackingConfig = TrackingConfig() | |
| logging: LoggingConfig = LoggingConfig() | |
| auto_train: AutoTrainConfig = AutoTrainConfig() | |
| ml: MLConfig = MLConfig() | |
| # ββ Config Loader ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ConfigLoader: | |
| """Hierarchical config loader with env override support. | |
| Load order (later overrides earlier): | |
| 1. Base defaults | |
| 2. <env>.yaml (e.g. configs/production.yaml) | |
| 3. Environment variables (UVMGEN_* prefix) | |
| """ | |
| ENV_PREFIX = "UVMGEN_" | |
| def __init__(self, root: Optional[str] = None): | |
| self.root = Path(root or os.getcwd()) | |
| def load(self, spec_path: str, pipeline_path: Optional[str] = None) -> tuple[DesignSpec, PipelineConfig]: | |
| design_spec = self._load_design_spec(spec_path) | |
| pipeline_cfg = self._load_pipeline(pipeline_path) | |
| self._apply_env_overrides(pipeline_cfg) | |
| return design_spec, pipeline_cfg | |
| def _load_design_spec(self, path: str) -> DesignSpec: | |
| from src.data.preprocessor import SpecPreprocessor | |
| from src.data.core_parser import CoreParser | |
| ext = Path(path).suffix.lower() | |
| if ext == ".core": | |
| raw = CoreParser().parse(Path(path).read_text(encoding="utf-8")) | |
| else: | |
| raw = self._read_yaml(path) | |
| raw = SpecPreprocessor().preprocess(raw) | |
| return DesignSpec(**raw) | |
| def _load_pipeline(self, path: Optional[str] = None) -> PipelineConfig: | |
| base = PipelineConfig() | |
| if path and Path(path).exists(): | |
| overrides = self._read_yaml(path) | |
| base = self._deep_merge(base, overrides) | |
| return base | |
| def _apply_env_overrides(self, cfg: PipelineConfig) -> None: | |
| prefix = self.ENV_PREFIX | |
| for key, val in os.environ.items(): | |
| if key.startswith(prefix): | |
| parts = key[len(prefix):].lower().split("__") | |
| target = cfg | |
| for part in parts[:-1]: | |
| target = getattr(target, part, None) | |
| if target is None: | |
| break | |
| else: | |
| last = parts[-1] | |
| if hasattr(target, last): | |
| setattr(target, last, self._coerce(val, type(getattr(target, last)))) | |
| def _read_yaml(path: str) -> dict: | |
| with open(path, "r") as f: | |
| return yaml.safe_load(f) | |
| def _coerce(val: str, typ: type) -> Any: | |
| if typ is bool: | |
| return val.lower() in ("1", "true", "yes") | |
| if typ is int: | |
| return int(val) | |
| if typ is float: | |
| return float(val) | |
| return val | |
| def _deep_merge(base: PipelineConfig, overrides: dict) -> PipelineConfig: | |
| import json | |
| base_dict = json.loads(base.model_dump_json()) | |
| for k, v in overrides.items(): | |
| if k in base_dict and isinstance(base_dict[k], dict) and isinstance(v, dict): | |
| base_dict[k].update(v) | |
| else: | |
| base_dict[k] = v | |
| return PipelineConfig(**base_dict) | |