Spaces:
Running
Running
| """Pipeline orchestration: runs the courtroom and streams UI updates.""" | |
| import logging | |
| import tempfile | |
| import gradio as gr | |
| from code_tribunal.config import TribunalConfig | |
| from code_tribunal.courtroom import Courtroom | |
| from code_tribunal.evidence import safe_extract_zip | |
| from code_tribunal.pipeline import Phase | |
| from code_tribunal.ui.helpers import escape_html, evidence_html | |
| from code_tribunal.ui.styles import PHASE_LABELS | |
| log = logging.getLogger("code_tribunal") | |
| def run_courtroom(code_input, progress=gr.Progress()): | |
| """Run the full pipeline, yielding updates to the UI.""" | |
| chat = [] | |
| ev_html = "" | |
| verdict_text = "" | |
| report_text = "" | |
| config = TribunalConfig() | |
| if code_input is None or not (hasattr(code_input, "name") and code_input.name.endswith(".zip")): | |
| yield ( | |
| gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), | |
| "### Please upload a .zip file.", [], | |
| gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), {}, | |
| ) | |
| return | |
| yield ( | |
| gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), | |
| "### Extracting files...", [], | |
| gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), {}, | |
| ) | |
| tmpdir = tempfile.mkdtemp() | |
| try: | |
| safe_extract_zip(code_input.name, tmpdir) | |
| except ValueError as e: | |
| yield ( | |
| gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), | |
| f"### Error: {escape_html(str(e))}", [], | |
| gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), {}, | |
| ) | |
| return | |
| if not config.is_configured: | |
| yield ( | |
| gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), | |
| "### ZAI_API_KEY not set.", [], | |
| gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), {}, | |
| ) | |
| return | |
| courtroom = Courtroom(config) | |
| courtroom.pipeline.create_run(code_input.name) | |
| current_phase = Phase.IDLE | |
| for event in courtroom.run(tmpdir): | |
| if event.phase != current_phase: | |
| current_phase = event.phase | |
| log.debug("[UI] Phase -> %s", current_phase) | |
| if event.data and "report" in event.data and event.data["report"] is not None: | |
| robj = event.data["report"] | |
| if hasattr(robj, "findings"): | |
| ev_html = evidence_html(robj) | |
| chat.append({ | |
| "role": "user", | |
| "content": f"**Case Filed**: {robj.file_count} files, **{len(robj.findings)}** findings.", | |
| }) | |
| if event.data and "reports" in event.data: | |
| for domain, text in event.data["reports"].items(): | |
| icon = {"security": "π‘οΈ", "quality": "π", "architecture": "ποΈ"}.get(domain, "π") | |
| chat.append({ | |
| "role": "assistant", | |
| "content": f"**{icon} {domain.title()} Investigation**\n\n{text[:3000]}{'...' if len(text) > 3000 else ''}", | |
| }) | |
| if event.data and "transcript" in event.data: | |
| for section in event.data["transcript"].split("\n\n"): | |
| for rnd in ["PROSECUTION", "DEFENSE", "REBUTTAL"]: | |
| if section.startswith(f"=== {rnd}"): | |
| content = section.replace(f"=== {rnd} ===", "").strip() | |
| icon = {"PROSECUTION": "βοΈ", "DEFENSE": "π‘οΈ", "REBUTTAL": "βοΈ"}.get(rnd, "π") | |
| chat.append({ | |
| "role": "assistant", | |
| "content": f"**{icon} {rnd.title()}**\n\n{content[:3000]}{'...' if len(content) > 3000 else ''}", | |
| }) | |
| break | |
| if event.data and "verdict" in event.data: | |
| verdict_text = event.data["verdict"] | |
| chat.append({ | |
| "role": "assistant", | |
| "content": f"**π¨ Judge's Verdict**\n\n{verdict_text[:3000]}{'...' if len(verdict_text) > 3000 else ''}", | |
| }) | |
| if event.data and "report" in event.data and isinstance(event.data["report"], str): | |
| report_text = event.data["report"] | |
| chat.append({ | |
| "role": "assistant", | |
| "content": f"**π Final Report**\n\n{report_text[:4000]}{'...' if len(report_text) > 4000 else ''}", | |
| }) | |
| show_export = event.phase == Phase.COMPLETE | |
| ctx = {} | |
| if show_export: | |
| log.debug("[UI] COMPLETE β showing export") | |
| ctx = {"evidence": ev_html, "verdict": verdict_text, "report": report_text} | |
| label = PHASE_LABELS.get(current_phase, current_phase.value) | |
| if current_phase not in (Phase.COMPLETE, Phase.FAILED): | |
| status = f'<h3 class="phase-active">{label}</h3>\n{event.status}' | |
| else: | |
| status = f"### {label}\n{event.status}" | |
| log.debug("[UI] yield β chat msgs: %d, status: %s, export: %s", len(chat), label, show_export) | |
| yield ( | |
| gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), | |
| status, chat, | |
| gr.update(visible=show_export), gr.update(visible=show_export), | |
| gr.update(visible=show_export), ctx, | |
| ) | |
| def handle_question(question: str, history: list, ctx: dict) -> tuple: | |
| """Handle a follow-up question from the Q&A interface.""" | |
| if not question.strip(): | |
| return history, "" | |
| config = TribunalConfig() | |
| courtroom = Courtroom(config) | |
| answer = courtroom.ask_question(question, ctx) | |
| history.append({"role": "user", "content": question}) | |
| history.append({ | |
| "role": "assistant", | |
| "content": f"**π΅οΈ Expert Witness**\n\n{answer[:3000]}{'...' if len(answer) > 3000 else ''}", | |
| }) | |
| return history, "" | |