| import os |
| import json |
| import numpy as np |
| from openai import OpenAI |
| from typing import Dict, Any |
| from dotenv import load_dotenv |
|
|
| load_dotenv() |
|
|
|
|
| def _algorithmic_score( |
| action: Dict[str, Any], |
| agent_reasoning: Dict[str, str], |
| outcome: Dict[str, Any], |
| state_brief: str, |
| ) -> float: |
| """ |
| Deterministic scoring function that replaces the LLM judge when the |
| remote API is unavailable or rate-limited. Scores on four axes: |
| |
| 1. Direction matches TA signal sentiment (0.3) |
| 2. Position size respects risk limit (0.2) |
| 3. SL/TP are set for non-hold trades (0.2) |
| 4. Reasoning quality (length + keyword check) (0.3) |
| |
| Returns a score in [0, 1]. |
| """ |
| score = 0.0 |
|
|
| |
| direction = action.get("direction", 0) |
| if hasattr(direction, 'item'): |
| direction = int(direction) |
| pnl_pct = outcome.get("pnl_pct", 0.0) |
|
|
| if direction == 1 and pnl_pct >= 0: |
| score += 0.30 |
| elif direction == 2 and pnl_pct <= 0: |
| score += 0.30 |
| elif direction == 0: |
| score += 0.15 |
|
|
| |
| size_raw = action.get("size", 0.0) |
| size = float(size_raw[0]) if hasattr(size_raw, '__len__') else float(size_raw) |
| max_dd = outcome.get("max_drawdown", 0.0) |
|
|
| if 0.0 <= size <= 1.0: |
| score += 0.10 |
| if size <= 0.5 or max_dd < 0.10: |
| score += 0.10 |
|
|
| |
| sl_raw = action.get("sl", 0.0) |
| tp_raw = action.get("tp", 0.0) |
| sl = float(sl_raw[0]) if hasattr(sl_raw, '__len__') else float(sl_raw) |
| tp = float(tp_raw[0]) if hasattr(tp_raw, '__len__') else float(tp_raw) |
|
|
| if direction != 0: |
| if sl > 0: |
| score += 0.10 |
| if tp > 0: |
| score += 0.10 |
| else: |
| score += 0.20 |
|
|
| |
| all_reasoning = " ".join(str(v) for v in agent_reasoning.values()).lower() |
| word_count = len(all_reasoning.split()) |
|
|
| if word_count > 20: |
| score += 0.10 |
| if word_count > 50: |
| score += 0.05 |
|
|
| quality_keywords = [ |
| "rsi", "ema", "macd", "volatility", "drawdown", |
| "risk", "trend", "bullish", "bearish", "momentum", |
| "support", "resistance", "limit", "exposure", |
| ] |
| hits = sum(1 for kw in quality_keywords if kw in all_reasoning) |
| score += min(hits * 0.03, 0.15) |
|
|
| return float(np.clip(score, 0.0, 1.0)) |
|
|
|
|
| class LLMJudge: |
| """ |
| Evaluates agent interactions and provides a normalized reward. |
| |
| Primary: Llama 3.3 70B (or compatible) via OpenAI-compatible API. |
| Fallback: Deterministic algorithmic scorer (no API calls, no rate limits). |
| """ |
|
|
| def __init__(self, api_key: str | None = None, base_url: str | None = None): |
| self.base_url = base_url or os.getenv("OPENAI_BASE_URL", "") |
| remote_enabled = os.getenv("ENABLE_REMOTE_JUDGE", "false").lower() == "true" |
| resolved_key = api_key or os.getenv("OPENAI_API_KEY", "") |
| if not resolved_key and self.base_url and "groq.com" in self.base_url: |
| resolved_key = os.getenv("GROQ_API_KEY", "") |
|
|
| self.enabled = remote_enabled and bool(resolved_key) |
| self.client = None |
| if self.enabled: |
| self.client = OpenAI( |
| api_key=resolved_key, |
| base_url=self.base_url if self.base_url else None |
| ) |
| self.model = os.getenv("JUDGE_MODEL", "llama-3.3-70b-versatile") |
| self._warned = False |
| self._rate_limit_hits = 0 |
| self._max_rate_limit_hits = 3 |
|
|
| def evaluate_step(self, |
| state_brief: str, |
| agent_reasoning: Dict[str, str], |
| action: Dict[str, Any], |
| outcome: Dict[str, Any]) -> float: |
| """ |
| Evaluate a single step and return a reward [0, 1]. |
| |
| Tries the remote LLM judge first; on failure or rate-limit, |
| falls back to the algorithmic scorer automatically. |
| """ |
| |
| if not self.enabled or self._rate_limit_hits >= self._max_rate_limit_hits: |
| return _algorithmic_score(action, agent_reasoning, outcome, state_brief) |
|
|
| |
| serializable_action = { |
| k: (v.tolist() if hasattr(v, "tolist") else v) |
| for k, v in action.items() |
| } |
| serializable_outcome = { |
| k: (v.tolist() if hasattr(v, "tolist") else v) |
| for k, v in outcome.items() |
| if k not in ["positions"] |
| } |
| serializable_outcome["positions"] = outcome.get("positions", {}) |
|
|
| prompt = f""" |
| Analyze this trade execution for a professional quant firm. |
| |
| MARKET STATE: |
| {state_brief} |
| |
| AGENT REASONING: |
| {json.dumps(agent_reasoning, indent=2)} |
| |
| ACTION TAKEN: |
| {json.dumps(serializable_action, indent=2)} |
| |
| OUTCOME: |
| {json.dumps(serializable_outcome, indent=2)} |
| |
| CRITERIA: |
| 1. Professionalism: Did they follow the 1% risk rule and SL/TP constraints? |
| 2. Alignment: Does the action match the agents' reasoning? |
| 3. Logic: Was the trade direction sound given the indicators? |
| |
| Respond with ONLY a JSON object: {{"score": float, "reason": str}}. |
| The score MUST be between 0.0 and 1.0. |
| """ |
|
|
| try: |
| if not self.client: |
| return _algorithmic_score(action, agent_reasoning, outcome, state_brief) |
|
|
| response = self.client.chat.completions.create( |
| model=self.model, |
| messages=[{"role": "user", "content": prompt}], |
| temperature=0.1, |
| response_format={"type": "json_object"} |
| ) |
| content = response.choices[0].message.content |
| if not content: |
| return _algorithmic_score(action, agent_reasoning, outcome, state_brief) |
|
|
| data = json.loads(content) |
| self._rate_limit_hits = 0 |
| return float(np.clip(data.get("score", 0.5), 0.0, 1.0)) |
|
|
| except Exception as e: |
| err_str = str(e).lower() |
| if "rate" in err_str or "429" in err_str or "limit" in err_str: |
| self._rate_limit_hits += 1 |
| if self._rate_limit_hits >= self._max_rate_limit_hits: |
| print(f"Judge: rate-limited {self._rate_limit_hits}× — switching to algorithmic fallback permanently.") |
| elif not self._warned: |
| print(f"Judge error: {e} — using algorithmic fallback.") |
| self._warned = True |
|
|
| return _algorithmic_score(action, agent_reasoning, outcome, state_brief) |
|
|
| def get_episode_reward(self, metrics: Dict[str, Any]) -> float: |
| """Evaluate overall episode performance.""" |
| return 0.0 |
|
|