File size: 5,949 Bytes
ca2b985
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
"""Pipeline orchestration: runs the courtroom and streams UI updates."""

import logging
import tempfile

import gradio as gr

from code_tribunal.config import TribunalConfig
from code_tribunal.courtroom import Courtroom
from code_tribunal.evidence import safe_extract_zip
from code_tribunal.pipeline import Phase
from code_tribunal.ui.helpers import escape_html, evidence_html
from code_tribunal.ui.styles import PHASE_LABELS

log = logging.getLogger("code_tribunal")


def run_courtroom(code_input, progress=gr.Progress()):
    """Run the full pipeline, yielding updates to the UI."""
    chat = []
    ev_html = ""
    verdict_text = ""
    report_text = ""
    config = TribunalConfig()

    if code_input is None or not (hasattr(code_input, "name") and code_input.name.endswith(".zip")):
        yield (
            gr.update(visible=False), gr.update(visible=False), gr.update(visible=True),
            "### Please upload a .zip file.", [],
            gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), {},
        )
        return

    yield (
        gr.update(visible=False), gr.update(visible=False), gr.update(visible=True),
        "### Extracting files...", [],
        gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), {},
    )

    tmpdir = tempfile.mkdtemp()
    try:
        safe_extract_zip(code_input.name, tmpdir)
    except ValueError as e:
        yield (
            gr.update(visible=False), gr.update(visible=False), gr.update(visible=True),
            f"### Error: {escape_html(str(e))}", [],
            gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), {},
        )
        return

    if not config.is_configured:
        yield (
            gr.update(visible=False), gr.update(visible=False), gr.update(visible=True),
            "### ZAI_API_KEY not set.", [],
            gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), {},
        )
        return

    courtroom = Courtroom(config)
    courtroom.pipeline.create_run(code_input.name)
    current_phase = Phase.IDLE

    for event in courtroom.run(tmpdir):
        if event.phase != current_phase:
            current_phase = event.phase
            log.debug("[UI] Phase -> %s", current_phase)

        if event.data and "report" in event.data and event.data["report"] is not None:
            robj = event.data["report"]
            if hasattr(robj, "findings"):
                ev_html = evidence_html(robj)
                chat.append({
                    "role": "user",
                    "content": f"**Case Filed**: {robj.file_count} files, **{len(robj.findings)}** findings.",
                })

        if event.data and "reports" in event.data:
            for domain, text in event.data["reports"].items():
                icon = {"security": "πŸ›‘οΈ", "quality": "πŸ“‹", "architecture": "πŸ—οΈ"}.get(domain, "πŸ“")
                chat.append({
                    "role": "assistant",
                    "content": f"**{icon} {domain.title()} Investigation**\n\n{text[:3000]}{'...' if len(text) > 3000 else ''}",
                })

        if event.data and "transcript" in event.data:
            for section in event.data["transcript"].split("\n\n"):
                for rnd in ["PROSECUTION", "DEFENSE", "REBUTTAL"]:
                    if section.startswith(f"=== {rnd}"):
                        content = section.replace(f"=== {rnd} ===", "").strip()
                        icon = {"PROSECUTION": "βš–οΈ", "DEFENSE": "πŸ›‘οΈ", "REBUTTAL": "βš–οΈ"}.get(rnd, "πŸ“")
                        chat.append({
                            "role": "assistant",
                            "content": f"**{icon} {rnd.title()}**\n\n{content[:3000]}{'...' if len(content) > 3000 else ''}",
                        })
                        break

        if event.data and "verdict" in event.data:
            verdict_text = event.data["verdict"]
            chat.append({
                "role": "assistant",
                "content": f"**πŸ”¨ Judge's Verdict**\n\n{verdict_text[:3000]}{'...' if len(verdict_text) > 3000 else ''}",
            })

        if event.data and "report" in event.data and isinstance(event.data["report"], str):
            report_text = event.data["report"]
            chat.append({
                "role": "assistant",
                "content": f"**πŸ“„ Final Report**\n\n{report_text[:4000]}{'...' if len(report_text) > 4000 else ''}",
            })

        show_export = event.phase == Phase.COMPLETE
        ctx = {}
        if show_export:
            log.debug("[UI] COMPLETE β€” showing export")
            ctx = {"evidence": ev_html, "verdict": verdict_text, "report": report_text}

        label = PHASE_LABELS.get(current_phase, current_phase.value)
        if current_phase not in (Phase.COMPLETE, Phase.FAILED):
            status = f'<h3 class="phase-active">{label}</h3>\n{event.status}'
        else:
            status = f"### {label}\n{event.status}"

        log.debug("[UI] yield β€” chat msgs: %d, status: %s, export: %s", len(chat), label, show_export)

        yield (
            gr.update(visible=False), gr.update(visible=False), gr.update(visible=True),
            status, chat,
            gr.update(visible=show_export), gr.update(visible=show_export),
            gr.update(visible=show_export), ctx,
        )


def handle_question(question: str, history: list, ctx: dict) -> tuple:
    """Handle a follow-up question from the Q&A interface."""
    if not question.strip():
        return history, ""
    config = TribunalConfig()
    courtroom = Courtroom(config)
    answer = courtroom.ask_question(question, ctx)
    history.append({"role": "user", "content": question})
    history.append({
        "role": "assistant",
        "content": f"**πŸ•΅οΈ Expert Witness**\n\n{answer[:3000]}{'...' if len(answer) > 3000 else ''}",
    })
    return history, ""