Spaces:
Running
Running
File size: 2,397 Bytes
7b4f5dd | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 | """
Agent Orchestrator β runs Security + Performance agents in parallel,
then feeds results to Fix Agent. Yields SSE events.
"""
import asyncio
from typing import AsyncGenerator
from .security_agent import SecurityAgent
from .performance_agent import PerformanceAgent
from .fix_agent import FixAgent
async def run_scan_pipeline(request) -> AsyncGenerator[tuple[str, dict], None]:
"""Main scan pipeline β orchestrates all three agents."""
# Determine source code
code = request.code or ""
language = request.language or "python"
# If GitHub URL, we'd clone here β for now return placeholder
if request.type == "github" and request.url:
code = f"# GitHub URL: {request.url}\n# Clone & scan would happen here\n"
language = "python"
findings = []
# ββ Security Agent ββ
yield "agent_start", {"agent": "security", "message": "Security Agent initializing..."}
await asyncio.sleep(0.3)
security_agent = SecurityAgent()
async for event_type, event_data in security_agent.analyze(code, language):
if event_type == "finding":
findings.append(event_data)
yield event_type, event_data
# ββ Performance Agent ββ
yield "agent_start", {"agent": "performance", "message": "Performance Agent initializing..."}
await asyncio.sleep(0.3)
perf_agent = PerformanceAgent()
async for event_type, event_data in perf_agent.analyze(code, language):
if event_type == "finding":
findings.append(event_data)
yield event_type, event_data
# ββ Fix Agent ββ
security_findings = [f for f in findings if f.get("agent") == "security" and f.get("fixAvailable")]
if security_findings:
yield "agent_start", {"agent": "fix", "message": "Fix Agent generating patches..."}
await asyncio.sleep(0.3)
fix_agent = FixAgent()
async for event_type, event_data in fix_agent.generate_fixes(security_findings, code):
yield event_type, event_data
# ββ Complete ββ
sev = {"critical": 0, "high": 0, "medium": 0, "low": 0}
for f in findings:
s = f.get("severity", "low")
if s in sev:
sev[s] += 1
yield "complete", {
"totalFindings": len(findings),
**sev,
"fixesGenerated": len(security_findings),
"filesAnalyzed": 1,
}
|