File size: 13,071 Bytes
399b80c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
"""Conversation log formatting for execution analysis.

Converts ``conversations.jsonl`` entries into a priority-based text block
suitable for LLM analysis prompts.  All functions are pure (stateless).

Priority levels (lower = more important):
  0 β€” CRITICAL : User instruction (never truncated)
  1 β€” CRITICAL : Final iteration assistant response (never truncated)
  2 β€” HIGH     : Tool calls (name + args) AND tool errors β€” kept together
  3 β€” HIGH     : Non-final assistant reasoning; tool results with embedded summary
  4 β€” MEDIUM   : Tool success results (try to preserve)
  5 β€” LOW      : System guidance messages between iterations
  SKIP         : Skill injection text, verbose system prompts (not included;
                 skill & tool info are provided separately in the prompt)
"""

from __future__ import annotations

import re
from typing import Any, Dict, List, Optional

# Per-section truncation limits (kept in sync with analyzer constants)
TOOL_ERROR_MAX_CHARS = 1000
TOOL_SUCCESS_MAX_CHARS = 800
TOOL_ARGS_MAX_CHARS = 500
TOOL_SUMMARY_MAX_CHARS = 1500


def format_conversations(
    conversations: List[Dict[str, Any]],
    budget: int,
) -> str:
    """Format ``conversations.jsonl`` entries into a readable text block.

    Uses priority-based truncation instead of simple tail-truncation.

    When total exceeds *budget*:
      1. Include all priority ≀ 3 (CRITICAL + HIGH) segments in full.
      2. Add MEDIUM + LOW segments until budget is exhausted, truncating
         if possible.
      3. If even HIGH content exceeds budget, keep priority 0-1 in full,
         budget-allocate priority 2, and summarize priority 3.
    """
    # Count total iterations for priority assignment
    total_iters = sum(
        1 for c in conversations if c.get("type") == "iteration"
    )

    # Phase 1: Collect all segments in chronological order with priority
    segments: List[Dict[str, Any]] = []

    for conv in conversations:
        conv_type = conv.get("type", "")
        if conv_type == "setup":
            _collect_setup_segments(conv, segments)
        elif conv_type == "iteration":
            _collect_iteration_segments(conv, total_iters, segments)

    # Phase 2: Assemble with budget management
    return _assemble_with_budget(segments, budget)

def _collect_setup_segments(
    conv: Dict[str, Any],
    segments: List[Dict[str, Any]],
) -> None:
    """Extract segments from a ``type: "setup"`` conversation entry.

    Only the user instruction is extracted.  System prompts (including skill
    injection text and tool descriptions) are skipped β€” they are provided in
    dedicated sections of the analysis prompt.
    """
    for msg in conv.get("messages", []):
        role = msg.get("role", "")
        content = msg.get("content", "")
        if not isinstance(content, str):
            content = str(content)

        if role == "user":
            segments.append({
                "priority": 0,  # CRITICAL β€” always keep
                "text": f"[USER INSTRUCTION]\n{content}",
                "iteration": 0,
                "role": "user",
                "truncatable_to": None,
            })

def _collect_iteration_segments(
    conv: Dict[str, Any],
    total_iters: int,
    segments: List[Dict[str, Any]],
) -> None:
    """Extract segments from a ``type: "iteration"`` conversation entry.

    Key design decisions:
      - Tool calls and tool errors share the SAME high priority (2)
      - Tool success results get MEDIUM priority (4)
      - Shell agent results with embedded "Execution Summary" get HIGH (3).
    """
    iteration = conv.get("iteration", "?")
    is_last = (iteration == total_iters) if isinstance(iteration, int) else False

    # Process delta_messages in order
    for msg in conv.get("delta_messages", []):
        role = msg.get("role", "")
        content = msg.get("content", "")
        if not isinstance(content, str):
            content = str(content)

        if role == "assistant":
            # Assistant reasoning
            if content:
                priority = 1 if is_last else 3
                segments.append({
                    "priority": priority,
                    "text": f"[Iter {iteration}] ASSISTANT: {content}",
                    "iteration": iteration,
                    "role": "assistant",
                    "truncatable_to": None,
                })

            # Tool calls
            for tc in msg.get("tool_calls", []):
                fn = tc.get("function", {})
                fn_name = fn.get("name", "?")
                fn_args = fn.get("arguments", "")
                if isinstance(fn_args, str) and len(fn_args) > TOOL_ARGS_MAX_CHARS:
                    fn_args = fn_args[:TOOL_ARGS_MAX_CHARS] + "..."
                segments.append({
                    "priority": 2,  # HIGH β€” paired with tool results/errors
                    "text": f"[Iter {iteration}] TOOL_CALL: {fn_name}({fn_args})",
                    "iteration": iteration,
                    "role": "tool_call",
                    "truncatable_to": None,
                })

        elif role == "tool":
            # Tool result
            is_error = _is_error_result(content)

            if is_error:
                truncated = content[:TOOL_ERROR_MAX_CHARS]
                if len(content) > TOOL_ERROR_MAX_CHARS:
                    truncated += f"... [truncated, total {len(content)} chars]"
                segments.append({
                    "priority": 2,  # HIGH β€” errors are critical, same tier as tool calls
                    "text": f"[Iter {iteration}] TOOL_ERROR: {truncated}",
                    "iteration": iteration,
                    "role": "tool_error",
                    "truncatable_to": None,
                })
            else:
                # Check if result contains a self-generated summary
                # (e.g. shell_agent produces "Execution Summary (N steps):")
                summary = _extract_embedded_summary(content)
                if summary:
                    # Show the embedded summary (high value, compact)
                    segments.append({
                        "priority": 3,  # HIGH β€” self-generated summaries are informative
                        "text": f"[Iter {iteration}] TOOL_RESULT (with summary):\n{summary}",
                        "iteration": iteration,
                        "role": "tool_result",
                        "truncatable_to": 500,
                    })
                else:
                    truncated = content[:TOOL_SUCCESS_MAX_CHARS]
                    if len(content) > TOOL_SUCCESS_MAX_CHARS:
                        truncated += f"... [truncated, total {len(content)} chars]"
                    segments.append({
                        "priority": 4,  # MEDIUM β€” try to preserve success results
                        "text": f"[Iter {iteration}] TOOL_RESULT: {truncated}",
                        "iteration": iteration,
                        "role": "tool_result",
                        "truncatable_to": 300,
                    })

        elif role == "system":
            # System guidance between iterations (e.g. "Iteration N complete...")
            if content:
                segments.append({
                    "priority": 5,  # LOW β€” guidance messages
                    "text": f"[Iter {iteration}] SYSTEM: {content}",
                    "iteration": iteration,
                    "role": "system",
                    "truncatable_to": 150,
                })

def _assemble_with_budget(
    segments: List[Dict[str, Any]],
    budget: int,
) -> str:
    """Assemble segments into final text respecting the character budget.

    Strategy:
      1. Include all segments with priority ≀ 3 (CRITICAL + HIGH) in full.
      2. Add MEDIUM + LOW segments in chronological order until budget is hit.
      3. If even HIGH-priority content exceeds budget, progressively truncate
         older iterations while preserving user instruction and final iteration.
    """
    # Calculate essential (priority ≀ 3) size
    essential = [s for s in segments if s["priority"] <= 3]
    essential_chars = sum(len(s["text"]) for s in essential)

    remaining_budget = budget - essential_chars

    if remaining_budget < 0:
        # Essential content alone exceeds budget β€” need to reduce
        # Keep priority 0-1 (user instruction + final iteration) in full
        # Truncate priority 2-3 (tool calls/errors + older assistant content)
        return _assemble_essential_only(segments, budget)

    # Build output in chronological order
    output_parts: List[str] = []
    used_chars = 0
    skipped_count = 0

    for seg in segments:
        text = seg["text"]
        priority = seg["priority"]

        if priority <= 3:
            # Essential β€” always include
            output_parts.append(text)
            used_chars += len(text) + 1
        elif used_chars + len(text) + 1 <= budget:
            # Within budget β€” include
            output_parts.append(text)
            used_chars += len(text) + 1
        else:
            # Over budget β€” try truncation
            truncatable_to = seg.get("truncatable_to")
            if truncatable_to and len(text) > truncatable_to:
                truncated = text[:truncatable_to] + "... [budget-truncated]"
                if used_chars + len(truncated) + 1 <= budget:
                    output_parts.append(truncated)
                    used_chars += len(truncated) + 1
                    continue
            skipped_count += 1

    if skipped_count > 0:
        output_parts.append(
            f"\n[... {skipped_count} lower-priority segment(s) omitted due to length ...]"
        )

    return "\n\n".join(output_parts)


def _assemble_essential_only(
    segments: List[Dict[str, Any]],
    budget: int,
) -> str:
    """Fallback: even essential content exceeds budget.

    Keep:
      - User instruction (priority 0) β€” never truncated
      - Final iteration (priority 1) β€” never truncated
      - Tool calls + tool errors (priority 2) β€” budget-allocated, truncated if needed
      - Non-final assistant reasoning (priority 3) β€” heavily summarized
    """
    output_parts: List[str] = []
    used_chars = 0

    # Pass 1: priority 0 and 1 (user instruction + final iteration)
    for seg in segments:
        if seg["priority"] <= 1:
            output_parts.append(seg["text"])
            used_chars += len(seg["text"]) + 1

    remaining = budget - used_chars

    # Pass 2: priority 2 (tool calls + tool errors) β€” budget-allocated
    tool_segments = [s for s in segments if s["priority"] == 2]
    if tool_segments:
        per_segment_budget = max(400, remaining // (len(tool_segments) + 1))
        for seg in tool_segments:
            text = seg["text"]
            if len(text) > per_segment_budget:
                text = text[:per_segment_budget] + "... [budget-truncated]"
            if used_chars + len(text) + 1 <= budget:
                output_parts.append(text)
                used_chars += len(text) + 1

    # Pass 3: priority 3 (non-final assistant reasoning) β€” one-line summaries
    assistants = [s for s in segments if s["priority"] == 3]
    if assistants and used_chars < budget:
        output_parts.append("\n--- Older iteration summaries ---")
        for seg in assistants:
            first_line = seg["text"].split("\n", 1)[0][:200]
            if used_chars + len(first_line) + 1 > budget:
                output_parts.append("[... remaining iterations omitted ...]")
                break
            output_parts.append(first_line)
            used_chars += len(first_line) + 1

    return "\n\n".join(output_parts)

def _is_error_result(content: str) -> bool:
    """Detect if a tool result represents an error."""
    if not content:
        return False
    # Check common error patterns in the first 200 chars
    head = content[:200].lower()
    return (
        content.startswith("[ERROR]")
        or content.startswith("ERROR")
        or "error" in head[:50]
        or "task failed" in head
        or "connection refused" in head
        or "timed out" in head
        or "traceback" in head
    )


def _extract_embedded_summary(content: str) -> Optional[str]:
    """Extract self-generated summary from tool result content.

    Shell agent results often contain an ``Execution Summary (N steps):``
    block that provides a compact view of what happened internally.
    This is more informative than the raw output.
    """
    # Look for "Execution Summary (N steps):" pattern
    match = re.search(
        r"(Execution Summary \(\d+ steps?\):.*?)(?:={10,}|$)",
        content,
        re.DOTALL,
    )
    if match:
        summary = match.group(1).strip()
        # Also capture any "Summary:" line after the steps
        summary_match = re.search(r"\nSummary:\s*(.+)", content)
        if summary_match:
            summary += f"\nConclusion: {summary_match.group(1).strip()}"
        return summary[:TOOL_SUMMARY_MAX_CHARS]

    return None