amine-yagoub's picture
refactor: extract phase runners from knowledge module
2e03441
"""Phase runners: investigation, trial, verdict, and report generators.
Each function is a standalone generator that yields PipelineEvent objects,
keeping courtroom.py thin as an orchestrator.
"""
import logging
import time
import litellm
from crewai import Crew, Process, Task
from code_tribunal.agents import (
architecture_investigator,
defense_attorney,
judge as judge_agent,
prosecutor,
verdict_report_agent,
)
from code_tribunal.config import TribunalConfig
from code_tribunal.pipeline import Phase, PipelineEvent, Pipeline
from code_tribunal.react import react_loop_stream
log = logging.getLogger("code_tribunal")
_MAX_RETRIES = 5
_BASE_DELAY = 4.0
_MIN_YIELD_INTERVAL = 0.05
def _crew_kickoff_with_retry(crew: Crew) -> object:
"""Run crew.kickoff() with exponential backoff on rate-limit errors."""
for attempt in range(_MAX_RETRIES):
try:
return crew.kickoff()
except litellm.RateLimitError:
if attempt == _MAX_RETRIES - 1:
raise
delay = _BASE_DELAY * (2 ** attempt)
log.warning("[RETRY] Rate limited (attempt %d/%d), waiting %.0fs...", attempt + 1, _MAX_RETRIES, delay)
time.sleep(delay)
def _domain_text(report, domain: str) -> str:
"""Extract findings for a specific domain from an EvidenceReport."""
findings = report.findings_by_domain.get(domain, [])
if not findings:
return f"No {domain} findings detected."
return "\n".join(str(f) for f in findings)
INVESTIGATION_AGENT_CONFIGS = [
(
"security",
"Security Forensic Investigator",
"You are a security forensic investigator. You find vulnerabilities, assess attack vectors, "
"and determine severity. You cite specific file paths and line numbers.",
),
(
"quality",
"Code Quality Forensic Investigator",
"You are a code quality forensic investigator. You identify technical debt, dead code, "
"missing error handling, and signs of rushed development.",
),
(
"architecture",
"Architecture Forensic Investigator",
"You are an architecture forensic investigator. You identify structural problems, "
"tight coupling, missing abstractions, and scalability concerns.",
),
]
def run_investigation(config, pipeline: Pipeline, report, tools, target_dir):
"""Phase 4: Run 3 investigators using ReACT loop with real tool calling."""
evidence_text = report.to_text()
domain_evidence = {
"security": _domain_text(report, "security"),
"quality": _domain_text(report, "quality"),
"architecture": _domain_text(report, "architecture"),
}
investigation_reports = {}
for label, role, goal in INVESTIGATION_AGENT_CONFIGS:
log.debug("[INVESTIGATION] Starting %s investigator...", label)
domain_ev = domain_evidence.get(label, "No specific findings.")
task_desc = (
f"Investigate this codebase for {label} issues.\n\n"
f"{label.upper()} EVIDENCE:\n{domain_ev}\n\n"
f"FULL EVIDENCE REPORT:\n{evidence_text}\n\n"
"Use your tools to read files, search for patterns, and trace call chains. "
"Produce a detailed investigation report with specific findings, severities, and remediation."
)
last_yield = 0.0
full_output = ""
for agent_role, delta, is_tool in react_loop_stream(
config=config,
task_description=task_desc,
agent_role=role,
agent_goal=goal,
tools=tools,
max_iterations=8,
):
if pipeline.is_cancelled:
return
now = time.time()
if is_tool:
yield PipelineEvent(
Phase.INVESTIGATION,
f"{label} investigator: {delta.strip()}",
agent_role=agent_role,
delta=delta,
)
full_output += delta
last_yield = now
elif now - last_yield >= _MIN_YIELD_INTERVAL:
yield PipelineEvent(
Phase.INVESTIGATION,
f"{label} investigator analyzing...",
agent_role=agent_role,
delta=delta,
)
full_output += delta
last_yield = now
log.debug("[INVESTIGATION] %s investigator done. Output length: %d", label, len(full_output))
investigation_reports[label] = full_output
pipeline.update(investigation_reports=investigation_reports)
yield PipelineEvent(
Phase.INVESTIGATION,
f"Investigation complete: {len(investigation_reports)} reports generated.",
data={"reports": investigation_reports},
)
def run_trial(config: TribunalConfig, pipeline: Pipeline, report, tools, target_dir):
"""Phase 5: Prosecutor vs Defense — non-streaming."""
evidence_text = report.to_text()
inv_reports = pipeline.state.investigation_reports if pipeline.state else {}
investigation_text = "\n\n".join(
f"=== {k.upper()} INVESTIGATION ===\n{v}"
for k, v in (inv_reports or {}).items()
)
pros_agent = prosecutor(config, tools=tools)
def_agent = defense_attorney(config, tools=tools)
rebuttal_agent = prosecutor(config, tools=tools)
prosecution_task = Task(
description=(
"PRESENT THE PROSECUTION'S CASE\n\n"
"You are presenting evidence against a freelance developer who delivered this code.\n\n"
"RAW EVIDENCE:\n" + evidence_text + "\n\n"
"INVESTIGATION REPORTS:\n" + investigation_text + "\n\n"
"Build your case. Be specific - cite file paths, line numbers, and vulnerability types. "
"Argue that this code represents negligence, not mere imperfection."
),
agent=pros_agent,
expected_output="A compelling prosecution argument citing specific evidence.",
)
defense_task = Task(
description=(
"PRESENT THE DEFENSE\n\n"
"The prosecution has presented their case. Mount your defense.\n\n"
"RAW EVIDENCE:\n" + evidence_text + "\n\n"
"INVESTIGATION REPORTS:\n" + investigation_text + "\n\n"
"Challenge specific claims. Argue proportionality - not every issue is negligence. Be honest but vigorous."
),
agent=def_agent,
context=[prosecution_task],
expected_output="A vigorous defense argument.",
)
rebuttal_task = Task(
description=(
"REBUTTAL\n\n"
"The defense has responded. Address their strongest points. "
"End with a closing argument for the judge."
),
agent=rebuttal_agent,
context=[prosecution_task, defense_task],
expected_output="A sharp rebuttal and closing argument.",
)
crew = Crew(
agents=[pros_agent, def_agent, rebuttal_agent],
tasks=[prosecution_task, defense_task, rebuttal_task],
process=Process.sequential,
verbose=False,
)
trial_transcript = ""
round_names = ["Prosecution", "Defense", "Rebuttal"]
try:
log.debug("[TRIAL] Running crew.kickoff()...")
result = _crew_kickoff_with_retry(crew)
task_outputs = result.tasks_output if hasattr(result, "tasks_output") else []
parts = []
for i, name in enumerate(round_names):
if i < len(task_outputs):
raw = task_outputs[i].raw if hasattr(task_outputs[i], "raw") else str(task_outputs[i])
parts.append(f"=== {name} ===\n{raw}")
trial_transcript = "\n\n".join(parts)
log.debug("[TRIAL] crew.kickoff() done. Transcript length: %d", len(trial_transcript))
except Exception as e:
log.debug("[TRIAL] FAILED: %s", e, exc_info=True)
trial_transcript = f"Trial fallback: {e}"
pipeline.update(trial_transcript=trial_transcript)
yield PipelineEvent(
Phase.TRIAL,
"Trial complete. The Judge is preparing to deliberate.",
data={"transcript": trial_transcript},
)
def run_verdict(config: TribunalConfig, pipeline: Pipeline, report, target_dir):
"""Phase 6: Judge delivers verdict — non-streaming."""
evidence_text = report.to_text()
state = pipeline.state
inv_text = "\n\n".join(
f"=== {k.upper()} INVESTIGATION ===\n{v}"
for k, v in (state.investigation_reports or {}).items()
)
transcript = state.trial_transcript or ""
judge = judge_agent(config)
verdict_task = Task(
description=(
"DELIVER YOUR VERDICT\n\n"
"RAW EVIDENCE:\n" + evidence_text + "\n\n"
"INVESTIGATION REPORTS:\n" + inv_text + "\n\n"
"TRIAL TRANSCRIPT:\n" + transcript + "\n\n"
"Deliver a structured verdict:\n"
"## VERDICT\nOverall: [GUILTY / MIXED / NOT GUILTY]\n"
"Reputational Risk Score: [0-100]\n\n"
"## FINDINGS SUMMARY\n"
"For each finding: severity, impact, remediation\n\n"
"## SENTENCE\n"
"Your final assessment and recommendations."
),
agent=judge,
expected_output="A structured verdict with overall ruling, risk score, findings summary, and sentence.",
)
crew = Crew(agents=[judge], tasks=[verdict_task], verbose=False)
verdict_text = ""
try:
log.debug("[VERDICT] Running crew.kickoff()...")
result = _crew_kickoff_with_retry(crew)
verdict_text = result.raw if hasattr(result, "raw") else str(result)
log.debug("[VERDICT] crew.kickoff() done. Verdict length: %d", len(verdict_text))
except Exception as e:
log.debug("[VERDICT] FAILED: %s", e, exc_info=True)
verdict_text = f"Verdict fallback: {e}"
pipeline.update(verdict=verdict_text)
yield PipelineEvent(Phase.VERDICT, "Verdict delivered.", data={"verdict": verdict_text})
def run_report(config: TribunalConfig, pipeline: Pipeline, report, target_dir):
"""Phase 7: Verdict Report Agent — non-streaming."""
state = pipeline.state
report_ag = verdict_report_agent(config)
inv_text = "\n\n".join(
f"=== {k.upper()} INVESTIGATION ===\n{v}"
for k, v in (state.investigation_reports or {}).items()
)
report_task = Task(
description=(
"Generate the FINAL STRUCTURED REPORT from this trial.\n\n"
"The Judge has delivered a verdict. Now compile everything into a clear, "
"professional report for the client.\n\n"
"EVIDENCE:\n" + report.to_text() + "\n\n"
"INVESTIGATION REPORTS:\n" + inv_text + "\n\n"
"TRIAL TRANSCRIPT:\n" + (state.trial_transcript or "") + "\n\n"
"VERDICT:\n" + (state.verdict or "") + "\n\n"
"The report MUST include:\n"
"1. **Executive Summary** - one paragraph overview\n"
"2. **Findings Table** - all findings sorted by severity (CRITICAL first)\n"
"3. **Per-Finding Analysis** - impact, remediation, estimated fix effort\n"
"4. **Sentencing Recommendations** - what the client should do next\n"
"5. **Reputational Risk Score** - breakdown of how the score was calculated"
),
agent=report_ag,
expected_output="A comprehensive structured report.",
)
crew = Crew(agents=[report_ag], tasks=[report_task], verbose=False)
report_text = ""
try:
log.debug("[REPORT] Running crew.kickoff()...")
result = _crew_kickoff_with_retry(crew)
report_text = result.raw if hasattr(result, "raw") else str(result)
log.debug("[REPORT] crew.kickoff() done. Report length: %d", len(report_text))
except Exception as e:
log.debug("[REPORT] FAILED: %s", e, exc_info=True)
report_text = f"Report generation fallback: {e}"
pipeline.update(report=report_text)
pipeline.complete()
yield PipelineEvent(Phase.REPORT, "Report generated.", data={"report": report_text})