| """ |
| ML utilities for UVM testbench generation. |
| Lightweight implementation with optional numpy/scikit-learn acceleration. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import hashlib |
| import json |
| import math |
| from collections import Counter, defaultdict |
| from dataclasses import dataclass, field |
| from typing import Any, Dict, List, Optional, Tuple |
|
|
| |
| try: |
| import numpy as np |
|
|
| HAS_NUMPY = True |
| except ImportError: |
| HAS_NUMPY = False |
|
|
| try: |
| from sklearn.feature_extraction.text import TfidfVectorizer |
| from sklearn.metrics.pairwise import cosine_similarity as skl_cosine_similarity |
|
|
| HAS_SKLEARN = True |
| except ImportError: |
| HAS_SKLEARN = False |
|
|
|
|
| @dataclass |
| class RichFeatureVector: |
| """Rich feature vector for ML-based similarity and generation.""" |
|
|
| interface_count: int = 0 |
| total_signals: int = 0 |
| register_count: int = 0 |
| total_fields: int = 0 |
| complexity_score: float = 0.0 |
| protocol_type: Optional[str] = None |
|
|
| signal_names: List[str] = field(default_factory=list) |
| signal_directions: Dict[str, str] = field(default_factory=dict) |
| signal_widths: Dict[str, int] = field(default_factory=dict) |
|
|
| register_names: List[str] = field(default_factory=list) |
| register_addresses: Dict[str, str] = field(default_factory=dict) |
| register_fields: Dict[str, List[str]] = field(default_factory=dict) |
| register_access: Dict[str, str] = field(default_factory=dict) |
|
|
| interface_names: List[str] = field(default_factory=list) |
|
|
| design_name: str = "" |
|
|
| def to_dict(self) -> Dict[str, Any]: |
| return { |
| "interface_count": self.interface_count, |
| "total_signals": self.total_signals, |
| "register_count": self.register_count, |
| "total_fields": self.total_fields, |
| "complexity_score": self.complexity_score, |
| "protocol_type": self.protocol_type, |
| "signal_names": self.signal_names, |
| "signal_directions": self.signal_directions, |
| "signal_widths": self.signal_widths, |
| "register_names": self.register_names, |
| "register_addresses": self.register_addresses, |
| "register_fields": self.register_fields, |
| "register_access": self.register_access, |
| "interface_names": self.interface_names, |
| "design_name": self.design_name, |
| } |
|
|
| def to_numerical(self) -> List[float]: |
| """Convert to numerical vector for similarity computation.""" |
| vec = [ |
| float(self.interface_count), |
| float(self.total_signals), |
| float(self.register_count), |
| float(self.total_fields), |
| self.complexity_score, |
| ] |
|
|
| hashes = [ |
| hash_str(self.protocol_type or "none"), |
| hash_str(",".join(sorted(self.signal_names))), |
| hash_str(",".join(sorted(self.register_names))), |
| hash_str(",".join(sorted(self.interface_names))), |
| ] |
| vec.extend([h / (2**32) for h in hashes]) |
|
|
| return vec |
|
|
| def to_text_repr(self) -> str: |
| """Convert to a text representation for TF-IDF encoding.""" |
| parts = [] |
| parts.append(f"protocol:{self.protocol_type or 'generic'}") |
| parts.append(f"design:{self.design_name}") |
|
|
| for name in self.signal_names: |
| dir = self.signal_directions.get(name, "unknown") |
| width = self.signal_widths.get(name, 1) |
| parts.append(f"signal:{name}:{dir}:{width}") |
|
|
| for name in self.register_names: |
| access = self.register_access.get(name, "rw") |
| fields = self.register_fields.get(name, []) |
| parts.append(f"reg:{name}:{access}") |
| for field in fields: |
| parts.append(f"field:{name}.{field}") |
|
|
| for name in self.interface_names: |
| parts.append(f"interface:{name}") |
|
|
| return " ".join(parts) |
|
|
| def fingerprint(self) -> str: |
| """Generate a stable fingerprint for this spec.""" |
| text = self.to_text_repr() |
| return hashlib.sha256(text.encode("utf-8")).hexdigest()[:16] |
|
|
|
|
| def hash_str(s: str) -> int: |
| """Hash a string to a 32-bit integer.""" |
| return int(hashlib.md5(s.encode("utf-8")).hexdigest()[:8], 16) |
|
|
|
|
| def cosine_similarity_py(v1: List[float], v2: List[float]) -> float: |
| """Pure Python cosine similarity.""" |
| if len(v1) != len(v2): |
| return 0.0 |
|
|
| dot = sum(a * b for a, b in zip(v1, v2)) |
| norm1 = math.sqrt(sum(a * a for a in v1)) |
| norm2 = math.sqrt(sum(b * b for b in v2)) |
|
|
| if norm1 == 0 or norm2 == 0: |
| return 0.0 |
|
|
| return dot / (norm1 * norm2) |
|
|
|
|
| def jaccard_similarity(set1: set, set2: set) -> float: |
| """Jaccard similarity between two sets.""" |
| if not set1 and not set2: |
| return 1.0 |
| union = set1 | set2 |
| if not union: |
| return 0.0 |
| return len(set1 & set2) / len(union) |
|
|
|
|
| def weighted_signal_similarity(fv1: RichFeatureVector, fv2: RichFeatureVector) -> float: |
| """Signal-based similarity with direction/width awareness.""" |
| signals1 = set(fv1.signal_names) |
| signals2 = set(fv2.signal_names) |
|
|
| if not signals1 or not signals2: |
| return 0.0 |
|
|
| common = signals1 & signals2 |
| if not common: |
| return 0.0 |
|
|
| score = 0.0 |
| max_score = 0.0 |
|
|
| for sig in common: |
| max_score += 1.0 |
| dir1 = fv1.signal_directions.get(sig) |
| dir2 = fv2.signal_directions.get(sig) |
| w1 = fv1.signal_widths.get(sig, 1) |
| w2 = fv2.signal_widths.get(sig, 1) |
|
|
| if dir1 == dir2: |
| score += 0.4 |
| else: |
| score += 0.2 |
|
|
| if w1 == w2: |
| score += 0.4 |
| else: |
| score += 0.2 |
|
|
| coverage = len(common) / max(len(signals1), len(signals2)) |
| base_score = score / max_score if max_score > 0 else 0.0 |
|
|
| return base_score * 0.7 + coverage * 0.3 |
|
|
|
|
| def protocol_similarity(fv1: RichFeatureVector, fv2: RichFeatureVector) -> float: |
| """Protocol-based similarity.""" |
| p1 = fv1.protocol_type |
| p2 = fv2.protocol_type |
|
|
| if p1 and p2 and p1 == p2: |
| return 1.0 |
| if p1 is None and p2 is None: |
| return 0.5 |
|
|
| PROTOCOL_GROUPS = { |
| "serial": {"uart", "spi", "i2c"}, |
| "bus": {"axi4lite", "apb", "wishbone"}, |
| } |
|
|
| for group, members in PROTOCOL_GROUPS.items(): |
| if p1 in members and p2 in members: |
| return 0.7 |
|
|
| return 0.1 |
|
|
|
|
| def register_similarity(fv1: RichFeatureVector, fv2: RichFeatureVector) -> float: |
| """Register structure similarity.""" |
| regs1 = set(fv1.register_names) |
| regs2 = set(fv2.register_names) |
|
|
| if not regs1 and not regs2: |
| return 0.5 |
|
|
| jaccard = jaccard_similarity(regs1, regs2) |
|
|
| access_match = 0.0 |
| common_regs = regs1 & regs2 |
| for reg in common_regs: |
| a1 = fv1.register_access.get(reg, "rw") |
| a2 = fv2.register_access.get(reg, "rw") |
| if a1 == a2: |
| access_match += 1.0 |
|
|
| access_score = access_match / len(common_regs) if common_regs else 0.0 |
|
|
| return jaccard * 0.6 + access_score * 0.4 |
|
|
|
|
| def combined_similarity(fv1: RichFeatureVector, fv2: RichFeatureVector) -> float: |
| """Combined similarity score across all dimensions.""" |
| proto_sim = protocol_similarity(fv1, fv2) |
| signal_sim = weighted_signal_similarity(fv1, fv2) |
| reg_sim = register_similarity(fv1, fv2) |
|
|
| num1 = fv1.to_numerical() |
| num2 = fv2.to_numerical() |
| if HAS_SKLEARN and HAS_NUMPY: |
| v1 = np.array(num1).reshape(1, -1) |
| v2 = np.array(num2).reshape(1, -1) |
| num_sim = float(skl_cosine_similarity(v1, v2)[0][0]) |
| else: |
| num_sim = cosine_similarity_py(num1, num2) |
|
|
| weights = { |
| "protocol": 0.35, |
| "signal": 0.30, |
| "register": 0.20, |
| "numerical": 0.15, |
| } |
|
|
| total = ( |
| proto_sim * weights["protocol"] |
| + signal_sim * weights["signal"] |
| + reg_sim * weights["register"] |
| + num_sim * weights["numerical"] |
| ) |
|
|
| return max(0.0, min(1.0, total)) |
|
|
|
|
| @dataclass |
| class SearchResult: |
| """Result from a similarity search.""" |
|
|
| fingerprint: str |
| design_name: str |
| protocol_type: Optional[str] |
| similarity: float |
| spec_dict: Dict[str, Any] |
| generated_files: Dict[str, str] = field(default_factory=dict) |
| rank: int = 0 |
|
|
|
|
| class LightweightTFIDF: |
| """Pure Python lightweight TF-IDF for text-based similarity.""" |
|
|
| def __init__(self): |
| self.idf: Dict[str, float] = {} |
| self.vocab: Dict[str, int] = {} |
| self.doc_count = 0 |
|
|
| def fit(self, documents: List[str]) -> "LightweightTFIDF": |
| """Fit on documents.""" |
| doc_freq: Dict[str, int] = defaultdict(int) |
| self.doc_count = len(documents) |
|
|
| vocab_set = set() |
| for doc in documents: |
| tokens = self._tokenize(doc) |
| unique_tokens = set(tokens) |
| for token in unique_tokens: |
| doc_freq[token] += 1 |
| vocab_set.update(unique_tokens) |
|
|
| self.vocab = {tok: idx for idx, tok in enumerate(sorted(vocab_set))} |
|
|
| for token, df in doc_freq.items(): |
| self.idf[token] = math.log(self.doc_count / (df + 1)) + 1 |
|
|
| return self |
|
|
| def transform(self, documents: List[str]) -> List[Dict[int, float]]: |
| """Transform documents to TF-IDF vectors (sparse dict format).""" |
| results = [] |
| for doc in documents: |
| tokens = self._tokenize(doc) |
| tf = Counter(tokens) |
| total = len(tokens) if tokens else 1 |
|
|
| vec: Dict[int, float] = {} |
| for token, count in tf.items(): |
| if token in self.vocab and token in self.idf: |
| tf_val = count / total |
| tfidf = tf_val * self.idf[token] |
| vec[self.vocab[token]] = tfidf |
| results.append(vec) |
| return results |
|
|
| def fit_transform(self, documents: List[str]) -> List[Dict[int, float]]: |
| return self.fit(documents).transform(documents) |
|
|
| @staticmethod |
| def _tokenize(text: str) -> List[str]: |
| tokens = [] |
| for word in text.lower().split(): |
| for part in word.replace(":", " ").replace(".", " ").split(): |
| if part: |
| tokens.append(part) |
| return tokens |
|
|
| @staticmethod |
| def cosine_sparse(v1: Dict[int, float], v2: Dict[int, float]) -> float: |
| """Cosine similarity between two sparse vectors.""" |
| common_keys = set(v1.keys()) & set(v2.keys()) |
| if not common_keys: |
| return 0.0 |
|
|
| dot = sum(v1[k] * v2[k] for k in common_keys) |
| norm1 = math.sqrt(sum(v * v for v in v1.values())) |
| norm2 = math.sqrt(sum(v * v for v in v2.values())) |
|
|
| if norm1 == 0 or norm2 == 0: |
| return 0.0 |
|
|
| return dot / (norm1 * norm2) |
|
|
|
|
| class HybridVectorizer: |
| """Hybrid vectorizer that prefers sklearn but falls back to pure Python.""" |
|
|
| def __init__(self): |
| self._skl_vectorizer: Optional[Any] = None |
| self._py_vectorizer: Optional[LightweightTFIDF] = None |
| self._use_sklearn = HAS_SKLEARN |
|
|
| def fit(self, documents: List[str]) -> "HybridVectorizer": |
| if self._use_sklearn: |
| self._skl_vectorizer = TfidfVectorizer( |
| analyzer="word", |
| ngram_range=(1, 2), |
| max_features=5000, |
| ) |
| self._skl_vectorizer.fit(documents) |
| else: |
| self._py_vectorizer = LightweightTFIDF() |
| self._py_vectorizer.fit(documents) |
| return self |
|
|
| def transform(self, documents: List[str]) -> Any: |
| if self._use_sklearn and self._skl_vectorizer: |
| return self._skl_vectorizer.transform(documents) |
| elif self._py_vectorizer: |
| return self._py_vectorizer.transform(documents) |
| return [] |
|
|
| def fit_transform(self, documents: List[str]) -> Any: |
| return self.fit(documents).transform(documents) |
|
|
| def similarity_matrix(self, query_vec: Any, index_vecs: Any) -> List[float]: |
| """Compute similarity between query and all index vectors.""" |
| if self._use_sklearn and HAS_NUMPY: |
| sims = skl_cosine_similarity(query_vec, index_vecs)[0] |
| return [float(s) for s in sims] |
| else: |
| if not query_vec or not index_vecs: |
| return [] |
| q = query_vec[0] if isinstance(query_vec, list) else query_vec |
| return [LightweightTFIDF.cosine_sparse(q, iv) for iv in index_vecs] |
|
|