File size: 6,484 Bytes
bd00c06
 
 
 
 
 
 
 
 
 
 
71f1fe0
bd00c06
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71f1fe0
 
 
 
 
 
 
 
bd00c06
 
 
 
71f1fe0
 
 
 
bd00c06
71f1fe0
bd00c06
71f1fe0
 
 
 
bd00c06
71f1fe0
 
 
bd00c06
71f1fe0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bd00c06
71f1fe0
 
 
 
 
 
bd00c06
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
"""Text prompt / completion adapter for the merchant policy.

Serialize an observation into a compact prompt the model can condition
on, and parse a JSON completion back into a typed
``ChargebackOpsAction``. Both helpers are pure — no provider calls, no
side effects — so they are cheap to unit-test.
"""

from __future__ import annotations

import json
import re
from typing import Any

try:
    from ..core.models import ChargebackOpsAction
except ImportError:  # pragma: no cover
    from core.models import ChargebackOpsAction


_SYSTEM_INSTRUCTION = (
    "You play the merchant-side agent in a chargeback dispute. "
    "Look at the observation and choose the single best next action. "
    "Return JSON only: "
    '{"action_type": "...", "case_id": "...", "strategy": "...", '
    '"evidence_ids": [...], "note": "..."} '
    "Use only action_types listed in available_actions. Omit fields you "
    "do not need."
)


_ALLOWED_ACTION_FIELDS: frozenset[str] = frozenset(
    {
        "action_type",
        "case_id",
        "system_name",
        "evidence_ids",
        "compelling_evidence_ids",
        "strategy",
        "note",
    }
)


def _compact_observation(observation: dict[str, Any]) -> dict[str, Any]:
    """Drop fields that add tokens without signal for the merchant policy."""

    visible_case = observation.get("visible_case")
    compact_case: dict[str, Any] | None = None
    if visible_case is not None:
        compact_case = {
            "case_id": visible_case["case_id"],
            "status": visible_case["status"],
            "reason_code": visible_case["reason_code"],
            "amount": visible_case["amount"],
            "currency": visible_case["currency"],
            "current_strategy": visible_case.get("current_strategy"),
            "systems_revealed": visible_case.get("systems_revealed", []),
            "retrieved_evidence": [
                {
                    "evidence_id": item["evidence_id"],
                    "source_system": item["source_system"],
                    "title": item["title"],
                }
                for item in visible_case.get("retrieved_evidence", [])
            ],
            "attached_evidence": [
                item["evidence_id"]
                for item in visible_case.get("attached_evidence", [])
            ],
            "policy": visible_case.get("policy"),
        }

    return {
        "objective": observation.get("objective", ""),
        "selected_case_id": observation.get("selected_case_id"),
        "available_actions": observation.get("available_actions", []),
        "steps_remaining": observation.get("steps_remaining", 0),
        "queue": [
            {
                "case_id": item["case_id"],
                "status": item["status"],
                "reason_code": item["reason_code"],
                "amount": item["amount"],
                "steps_until_deadline": item["steps_until_deadline"],
            }
            for item in observation.get("queue", [])
        ],
        "visible_case": compact_case,
        "last_action_result": observation.get("last_action_result", ""),
    }


def build_prompt(observation: dict[str, Any]) -> str:
    """Return a deterministic prompt for the merchant policy."""

    compact = _compact_observation(observation)
    body = json.dumps(compact, separators=(",", ":"), sort_keys=True)
    return f"{_SYSTEM_INSTRUCTION}\nOBSERVATION:\n{body}\nACTION:"


def parse_completion(text: str) -> dict[str, Any] | None:
    """Parse a model completion into a raw action dict, or return None.

    Tolerates: code fences, leading prose / `<think>` blocks, prefix words
    naming the action_type before the JSON, and JSON truncated mid-string
    (auto-closes at the last balanced field). Required because untrained
    Qwen-style chat models often emit valid JSON head + truncated tail —
    a strict parser would zero out the entire training signal.
    """

    if not text:
        return None
    cleaned = text.strip()
    cleaned = re.sub(r"<think>.*?</think>", "", cleaned, flags=re.DOTALL).strip()
    cleaned = re.sub(r"^```(?:json)?\s*", "", cleaned)
    cleaned = re.sub(r"```\s*$", "", cleaned).strip()

    start = cleaned.find("{")
    if start == -1:
        return None
    prefix = cleaned[:start].strip()
    body = cleaned[start:]

    data: dict[str, Any] | None = None
    try:
        candidate = json.loads(body)
        if isinstance(candidate, dict):
            data = candidate
    except json.JSONDecodeError:
        pass

    if data is None:
        depth = 0
        in_str = False
        esc = False
        last_safe = -1
        for i, ch in enumerate(body):
            if esc:
                esc = False
                continue
            if ch == "\\":
                esc = True
                continue
            if ch == '"':
                in_str = not in_str
                continue
            if in_str:
                continue
            if ch == "{":
                depth += 1
            elif ch == "}":
                depth -= 1
                if depth == 0:
                    try:
                        candidate = json.loads(body[: i + 1])
                        if isinstance(candidate, dict):
                            data = candidate
                            break
                    except json.JSONDecodeError:
                        pass
            elif ch == "," and depth == 1:
                last_safe = i
        if data is None and last_safe != -1:
            try:
                candidate = json.loads(body[:last_safe] + "}")
                if isinstance(candidate, dict):
                    data = candidate
            except json.JSONDecodeError:
                pass

    if data is None:
        return None

    if "action_type" not in data and prefix:
        m = re.match(r"[a-z_][a-z0-9_]*", prefix.lower())
        if m:
            data["action_type"] = m.group(0)

    return {k: v for k, v in data.items() if k in _ALLOWED_ACTION_FIELDS}


def action_from_completion(text: str) -> ChargebackOpsAction | None:
    """Parse a completion and build a validated :class:`ChargebackOpsAction`."""

    parsed = parse_completion(text)
    if parsed is None or "action_type" not in parsed:
        return None
    try:
        return ChargebackOpsAction(**parsed)
    except Exception:
        return None


__all__ = [
    "action_from_completion",
    "build_prompt",
    "parse_completion",
]