ninja-code-guard / app /agents /synthesizer.py
NinjainPJs's picture
Fix all ruff lint issues — 0 errors, 92 tests passing
b9da50c
"""
Synthesizer Agent
==================
The Synthesizer is the "senior engineering manager" of Ninja Code Guard.
It takes findings from all three domain agents (Security, Performance, Style)
and produces a unified, non-redundant review.
Responsibilities:
1. **Deduplicate** — If Security and Performance flag the same line for
different reasons, merge them into one finding with both perspectives.
2. **Resolve conflicts** — If agents disagree on severity, use a precedence
hierarchy: Security > Performance > Style.
3. **Re-rank** — Sort findings by composite score: severity × confidence.
4. **Compute Health Score** — 0-100 based on weighted finding density.
5. **Generate executive summary** — 3-5 sentences summarizing the review.
6. **Determine recommendation** — approve / request_changes / block.
Why a Synthesizer instead of just concatenating findings?
- Without dedup: the same SQL injection might be flagged by both Security
(as CWE-89) and Performance (as "unbounded query") — confusing for devs.
- Without conflict resolution: Security says "critical", Style says "medium"
for the same issue — which severity should the comment show?
- Without re-ranking: findings appear in arbitrary order — devs should see
the most important issues first.
"""
from __future__ import annotations
import time
from collections import defaultdict
import structlog
from app.models.findings import Finding, SynthesizedReview
from app.services.health_score import calculate_health_score, determine_recommendation
logger = structlog.get_logger()
# Agent precedence for severity conflicts (higher = takes priority)
AGENT_PRECEDENCE = {
"security": 3,
"performance": 2,
"style": 1,
}
SEVERITY_RANK = {
"critical": 4,
"high": 3,
"medium": 2,
"low": 1,
}
def _finding_key(f: Finding) -> str:
"""
Generate a deduplication key for a finding.
Two findings are considered duplicates if they reference the same
file and overlapping line ranges. We use a simplified key based on
file_path and line_start — findings on the same line from different
agents are candidates for merging.
"""
return f"{f.file_path}:{f.line_start}:{f.category}"
def deduplicate_findings(findings: list[Finding]) -> list[Finding]:
"""
Remove duplicate findings that reference the same code location.
When multiple agents flag the same file+line, we keep the finding from
the highest-precedence agent (Security > Performance > Style) and take
the maximum severity between them.
Example:
Security flags app.py:5 as "critical" (SQL injection)
Performance flags app.py:5 as "high" (unbounded query)
→ Keep Security's finding with "critical" severity
→ Append Performance's insight to the description
"""
# Group findings by location
groups: dict[str, list[Finding]] = defaultdict(list)
for finding in findings:
key = _finding_key(finding)
groups[key].append(finding)
deduped = []
duplicates_removed = 0
for _key, group in groups.items():
if len(group) == 1:
deduped.append(group[0])
continue
# Sort by agent precedence (highest first)
group.sort(
key=lambda f: AGENT_PRECEDENCE.get(f.agent, 0), reverse=True
)
# Take the primary finding (highest precedence agent)
primary = group[0]
# Take the maximum severity across all agents
max_severity = max(group, key=lambda f: SEVERITY_RANK.get(f.severity, 0))
# Merge: keep primary's structure, upgrade severity if needed
merged_description = primary.description
if len(group) > 1:
other_agents = [f.agent for f in group[1:]]
merged_description += (
f"\n\n*Also flagged by: {', '.join(other_agents)} agent(s).*"
)
merged = Finding(
agent=primary.agent,
file_path=primary.file_path,
line_start=primary.line_start,
line_end=primary.line_end,
severity=max_severity.severity,
category=primary.category,
title=primary.title,
description=merged_description,
suggested_fix=primary.suggested_fix,
cwe_id=primary.cwe_id,
confidence=max(f.confidence for f in group),
)
deduped.append(merged)
duplicates_removed += len(group) - 1
if duplicates_removed > 0:
logger.info(
"Deduplicated findings",
removed=duplicates_removed,
before=len(findings),
after=len(deduped),
)
return deduped
def rank_findings(findings: list[Finding]) -> list[Finding]:
"""
Sort findings by importance: severity (desc) then confidence (desc).
Developers should see the most critical, highest-confidence issues first.
This matches how a senior engineer would present a review — lead with
the blocking issues, then the nice-to-haves.
"""
return sorted(
findings,
key=lambda f: (SEVERITY_RANK.get(f.severity, 0), f.confidence),
reverse=True,
)
def generate_executive_summary(
findings: list[Finding],
health_score: int,
recommendation: str,
) -> str:
"""
Generate a 3-5 sentence executive summary of the review.
This appears at the top of the PR comment, giving the author a quick
overview without needing to read every finding.
"""
if not findings:
return (
"No issues were found in this pull request. "
"The code changes look clean across security, performance, and style dimensions. "
"Safe to merge."
)
# Count by agent
agent_counts = defaultdict(int)
for f in findings:
agent_counts[f.agent] += 1
# Count by severity
sev_counts = defaultdict(int)
for f in findings:
sev_counts[f.severity] += 1
parts = []
# Opening line
total = len(findings)
parts.append(
f"Multi-agent review analyzed this PR across security, performance, and style dimensions, "
f"finding {total} issue{'s' if total != 1 else ''}."
)
# Severity breakdown
sev_parts = []
for sev in ["critical", "high", "medium", "low"]:
count = sev_counts.get(sev, 0)
if count > 0:
sev_parts.append(f"{count} {sev}")
if sev_parts:
parts.append(f"Breakdown: {', '.join(sev_parts)}.")
# Agent breakdown
agent_parts = []
for agent in ["security", "performance", "style"]:
count = agent_counts.get(agent, 0)
if count > 0:
agent_parts.append(f"{agent.capitalize()}: {count}")
if agent_parts:
parts.append(f"By domain: {', '.join(agent_parts)}.")
# Top issue highlight
if sev_counts.get("critical", 0) > 0:
critical_finding = next(f for f in findings if f.severity == "critical")
parts.append(
f"Most urgent: {critical_finding.title} in `{critical_finding.file_path}`."
)
elif sev_counts.get("high", 0) > 0:
high_finding = next(f for f in findings if f.severity == "high")
parts.append(
f"Top priority: {high_finding.title} in `{high_finding.file_path}`."
)
return " ".join(parts)
def synthesize(
security_findings: list[Finding],
performance_findings: list[Finding],
style_findings: list[Finding],
) -> SynthesizedReview:
"""
Main entry point: synthesize findings from all agents into a unified review.
Pipeline:
1. Combine all findings
2. Deduplicate (merge overlapping findings)
3. Rank by severity and confidence
4. Calculate Health Score
5. Determine recommendation
6. Generate executive summary
Returns a SynthesizedReview ready for posting to GitHub.
"""
start = time.time()
# Step 1: Combine
all_findings = security_findings + performance_findings + style_findings
# Step 2: Deduplicate
deduped = deduplicate_findings(all_findings)
# Step 3: Rank
ranked = rank_findings(deduped)
# Step 4: Health Score
health_score = calculate_health_score(ranked)
# Step 5: Recommendation
recommendation = determine_recommendation(ranked, health_score)
# Step 6: Executive summary
summary = generate_executive_summary(ranked, health_score, recommendation)
# Count by severity
critical = sum(1 for f in ranked if f.severity == "critical")
high = sum(1 for f in ranked if f.severity == "high")
medium = sum(1 for f in ranked if f.severity == "medium")
low = sum(1 for f in ranked if f.severity == "low")
elapsed_ms = int((time.time() - start) * 1000)
logger.info(
"Synthesis complete",
input_findings=len(all_findings),
after_dedup=len(ranked),
health_score=health_score,
recommendation=recommendation,
elapsed_ms=elapsed_ms,
)
return SynthesizedReview(
health_score=health_score,
executive_summary=summary,
recommendation=recommendation,
findings=ranked,
critical_count=critical,
high_count=high,
medium_count=medium,
low_count=low,
duration_ms=elapsed_ms,
)