Spaces:
Sleeping
Sleeping
File size: 5,611 Bytes
ffc1e30 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 | """Inline (online) evaluators — run after each analyst verdict during cron.
Only runs the *cheap* evaluators (no LLM calls):
- format_score: structural checks (headers, duplicates, Kelly section)
- verdict_validity_score: valid verdict keyword present
Results are logged as LangSmith feedback on the current run.
Falls back silently if LangSmith is not configured.
"""
import os
import re
from src.core.logger import get_logger
logger = get_logger(__name__)
VALID_VERDICTS = {"STRONG BUY", "BUY", "WATCH", "AVOID"}
REQUIRED_HEADERS = [
"### THE QUANTITATIVE BASE",
"### THE LYNCH PITCH",
"### THE MUNGER INVERT",
"### FINAL VERDICT",
]
def _format_score(verdict_text: str) -> dict:
"""Check structural correctness of the verdict report."""
if not verdict_text or "REJECTED" in verdict_text.upper():
return {"key": "format_score", "score": 1.0, "comment": "Rejected/empty, N/A"}
issues = []
total_checks = 0
for header in REQUIRED_HEADERS:
total_checks += 1
count = verdict_text.count(header)
if count == 0:
issues.append(f"Missing: {header}")
elif count > 1:
issues.append(f"Duplicated ({count}x): {header}")
upper = verdict_text.upper()
is_buy = "STRONG BUY" in upper or ("BUY" in upper and "AVOID" not in upper)
if is_buy:
total_checks += 1
if "POSITION SIZING" not in verdict_text and "Kelly" not in verdict_text:
issues.append("Missing Kelly section for BUY verdict")
passed = total_checks - len(issues)
score = passed / total_checks if total_checks > 0 else 1.0
return {
"key": "format_score",
"score": score,
"comment": "; ".join(issues) if issues else "All format checks passed",
}
def _verdict_validity_score(verdict_text: str) -> dict:
"""Check that the final verdict is one of the 4 valid values."""
if not verdict_text or "REJECTED" in verdict_text.upper():
return {"key": "verdict_validity", "score": 1.0, "comment": "Rejected, N/A"}
upper = verdict_text.upper()
found = None
if "STRONG BUY" in upper:
found = "STRONG BUY"
elif "BUY" in upper:
found = "BUY"
elif "WATCH" in upper:
found = "WATCH"
elif "AVOID" in upper:
found = "AVOID"
if found and found in VALID_VERDICTS:
return {"key": "verdict_validity", "score": 1.0, "comment": f"Valid: {found}"}
return {"key": "verdict_validity", "score": 0.0, "comment": "Invalid/missing verdict"}
def log_online_feedback(
verdict_text: str,
ticker: str,
*,
run_id: str | None = None,
is_fallback: bool = False,
) -> None:
"""Run cheap evaluators and post results as LangSmith feedback.
Requires LANGCHAIN_API_KEY and LANGCHAIN_TRACING_V2=true in env.
Fails silently if LangSmith is unavailable.
"""
api_key = os.getenv("LANGCHAIN_API_KEY", "")
tracing = os.getenv("LANGCHAIN_TRACING_V2", "").lower() == "true"
if not api_key or not tracing:
return
evals = [
_format_score(verdict_text),
_verdict_validity_score(verdict_text),
]
try:
from langsmith import Client
client = Client()
for ev in evals:
client.create_feedback(
run_id=run_id,
key=ev["key"],
score=ev["score"],
comment=f"[{ticker}] {ev['comment']}",
source_info={"type": "online_eval", "ticker": ticker},
) if run_id else None
logger.info(
"Online eval [%s] %s: %.2f — %s",
ticker, ev["key"], ev["score"], ev["comment"],
)
except Exception as exc:
logger.debug("LangSmith feedback skipped: %s", exc)
def tag_for_review(
verdict_text: str,
ticker: str,
*,
run_id: str | None = None,
is_fallback: bool = False,
) -> None:
"""Tag LangSmith runs that need human review.
Criteria:
- WATCH or AVOID verdicts (edge cases worth reviewing)
- Fallback-path verdicts (structured output failed)
"""
api_key = os.getenv("LANGCHAIN_API_KEY", "")
tracing = os.getenv("LANGCHAIN_TRACING_V2", "").lower() == "true"
if not api_key or not tracing or not run_id:
return
upper = (verdict_text or "").upper()
needs_review = is_fallback or "WATCH" in upper or "AVOID" in upper
if not needs_review:
return
reasons = []
if is_fallback:
reasons.append("fallback_path")
if "WATCH" in upper:
reasons.append("WATCH_verdict")
if "AVOID" in upper:
reasons.append("AVOID_verdict")
try:
from langsmith import Client
client = Client()
client.update_run(
run_id,
extra={
"metadata": {
"needs_review": True,
"review_reasons": reasons,
"ticker": ticker,
}
},
tags=["needs_review"] + reasons,
)
logger.info(
"Tagged run %s for review: %s (%s)",
run_id[:8] if run_id else "?", ticker, ", ".join(reasons),
)
except Exception as exc:
logger.debug("LangSmith annotation skipped: %s", exc)
def get_current_run_id() -> str | None:
"""Attempt to retrieve the current LangSmith run ID from callback context."""
try:
from langsmith import get_current_run_tree
rt = get_current_run_tree()
return str(rt.id) if rt else None
except Exception:
return None
|