File size: 11,927 Bytes
aec0295
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
"""
Reward computation and normalization for the trading environment.
All rewards and grades are normalized to [0, 1].
"""

import numpy as np
from typing import Dict
import json
import re


# Default reward component weights
DEFAULT_WEIGHTS = {
    "profit": 1.0,
    "drawdown": 0.5,
    "volatility": 0.3,
    "sharpe": 0.5,
    "overtrading": 0.1,
    "hold_penalty": 0.01,
    "directional_bonus": 0.3,
}

# Normalization: tanh scale factor (higher = sharper gradient near zero)
DEFAULT_NORM_SCALE = 5.0


def compute_raw_reward(
    profit: float,
    drawdown: float,
    volatility: float,
    sharpe: float,
    trade_count: int,
    weights: Dict[str, float] | None = None,
    direction: int = 0,
    price_trend: float = 0.0,
) -> float:
    """
    Compute the raw (un-normalized) reward signal.

    The profit signal is amplified (Γ—1000) so single-step PnL fractions
    produce meaningful gradient.  A small hold-penalty discourages the
    model from always choosing direction=0, and a directional bonus
    rewards matching the market trend.

    Args:
        profit: Change in portfolio value (as fraction of initial).
        drawdown: Current max drawdown [0, 1].
        volatility: Return standard deviation.
        sharpe: Sharpe ratio of returns.
        trade_count: Number of trades executed this step.
        weights: Component weights (uses defaults if None).
        direction: Action direction (0=Hold, 1=Buy, 2=Sell).
        price_trend: Signed price change fraction for the step.

    Returns:
        Raw reward (float, unbounded).
    """
    w = weights or DEFAULT_WEIGHTS

    # Amplify per-step profit so it's not buried in noise
    profit_signal = w["profit"] * profit * 1000.0

    # Penalties
    dd_penalty = w["drawdown"] * drawdown
    vol_penalty = w["volatility"] * volatility
    overtrade_penalty = w["overtrading"] * (trade_count / 10.0)

    # Bonuses
    sharpe_bonus = w["sharpe"] * np.tanh(sharpe)

    # Hold penalty: small cost for doing nothing
    hold_pen = w.get("hold_penalty", 0.01) if direction == 0 else 0.0

    # Directional correctness: reward matching the trend
    dir_bonus = 0.0
    w_dir = w.get("directional_bonus", 0.3)
    if direction == 1 and price_trend > 0:       # Bought into uptrend
        dir_bonus = w_dir * min(abs(price_trend) * 100, 1.0)
    elif direction == 2 and price_trend < 0:     # Sold into downtrend
        dir_bonus = w_dir * min(abs(price_trend) * 100, 1.0)
    elif direction != 0:                         # Wrong direction
        dir_bonus = -w_dir * 0.5

    reward = (
        profit_signal
        - dd_penalty
        - vol_penalty
        + sharpe_bonus
        - overtrade_penalty
        - hold_pen
        + dir_bonus
    )
    return float(reward)


def normalize_reward(
    raw: float,
    scale: float | None = None,
) -> float:
    """
    Normalize reward to [-1, 1] using tanh scaling.

    This preserves the sign (positive = good, negative = bad) and
    provides smooth gradient everywhere, unlike the old min-max clip
    which collapsed everything to ~0.5.
    """
    s = float(scale if scale is not None else DEFAULT_NORM_SCALE)
    return float(np.tanh(raw / s))


def compute_grade(metrics: Dict[str, float]) -> float:
    """
    Compute the final evaluation grade [0, 1].

    grade = 0.4 * normalized_profit
          + 0.3 * normalized_sharpe
          + 0.2 * (1 - normalized_drawdown)
          + 0.1 * consistency

    All input metrics must already be in [0, 1].
    """
    profit = np.clip(metrics.get("profit", 0.0), 0.0, 1.0)
    sharpe = np.clip(metrics.get("sharpe", 0.0), 0.0, 1.0)
    drawdown = np.clip(metrics.get("drawdown", 0.0), 0.0, 1.0)
    consistency = np.clip(metrics.get("consistency", 0.0), 0.0, 1.0)

    grade = (
        0.4 * profit
        + 0.3 * sharpe
        + 0.2 * (1.0 - drawdown)
        + 0.1 * consistency
    )
    return float(np.clip(grade, 0.0, 1.0))


def _extract_json_action(completion: str):
    match = re.search(r"<action>\s*({.*?})\s*</action>", completion, re.DOTALL)
    if not match:
        return None
    return json.loads(match.group(1))


def _extract_prompt_state(prompt: str):
    json_match = re.search(r'"state"\s*:\s*\[(.*?)\]', prompt, re.DOTALL)
    if json_match:
        return [float(x.strip()) for x in json_match.group(1).split(",") if x.strip()]

    plain_match = re.search(r"State:\s*\[(.*?)\]", prompt, re.DOTALL)
    if plain_match:
        return [float(x.strip()) for x in plain_match.group(1).split(",") if x.strip()]

    return None


def _extract_signal_value(prompt: str, key: str):
    json_match = re.search(rf'"{key}"\s*:\s*(-?[\d\.]+)', prompt)
    if json_match:
        return float(json_match.group(1))

    plain_match = re.search(rf"{key}\s*[:=]\s*(-?[\d\.]+)", prompt)
    if plain_match:
        return float(plain_match.group(1))

    return None


# ──────────────────────────────────────────────
# GRPO Verifier Functions (Expert Optimized)
# ──────────────────────────────────────────────

def format_reward_func(prompts, completions, **kwargs) -> list[float]:
    """Strict format and reasoning length check."""
    rewards = []
    for completion in completions:
        try:
            if "<thought>" not in completion or "</thought>" not in completion or "<action>" not in completion or "</action>" not in completion:
                rewards.append(0.0)
                continue
            
            thought = completion.split("<thought>")[1].split("</thought>")[0].strip()
            if len(thought) < 150:
                rewards.append(0.2) 
                continue

            if _extract_json_action(completion) is not None:
                rewards.append(1.0)
            else:
                rewards.append(0.4)
        except Exception:
            rewards.append(0.0)
    return rewards

def alignment_reward_func(prompts, completions, **kwargs) -> list[float]:
    """
    Ensures the <thought> matches the signals in the <prompt>.
    This is the 'Anti-Hallucination' reward.
    """
    rewards = []
    for prompt, completion in zip(prompts, completions):
        try:
            ta_signal = _extract_signal_value(prompt, "ta")
            is_bullish = ta_signal is not None and ta_signal > 0.2
            is_bearish = ta_signal is not None and ta_signal < -0.2
            
            thought = completion.split("<thought>")[1].split("</thought>")[0].lower()
            
            score = 0.5 # Baseline
            if is_bullish and ("bullish" in thought or "upward" in thought or "buy" in thought):
                score += 0.5
            elif is_bearish and ("bearish" in thought or "downward" in thought or "sell" in thought):
                score += 0.5
                
            rewards.append(score)
        except Exception:
            rewards.append(0.0)
    return rewards

def risk_reward_func(prompts, completions, **kwargs) -> list[float]:
    """Safety Constraint: Position limits and Stop-Loss presence."""
    rewards = []
    for prompt, completion in zip(prompts, completions):
        try:
            limit = _extract_signal_value(prompt, "position_limit")
            if limit is None:
                limit = _extract_signal_value(prompt, "risk")
            if limit is None:
                limit = 1.0
            
            data = _extract_json_action(completion)
            if data is not None:
                size = float(data.get("size", 0.0))
                
                # Reward 1: Under limit
                score = 0.7 if size <= limit else 0.0
                
                # Reward 2: Logic check (Mentioning 'risk' or 'limit' in thoughts)
                thought = completion.split("<thought>")[1].split("</thought>")[0].lower()
                if "risk" in thought or "limit" in thought or "constraint" in thought:
                    score += 0.3
                    
                rewards.append(score)
            else:
                rewards.append(0.0)
        except Exception:
            rewards.append(0.0)
    return rewards

def profit_reward_func(prompts, completions, **kwargs) -> list[float]:
    """
    Simulated PnL: Checks if the action (direction) matches the actual
    future price trend provided in the hidden 'scenario_result' metadata.
    """
    rewards = []
    for prompt, completion in zip(prompts, completions):
        try:
            data = _extract_json_action(completion)
            if data is None:
                rewards.append(0.0)
                continue
            direction = int(data.get("direction", 0))

            prices = _extract_prompt_state(prompt)
            if not prices or len(prices) < 2:
                rewards.append(0.0)
                continue

            is_up_trend = prices[-1] > prices[0]
            
            if direction == 1 and is_up_trend: # Buy in uptrend
                rewards.append(1.0)
            elif direction == 2 and not is_up_trend: # Sell in downtrend
                rewards.append(1.0)
            elif direction == 0: # Neutral
                rewards.append(0.5)
            else: # Wrong direction
                rewards.append(0.0)
        except Exception:
            rewards.append(0.0)
    return rewards


def governance_reward_func(prompts, completions, **kwargs) -> list[float]:
    """Self-regulation verifier: rewards actions that would pass governance
    without intervention.

    An agent that **self-regulates** (proposes compliant sizes, references
    risk constraints in its reasoning) scores higher than one that blindly
    maximises size and forces the environment to clamp it.

    Scoring rubric (0-1):
      +0.40  Action has valid JSON with size ≀ governance limit.
      +0.20  Size uses ≀ 80 % of limit (conservative, professional).
      +0.20  <thought> explicitly references governance keywords
             (risk, limit, constraint, compliance, conservative).
      +0.20  Direction is non-zero (agent is actively trading, not idle).
      -0.50  Size EXCEEDS governance limit (would trigger intervention).
    """
    rewards = []
    for prompt, completion in zip(prompts, completions):
        try:
            data = _extract_json_action(completion)
            if data is None:
                rewards.append(0.0)
                continue

            size = float(data.get("size", 0.0))
            direction = int(data.get("direction", 0))
            limit = _extract_signal_value(prompt, "position_limit")
            if limit is None:
                limit = 1.0

            score = 0.0

            # Core compliance: within limit
            if size <= limit:
                score += 0.40
                # Conservative bonus: using ≀ 80 % of limit
                if 0 < size <= limit * 0.8:
                    score += 0.20
            else:
                # Governance would intervene β€” penalise
                score -= 0.50

            # Reasoning quality: does the thought show awareness?
            try:
                thought = completion.split("<thought>")[1].split("</thought>")[0].lower()
                governance_keywords = ["risk", "limit", "constraint", "compliance",
                                       "conservative", "governance", "restrict",
                                       "drawdown", "cap", "position limit"]
                if any(kw in thought for kw in governance_keywords):
                    score += 0.20
            except (IndexError, AttributeError):
                pass

            # Activity bonus: non-hold action
            if direction != 0:
                score += 0.20

            rewards.append(float(np.clip(score, 0.0, 1.0)))
        except Exception:
            rewards.append(0.0)
    return rewards