ROCmPort-AI / backend /tools /static_analyzer.py
tazwarrrr's picture
docs fix
e7a1a69
"""
static_analyzer.py — Pure-Python wavefront correctness scanner.
Runs BEFORE the LLM sees any code. Zero external dependencies. Typical run time < 5ms.
Detects the six most common categories of CUDA→AMD correctness hazards caused by the
NVIDIA warpSize=32 vs AMD wavefront=64 mismatch. Results are fed as structured pre-analysis
context into the LLM analyzer prompt, making the LLM's job more targeted and auditable.
"""
import re
import time
from typing import List
from ..models import RiskItem, StaticRiskReport
# ---------------------------------------------------------------------------
# Risk pattern definitions
# Each entry: (pattern_name, regex, risk_level, description, amd_fix_hint)
# ---------------------------------------------------------------------------
_PATTERNS: List[tuple] = [
(
"warp_size_hardcoded_32_conditional",
re.compile(r'\btid\s*<\s*32\b|\bthreadIdx\.x\s*<\s*32\b|\bi\s*<\s*32\b', re.MULTILINE),
"CRITICAL",
"Hardcoded '<32' in thread conditional — assumes NVIDIA warpSize=32. "
"On AMD wavefront=64 this silently skips lanes 32–63 in final reduction stages, "
"producing incorrect results.",
"Expand final stage: check 'tid < 64' first, then 'tid < 32'. "
"See AMD wavefront reduction pattern in docs/JUDGE_MODE.md."
),
(
"warp_size_define_32",
re.compile(r'#\s*define\s+WARP_SIZE\s+32\b', re.MULTILINE),
"CRITICAL",
"#define WARP_SIZE 32 — this constant will produce wrong kernel geometry on AMD. "
"Wavefront size is 64 on all GCN/CDNA architectures including MI300X.",
"Change to #define WARP_SIZE 64 or use the runtime constant wavefrontSize "
"from hipDeviceGetAttribute(HIP_DEVICE_ATTRIBUTE_WAVEFRONT_SIZE)."
),
(
"shfl_sync_warp_primitive",
re.compile(r'\b__shfl_sync\b|\b__shfl_up_sync\b|\b__shfl_down_sync\b|\b__shfl_xor_sync\b', re.MULTILINE),
"CRITICAL",
"__shfl_sync family requires the 0xffffffff mask to be reinterpreted for 64-lane wavefronts. "
"hipify replaces the function name but not the mask — lanes 32–63 are excluded.",
"Replace with __shfl, __shfl_up, __shfl_down, __shfl_xor (no mask arg in HIP). "
"Verify lane shuffle ranges cover the full 64-lane wavefront."
),
(
"ballot_sync_mask",
re.compile(r'\b__ballot_sync\s*\(\s*0x[Ff]+\s*,', re.MULTILINE),
"CRITICAL",
"__ballot_sync(0xffffffff, ...) uses a 32-bit full mask. On AMD this is __ballot() "
"with no mask argument — the 32-bit mask is semantically wrong for a 64-lane wavefront.",
"Replace __ballot_sync(0xffffffff, cond) with __ballot(cond). "
"The return type changes from uint32_t to uint64_t — update downstream bitmask logic."
),
(
"shfl_wavefront_offset_16",
re.compile(r'\b__shfl(?:_down|_up|_xor)?\s*\([^;]*,\s*16\s*(?:,|\))', re.MULTILINE),
"HIGH",
"__shfl* with offset=16 often encodes a 32-lane warp reduction tail. "
"On AMD wavefront=64 the reduction should include an offset=32 step first.",
"Audit the shuffle reduction and add a wavefront-64 step, e.g. offset=32 "
"before offset=16 where the algorithm reduces a full wavefront."
),
(
"activemask_warp",
re.compile(r'\b__activemask\s*\(\s*\)', re.MULTILINE),
"HIGH",
"__activemask() returns a 32-bit value on NVIDIA. On AMD __activemask() "
"or __ballot(1) returns a 64-bit value. Storing in uint32_t will truncate lanes 32–63.",
"Declare the result as uint64_t. Audit all bitmask operations for 64-bit correctness."
),
(
"threadidx_modulo_warpsize",
re.compile(r'threadIdx\.x\s*%\s*(?:32|warpSize)\b', re.MULTILINE),
"HIGH",
"threadIdx.x % 32 assumes 32-lane warps. On AMD wavefront=64 the lane ID "
"within a wavefront requires modulo 64.",
"Use threadIdx.x % 64 or threadIdx.x & 63 for the lane ID within a wavefront."
),
(
"reduction_loop_stops_at_32",
re.compile(r'for\s*\([^)]*\bs\s*>\s*32\b', re.MULTILINE),
"HIGH",
"Reduction loop terminates at s>32 before manually unrolling the final 32 lanes. "
"On AMD the loop should terminate at s>64 to correctly handle the 64-lane warp tail.",
"Change loop bound from s>32 to s>64. Expand the manual unroll below the loop "
"to cover tid<64 before the tid<32 block."
),
(
"inline_ptx_block",
re.compile(r'asm\s+volatile\s*\(', re.MULTILINE),
"CRITICAL",
"Inline PTX assembly is NVIDIA-specific ISA. hipify cannot translate PTX semantics. "
"The kernel may compile under hipcc but will have undefined or incorrect behaviour.",
"Replace inline PTX with portable HIP intrinsics or CDNA ISA equivalents. "
"Common cases: lane_id → __lane_id(), __clz → __clz() (same name in HIP)."
),
(
"cuda_runtime_include",
re.compile(r'#\s*include\s*[<\"]cuda_runtime(?:_api)?\.h[>\"]', re.MULTILINE),
"MEDIUM",
"cuda_runtime.h / cuda_runtime_api.h must be replaced with hip/hip_runtime.h. "
"hipify handles this mechanically but the check confirms it was applied.",
"Replace with #include <hip/hip_runtime.h>. "
"hipify-clang does this automatically in its first pass."
),
(
"cuda_library_dependency",
re.compile(r'#\s*include\s*[<"][^>"]*(?:cub|thrust|cudnn)[^>"]*[>"]|\b(?:cub|thrust|cudnn)::', re.MULTILINE),
"HIGH",
"CUDA library dependency detected. hipify can rename some CUB/Thrust/cuDNN symbols, "
"but API coverage and performance behavior are not guaranteed to match rocPRIM/hipCUB/MIOpen.",
"Manually review the translated library call, compare against rocPRIM/hipCUB/MIOpen, "
"and add correctness/performance tests for the specific primitive."
),
(
"shared_memory_no_padding",
re.compile(r'__shared__\s+\w+\s+\w+\s*\[\s*\d+\s*\]', re.MULTILINE),
"MEDIUM",
"Fixed-size shared memory array detected without padding. AMD LDS has 32 banks of 4B. "
"Arrays whose inner dimension is a power-of-2 may cause systematic bank conflicts.",
"Add +1 padding to the inner dimension, e.g., __shared__ float tile[32][33]. "
"This staggers accesses across banks and eliminates the conflict."
),
]
def _find_line_number(code: str, match_start: int) -> int:
"""Convert a character offset into a 1-indexed line number."""
return code[:match_start].count('\n') + 1
def scan(cuda_code: str) -> StaticRiskReport:
"""
Scan CUDA source for AMD compatibility hazards.
Returns a StaticRiskReport with structured RiskItems, counts by severity,
and the wall-clock scan duration for transparency.
"""
t0 = time.perf_counter()
items: List[RiskItem] = []
for pattern_name, regex, risk_level, description, amd_fix_hint in _PATTERNS:
for match in regex.finditer(cuda_code):
line_num = _find_line_number(cuda_code, match.start())
items.append(RiskItem(
line=line_num,
pattern=pattern_name,
risk_level=risk_level,
description=description,
amd_fix_hint=amd_fix_hint,
))
elapsed_ms = (time.perf_counter() - t0) * 1000.0
critical = sum(1 for i in items if i.risk_level == "CRITICAL")
high = sum(1 for i in items if i.risk_level == "HIGH")
medium = sum(1 for i in items if i.risk_level == "MEDIUM")
return StaticRiskReport(
items=items,
critical_count=critical,
high_count=high,
medium_count=medium,
scan_duration_ms=round(elapsed_ms, 3),
)
def format_for_llm_prompt(report: StaticRiskReport) -> str:
"""
Render the static report as a compact context block to inject into LLM prompts.
Keeps token usage low while giving the LLM grounded, actionable pre-analysis.
"""
if not report.items:
return "Static pre-scan: No known AMD compatibility hazards detected."
lines = [
f"=== STATIC PRE-SCAN ({report.critical_count} CRITICAL, "
f"{report.high_count} HIGH, {report.medium_count} MEDIUM) ===",
"The following hazards were detected by deterministic pattern matching BEFORE LLM analysis.",
"Confirm and expand on these findings — do NOT contradict them without strong evidence.",
"",
]
for item in report.items:
loc = f"line {item.line}" if item.line else "location unknown"
lines.append(f"[{item.risk_level}] {item.pattern} @ {loc}")
lines.append(f" Issue: {item.description}")
lines.append(f" Fix: {item.amd_fix_hint}")
lines.append("")
return "\n".join(lines)