Spaces:
Sleeping
Sleeping
| """Courtroom orchestrator — thin coordinator that delegates to phase runners.""" | |
| import logging | |
| from typing import Generator | |
| from crewai import Crew, Task | |
| from code_tribunal.agents import expert_witness | |
| from code_tribunal.code_graph import CodeGraph | |
| from code_tribunal.config import TribunalConfig | |
| from code_tribunal.evidence import gather_evidence_streaming, EvidenceReport | |
| from code_tribunal.pipeline import Phase, PipelineEvent, Pipeline | |
| from code_tribunal.phases import run_investigation, run_trial, run_verdict, run_report | |
| from code_tribunal.tools import ( | |
| configure_tools, | |
| FileReaderTool, | |
| FindingContextTool, | |
| PatternSearchTool, | |
| CodeGraphQueryTool, | |
| ) | |
| log = logging.getLogger("code_tribunal") | |
| logging.getLogger("crewai").setLevel(logging.WARNING) | |
| class Courtroom: | |
| """Orchestrates the full courtroom pipeline.""" | |
| def __init__(self, config: TribunalConfig) -> None: | |
| self.config = config | |
| self.code_graph = CodeGraph() | |
| self.pipeline = Pipeline() | |
| def run(self, target_dir: str) -> Generator[PipelineEvent, None, None]: | |
| """Run the full pipeline, yielding events for UI consumption.""" | |
| try: | |
| report = yield from self._run_evidence(target_dir) | |
| if report is None: | |
| return | |
| yield from self._run_graph(target_dir) | |
| inv_tools, trial_tools = self._setup_tools(target_dir) | |
| yield from run_investigation( | |
| self.config, self.pipeline, report, inv_tools, target_dir, | |
| ) | |
| yield from run_trial( | |
| self.config, self.pipeline, report, trial_tools, target_dir, | |
| ) | |
| yield from run_verdict(self.config, self.pipeline, report, target_dir) | |
| yield from run_report(self.config, self.pipeline, report, target_dir) | |
| except Exception as e: | |
| log.debug("[PIPELINE ERROR] %s", e, exc_info=True) | |
| self.pipeline.fail(str(e)) | |
| yield PipelineEvent(Phase.FAILED, f"Pipeline error: {e}") | |
| return | |
| log.debug("[COMPLETE] Yielding Phase.COMPLETE") | |
| yield PipelineEvent(Phase.COMPLETE, "Trial complete. You may now ask questions.") | |
| def ask_question(self, question: str, context: dict) -> str: | |
| """Interactive Q&A after verdict.""" | |
| log.debug("[Q&A] Question: %s", question[:100]) | |
| tools = [FileReaderTool(), FindingContextTool(), CodeGraphQueryTool()] | |
| qa_agent = expert_witness(self.config, tools=tools) | |
| context_text = "\n\n".join( | |
| f"## {k.upper()}\n{v}" for k, v in context.items() if v | |
| ) | |
| qa_task = Task( | |
| description=( | |
| "Answer this follow-up question about the trial:\n\n" | |
| f"**Question**: {question}\n\n" | |
| f"TRIAL CONTEXT:\n{context_text}\n\n" | |
| "Provide a detailed, specific answer. Cite file paths, line numbers, and findings " | |
| "where relevant. Use your tools to look up specific code if needed." | |
| ), | |
| agent=qa_agent, | |
| expected_output="A detailed answer citing specific evidence, file paths, and line numbers.", | |
| ) | |
| crew = Crew(agents=[qa_agent], tasks=[qa_task], verbose=False) | |
| result = crew.kickoff() | |
| return result.raw if hasattr(result, "raw") else str(result) | |
| def _run_evidence(self, target_dir: str) -> Generator: | |
| """Phase 1: Gather forensic evidence with GritQL.""" | |
| log.debug("[PHASE 1] Starting evidence scan...") | |
| yield PipelineEvent(Phase.EVIDENCE, "Scanning with GritQL forensic patterns...") | |
| findings_count = 0 | |
| report = None | |
| for update in gather_evidence_streaming(target_dir): | |
| if isinstance(update, str): | |
| yield PipelineEvent(Phase.EVIDENCE, update) | |
| elif isinstance(update, EvidenceReport): | |
| report = update | |
| findings_count = len(report.findings) | |
| if report is None or findings_count == 0: | |
| log.debug("[PHASE 1] No findings — case dismissed.") | |
| yield PipelineEvent( | |
| Phase.COMPLETE, | |
| "No findings detected. Case dismissed - code appears clean.", | |
| {"report": None}, | |
| ) | |
| return None | |
| log.debug("[PHASE 1] Evidence complete: %d findings across %d files.", findings_count, report.file_count) | |
| self.pipeline.update( | |
| evidence_report={"findings": findings_count, "files": report.file_count}, | |
| ) | |
| yield PipelineEvent( | |
| Phase.EVIDENCE, | |
| f"Evidence complete: **{findings_count}** findings across **{report.file_count}** files.", | |
| {"report": report}, | |
| ) | |
| return report | |
| def _run_graph(self, target_dir: str) -> Generator: | |
| """Phase 2: Build code dependency graph.""" | |
| log.debug("[PHASE 2] Building code dependency graph...") | |
| yield PipelineEvent(Phase.GRAPH, "Building code dependency graph...") | |
| self.code_graph.build_from_directory(target_dir) | |
| stats = self.code_graph.get_statistics() | |
| nodes = sum(stats["nodes"].values()) | |
| edges = sum(stats["edges"].values()) | |
| log.debug("[PHASE 2] Code graph: %d nodes, %d edges.", nodes, edges) | |
| yield PipelineEvent( | |
| Phase.GRAPH, | |
| f"Code graph: **{nodes}** nodes, **{edges}** edges.", | |
| ) | |
| def _setup_tools(self, target_dir: str) -> tuple: | |
| """Phase 3: Configure tools for investigators and trial agents.""" | |
| log.debug("[PHASE 3] Configuring tools...") | |
| configure_tools(target_dir, self.code_graph) | |
| inv_tools = [FileReaderTool(), PatternSearchTool(), CodeGraphQueryTool(), FindingContextTool()] | |
| trial_tools = [FindingContextTool()] | |
| return inv_tools, trial_tools | |