ROCmPort-AI / backend /main.py
tazwarrrr's picture
fix: priority 1-4 debug pass — retry loop, SSE timeout, rocprof CSV parser, silent failures
0b5416e
# pylint: disable=broad-exception-caught
from backend.agents.analyzer import AnalyzerResult, WorkloadType
from backend.agents.tester import run as run_tester
from backend.graph.pipeline import pipeline as migration_pipeline
from backend.models import PortRequest, ColdStartRequest, AggregateMetricsRequest
from fastapi.staticfiles import StaticFiles
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI, HTTPException
import json
import asyncio
import zipfile
import io
import os
import difflib
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
app = FastAPI(
title="ROCmPort AI",
description="CUDA-to-ROCm migration assistant with iterative testing and optimization.",
version="1.0.0",
contact={
"name": "Tazwar Ahnaf Enan",
"url": "https://github.com/tazwaryayyyy",
"email": "tazwardevp@gmail.com",
},
license_info={
"name": "MIT",
},
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/health")
async def health():
from backend.agents.analyzer import llm_client
return {
"status": "ok",
"service": "ROCmPort AI",
"llm_provider": llm_client.get_model_info(),
"rocm_available": os.environ.get("ROCM_AVAILABLE", "false").lower() == "true",
}
@app.get("/benchmark-report")
async def benchmark_report():
"""
Returns a fully auditable benchmark report with:
- Per-kernel deterministic performance data (data_source labelled)
- Static risk scan results for each demo kernel
- Hardware context and reproducibility instructions
- LLM provider information
Judges can use this endpoint to audit every metric shown in the UI.
"""
from backend.tools.demo_artifacts import get_benchmark_summary
from backend.tools import static_analyzer
from backend.agents.analyzer import llm_client
import os
kernels_dir = os.path.join(os.path.dirname(__file__), "demo_kernels")
summary = get_benchmark_summary()
# Attach static risk scan for each demo kernel
kernel_risk_scans = {}
for fname in os.listdir(kernels_dir):
if fname.endswith(".cu"):
kname = fname.replace(".cu", "")
with open(os.path.join(kernels_dir, fname), encoding="utf-8") as f:
cuda_code = f.read()
report = static_analyzer.scan(cuda_code)
kernel_risk_scans[kname] = {
"critical_count": report.critical_count,
"high_count": report.high_count,
"medium_count": report.medium_count,
"scan_duration_ms": report.scan_duration_ms,
"items": [item.model_dump() for item in report.items],
}
summary["static_risk_scans"] = kernel_risk_scans
summary["llm_provider"] = llm_client.get_model_info()
return summary
@app.post("/port")
async def port_cuda_code(req: PortRequest):
"""
Main endpoint. Streams SSE events as the LangGraph pipeline runs.
Each event is a JSON object matching the AgentEvent schema.
"""
if not req.cuda_code or len(req.cuda_code.strip()) < 10:
raise HTTPException(status_code=400, detail="No CUDA code provided")
queue: asyncio.Queue = asyncio.Queue()
async def _run_graph():
initial_state = {
"cuda_code": req.cuda_code,
"kernel_name": req.kernel_name or "custom",
"simple_mode": req.simple_mode or False,
"analyzer_result": None,
"translator_result": None,
"optimizer_result": None,
"tester_result": None,
"iteration": 0,
"max_iterations": 3,
"should_retry": False,
"migration_success": False,
"final_report": {},
"events": [],
}
try:
async for chunk in migration_pipeline.astream(
initial_state, stream_mode="updates"
):
for _node_name, node_output in chunk.items():
for event in node_output.get("events", []):
await queue.put(event)
await asyncio.sleep(0.05) # let client breathe
except Exception as exc:
await queue.put(
{
"agent": "coordinator",
"status": "failed",
"message": "Pipeline error",
"detail": str(exc),
}
)
finally:
await queue.put(None) # sentinel
async def event_stream():
task = asyncio.create_task(_run_graph())
try:
while True:
try:
event = await asyncio.wait_for(queue.get(), timeout=120.0)
except asyncio.TimeoutError:
yield "data: [DONE]\n\n"
break
if event is None:
yield "data: [DONE]\n\n"
break
yield f"data: {json.dumps(event)}\n\n"
finally:
task.cancel()
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)
async def _collect_pipeline_events(cuda_code: str, kernel_name: str, simple_mode: bool = False) -> tuple[list[dict], dict | None]:
"""Collect all pipeline events via LangGraph and extract the final report."""
events: list[dict] = []
final_report = None
initial_state = {
"cuda_code": cuda_code,
"kernel_name": kernel_name,
"simple_mode": simple_mode,
"analyzer_result": None,
"translator_result": None,
"optimizer_result": None,
"tester_result": None,
"iteration": 0,
"max_iterations": 3,
"should_retry": False,
"migration_success": False,
"final_report": {},
"events": [],
}
async for chunk in migration_pipeline.astream(initial_state, stream_mode="updates"):
for _node_name, node_output in chunk.items():
for event in node_output.get("events", []):
events.append(event)
if (
event.get("agent") == "coordinator"
and event.get("status") == "done"
and event.get("detail")
):
try:
final_report = json.loads(event["detail"])
except (json.JSONDecodeError, TypeError):
final_report = None
return events, final_report
def _has_adaptation_loop(events: list[dict]) -> bool:
"""Return True when the run shows retry-based adaptation behavior."""
saw_regression = any(
e.get("agent") == "tester" and e.get(
"status") == "failed" and "regression" in str(e.get("message", "")).lower()
for e in events
)
saw_retry = any(
e.get("agent") == "optimizer" and e.get("status") == "retrying"
for e in events
)
return saw_regression and saw_retry
@app.post("/cold-start")
async def cold_start_run(req: ColdStartRequest):
"""
Single-run endpoint for unknown pasted CUDA input.
Returns full trace plus summary trust signals.
"""
if not req.cuda_code or len(req.cuda_code.strip()) < 10:
raise HTTPException(status_code=400, detail="No CUDA code provided")
events, report = await _collect_pipeline_events(req.cuda_code, req.kernel_name or "unknown_input", False)
if report is None:
raise HTTPException(
status_code=500, detail="Pipeline completed without final report")
return {
"success": True,
"kernel_name": req.kernel_name or "unknown_input",
"adaptation_loop_observed": _has_adaptation_loop(events),
"event_count": len(events),
"report": report,
"events": events,
}
@app.post("/aggregate-metric")
async def aggregate_metric(req: AggregateMetricsRequest):
"""
Evaluate multiple kernels and return one aggregate metric:
average speedup vs baseline HIP.
"""
kernels_dir = os.path.join(os.path.dirname(__file__), "demo_kernels")
requested = req.kernel_names or []
available: dict[str, str] = {}
for fname in os.listdir(kernels_dir):
if fname.endswith(".cu"):
kname = fname.replace(".cu", "")
with open(os.path.join(kernels_dir, fname), encoding="utf-8") as f:
available[kname] = f.read()
selected_names = requested if requested else sorted(available.keys())
selected_names = [name for name in selected_names if name in available]
if not selected_names:
raise HTTPException(
status_code=400, detail="No valid kernels selected for aggregation")
runs = []
speedups = []
for name in selected_names:
events, report = await _collect_pipeline_events(available[name], name, False)
if report is None:
continue
speedup = float(report.get("speedup", 0.0) or 0.0)
speedups.append(speedup)
runs.append({
"kernel": name,
"speedup": speedup,
"adaptation_loop_observed": _has_adaptation_loop(events),
"iterations": report.get("iterations", 1),
})
if not speedups:
raise HTTPException(
status_code=500, detail="Unable to produce aggregate metric from selected kernels")
avg_speedup = round(sum(speedups) / len(speedups), 3)
avg_improvement_pct = round((avg_speedup - 1.0) * 100.0, 2)
return {
"success": True,
"baseline": "straight hipify output with minimal compile edits",
"kernel_count": len(speedups),
"aggregate_metric": {
"average_speedup_vs_baseline": avg_speedup,
"average_improvement_percent": avg_improvement_pct,
},
"runs": runs,
}
@app.post("/recompile")
async def recompile_edited_code(req: dict):
"""
Recompile endpoint for human override feature.
Accepts edited HIP code and re-runs tester.
"""
try:
edited_code = req.get("edited_code")
kernel_name = req.get("kernel_name", "custom")
if not edited_code or len(edited_code.strip()) < 10:
raise HTTPException(status_code=400, detail="No HIP code provided")
# Create a mock analyzer result for testing
analyzer_result = AnalyzerResult(
kernels_found=["test_kernel"],
cuda_apis=["hipMalloc", "hipMemcpy"],
warp_size_issue=False,
warp_size_detail=None,
workload_type=WorkloadType.MEMORY_BOUND,
sharding_detected=False,
difficulty="Easy",
difficulty_reason="Simple test kernel"
)
# Run tester with edited code
tester_result = await asyncio.to_thread(run_tester, edited_code, analyzer_result, 2, kernel_name)
return {
"success": True,
"result": tester_result.model_dump()
}
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Recompilation failed: {str(e)}") from e
@app.post("/export")
async def export_migration_package(req: dict):
"""
Export endpoint for GitHub PR simulation.
Returns a zip file with diff and migration report.
"""
try:
migration_report = req.get("migration_report", {})
if not isinstance(migration_report, dict):
migration_report = {}
original_cuda = str(req.get("original_cuda") or "")
# Fallback to report content when frontend omits final_rocm.
final_rocm = str(req.get("final_rocm")
or migration_report.get("optimized_code") or "")
if not final_rocm.strip():
raise HTTPException(
status_code=400, detail="No ROCm code provided for export")
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
# Add professional unified diff
diff = difflib.unified_diff(
original_cuda.splitlines(keepends=True),
final_rocm.splitlines(keepends=True),
fromfile="original.cu",
tofile="optimized.hip"
)
diff_text = "".join(diff)
zf.writestr("migration.diff", diff_text)
# Include source snapshots for easier review in PRs.
zf.writestr("original.cu", original_cuda)
zf.writestr("optimized.hip", final_rocm)
# Add migration report as markdown
md_report = f"""# ROCmPort AI Migration Report
## Performance Results
- Speedup: {migration_report.get('speedup', 'N/A')}x
- Bandwidth Utilization: {migration_report.get('bandwidth_utilized', 'N/A')}%
- Total Changes: {migration_report.get('total_changes', 'N/A')}
## AMD Advantage Explanation
{migration_report.get('amd_advantage_explanation', 'N/A')}
## Cost Impact
{migration_report.get('cost_estimate', 'N/A')}
Generated by ROCmPort AI.
"""
zf.writestr("migration_report.md", md_report)
zip_content = zip_buffer.getvalue()
from fastapi.responses import Response
return Response(
content=zip_content,
media_type="application/zip",
headers={
"Content-Disposition": "attachment; filename=rocmport_migration.zip"}
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Export failed: {str(e)}") from e
@app.get("/demo-kernels")
async def list_demo_kernels():
kernels_dir = os.path.join(os.path.dirname(__file__), "demo_kernels")
kernels = {}
for fname in os.listdir(kernels_dir):
if fname.endswith(".cu"):
name = fname.replace(".cu", "")
with open(os.path.join(kernels_dir, fname), encoding="utf-8") as f:
kernels[name] = f.read()
return kernels
# Serve compiled frontend when available; fall back to the source folder for dev.
frontend_root = os.path.join(os.path.dirname(__file__), "..", "frontend")
frontend_dist = os.path.join(frontend_root, "dist")
frontend_path = frontend_dist if os.path.exists(
frontend_dist) else frontend_root
if os.path.exists(frontend_path):
app.mount("/", StaticFiles(directory=frontend_path,
html=True), name="frontend")