File size: 10,037 Bytes
aceb1b2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
"""
Signal-based triage scoring for the annotation queue.

Captures a per-item quality signal — an agent error, a production thumbs-down, a
low automated score, or any custom field — and turns it into a numeric
*triage priority* so the assignment queue can surface the worst / most-suspect
traces first instead of annotating in arrival (FIFO) order.

The same scorer runs over statically loaded data and over traces ingested at
runtime (webhook / Langfuse), because both funnel through
``ItemStateManager.add_item``. The priority is stored on the item's metadata
(``triage_priority`` / ``triage_reason`` / ``triage_rule``); the ``priority``
assignment strategy reads it; the inline badge and the admin queue page surface it.

Config (all optional):

    triage:
      enabled: true
      order: desc            # high priority first (default); 'asc' = low first
      default_priority: 0    # items matching no rule
      show_badge: true       # show a "why prioritized" banner during annotation
      signal_field: null     # read a numeric priority straight from this field
      invert_signal: false   # if true, a LOWER field value => HIGHER priority
      rules:                 # evaluated in order; highest matching priority wins
        - name: "Agent errored"
          when: {field: "status", equals: "error"}
          priority: 100
          badge: "Agent errored"
        - name: "Negative feedback"
          when: {field: "feedback", in: ["thumbs_down", "negative"]}
          priority: 80
        - name: "Low score"
          when: {field: "score", lt: 0.5}
          priority: 60

When ``enabled`` with no ``rules`` and no ``signal_field``, a turnkey set of
built-in defaults is used (error status, negative feedback, low score) so
ingested traces are triaged out of the box.
"""

from __future__ import annotations

import logging
from dataclasses import dataclass

logger = logging.getLogger(__name__)


# Turnkey defaults applied when triage is enabled but no rules/signal_field are
# configured. These match the signals ingested traces most commonly carry.
DEFAULT_RULES = [
    {"name": "Agent errored", "badge": "Agent errored", "priority": 100,
     "when": {"field": "status", "in": ["error", "failed", "failure"]}},
    {"name": "Negative feedback", "badge": "Negative feedback", "priority": 80,
     "when": {"field": "feedback", "in": ["thumbs_down", "negative", "down", "👎"]}},
    {"name": "Low score", "badge": "Low score", "priority": 60,
     "when": {"field": "score", "lt": 0.5}},
]


@dataclass
class TriageScore:
    """The triage outcome for one item."""
    priority: float
    reason: str | None = None   # human-readable badge text (None when unflagged)
    rule: str | None = None     # the rule name that matched (None for default/field)

    def to_metadata(self) -> dict:
        return {
            "triage_priority": self.priority,
            "triage_reason": self.reason,
            "triage_rule": self.rule,
        }


def _lookup(data: dict, field: str):
    """Resolve a possibly dotted field path against an item dict.

    Supports nested dicts (``metadata.score``). Returns None if any segment is
    missing or a non-dict is traversed.
    """
    cur = data
    for part in str(field).split("."):
        if not isinstance(cur, dict) or part not in cur:
            return None
        cur = cur[part]
    return cur


def _as_number(value):
    """Coerce a value to float, or None if it isn't numeric."""
    if isinstance(value, bool):
        return None
    if isinstance(value, (int, float)):
        return float(value)
    if isinstance(value, str):
        try:
            return float(value.strip())
        except (ValueError, AttributeError):
            return None
    return None


def _matches(condition: dict, data: dict) -> bool:
    """Evaluate a single rule's ``when`` condition against item data.

    Supported operators: equals, in, lt, lte, gt, gte, exists, contains.
    String comparisons for equals/in are case-insensitive. Numeric comparisons
    coerce both sides. ``contains`` tests membership in a list/string field.
    """
    field = condition.get("field")
    if field is None:
        return False
    value = _lookup(data, field)

    if "exists" in condition:
        present = value is not None
        return present == bool(condition["exists"])

    # Absent fields never match value-based operators.
    if value is None:
        return False

    if "equals" in condition:
        target = condition["equals"]
        if isinstance(value, str) and isinstance(target, str):
            return value.strip().lower() == target.strip().lower()
        return value == target

    if "in" in condition:
        options = condition["in"] or []
        norm = [o.lower() if isinstance(o, str) else o for o in options]
        v = value.lower() if isinstance(value, str) else value
        return v in norm

    if "contains" in condition:
        target = condition["contains"]
        if isinstance(value, (list, tuple, set)):
            tnorm = target.lower() if isinstance(target, str) else target
            return any(
                (item.lower() if isinstance(item, str) else item) == tnorm
                for item in value
            )
        if isinstance(value, str) and isinstance(target, str):
            return target.lower() in value.lower()
        return False

    for op, py in (("lt", "<"), ("lte", "<="), ("gt", ">"), ("gte", ">=")):
        if op in condition:
            lhs, rhs = _as_number(value), _as_number(condition[op])
            if lhs is None or rhs is None:
                return False
            if op == "lt":
                return lhs < rhs
            if op == "lte":
                return lhs <= rhs
            if op == "gt":
                return lhs > rhs
            return lhs >= rhs

    return False


class TriageScorer:
    """Scores items into a triage priority from the ``triage`` config block."""

    def __init__(self, triage_config: dict):
        cfg = triage_config or {}
        self.enabled = bool(cfg.get("enabled", False))
        self.order = str(cfg.get("order", "desc")).lower()
        self.default_priority = float(cfg.get("default_priority", 0) or 0)
        self.show_badge = bool(cfg.get("show_badge", True))
        self.signal_field = cfg.get("signal_field")
        self.invert_signal = bool(cfg.get("invert_signal", False))

        rules = cfg.get("rules")
        if not rules and not self.signal_field:
            rules = DEFAULT_RULES
        self.rules = rules or []

    def score(self, item_data: dict) -> TriageScore:
        """Return the TriageScore for one item (highest matching rule wins)."""
        if not self.enabled:
            return TriageScore(priority=self.default_priority)

        best: TriageScore | None = None
        for rule in self.rules:
            cond = rule.get("when") or {}
            try:
                if _matches(cond, item_data):
                    pr = float(rule.get("priority", 0) or 0)
                    if best is None or pr > best.priority:
                        badge = rule.get("badge") or rule.get("name")
                        best = TriageScore(priority=pr, reason=badge, rule=rule.get("name"))
            except Exception as e:  # a malformed rule must never break loading
                logger.warning(f"Triage rule {rule.get('name')!r} failed: {e}")

        if best is not None:
            return best

        # No rule matched: optionally read a direct numeric signal.
        if self.signal_field is not None:
            raw = _as_number(_lookup(item_data, self.signal_field))
            if raw is not None:
                pr = -raw if self.invert_signal else raw
                return TriageScore(priority=pr, reason=None, rule=None)

        return TriageScore(priority=self.default_priority)


def build_scorer(config: dict) -> TriageScorer | None:
    """Build a TriageScorer from a server config, or None when triage is off."""
    triage_cfg = (config or {}).get("triage") or {}
    if not triage_cfg.get("enabled"):
        return None
    return TriageScorer(triage_cfg)


def compute_triage_queue(config: dict) -> dict:
    """Build the admin triage-queue report from the live ItemStateManager.

    Returns the remaining (incomplete) items ranked by triage priority, with the
    reason/rule that flagged them, current annotation count, and whether they are
    already assigned. Used by the ``/admin/triage-queue`` page.
    """
    from potato.item_state_management import get_item_state_manager

    scorer = build_scorer(config)
    order = (scorer.order if scorer else "desc")
    reverse = order != "asc"

    ism = get_item_state_manager()
    rows = []
    # Preserve the configured/global ordering as the deterministic tie-break.
    ordering = {iid: i for i, iid in enumerate(ism.instance_id_ordering)}
    for iid in ism.instance_id_ordering:
        item = ism.get_item(iid)
        if item is None:
            continue
        # Skip items that have reached their annotation cap.
        try:
            if ism._item_is_saturated(iid):
                continue
        except Exception:
            pass
        priority = item.get_metadata("triage_priority")
        if priority is None:
            priority = scorer.default_priority if scorer else 0
        n_ann = len(ism.instance_annotators.get(iid, set()))
        rows.append({
            "id": iid,
            "priority": priority,
            "reason": item.get_metadata("triage_reason"),
            "rule": item.get_metadata("triage_rule"),
            "annotations": n_ann,
            "assigned": n_ann > 0,
            "_order": ordering.get(iid, 0),
        })

    rows.sort(key=lambda r: (r["priority"], -r["_order"]), reverse=reverse)
    for r in rows:
        r.pop("_order", None)

    return {
        "enabled": bool(scorer),
        "order": order,
        "n_items": len(rows),
        "n_flagged": sum(1 for r in rows if r["reason"]),
        "items": rows,
    }