File size: 14,133 Bytes
bd351d2
 
 
 
 
 
 
 
 
c8055f7
 
 
 
bd351d2
8457788
7d1d321
c8055f7
bd351d2
c8055f7
8457788
 
 
 
 
 
 
 
 
 
 
c8055f7
 
 
8457788
ffb673b
c8055f7
 
8457788
 
bd351d2
c8055f7
 
bd351d2
c8055f7
 
bd351d2
 
 
c8055f7
 
 
bd351d2
 
c8055f7
bd351d2
c8055f7
 
 
8457788
c8055f7
8457788
c8055f7
 
 
 
 
 
 
 
 
 
 
8457788
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c8055f7
 
8457788
 
 
bd351d2
8457788
 
 
 
 
 
 
 
 
 
c8055f7
 
 
 
 
8457788
 
 
bd351d2
 
 
 
8457788
 
 
bd351d2
 
 
 
 
8457788
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c8055f7
8457788
 
c8055f7
 
 
bd351d2
 
 
 
 
8457788
bd351d2
8457788
bd351d2
8457788
 
 
bd351d2
 
 
 
8457788
7d1d321
bd351d2
 
 
b5e4366
7d1d321
 
 
 
 
bd351d2
 
7d1d321
bd351d2
 
 
7d1d321
bd351d2
 
 
7d1d321
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b5e4366
 
 
8457788
 
 
 
b5e4366
 
 
8457788
 
bd351d2
 
8457788
 
 
bd351d2
 
8457788
 
 
 
 
bd351d2
 
 
 
 
8457788
bd351d2
8457788
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bd351d2
8457788
 
bd351d2
 
 
8457788
 
c8055f7
 
8457788
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c8055f7
 
 
8457788
 
c8055f7
8457788
 
 
 
 
 
 
 
 
c8055f7
bd351d2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bfb16e5
 
bd351d2
bfb16e5
bd351d2
 
 
 
bfb16e5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
"""Local small-model assistance for Trace Field Notes on Hugging Face ZeroGPU.

The analysis models run on the Space GPU through ``transformers``. Heavy imports
(``torch``, ``transformers``) are loaded lazily inside the generator so that the
deterministic analyzer, the test suite, and local development keep working
without GPU dependencies installed. If a model cannot be loaded or its output is
not valid JSON, :func:`analyzer.analyze_trace_file` falls back to the
deterministic codebook and records the reason in the model notes.
"""

from __future__ import annotations

import json
import re
import time
from collections.abc import Mapping
from dataclasses import dataclass
from typing import Any, Callable

from profiling import get_logger
from schemas import (
    APPRAISALS,
    DETOUR_TYPES,
    DIFFICULTY_TYPES,
    OUTCOME_CLAIMS,
    RECOVERY_PATTERNS,
    RESOLUTION_MODES,
)

logger = get_logger()


PRIMARY_MODEL_ID = "nvidia/NVIDIA-Nemotron-3-Nano-30B-A3B-BF16"
QUICK_MODEL_ID = "openbmb/MiniCPM5-1B"
MODEL_MAX_NEW_TOKENS = 8192

MODEL_CHOICES = {
    "minicpm": {
        "label": "MiniCPM5 1B — quick analysis",
        "model_id": QUICK_MODEL_ID,
    },
    "nemotron": {
        "label": "NVIDIA Nemotron 3 Nano 30B-A3B — deeper analysis",
        "model_id": PRIMARY_MODEL_ID,
    },
    "deterministic": {
        "label": "Rule-based — instant, no model",
        "model_id": None,
    },
}

# (messages, *, model_id, max_new_tokens) -> raw model text.
GenerateFn = Callable[..., str]

_MODEL_CACHE: dict[str, Any] = {}


@dataclass(slots=True)
class ModelAnalysisResult:
    model_id: str
    analysis: dict[str, Any]
    note: str


def model_id_for_engine(engine: str) -> str | None:
    choice = MODEL_CHOICES.get(engine)
    if not choice:
        return None
    model_id = choice["model_id"]
    return str(model_id) if model_id else None


def resolve_device(device: str | None = None) -> str:
    """Pick the compute device: explicit override, else cuda -> mps -> cpu."""

    if device:
        return device
    import torch

    if torch.cuda.is_available():
        return "cuda"
    mps = getattr(torch.backends, "mps", None)
    if mps is not None and mps.is_available():
        return "mps"
    return "cpu"


def run_model_analysis(
    *,
    engine: str,
    numbered_narrative: str,
    agent_type: str = "unknown",
    codebook_hint: str = "",
    generate: GenerateFn | None = None,
    device: str | None = None,
) -> ModelAnalysisResult:
    """Run the selected model as the primary analyst and return a field report.

    The model identifies and classifies the difficulty episodes and writes the
    session verdict directly from the visible narrative; the deterministic codebook
    is only a fallback (used by the caller if this raises). ``device`` forces the
    compute device for the default local generator; an injected ``generate`` is
    used as-is.
    """

    model_id = model_id_for_engine(engine)
    if not model_id:
        raise ValueError(f"No model is configured for analysis engine {engine!r}.")

    prompt = build_analysis_prompt(
        numbered_narrative, agent_type=agent_type, codebook_hint=codebook_hint
    )
    messages = [
        {
            "role": "system",
            "content": (
                "You are an expert analyst of coding-agent session traces. "
                "Judge only the visible narrative; never invent hidden reasoning. "
                "Return one JSON object and nothing else."
            ),
        },
        {"role": "user", "content": prompt},
    ]

    started = time.perf_counter()
    if generate is not None:
        content = generate(messages, model_id=model_id, max_new_tokens=MODEL_MAX_NEW_TOKENS)
        device_label = "injected"
    else:
        device_label = resolve_device(device)
        content = _local_generator(
            messages,
            model_id=model_id,
            max_new_tokens=MODEL_MAX_NEW_TOKENS,
            device=device_label,
        )
    logger.info(
        "model analysis: %s on %s in %.2fs (%d chars in)",
        model_id,
        device_label,
        time.perf_counter() - started,
        len(numbered_narrative),
    )
    analysis = parse_analysis_json(content)
    return ModelAnalysisResult(
        model_id=model_id,
        analysis=analysis,
        note=f"Analysis produced by {model_id}.",
    )


def _local_generator(
    messages: list[dict[str, str]],
    *,
    model_id: str,
    max_new_tokens: int,
    device: str | None = None,
) -> str:
    """Generate text with a locally loaded model on the chosen device.

    Imported lazily: ``torch`` only needs to exist on the GPU Space (or a local
    machine running the model), never for the deterministic path, tests, or
    light local development.
    """

    import torch

    tokenizer, model = _load_model(model_id, device=device)
    chat_inputs = tokenizer.apply_chat_template(
        messages,
        add_generation_prompt=True,
        return_tensors="pt",
        **_chat_template_kwargs(model_id),
    )
    generation_inputs, prompt_token_count = _prepare_generation_inputs(
        chat_inputs,
        device=model.device,
    )
    with torch.no_grad():
        generated = model.generate(
            **generation_inputs,
            max_new_tokens=max_new_tokens,
            do_sample=False,
        )
    completion = generated[0][prompt_token_count:]
    return tokenizer.decode(completion, skip_special_tokens=True)


def _prepare_generation_inputs(chat_inputs: Any, *, device: Any) -> tuple[dict[str, Any], int]:
    """Move tokenizer output to device and return kwargs plus prompt length.

    ``apply_chat_template`` may return either a tensor-like object or a
    ``BatchEncoding``/mapping depending on the tokenizer. ``generate`` accepts
    tensor input through the ``inputs=`` keyword and mapping input through
    expanded kwargs such as ``input_ids`` and ``attention_mask``.
    """

    moved = _move_to_device(chat_inputs, device)
    if isinstance(moved, Mapping):
        generation_inputs = {
            key: _move_to_device(value, device)
            for key, value in moved.items()
        }
        input_ids = generation_inputs.get("input_ids")
        if input_ids is None or not hasattr(input_ids, "shape"):
            raise ValueError("Tokenizer output did not include tensor-shaped input_ids.")
        return generation_inputs, int(input_ids.shape[-1])

    if not hasattr(moved, "shape"):
        raise ValueError("Tokenizer output was neither a tensor nor a mapping.")
    return {"inputs": moved}, int(moved.shape[-1])


def _move_to_device(value: Any, device: Any) -> Any:
    if hasattr(value, "to"):
        return value.to(device)
    return value


def _chat_template_kwargs(model_id: str) -> dict[str, Any]:
    """Model-specific chat-template controls."""

    if model_id.startswith("openbmb/"):
        # MiniCPM5 supports hybrid reasoning; the quick engine keeps thinking
        # off for fast, reliably parseable JSON memos.
        return {"enable_thinking": False}
    return {}


def _load_model(model_id: str, device: str | None = None) -> Any:
    """Lazily load and cache a (tokenizer, model) pair on the chosen device.

    The cache keeps weights resident across requests so only the first call per
    (model, device) pays the load cost. ZeroGPU exposes CUDA inside the
    ``@spaces.GPU`` context; CPU/MPS support lets the app run off-Space (e.g. for
    users without GPU quota, or local development).
    """

    import torch

    resolved = resolve_device(device)
    cache_key = f"{model_id}@{resolved}"
    cached = _MODEL_CACHE.get(cache_key)
    if cached is not None:
        return cached

    from transformers import AutoModelForCausalLM, AutoTokenizer

    started = time.perf_counter()
    tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)
    if resolved == "cuda":
        # The ZeroGPU Space path: load straight onto the GPU in bfloat16.
        model = AutoModelForCausalLM.from_pretrained(
            model_id,
            dtype=torch.bfloat16,
            device_map="cuda",
            trust_remote_code=True,
        )
    else:
        # CPU / Apple MPS: fp16 on MPS, fp32 on CPU for numerical stability.
        dtype = torch.float16 if resolved == "mps" else torch.float32
        model = AutoModelForCausalLM.from_pretrained(
            model_id,
            dtype=dtype,
            trust_remote_code=True,
        ).to(resolved)
    model.eval()
    logger.info("loaded %s on %s in %.1fs", model_id, resolved, time.perf_counter() - started)
    _MODEL_CACHE[cache_key] = (tokenizer, model)
    return tokenizer, model


def _vocab_block(name: str, vocab: dict[str, str]) -> str:
    return f"{name}:\n" + "\n".join(f"- {key}: {meaning}" for key, meaning in vocab.items())


def build_analysis_prompt(
    numbered_narrative: str, *, agent_type: str = "unknown", codebook_hint: str = ""
) -> str:
    narrative = numbered_narrative[:16000]
    vocab = "\n\n".join(
        [
            _vocab_block("difficulty_type", DIFFICULTY_TYPES),
            _vocab_block("appraisal", APPRAISALS),
            _vocab_block("detour_type", DETOUR_TYPES),
            _vocab_block("resolution_mode", RESOLUTION_MODES),
            _vocab_block("recovery_pattern", RECOVERY_PATTERNS),
            _vocab_block("outcome_claim", OUTCOME_CLAIMS),
        ]
    )
    return f"""Read the agent's visible narrative and produce a structured field report as JSON.

Identify the real DIFFICULTY EPISODES — moments where the agent hit a snag, reassessed,
detoured, recovered, or claimed completion. Ignore instructions, skill files, prompts,
or boilerplate the agent merely read or quoted; those are NOT difficulties. Merge
duplicates. Prefer 1-8 substantive episodes; if there is genuinely no difficulty,
return an empty episodes list.

Return ONE JSON object (first character {{ and last character }}), no prose, EXACTLY:
{{
  "verdict": {{
    "tone": one of ["stable","iterative","detour","partial","risk","unknown"],
    "headline": "<= 12 words, plain language",
    "detail": "2-4 sentences a developer can act on",
    "honesty": one of ["candid","mixed","overclaimed"]
  }},
  "overall_patterns": {{
    "difficulty_style": "1 sentence", "detour_style": "1 sentence",
    "recovery_style": "1 sentence", "risk_or_caveat": "1 sentence"
  }},
  "episodes": [
    {{
      "start_index": <a message index shown below>,
      "end_index": <a message index shown below>,
      "title": "<= 10 words",
      "initial_intention": "1 sentence", "reported_difficulty": "1-2 sentences",
      "difficulty_type": "<one key below>", "appraisal": "<one key below>",
      "strategy_before": "1 sentence", "strategy_after": "1 sentence",
      "detour_type": "<one key below>", "resolution_mode": "<one key below>",
      "recovery_pattern": "<one key below>", "outcome_claim": "<one key below>",
      "productive_detour": one of ["yes","no","mixed","unknown"],
      "evidence_quotes": ["short verbatim quote", "up to 3"],
      "analyst_memo": "1-3 sentences of real insight, NOT a restatement of the codes"
    }}
  ]
}}

Controlled vocabulary (use these keys exactly):
{vocab}

Guidance:
- Every field must contain real content drawn from the trace. NEVER output a
  placeholder such as "<= 10 words", "1 sentence", or "<one key below>" literally.
- difficulty_type, appraisal, detour_type, resolution_mode, recovery_pattern, and
  outcome_claim must each be EXACTLY one key from the vocabulary above (lowercase,
  with underscores). If unsure, use "unknown".
- Be accurate, not generous. If the agent ended unresolved or overclaimed, say so in tone/honesty.
- honesty = "overclaimed" when a success claim outruns the visible evidence.
- start_index / end_index must be message indices that appear below.
- Quote the agent's own words; keep the original language of the quote.
- Do not include secrets or long tool dumps.

Agent type: {agent_type}
Rule-based pre-scan candidate spans (hints only — keep, drop, merge, or add freely): {codebook_hint or "(none)"}

Numbered visible messages:
{narrative}
"""


def parse_analysis_json(content: str) -> dict[str, Any]:
    """Validate the structural shape of the model's field report (codes coerced later)."""

    parsed = _loads_lenient(content)
    episodes = parsed.get("episodes")
    if not isinstance(episodes, list):
        raise ValueError("Model response did not include an 'episodes' list.")
    parsed["episodes"] = [episode for episode in episodes if isinstance(episode, dict)]
    if not isinstance(parsed.get("overall_patterns"), dict):
        parsed["overall_patterns"] = {}
    if not isinstance(parsed.get("verdict"), dict):
        parsed["verdict"] = {}
    return parsed


def _loads_lenient(content: str) -> dict[str, Any]:
    """Parse JSON from a model that may wrap it in prose or code fences."""

    if not isinstance(content, str) or not content.strip():
        raise ValueError("Model response content was empty.")

    text = content.strip()
    fence = re.match(r"^```[a-zA-Z0-9]*\s*(.*?)\s*```$", text, re.DOTALL)
    if fence:
        text = fence.group(1).strip()

    try:
        parsed: Any = json.loads(text)
    except json.JSONDecodeError:
        candidates = list(_json_object_candidates(text))
        if not candidates:
            raise ValueError("Model response was not valid JSON.")
        parsed = candidates[-1]

    if not isinstance(parsed, dict):
        raise ValueError("Model response was not a JSON object.")
    return parsed


def _json_object_candidates(text: str) -> list[dict[str, Any]]:
    decoder = json.JSONDecoder()
    candidates: list[dict[str, Any]] = []
    cursor = 0
    while True:
        start = text.find("{", cursor)
        if start == -1:
            return candidates
        try:
            parsed, consumed = decoder.raw_decode(text[start:])
        except json.JSONDecodeError:
            cursor = start + 1
            continue
        if isinstance(parsed, dict):
            candidates.append(parsed)
        cursor = start + max(consumed, 1)