AgentIC / src /agentic /core /cdc_analyzer.py
vxkyyy's picture
feat: add Supabase auth + 5 new pipeline stages
1d4d3e9
"""
CDC Analyzer β€” Phase 4 of the Spec Pipeline
=============================================
Receives a feasibility-checked hardware specification and identifies every
signal that crosses a clock domain boundary. For each crossing it assigns
an exact synchronization strategy and generates CDC sub-module specifications
that the RTL generator must instantiate.
A missed CDC is the hardest class of silicon bug to debug β€” it may pass
simulation and fail only on real silicon under specific timing conditions.
Pipeline Steps:
1. IDENTIFY CLOCK DOMAINS β€” Extract all distinct clock domains
2. IDENTIFY CROSSING SIGNALS β€” Enumerate every cross-domain signal
3. ASSIGN SYNCHRONIZATION STRATEGY β€” One strategy per crossing
4. GENERATE CDC SUBMODULES β€” Submodule specs for the RTL generator
5. OUTPUT β€” Enriched spec with CDC annotations
"""
import json
import logging
import re
from dataclasses import asdict, dataclass, field
from typing import Any, Dict, List, Optional, Set, Tuple
logger = logging.getLogger(__name__)
# ─── Synchronization Strategy Constants ──────────────────────────────
SYNC_SINGLE_BIT = "SINGLE_BIT_SYNC"
SYNC_PULSE = "PULSE_SYNC"
SYNC_ASYNC_FIFO = "ASYNC_FIFO"
SYNC_HANDSHAKE = "HANDSHAKE"
SYNC_RESET = "RESET_SYNC"
SYNC_QUASI_STATIC = "QUASI_STATIC"
SYNC_UNRESOLVED = "CDC_UNRESOLVED"
# Minimum FIFO depth for async FIFOs
MIN_FIFO_DEPTH = 4
DEFAULT_FIFO_DEPTH = 8
# Synchronizer depth thresholds
DEFAULT_SYNC_DEPTH = 2
HIGH_FREQ_RATIO_THRESHOLD = 4 # use 3-flop sync when ratio > 4:1
# ─── Signal Type Detection Patterns ─────────────────────────────────
_RESET_PATTERNS = re.compile(
r"\brst|reset|rstn|rst_n|arst|areset\b", re.IGNORECASE
)
_HANDSHAKE_PATTERNS = re.compile(
r"\breq|ack|valid|ready|grant|request|acknowledge\b", re.IGNORECASE
)
_BUS_WIDTH_PATTERN = re.compile(
r"\[(\d+)\s*:\s*(\d+)\]"
)
_DATA_BUS_PATTERNS = re.compile(
r"\bdata|wdata|rdata|din|dout|payload|fifo_data|wr_data|rd_data\b",
re.IGNORECASE,
)
_CLOCK_PATTERNS = re.compile(
r"\bclk|clock|pclk|hclk|fclk|aclk|sclk|mclk|bclk|clk_\w+\b",
re.IGNORECASE,
)
_ENABLE_FLAG_PATTERNS = re.compile(
r"\ben|enable|flag|strobe|pulse|irq|interrupt|trigger|start|done|busy|"
r"empty|full|overflow|underflow|valid|error\b",
re.IGNORECASE,
)
_CONFIG_PATTERNS = re.compile(
r"\bcfg|config|mode|ctrl|control|param|setting|threshold\b",
re.IGNORECASE,
)
# Clock source keywords for domain identification
_CLK_DIVIDER_PATTERNS = re.compile(
r"divid|prescal|div_by|divided|half_clk|clk_div", re.IGNORECASE
)
_CLK_GATE_PATTERNS = re.compile(
r"gate|gated|clock_gate|clk_gate|cg_|icg", re.IGNORECASE
)
# ─── Output Dataclasses ─────────────────────────────────────────────
@dataclass
class ClockDomain:
"""A distinct clock domain in the design."""
domain_name: str
source_clock_signal: str
nominal_frequency_mhz: float = 0.0
is_derived: bool = False # True if divided/gated from another
parent_domain: str = "" # Name of parent domain if derived
derivation_type: str = "" # "divided" | "gated" | ""
submodules: List[str] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@dataclass
class CrossingSignal:
"""A signal that crosses between two clock domains."""
signal_name: str
source_domain: str
destination_domain: str
signal_type: str # "single_bit_control" | "multi_bit_data" | "bus" |
# "handshake" | "reset"
bit_width: int = 1
direction: str = "unidirectional" # "unidirectional" | "bidirectional"
sync_strategy: str = ""
sync_details: Dict[str, Any] = field(default_factory=dict)
unresolved_reason: str = ""
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@dataclass
class CDCSubmodule:
"""A synchronization submodule that must be instantiated in the RTL."""
module_name: str
strategy: str
ports: List[Dict[str, str]] = field(default_factory=list)
parameters: Dict[str, Any] = field(default_factory=dict)
behavior: str = ""
source_domain: str = ""
destination_domain: str = ""
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@dataclass
class CDCResult:
"""Output of the CDCAnalyzer."""
cdc_status: str # "SINGLE_DOMAIN" | "MULTI_DOMAIN" | "UNRESOLVED"
clock_domains: List[ClockDomain] = field(default_factory=list)
crossing_signals: List[CrossingSignal] = field(default_factory=list)
cdc_submodules_added: List[CDCSubmodule] = field(default_factory=list)
cdc_warnings: List[str] = field(default_factory=list)
cdc_unresolved: List[str] = field(default_factory=list)
domain_count: int = 0
def to_dict(self) -> Dict[str, Any]:
return {
"cdc_status": self.cdc_status,
"clock_domains": [d.to_dict() for d in self.clock_domains],
"crossing_signals": [c.to_dict() for c in self.crossing_signals],
"cdc_submodules_added": [s.to_dict() for s in self.cdc_submodules_added],
"cdc_warnings": list(self.cdc_warnings),
"cdc_unresolved": list(self.cdc_unresolved),
"domain_count": self.domain_count,
}
def to_json(self) -> str:
return json.dumps(self.to_dict(), indent=2)
# ─── Main Class ──────────────────────────────────────────────────────
class CDCAnalyzer:
"""
Identifies every clock domain crossing in a hardware specification and
assigns synchronization strategies before RTL generation.
Input: HardwareSpec dict (+ optional hierarchy/feasibility data)
Output: CDCResult with domains, crossings, sync submodules, and warnings
"""
def analyze(
self,
hw_spec_dict: Dict[str, Any],
hierarchy_result_dict: Optional[Dict[str, Any]] = None,
) -> CDCResult:
"""
Run full CDC analysis on the spec.
Args:
hw_spec_dict: HardwareSpec.to_dict() output.
hierarchy_result_dict: Optional HierarchyResult.to_dict() for
expanded submodule clock information.
Returns:
CDCResult with full CDC analysis.
"""
# Step 1: Identify all clock domains
domains = self._identify_clock_domains(hw_spec_dict, hierarchy_result_dict)
if len(domains) <= 1:
return CDCResult(
cdc_status="SINGLE_DOMAIN",
clock_domains=domains,
domain_count=len(domains),
cdc_warnings=["No CDC analysis required β€” single clock domain."]
if domains else [],
)
# Build domain relationship map
domain_map = {d.domain_name: d for d in domains}
async_pairs = self._find_async_domain_pairs(domains)
# Step 2: Identify all crossing signals
crossings = self._identify_crossing_signals(
hw_spec_dict, hierarchy_result_dict, domains, async_pairs
)
if not crossings:
return CDCResult(
cdc_status="MULTI_DOMAIN",
clock_domains=domains,
domain_count=len(domains),
cdc_warnings=[
f"Design has {len(domains)} clock domains but no "
f"cross-domain signals detected. Verify domain isolation."
],
)
# Step 3: Assign synchronization strategy to each crossing
warnings: List[str] = []
unresolved: List[str] = []
for crossing in crossings:
self._assign_sync_strategy(crossing, domain_map, warnings, unresolved)
# Step 4: Generate CDC submodules
submodules = self._generate_cdc_submodules(crossings)
# Determine overall status
if unresolved:
status = "UNRESOLVED"
warnings.append(
f"CDC analysis has {len(unresolved)} unresolved crossing(s). "
f"RTL generation should not proceed until these are resolved."
)
else:
status = "MULTI_DOMAIN"
return CDCResult(
cdc_status=status,
clock_domains=domains,
crossing_signals=crossings,
cdc_submodules_added=submodules,
cdc_warnings=warnings,
cdc_unresolved=unresolved,
domain_count=len(domains),
)
# ── Step 1: Identify Clock Domains ───────────────────────────────
def _identify_clock_domains(
self,
hw_spec_dict: Dict[str, Any],
hierarchy_result_dict: Optional[Dict[str, Any]],
) -> List[ClockDomain]:
"""Extract every distinct clock domain from the spec."""
domains: List[ClockDomain] = []
seen_clocks: Set[str] = set()
target_freq = hw_spec_dict.get("target_frequency_mhz", 50) or 50
# 1a. Scan top-level ports for clock signals
top_ports = hw_spec_dict.get("ports", [])
for port in top_ports:
pname = port.get("name", "")
if _CLOCK_PATTERNS.search(pname) and pname.lower() not in seen_clocks:
seen_clocks.add(pname.lower())
domains.append(ClockDomain(
domain_name=self._clock_to_domain_name(pname),
source_clock_signal=pname,
nominal_frequency_mhz=target_freq,
))
# 1b. Scan submodule ports for additional clock signals
all_submodules = self._collect_all_submodules(
hw_spec_dict, hierarchy_result_dict
)
for sm in all_submodules:
sm_ports = sm.get("ports", [])
sm_name = sm.get("name", "")
sm_desc = f"{sm_name} {sm.get('description', '')}".lower()
for port in sm_ports:
pname = port.get("name", "")
if _CLOCK_PATTERNS.search(pname) and pname.lower() not in seen_clocks:
seen_clocks.add(pname.lower())
# Try to determine if it's a derived clock
is_derived = False
parent = ""
derivation = ""
freq = target_freq
if _CLK_DIVIDER_PATTERNS.search(pname) or \
_CLK_DIVIDER_PATTERNS.search(sm_desc):
is_derived = True
derivation = "divided"
# Try to extract division factor
div_match = re.search(r"div(?:ide)?(?:_by)?_?(\d+)",
pname + " " + sm_desc, re.IGNORECASE)
if div_match:
divisor = int(div_match.group(1))
freq = target_freq / max(divisor, 1)
else:
freq = target_freq / 2 # assume /2 if not specified
parent = domains[0].domain_name if domains else ""
elif _CLK_GATE_PATTERNS.search(pname) or \
_CLK_GATE_PATTERNS.search(sm_desc):
is_derived = True
derivation = "gated"
parent = domains[0].domain_name if domains else ""
domains.append(ClockDomain(
domain_name=self._clock_to_domain_name(pname),
source_clock_signal=pname,
nominal_frequency_mhz=freq,
is_derived=is_derived,
parent_domain=parent,
derivation_type=derivation,
))
# 1c. Check spec-level clock_domains field (if the spec generator
# explicitly listed them)
spec_clock_domains = hw_spec_dict.get("clock_domains", [])
if isinstance(spec_clock_domains, list):
for cd in spec_clock_domains:
if isinstance(cd, dict):
cd_name = cd.get("name", cd.get("domain", ""))
cd_clk = cd.get("clock", cd.get("signal", cd_name))
cd_freq = cd.get("frequency_mhz", cd.get("freq", target_freq))
elif isinstance(cd, str):
cd_name = cd
cd_clk = cd
cd_freq = target_freq
else:
continue
if cd_clk.lower() not in seen_clocks:
seen_clocks.add(cd_clk.lower())
domains.append(ClockDomain(
domain_name=self._clock_to_domain_name(cd_name),
source_clock_signal=cd_clk,
nominal_frequency_mhz=cd_freq,
))
# 1d. Check mandatory_fields_status for clock_domains info
mfs = hw_spec_dict.get("mandatory_fields_status", {})
cd_info = mfs.get("clock_domains", {})
if isinstance(cd_info, dict):
val = cd_info.get("value", cd_info.get("inferred_value", ""))
if isinstance(val, str) and val:
# Try parsing "single domain" or "dual clock" etc.
multi_match = re.search(r"(\d+)\s*(?:clock|domain)", val, re.IGNORECASE)
if multi_match and int(multi_match.group(1)) > 1 and len(domains) < 2:
# The spec says multiple clocks but we only found one in ports
# Add a placeholder second domain
domains.append(ClockDomain(
domain_name="domain_secondary",
source_clock_signal="clk_secondary",
nominal_frequency_mhz=target_freq,
))
# 1e. Assign submodules to domains
self._assign_submodules_to_domains(domains, all_submodules)
# If no clock was found at all, assume single domain
if not domains:
domains.append(ClockDomain(
domain_name="domain_clk",
source_clock_signal="clk",
nominal_frequency_mhz=target_freq,
))
return domains
def _clock_to_domain_name(self, clock_signal: str) -> str:
"""Convert a clock signal name to a domain name."""
name = clock_signal.lower().strip()
name = re.sub(r"[^a-z0-9_]", "_", name)
if not name.startswith("domain_"):
name = f"domain_{name}"
return name
def _assign_submodules_to_domains(
self,
domains: List[ClockDomain],
submodules: List[Dict[str, Any]],
) -> None:
"""Assign each submodule to its clock domain based on port connections."""
if not domains:
return
primary_domain = domains[0]
domain_clk_map: Dict[str, ClockDomain] = {}
for d in domains:
domain_clk_map[d.source_clock_signal.lower()] = d
for sm in submodules:
sm_name = sm.get("name", "")
sm_ports = sm.get("ports", [])
assigned = False
for port in sm_ports:
pname = port.get("name", "").lower()
if pname in domain_clk_map:
domain_clk_map[pname].submodules.append(sm_name)
assigned = True
break
if not assigned:
# Default: assign to primary domain
primary_domain.submodules.append(sm_name)
def _find_async_domain_pairs(
self, domains: List[ClockDomain]
) -> Set[Tuple[str, str]]:
"""Find all pairs of clock domains that are asynchronous to each other."""
async_pairs: Set[Tuple[str, str]] = set()
for i, d1 in enumerate(domains):
for d2 in domains[i + 1:]:
# Same source = synchronous
if d1.source_clock_signal == d2.source_clock_signal:
continue
# Divided from same parent = synchronous (but flag gated clocks)
if (d1.parent_domain and d1.parent_domain == d2.domain_name and
d1.derivation_type == "divided"):
continue
if (d2.parent_domain and d2.parent_domain == d1.domain_name and
d2.derivation_type == "divided"):
continue
# Gated clock from same source: potentially same domain but
# may have enable timing issues β€” flag it
if (d1.parent_domain == d2.domain_name and
d1.derivation_type == "gated"):
# Not truly async, but needs care
continue
if (d2.parent_domain == d1.domain_name and
d2.derivation_type == "gated"):
continue
# All other pairs are asynchronous
async_pairs.add((d1.domain_name, d2.domain_name))
async_pairs.add((d2.domain_name, d1.domain_name))
return async_pairs
# ── Step 2: Identify Crossing Signals ────────────────────────────
def _identify_crossing_signals(
self,
hw_spec_dict: Dict[str, Any],
hierarchy_result_dict: Optional[Dict[str, Any]],
domains: List[ClockDomain],
async_pairs: Set[Tuple[str, str]],
) -> List[CrossingSignal]:
"""Find all signals that cross between asynchronous clock domains."""
crossings: List[CrossingSignal] = []
seen: Set[str] = set()
if not async_pairs:
return crossings
# Build submodule β†’ domain map
sm_domain_map: Dict[str, str] = {}
for domain in domains:
for sm_name in domain.submodules:
sm_domain_map[sm_name] = domain.domain_name
all_submodules = self._collect_all_submodules(
hw_spec_dict, hierarchy_result_dict
)
# For each submodule, check if any of its ports connect to a
# submodule in a different (async) domain
sm_by_name: Dict[str, Dict[str, Any]] = {}
for sm in all_submodules:
sm_by_name[sm.get("name", "")] = sm
# Strategy: Look for signals that appear in ports of submodules
# belonging to different async domains. Also check top-level
# ports that fan out to multiple domains.
port_consumers: Dict[str, List[Tuple[str, str]]] = {}
# port_consumers[signal_name] = [(submodule, domain), ...]
for sm in all_submodules:
sm_name = sm.get("name", "")
sm_domain = sm_domain_map.get(sm_name, domains[0].domain_name)
for port in sm.get("ports", []):
pname = port.get("name", "")
if _CLOCK_PATTERNS.search(pname):
continue # Skip clock signals themselves
if pname not in port_consumers:
port_consumers[pname] = []
port_consumers[pname].append((sm_name, sm_domain))
# Find signals consumed by submodules in different async domains
for sig_name, consumers in port_consumers.items():
consumer_domains = {dom for _, dom in consumers}
if len(consumer_domains) < 2:
continue
# Check each pair of consuming domains
domain_list = sorted(consumer_domains)
for i, d1 in enumerate(domain_list):
for d2 in domain_list[i + 1:]:
if (d1, d2) in async_pairs:
key = f"{sig_name}:{d1}->{d2}"
if key in seen:
continue
seen.add(key)
seen.add(f"{sig_name}:{d2}->{d1}")
# Determine signal type and width
sig_type, bit_width = self._classify_signal(
sig_name, port_consumers[sig_name], sm_by_name
)
crossings.append(CrossingSignal(
signal_name=sig_name,
source_domain=d1,
destination_domain=d2,
signal_type=sig_type,
bit_width=bit_width,
direction="unidirectional",
))
# Also check for reset signals crossing domains
self._check_reset_crossings(
domains, async_pairs, hw_spec_dict, crossings, seen
)
# Check for inter-domain handshake pairs
self._check_handshake_crossings(
domains, async_pairs, all_submodules, sm_domain_map, crossings, seen
)
return crossings
def _classify_signal(
self,
signal_name: str,
consumers: List[Tuple[str, str]],
sm_by_name: Dict[str, Dict[str, Any]],
) -> Tuple[str, int]:
"""Classify a signal as single_bit_control, multi_bit_data, bus, handshake, or reset."""
name_lower = signal_name.lower()
# Reset?
if _RESET_PATTERNS.search(name_lower):
return "reset", 1
# Check port data type for width
bit_width = 1
for sm_name, _ in consumers:
sm = sm_by_name.get(sm_name, {})
for port in sm.get("ports", []):
if port.get("name", "") == signal_name:
dtype = port.get("data_type", "")
width_match = _BUS_WIDTH_PATTERN.search(dtype)
if width_match:
hi = int(width_match.group(1))
lo = int(width_match.group(2))
bit_width = abs(hi - lo) + 1
break
if bit_width > 1:
break
# Handshake?
if _HANDSHAKE_PATTERNS.search(name_lower):
if bit_width == 1:
return "handshake", 1
return "handshake", bit_width
# Multi-bit data or bus?
if bit_width > 1:
if _DATA_BUS_PATTERNS.search(name_lower):
return "multi_bit_data", bit_width
return "bus", bit_width
# Single-bit control/flag/enable
if _ENABLE_FLAG_PATTERNS.search(name_lower):
return "single_bit_control", 1
# Default: single-bit control
return "single_bit_control", 1
def _check_reset_crossings(
self,
domains: List[ClockDomain],
async_pairs: Set[Tuple[str, str]],
hw_spec_dict: Dict[str, Any],
crossings: List[CrossingSignal],
seen: Set[str],
) -> None:
"""Ensure every reset signal crossing a domain boundary is captured."""
top_ports = hw_spec_dict.get("ports", [])
reset_signals = [
p.get("name", "") for p in top_ports
if _RESET_PATTERNS.search(p.get("name", ""))
]
if not reset_signals:
return
# Every reset that enters the chip must be synchronized to each
# async domain it serves
primary_domain = domains[0].domain_name if domains else ""
for rst_sig in reset_signals:
for domain in domains:
if domain.domain_name == primary_domain:
continue
if (primary_domain, domain.domain_name) not in async_pairs:
continue
key = f"{rst_sig}:{primary_domain}->{domain.domain_name}"
if key in seen:
continue
seen.add(key)
crossings.append(CrossingSignal(
signal_name=rst_sig,
source_domain=primary_domain,
destination_domain=domain.domain_name,
signal_type="reset",
bit_width=1,
direction="unidirectional",
))
def _check_handshake_crossings(
self,
domains: List[ClockDomain],
async_pairs: Set[Tuple[str, str]],
submodules: List[Dict[str, Any]],
sm_domain_map: Dict[str, str],
crossings: List[CrossingSignal],
seen: Set[str],
) -> None:
"""Detect req/ack handshake pairs that span domains."""
# Look for paired req/ack or valid/ready signals
req_ack_pairs = []
for sm in submodules:
sm_name = sm.get("name", "")
ports = sm.get("ports", [])
port_names = [p.get("name", "") for p in ports]
for pn in port_names:
pn_lower = pn.lower()
# Find req→ack pairs
if "req" in pn_lower:
ack_name = pn_lower.replace("req", "ack")
for pn2 in port_names:
if pn2.lower() == ack_name:
req_ack_pairs.append((pn, pn2, sm_name))
# Find valid→ready pairs
if "valid" in pn_lower:
ready_name = pn_lower.replace("valid", "ready")
for pn2 in port_names:
if pn2.lower() == ready_name:
req_ack_pairs.append((pn, pn2, sm_name))
# Mark bidirectional if req/ack span domains
for req_sig, ack_sig, sm_name in req_ack_pairs:
sm_dom = sm_domain_map.get(sm_name, "")
for domain in domains:
if domain.domain_name == sm_dom:
continue
if (sm_dom, domain.domain_name) in async_pairs:
key = f"{req_sig}:{sm_dom}->{domain.domain_name}"
if key not in seen:
seen.add(key)
crossings.append(CrossingSignal(
signal_name=f"{req_sig}/{ack_sig}",
source_domain=sm_dom,
destination_domain=domain.domain_name,
signal_type="handshake",
bit_width=1,
direction="bidirectional",
))
# ── Step 3: Assign Synchronization Strategy ──────────────────────
def _assign_sync_strategy(
self,
crossing: CrossingSignal,
domain_map: Dict[str, ClockDomain],
warnings: List[str],
unresolved: List[str],
) -> None:
"""Assign the correct synchronization strategy to a crossing signal."""
src_domain = domain_map.get(crossing.source_domain)
dst_domain = domain_map.get(crossing.destination_domain)
# Calculate frequency ratio for synchronizer depth decisions
src_freq = src_domain.nominal_frequency_mhz if src_domain else 50
dst_freq = dst_domain.nominal_frequency_mhz if dst_domain else 50
freq_ratio = max(src_freq, dst_freq) / max(min(src_freq, dst_freq), 1)
sync_depth = 3 if freq_ratio > HIGH_FREQ_RATIO_THRESHOLD else 2
sig_type = crossing.signal_type
bit_width = crossing.bit_width
sig_name = crossing.signal_name
# ── RESET signals: ALWAYS use RESET_SYNC ────────────────────
if sig_type == "reset":
crossing.sync_strategy = SYNC_RESET
crossing.sync_details = {
"origin_domain": crossing.source_domain,
"target_domain": crossing.destination_domain,
"behavior": "Asynchronous assert, synchronous deassert",
"sync_depth": sync_depth,
}
return
# ── MULTI-BIT data/bus: NEVER use 2-flop sync ───────────────
if bit_width > 1 and sig_type in ("multi_bit_data", "bus"):
crossing.sync_strategy = SYNC_ASYNC_FIFO
fifo_depth = DEFAULT_FIFO_DEPTH
if freq_ratio > 4:
fifo_depth = 16 # deeper FIFO for large frequency ratios
ptr_width = self._gray_pointer_width(fifo_depth)
crossing.sync_details = {
"data_width": bit_width,
"fifo_depth": fifo_depth,
"gray_pointer_width": ptr_width,
"behavior": (
"Gray-coded read/write pointers, pointer sync via "
f"{sync_depth}-flop synchronizers, no combinational "
"paths between clock domains"
),
}
warnings.append(
f"CDC: Signal '{sig_name}' ({bit_width}-bit) crosses from "
f"{crossing.source_domain} to {crossing.destination_domain}. "
f"Assigned ASYNC_FIFO (depth={fifo_depth})."
)
return
# ── HANDSHAKE signals ────────────────────────────────────────
if sig_type == "handshake":
if bit_width > 1:
# Multi-bit handshake: use HANDSHAKE protocol
crossing.sync_strategy = SYNC_HANDSHAKE
crossing.sync_details = {
"req_signal": f"{sig_name}_req",
"ack_signal": f"{sig_name}_ack",
"data_width": bit_width,
"behavior": (
"4-phase handshake: req asserted with stable data, "
"ack confirms receipt, req deasserted, ack deasserted"
),
}
return
else:
# Single-bit handshake control: 2-flop sync is sufficient
# if it's a level signal (req/ack/valid/ready)
crossing.sync_strategy = SYNC_SINGLE_BIT
crossing.sync_details = {
"sync_depth": sync_depth,
"dont_touch": True,
"behavior": (
f"{sync_depth}-stage synchronizer, both flops have "
"dont_touch attribute for synthesis"
),
}
return
# ── Single-bit control/enable/flag ───────────────────────────
if bit_width == 1 and sig_type == "single_bit_control":
# Check if this might be a pulse shorter than dst clock period
is_likely_pulse = any(
kw in sig_name.lower()
for kw in ["pulse", "strobe", "trigger", "irq", "interrupt"]
)
if is_likely_pulse and src_freq > dst_freq:
# Source is faster: pulse may be shorter than dst period
crossing.sync_strategy = SYNC_PULSE
crossing.sync_details = {
"behavior": (
"Toggle flip-flop in source domain, "
f"{sync_depth}-flop synchronizer in destination domain, "
"edge detector to regenerate pulse"
),
"sync_depth": sync_depth,
}
warnings.append(
f"CDC: Signal '{sig_name}' is a likely pulse crossing from "
f"fast ({src_freq} MHz) to slow ({dst_freq} MHz) domain. "
f"Using PULSE_SYNC to avoid missing it."
)
return
else:
crossing.sync_strategy = SYNC_SINGLE_BIT
crossing.sync_details = {
"sync_depth": sync_depth,
"dont_touch": True,
"behavior": (
f"{sync_depth}-stage synchronizer, both flops have "
"dont_touch attribute for synthesis"
),
}
return
# ── Config/quasi-static signals ──────────────────────────────
if _CONFIG_PATTERNS.search(sig_name):
if bit_width == 1:
crossing.sync_strategy = SYNC_SINGLE_BIT
crossing.sync_details = {
"sync_depth": sync_depth,
"dont_touch": True,
"behavior": (
f"{sync_depth}-stage synchronizer with dont_touch"
),
}
warnings.append(
f"CDC: Config signal '{sig_name}' crosses domains. "
f"If it only changes during configuration/reset, consider "
f"QUASI_STATIC with documented guarantee."
)
return
else:
# Multi-bit config: use handshake since it's low bandwidth
crossing.sync_strategy = SYNC_HANDSHAKE
crossing.sync_details = {
"req_signal": f"{sig_name}_cfg_req",
"ack_signal": f"{sig_name}_cfg_ack",
"data_width": bit_width,
"behavior": (
"4-phase handshake for configuration register update"
),
}
warnings.append(
f"CDC: Multi-bit config signal '{sig_name}' ({bit_width}-bit) "
f"crosses domains. Using HANDSHAKE. If only set during reset, "
f"consider QUASI_STATIC."
)
return
# ── Fallback: unresolved ─────────────────────────────────────
crossing.sync_strategy = SYNC_UNRESOLVED
crossing.unresolved_reason = (
f"Cannot confidently classify signal '{sig_name}' "
f"(type={sig_type}, width={bit_width}) for automatic sync assignment."
)
unresolved.append(
f"CDC_UNRESOLVED: {sig_name} crossing from "
f"{crossing.source_domain} to {crossing.destination_domain}. "
f"Reason: {crossing.unresolved_reason}"
)
def _gray_pointer_width(self, fifo_depth: int) -> int:
"""Calculate Gray code pointer width for a given FIFO depth."""
import math
return max(2, int(math.ceil(math.log2(max(fifo_depth, 2)))) + 1)
# ── Step 4: Generate CDC Submodules ──────────────────────────────
def _generate_cdc_submodules(
self, crossings: List[CrossingSignal]
) -> List[CDCSubmodule]:
"""Generate synchronization submodule specs for the RTL generator."""
submodules: List[CDCSubmodule] = []
seen_modules: Set[str] = set()
for crossing in crossings:
if crossing.sync_strategy == SYNC_UNRESOLVED:
continue
strategy = crossing.sync_strategy
sig_name = self._sanitize_name(crossing.signal_name)
if strategy == SYNC_SINGLE_BIT:
mod_name = f"cdc_sync_{sig_name}"
if mod_name in seen_modules:
continue
seen_modules.add(mod_name)
depth = crossing.sync_details.get("sync_depth", 2)
submodules.append(CDCSubmodule(
module_name=mod_name,
strategy=strategy,
ports=[
{"name": "clk_dst", "direction": "input",
"data_type": "logic", "description": "Destination clock"},
{"name": "rst_n_dst", "direction": "input",
"data_type": "logic", "description": "Destination reset (active-low)"},
{"name": "sig_src", "direction": "input",
"data_type": "logic", "description": "Source domain signal"},
{"name": "sig_dst_synced", "direction": "output",
"data_type": "logic", "description": "Synchronized signal in destination domain"},
],
parameters={"SYNC_DEPTH": depth},
behavior=(
f"{depth}-stage synchronizer chain. All flops have "
"(* dont_touch = \"true\" *) attribute to prevent "
"synthesis optimization. Reset clears all stages."
),
source_domain=crossing.source_domain,
destination_domain=crossing.destination_domain,
))
elif strategy == SYNC_PULSE:
mod_name = f"cdc_pulse_sync_{sig_name}"
if mod_name in seen_modules:
continue
seen_modules.add(mod_name)
depth = crossing.sync_details.get("sync_depth", 2)
submodules.append(CDCSubmodule(
module_name=mod_name,
strategy=strategy,
ports=[
{"name": "clk_src", "direction": "input",
"data_type": "logic", "description": "Source clock"},
{"name": "rst_n_src", "direction": "input",
"data_type": "logic", "description": "Source reset (active-low)"},
{"name": "clk_dst", "direction": "input",
"data_type": "logic", "description": "Destination clock"},
{"name": "rst_n_dst", "direction": "input",
"data_type": "logic", "description": "Destination reset (active-low)"},
{"name": "pulse_src", "direction": "input",
"data_type": "logic", "description": "Single-cycle pulse in source domain"},
{"name": "pulse_dst", "direction": "output",
"data_type": "logic", "description": "Regenerated pulse in destination domain"},
],
parameters={"SYNC_DEPTH": depth},
behavior=(
"Toggle flip-flop captures pulse in source domain. "
f"{depth}-flop synchronizer transfers toggle to "
"destination domain. XOR edge detector regenerates "
"single-cycle pulse in destination domain."
),
source_domain=crossing.source_domain,
destination_domain=crossing.destination_domain,
))
elif strategy == SYNC_ASYNC_FIFO:
data_width = crossing.sync_details.get("data_width", 8)
fifo_depth = crossing.sync_details.get("fifo_depth", DEFAULT_FIFO_DEPTH)
ptr_width = crossing.sync_details.get("gray_pointer_width", 4)
mod_name = f"cdc_fifo_{sig_name}"
if mod_name in seen_modules:
continue
seen_modules.add(mod_name)
submodules.append(CDCSubmodule(
module_name=mod_name,
strategy=strategy,
ports=[
{"name": "wr_clk", "direction": "input",
"data_type": "logic", "description": "Write clock"},
{"name": "wr_rst_n", "direction": "input",
"data_type": "logic", "description": "Write reset (active-low)"},
{"name": "wr_en", "direction": "input",
"data_type": "logic", "description": "Write enable"},
{"name": "wr_data", "direction": "input",
"data_type": f"logic [{data_width - 1}:0]",
"description": "Write data"},
{"name": "wr_full", "direction": "output",
"data_type": "logic", "description": "FIFO full flag"},
{"name": "rd_clk", "direction": "input",
"data_type": "logic", "description": "Read clock"},
{"name": "rd_rst_n", "direction": "input",
"data_type": "logic", "description": "Read reset (active-low)"},
{"name": "rd_en", "direction": "input",
"data_type": "logic", "description": "Read enable"},
{"name": "rd_data", "direction": "output",
"data_type": f"logic [{data_width - 1}:0]",
"description": "Read data"},
{"name": "rd_empty", "direction": "output",
"data_type": "logic", "description": "FIFO empty flag"},
],
parameters={
"DATA_WIDTH": data_width,
"FIFO_DEPTH": fifo_depth,
"PTR_WIDTH": ptr_width,
},
behavior=(
f"Asynchronous FIFO with {fifo_depth}-deep buffer. "
f"Gray-coded {ptr_width}-bit read and write pointers. "
"Pointer synchronization via 2-flop synchronizers. "
"No combinational paths between write and read clock "
"domains. Full/empty generation from synchronized "
"Gray pointers."
),
source_domain=crossing.source_domain,
destination_domain=crossing.destination_domain,
))
elif strategy == SYNC_HANDSHAKE:
data_width = crossing.sync_details.get("data_width", 1)
req_sig = crossing.sync_details.get("req_signal", f"{sig_name}_req")
ack_sig = crossing.sync_details.get("ack_signal", f"{sig_name}_ack")
mod_name = f"cdc_handshake_{sig_name}"
if mod_name in seen_modules:
continue
seen_modules.add(mod_name)
ports = [
{"name": "clk_src", "direction": "input",
"data_type": "logic", "description": "Source clock"},
{"name": "rst_n_src", "direction": "input",
"data_type": "logic", "description": "Source reset"},
{"name": "clk_dst", "direction": "input",
"data_type": "logic", "description": "Destination clock"},
{"name": "rst_n_dst", "direction": "input",
"data_type": "logic", "description": "Destination reset"},
{"name": req_sig, "direction": "output",
"data_type": "logic", "description": "Request signal"},
{"name": ack_sig, "direction": "input",
"data_type": "logic", "description": "Acknowledge signal"},
]
if data_width > 1:
ports.extend([
{"name": "data_src", "direction": "input",
"data_type": f"logic [{data_width - 1}:0]",
"description": "Source data"},
{"name": "data_dst", "direction": "output",
"data_type": f"logic [{data_width - 1}:0]",
"description": "Destination data (valid when ack)"},
])
submodules.append(CDCSubmodule(
module_name=mod_name,
strategy=strategy,
ports=ports,
parameters={"DATA_WIDTH": data_width},
behavior=(
"4-phase handshake protocol: (1) source asserts req "
"with stable data, (2) destination synchronizes req "
"and asserts ack, (3) source deasserts req, "
"(4) destination deasserts ack. Data must remain "
"stable from req assert to ack assert."
),
source_domain=crossing.source_domain,
destination_domain=crossing.destination_domain,
))
elif strategy == SYNC_RESET:
mod_name = f"cdc_reset_sync_{sig_name}"
if mod_name in seen_modules:
continue
seen_modules.add(mod_name)
depth = crossing.sync_details.get("sync_depth", 2)
submodules.append(CDCSubmodule(
module_name=mod_name,
strategy=strategy,
ports=[
{"name": "clk_dst", "direction": "input",
"data_type": "logic", "description": "Destination clock"},
{"name": "rst_async_n", "direction": "input",
"data_type": "logic",
"description": "Asynchronous reset input (active-low)"},
{"name": "rst_sync_n", "direction": "output",
"data_type": "logic",
"description": "Synchronized reset output (active-low)"},
],
parameters={"SYNC_DEPTH": depth},
behavior=(
"Asynchronous assert, synchronous deassert reset "
f"synchronizer. {depth}-flop chain clocked by "
"destination clock. Reset assertion is immediate "
"(async), deassertion is synchronized to destination "
"clock to prevent metastability."
),
source_domain=crossing.source_domain,
destination_domain=crossing.destination_domain,
))
return submodules
# ── Utility Methods ──────────────────────────────────────────────
def _collect_all_submodules(
self,
hw_spec_dict: Dict[str, Any],
hierarchy_result_dict: Optional[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""Flatten all submodules from both spec and hierarchy result."""
all_subs: List[Dict[str, Any]] = []
# From spec
for sm in hw_spec_dict.get("submodules", []):
if isinstance(sm, dict):
all_subs.append(sm)
# From hierarchy result (may have nested specs)
if hierarchy_result_dict:
for sm in hierarchy_result_dict.get("submodules", []):
if isinstance(sm, dict):
all_subs.append(sm)
# Recurse into nested_spec
nested = sm.get("nested_spec")
if isinstance(nested, dict):
for nsm in nested.get("submodules", []):
if isinstance(nsm, dict):
all_subs.append(nsm)
return all_subs
def _sanitize_name(self, name: str) -> str:
"""Convert a signal name to a valid Verilog identifier fragment."""
# Remove slash-separated compound names
name = name.replace("/", "_")
name = re.sub(r"[^a-zA-Z0-9_]", "_", name)
name = re.sub(r"_+", "_", name).strip("_")
return name.lower()