Spaces:
Sleeping
Sleeping
File size: 8,904 Bytes
c602f24 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 | """Re-evaluate US tickers from VPS portfolio through the full agent pipeline.
For each US ticker with a previous BUY/STRONG BUY verdict stored on the VPS:
1. Run the full LangGraph pipeline (gatekeeper β analyst with debate/Kelly).
2. Compare the new verdict against the old one.
3. If the verdict changed, update the VPS DuckDB via the data API.
4. If the new verdict is still BUY/STRONG BUY, route through Alpaca paper trading.
Usage:
python -m scripts.test_reeval_us_tickers # dry-run (no Alpaca orders)
ALPACA_ENABLED=true python -m scripts.test_reeval_us_tickers # live paper orders
"""
import os
import sys
import json
import time
import warnings
import requests
from datetime import datetime
from pathlib import Path
# Ensure project root on path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from dotenv import load_dotenv
load_dotenv()
warnings.filterwarnings("ignore", category=UserWarning, module="pydantic")
from src.core.logger import get_logger
from src.agent import app as agent_app # The compiled LangGraph
logger = get_logger("reeval_test")
# VPS config
VPS_API_URL = os.getenv("VPS_API_URL", "").rstrip("/")
VPS_API_KEY = os.getenv("VPS_API_KEY", "")
def _vps_headers():
return {"X-API-Key": VPS_API_KEY, "Content-Type": "application/json"}
def fetch_us_tickers_from_vps() -> list[dict]:
"""Get all US tickers from VPS portfolio (no dot in ticker = US)."""
resp = requests.get(f"{VPS_API_URL}/portfolio", headers=_vps_headers(), timeout=10)
resp.raise_for_status()
portfolio = resp.json()
# Filter US-only tickers (no dot suffix), deduplicate by keeping latest
# Exclude test entries and large-cap tickers used for testing
EXCLUDE = {
"TEST1", "PYTEST1", "PYTH", "GHACT", "TEST.VPS",
# Large-cap tickers used for VPS smoke tests, not real micro-cap finds
"AAPL", "TSLA", "MSFT", "GOOG", "NVDA", "AMZN", "META",
}
import re
_TEST_RE = re.compile(r"^(TEST|PYTEST|TST|T\d{3,})", re.IGNORECASE)
seen = {}
for trade in portfolio:
ticker = trade["ticker"]
if ("." not in ticker
and ticker not in EXCLUDE
and not _TEST_RE.match(ticker)):
# Keep the most recent entry for each ticker
if ticker not in seen or trade["date"] > seen[ticker]["date"]:
seen[ticker] = trade
return list(seen.values())
def extract_verdict_from_report(report: str) -> str:
"""Extract the verdict keyword from a full report text."""
upper = report.upper()
if "STRONG BUY" in upper:
return "STRONG BUY"
if "BUY" in upper:
return "BUY"
if "WATCH" in upper:
return "WATCH"
return "AVOID"
def run_agent_for_ticker(ticker: str) -> dict:
"""Run the full agent pipeline for a single ticker.
Uses the same LangGraph app as the Chainlit UI.
Returns the final state dict.
"""
config = {
"configurable": {"thread_id": f"reeval-{ticker.lower()}-{int(time.time())}"},
"recursion_limit": 30,
}
# Set the ticker as manual search and region as USA
initial = {"ticker": ticker, "region": "USA"}
result = agent_app.invoke(initial, config)
return result
def update_vps_portfolio(ticker: str, entry_price: float, verdict: str,
position_size: float = 0.0,
order_id: str = None,
fill_price: float = None,
broker_status: str = "none") -> bool:
"""Record the re-evaluated trade to VPS."""
today = datetime.now().strftime("%Y-%m-%d")
payload = {
"ticker": ticker,
"entry_price": entry_price,
"date": today,
"verdict": verdict,
"source": "reeval_test",
"position_size": position_size,
"order_id": order_id,
"fill_price": fill_price,
"broker_status": broker_status,
}
try:
resp = requests.post(
f"{VPS_API_URL}/portfolio",
headers=_vps_headers(),
json=payload,
timeout=10,
)
resp.raise_for_status()
result = resp.json()
if result.get("status") == "duplicate":
logger.info("[VPS] Duplicate skipped for %s on %s", ticker, today)
return False
logger.info("[VPS] Recorded: %s %s @ $%.2f", verdict, ticker, entry_price)
return True
except Exception as exc:
logger.error("[VPS] Failed to record %s: %s", ticker, exc)
return False
def main():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--limit", type=int, default=3,
help="Max tickers to evaluate (default 3)")
args = parser.parse_args()
print("=" * 70)
print("PrimoGreedy β US Ticker Re-Evaluation Test")
print(f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Alpaca Enabled: {os.getenv('ALPACA_ENABLED', 'false')}")
print(f"VPS API: {VPS_API_URL}")
print(f"Ticker Limit: {args.limit}")
print("=" * 70)
# 1. Fetch US tickers from VPS
print("\nπ‘ Fetching US tickers from VPS portfolio...")
us_trades = fetch_us_tickers_from_vps()
if not us_trades:
print("β No US tickers found in VPS portfolio.")
return
# Apply limit
if args.limit and len(us_trades) > args.limit:
print(f"\nβ οΈ Limiting to {args.limit} of {len(us_trades)} tickers (use --limit N to change)")
us_trades = us_trades[:args.limit]
print(f"\nβ
Re-evaluating {len(us_trades)} US tickers:\n")
print(f"{'Ticker':<10} {'Old Verdict':<15} {'Entry Price':<12} {'Date':<12}")
print("-" * 50)
for t in us_trades:
print(f"{t['ticker']:<10} {t['verdict']:<15} ${t['entry_price']:<11.2f} {t['date']:<12}")
# 2. Run each ticker through the agent
results = []
for i, trade in enumerate(us_trades, 1):
ticker = trade["ticker"]
old_verdict = trade["verdict"]
print(f"\n{'='*70}")
print(f"[{i}/{len(us_trades)}] Re-evaluating {ticker} (old verdict: {old_verdict})")
print(f"{'='*70}")
try:
state = run_agent_for_ticker(ticker)
report = state.get("final_verdict", state.get("final_report", ""))
new_verdict = extract_verdict_from_report(report)
changed = new_verdict != old_verdict
result = {
"ticker": ticker,
"old_verdict": old_verdict,
"new_verdict": new_verdict,
"changed": changed,
"report_preview": report[:300] if report else "No report",
}
results.append(result)
status_icon = "π" if changed else "β
"
print(f"\n{status_icon} {ticker}: {old_verdict} β {new_verdict}"
f"{' [CHANGED]' if changed else ' [SAME]'}")
# 3. If verdict changed, update VPS
if changed:
print(f" π Updating VPS with new verdict...")
# Get current price from state
fin_data = state.get("financial_data", {})
current_price = fin_data.get("currentPrice", 0) or fin_data.get("regularMarketPrice", 0) or trade["entry_price"]
update_vps_portfolio(ticker, current_price, new_verdict)
# Print report preview
print(f"\n π Report Preview:")
for line in report[:500].split("\n"):
print(f" {line}")
if len(report) > 500:
print(f" ... ({len(report) - 500} more chars)")
except Exception as exc:
logger.error("Failed to evaluate %s: %s", ticker, exc, exc_info=True)
results.append({
"ticker": ticker,
"old_verdict": old_verdict,
"new_verdict": "ERROR",
"changed": True,
"report_preview": str(exc),
})
print(f"\nβ {ticker}: ERROR β {exc}")
# Small pause between tickers to avoid rate limits
if i < len(us_trades):
time.sleep(3)
# 4. Summary
print(f"\n{'='*70}")
print("π RE-EVALUATION SUMMARY")
print(f"{'='*70}")
print(f"\n{'Ticker':<10} {'Old':<15} {'New':<15} {'Status':<10}")
print("-" * 50)
for r in results:
status = "CHANGED" if r["changed"] else "SAME"
icon = "π" if r["changed"] else "β
"
print(f"{r['ticker']:<10} {r['old_verdict']:<15} {r['new_verdict']:<15} {icon} {status}")
changed_count = sum(1 for r in results if r["changed"])
print(f"\nπ Total: {len(results)} tickers | {changed_count} changed | {len(results) - changed_count} same")
print(f"β±οΈ Completed: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
if __name__ == "__main__":
main()
|