Spaces:
Sleeping
Sleeping
File size: 5,852 Bytes
6a2abaa d5341cc 1cdb3e3 d5341cc 6a2abaa d5341cc 6a2abaa 1cdb3e3 6a2abaa 1cdb3e3 6a2abaa 1cdb3e3 6a2abaa 1cdb3e3 6a2abaa 1cdb3e3 6a2abaa 1cdb3e3 6a2abaa 1cdb3e3 6a2abaa 1cdb3e3 6a2abaa 1cdb3e3 6a2abaa 1cdb3e3 d5341cc 1cdb3e3 d5341cc 1cdb3e3 6a2abaa 1cdb3e3 6a2abaa 1cdb3e3 6a2abaa 1cdb3e3 6a2abaa 1cdb3e3 6a2abaa 1cdb3e3 6a2abaa d5341cc 1cdb3e3 6a2abaa 1cdb3e3 6a2abaa d5341cc 6a2abaa | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 | """Courtroom orchestrator — thin coordinator that delegates to phase runners."""
import logging
from typing import Generator
from crewai import Crew, Task
from code_tribunal.agents import expert_witness
from code_tribunal.code_graph import CodeGraph
from code_tribunal.config import TribunalConfig
from code_tribunal.evidence import gather_evidence_streaming, EvidenceReport
from code_tribunal.pipeline import Phase, PipelineEvent, Pipeline
from code_tribunal.phases import run_investigation, run_trial, run_verdict, run_report
from code_tribunal.tools import (
configure_tools,
FileReaderTool,
FindingContextTool,
PatternSearchTool,
CodeGraphQueryTool,
)
log = logging.getLogger("code_tribunal")
logging.getLogger("crewai").setLevel(logging.WARNING)
class Courtroom:
"""Orchestrates the full courtroom pipeline."""
def __init__(self, config: TribunalConfig) -> None:
self.config = config
self.code_graph = CodeGraph()
self.pipeline = Pipeline()
def run(self, target_dir: str) -> Generator[PipelineEvent, None, None]:
"""Run the full pipeline, yielding events for UI consumption."""
try:
report = yield from self._run_evidence(target_dir)
if report is None:
return
yield from self._run_graph(target_dir)
inv_tools, trial_tools = self._setup_tools(target_dir)
yield from run_investigation(
self.config, self.pipeline, report, inv_tools, target_dir,
)
yield from run_trial(
self.config, self.pipeline, report, trial_tools, target_dir,
)
yield from run_verdict(self.config, self.pipeline, report, target_dir)
yield from run_report(self.config, self.pipeline, report, target_dir)
except Exception as e:
log.debug("[PIPELINE ERROR] %s", e, exc_info=True)
self.pipeline.fail(str(e))
yield PipelineEvent(Phase.FAILED, f"Pipeline error: {e}")
return
log.debug("[COMPLETE] Yielding Phase.COMPLETE")
yield PipelineEvent(Phase.COMPLETE, "Trial complete. You may now ask questions.")
def ask_question(self, question: str, context: dict) -> str:
"""Interactive Q&A after verdict."""
log.debug("[Q&A] Question: %s", question[:100])
tools = [FileReaderTool(), FindingContextTool(), CodeGraphQueryTool()]
qa_agent = expert_witness(self.config, tools=tools)
context_text = "\n\n".join(
f"## {k.upper()}\n{v}" for k, v in context.items() if v
)
qa_task = Task(
description=(
"Answer this follow-up question about the trial:\n\n"
f"**Question**: {question}\n\n"
f"TRIAL CONTEXT:\n{context_text}\n\n"
"Provide a detailed, specific answer. Cite file paths, line numbers, and findings "
"where relevant. Use your tools to look up specific code if needed."
),
agent=qa_agent,
expected_output="A detailed answer citing specific evidence, file paths, and line numbers.",
)
crew = Crew(agents=[qa_agent], tasks=[qa_task], verbose=False)
result = crew.kickoff()
return result.raw if hasattr(result, "raw") else str(result)
def _run_evidence(self, target_dir: str) -> Generator:
"""Phase 1: Gather forensic evidence with GritQL."""
log.debug("[PHASE 1] Starting evidence scan...")
yield PipelineEvent(Phase.EVIDENCE, "Scanning with GritQL forensic patterns...")
findings_count = 0
report = None
for update in gather_evidence_streaming(target_dir):
if isinstance(update, str):
yield PipelineEvent(Phase.EVIDENCE, update)
elif isinstance(update, EvidenceReport):
report = update
findings_count = len(report.findings)
if report is None or findings_count == 0:
log.debug("[PHASE 1] No findings — case dismissed.")
yield PipelineEvent(
Phase.COMPLETE,
"No findings detected. Case dismissed - code appears clean.",
{"report": None},
)
return None
log.debug("[PHASE 1] Evidence complete: %d findings across %d files.", findings_count, report.file_count)
self.pipeline.update(
evidence_report={"findings": findings_count, "files": report.file_count},
)
yield PipelineEvent(
Phase.EVIDENCE,
f"Evidence complete: **{findings_count}** findings across **{report.file_count}** files.",
{"report": report},
)
return report
def _run_graph(self, target_dir: str) -> Generator:
"""Phase 2: Build code dependency graph."""
log.debug("[PHASE 2] Building code dependency graph...")
yield PipelineEvent(Phase.GRAPH, "Building code dependency graph...")
self.code_graph.build_from_directory(target_dir)
stats = self.code_graph.get_statistics()
nodes = sum(stats["nodes"].values())
edges = sum(stats["edges"].values())
log.debug("[PHASE 2] Code graph: %d nodes, %d edges.", nodes, edges)
yield PipelineEvent(
Phase.GRAPH,
f"Code graph: **{nodes}** nodes, **{edges}** edges.",
)
def _setup_tools(self, target_dir: str) -> tuple:
"""Phase 3: Configure tools for investigators and trial agents."""
log.debug("[PHASE 3] Configuring tools...")
configure_tools(target_dir, self.code_graph)
inv_tools = [FileReaderTool(), PatternSearchTool(), CodeGraphQueryTool(), FindingContextTool()]
trial_tools = [FindingContextTool()]
return inv_tools, trial_tools
|