| import asyncio |
| import json |
| from typing import AsyncGenerator |
|
|
| |
|
|
| from . import analyzer, optimizer, tester, translator |
| from ..models import ( |
| AgentEvent, |
| AgentStatus, |
| AnalyzerResult, |
| CostEstimate, |
| FinalReport, |
| OptimizerResult, |
| TesterResult, |
| TranslatorResult, |
| WorkloadType, |
| ) |
|
|
|
|
| def calculate_cost_estimate(analyzer_result: AnalyzerResult) -> CostEstimate: |
| """Calculate cost impact estimate based on code complexity.""" |
| complexity = analyzer_result.complexity_score or 5 |
|
|
| if complexity <= 3: |
| manual_weeks = "1-2 weeks" |
| savings = f"~{complexity * 5}-{complexity * 10} eng-days × team rate (complexity {complexity}/10)" |
| factor = "Low" |
| elif complexity <= 7: |
| manual_weeks = "3-6 weeks" |
| savings = f"~{complexity * 5}-{complexity * 10} eng-days × team rate (complexity {complexity}/10)" |
| factor = "Medium" |
| else: |
| manual_weeks = "6-10 weeks" |
| savings = f"~{complexity * 5}-{complexity * 10} eng-days × team rate (complexity {complexity}/10)" |
| factor = "High" |
|
|
| return CostEstimate( |
| manual_porting_weeks=manual_weeks, |
| rocmport_minutes="Varies by kernel", |
| estimated_savings=savings, |
| complexity_factor=factor, |
| ) |
|
|
|
|
| def simplify_explanation(report: FinalReport) -> str: |
| """Convert technical explanation to simpler wording for explain mode.""" |
| simple_text = report.amd_advantage_explanation |
|
|
| simple_text = simple_text.replace( |
| "5.3 TB/s memory bandwidth", "much faster memory access") |
| simple_text = simple_text.replace("3.35 TB/s", "slower memory access") |
| simple_text = simple_text.replace( |
| "memory-bound", "needs to move a lot of data") |
| simple_text = simple_text.replace( |
| "compute-bound", "does a lot of calculations") |
| simple_text = simple_text.replace( |
| "wavefront", "group of threads working together") |
| simple_text = simple_text.replace( |
| "shared memory tiling", "shares data between threads efficiently") |
| simple_text = simple_text.replace("coalescing", "accesses memory in order") |
| simple_text = simple_text.replace("optimization", "improvement") |
| simple_text = simple_text.replace("performance", "speed") |
| simple_text = simple_text.replace("benchmark", "test") |
| simple_text = simple_text.replace("iteration", "try") |
|
|
| simple_text = simple_text.replace("This kernel is", "This code is") |
| simple_text = simple_text.replace("The optimization", "The improvement") |
| simple_text = simple_text.replace("achieves", "gets") |
| simple_text = simple_text.replace("demonstrates", "shows") |
| return simple_text |
|
|
|
|
| |
| |
| |
| async def run_pipeline( |
| cuda_code: str, |
| kernel_name: str = "custom", |
| simple_mode: bool = False, |
| ) -> AsyncGenerator[AgentEvent, None]: |
| """Run full pipeline and stream AgentEvent objects.""" |
| yield AgentEvent( |
| agent="analyzer", |
| status=AgentStatus.RUNNING, |
| message="Scanning CUDA code for kernels, APIs, and hardware-specific issues...", |
| ) |
|
|
| try: |
| analyzer_result: AnalyzerResult = await asyncio.to_thread(analyzer.run, cuda_code) |
| except Exception as e: |
| yield AgentEvent(agent="analyzer", status=AgentStatus.FAILED, message="Analysis failed", detail=str(e)) |
| return |
|
|
| detail_parts = [ |
| f"Found {len(analyzer_result.kernels_found)} kernel(s): {', '.join(analyzer_result.kernels_found)}", |
| f"Workload: {analyzer_result.workload_type.value}", |
| f"Difficulty: {analyzer_result.difficulty} - {analyzer_result.difficulty_reason}", |
| ] |
|
|
| if analyzer_result.warp_size_issue: |
| detail_parts.append( |
| f"WARP SIZE ISSUE: {analyzer_result.warp_size_detail}") |
| if analyzer_result.sharding_detected: |
| detail_parts.append( |
| "Multi-GPU sharding detected; review if needed on MI300X memory capacity.") |
| if analyzer_result.prediction: |
| detail_parts.append(analyzer_result.prediction) |
|
|
| yield AgentEvent( |
| agent="analyzer", |
| status=AgentStatus.DONE, |
| message=( |
| f"Found {len(analyzer_result.kernels_found)} kernel(s) | " |
| f"{analyzer_result.workload_type.value} workload | Difficulty: {analyzer_result.difficulty}" |
| ), |
| detail="\n".join(detail_parts), |
| ) |
|
|
| yield AgentEvent( |
| agent="translator", |
| status=AgentStatus.RUNNING, |
| message="Running hipify-clang (pass 1) then LLM correction (pass 2)...", |
| ) |
|
|
| try: |
| translator_result: TranslatorResult = await asyncio.to_thread(translator.run, cuda_code, analyzer_result) |
| except Exception as e: |
| yield AgentEvent(agent="translator", status=AgentStatus.FAILED, message="Translation failed", detail=str(e)) |
| return |
|
|
| yield AgentEvent( |
| agent="translator", |
| status=AgentStatus.DONE, |
| message=( |
| f"{translator_result.total_changes} changes " |
| f"({translator_result.hipify_changes} hipify + {translator_result.llm_changes} LLM)" |
| ), |
| detail=( |
| f"Total changes: {translator_result.total_changes} " |
| f"({translator_result.hipify_changes} hipify, {translator_result.llm_changes} LLM)\n" |
| f"Warp size corrected: {analyzer_result.warp_size_issue}\n" |
| "Kernel launch syntax updated" |
| ), |
| ) |
|
|
| yield AgentEvent( |
| agent="optimizer", |
| status=AgentStatus.RUNNING, |
| message="Applying AMD MI300X-specific optimizations (iteration 1)...", |
| ) |
|
|
| try: |
| optimizer_result: OptimizerResult = await asyncio.to_thread( |
| optimizer.run, |
| translator_result.hip_code, |
| analyzer_result, |
| 1, |
| ) |
| except Exception as e: |
| yield AgentEvent(agent="optimizer", status=AgentStatus.FAILED, message="Optimization failed", detail=str(e)) |
| return |
|
|
| yield AgentEvent( |
| agent="optimizer", |
| status=AgentStatus.DONE, |
| message=f"{len(optimizer_result.changes)} optimization(s) applied", |
| detail="\n".join( |
| f"- {c['description']}" for c in optimizer_result.changes), |
| ) |
|
|
| yield AgentEvent( |
| agent="tester", |
| status=AgentStatus.RUNNING, |
| message="Compiling with hipcc and profiling with rocprof (iteration 1)...", |
| ) |
|
|
| try: |
| tester_result_1: TesterResult = await asyncio.to_thread( |
| tester.run, |
| optimizer_result.optimized_code, |
| analyzer_result, |
| 1, |
| kernel_name, |
| ) |
| except Exception as e: |
| yield AgentEvent(agent="tester", status=AgentStatus.FAILED, message="Testing failed", detail=str(e)) |
| return |
|
|
| if not tester_result_1.success: |
| yield AgentEvent( |
| agent="tester", |
| status=AgentStatus.FAILED, |
| message="Compilation or profiling failed", |
| detail=tester_result_1.notes, |
| ) |
| return |
|
|
| if tester_result_1.speedup < 1.0: |
| yield AgentEvent( |
| agent="tester", |
| status=AgentStatus.FAILED, |
| message=f"Iteration 1: {tester_result_1.speedup}x vs baseline HIP (regression)", |
| detail=( |
| f"Bandwidth utilized: {tester_result_1.bandwidth_utilized}%\n" |
| f"{tester_result_1.notes}" |
| ), |
| ) |
|
|
| yield AgentEvent( |
| agent="coordinator", |
| status=AgentStatus.RUNNING, |
| message="Performance regressed, retrying optimizer with profiler feedback...", |
| detail=f"Profiler feedback: {tester_result_1.notes}", |
| ) |
|
|
| yield AgentEvent( |
| agent="optimizer", |
| status=AgentStatus.RETRYING, |
| message="Trying alternative optimization strategy (iteration 2)...", |
| detail=f"Previous strategy regressed. Feedback: {tester_result_1.notes}", |
| ) |
|
|
| try: |
| optimizer_result_2: OptimizerResult = await asyncio.to_thread( |
| optimizer.run, |
| translator_result.hip_code, |
| analyzer_result, |
| 2, |
| tester_result_1.notes, |
| ) |
| except Exception as e: |
| yield AgentEvent(agent="optimizer", status=AgentStatus.FAILED, message="Re-optimization failed", detail=str(e)) |
| return |
|
|
| yield AgentEvent( |
| agent="optimizer", |
| status=AgentStatus.DONE, |
| message=f"Alternative strategy: {len(optimizer_result_2.changes)} change(s) applied", |
| detail="\n".join( |
| f"- {c['description']}" for c in optimizer_result_2.changes), |
| ) |
|
|
| yield AgentEvent( |
| agent="tester", |
| status=AgentStatus.RUNNING, |
| message="Re-profiling with alternative optimization (iteration 2)...", |
| ) |
|
|
| try: |
| tester_result_final: TesterResult = await asyncio.to_thread( |
| tester.run, |
| optimizer_result_2.optimized_code, |
| analyzer_result, |
| 2, |
| kernel_name, |
| ) |
| except Exception as e: |
| yield AgentEvent(agent="tester", status=AgentStatus.FAILED, message="Re-testing failed", detail=str(e)) |
| return |
|
|
| final_optimizer = optimizer_result_2 |
| else: |
| tester_result_final = tester_result_1 |
| final_optimizer = optimizer_result |
|
|
| yield AgentEvent( |
| agent="tester", |
| status=AgentStatus.DONE, |
| message=f"Iteration {tester_result_final.iteration}: {tester_result_final.speedup}x vs baseline HIP", |
| detail=( |
| f"Execution time: {tester_result_final.execution_ms:.1f}ms\n" |
| f"Memory bandwidth: {tester_result_final.bandwidth_utilized:.1f}% utilized\n" |
| f"Bottleneck type: {tester_result_final.bottleneck}\n" |
| f"{tester_result_final.notes}" |
| ), |
| ) |
|
|
| yield AgentEvent(agent="coordinator", status=AgentStatus.RUNNING, message="Generating migration report...") |
|
|
| amd_explanation = _build_amd_explanation( |
| analyzer_result, tester_result_final) |
|
|
| try: |
| cost_estimate = calculate_cost_estimate(analyzer_result) |
| except Exception: |
| cost_estimate = CostEstimate( |
| manual_porting_weeks="3-6 weeks", |
| rocmport_minutes="Varies by kernel", |
| estimated_savings="$20,000-$50,000", |
| complexity_factor="Medium", |
| ) |
|
|
| temp_report = FinalReport( |
| migration_success=True, |
| speedup=tester_result_final.speedup, |
| bandwidth_utilized=tester_result_final.bandwidth_utilized, |
| total_changes=translator_result.total_changes + |
| len(final_optimizer.changes), |
| bottleneck=tester_result_final.bottleneck, |
| amd_advantage_explanation=amd_explanation, |
| iterations=tester_result_final.iteration, |
| hip_code=translator_result.hip_code, |
| optimized_code=final_optimizer.optimized_code, |
| verification=tester_result_final.verification, |
| static_risk_report=analyzer_result.static_risk_report, |
| data_source=tester_result_final.data_source or "simulated", |
| ) |
| simplified_explanation = simplify_explanation(temp_report) |
|
|
| report = FinalReport( |
| migration_success=True, |
| speedup=tester_result_final.speedup, |
| bandwidth_utilized=tester_result_final.bandwidth_utilized, |
| total_changes=translator_result.total_changes + |
| len(final_optimizer.changes), |
| bottleneck=tester_result_final.bottleneck, |
| amd_advantage_explanation=amd_explanation, |
| iterations=tester_result_final.iteration, |
| hip_code=translator_result.hip_code, |
| optimized_code=final_optimizer.optimized_code, |
| verification=tester_result_final.verification, |
| cost_estimate=cost_estimate, |
| simplified_explanation=simplified_explanation, |
| static_risk_report=analyzer_result.static_risk_report, |
| data_source=tester_result_final.data_source or "simulated", |
| ) |
|
|
| yield AgentEvent( |
| agent="coordinator", |
| status=AgentStatus.DONE, |
| message="Migration complete", |
| detail=json.dumps(report.model_dump()), |
| ) |
|
|
|
|
| def _build_amd_explanation(analyzer_result: AnalyzerResult, tester_result: TesterResult) -> str: |
| if analyzer_result.workload_type == WorkloadType.MEMORY_BOUND: |
| return ( |
| "This is a memory-bound kernel; performance scales with memory bandwidth. " |
| "MI300X provides higher memory bandwidth than H100-class hardware, and this workload " |
| f"reached {tester_result.bandwidth_utilized:.0f}% utilization after optimization." |
| ) |
| return ( |
| "This is a compute-bound kernel; launch geometry and wavefront-aware tuning are key drivers. " |
| "After optimization, compute utilization and execution characteristics improved." |
| ) |
|
|