IRIS-AI_DEMO / generate_llm_reports.py
Brajmovech's picture
Sync localhost changes after aa6da6b into demo
f21508a
"""
Generate prediction data for tickers in watchlist.txt using three LLMs:
- ChatGPT 5.2 (OpenAI-compatible)
- DeepSeek V3
- Gemini V3 Pro
Results are appended to the existing JSON report files in data/LLM reports/,
using the same schema as gemini_v3_pro.json.
To run the script, type in the terminal:
python generate_llm_reports.py
"""
from __future__ import annotations
import json
import math
import os
import re
import time as _time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Iterable, List
from iris_mvp import IRIS_System
PROJECT_ROOT = Path(__file__).resolve().parent
LLM_REPORTS_DIR = PROJECT_ROOT / "data" / "LLM reports"
WATCHLIST_PATH = PROJECT_ROOT / "watchlist.txt"
_TICKER_ALIASES = {
"GOOGL": "GOOG",
}
def _load_env():
"""
Load environment variables from .env at project root.
Tries python-dotenv if available; otherwise falls back to a simple parser.
"""
env_path = PROJECT_ROOT / ".env"
if not env_path.exists():
return
# First try python-dotenv if installed.
try:
from dotenv import load_dotenv # type: ignore
load_dotenv(env_path)
return
except Exception:
# Fall back to manual parsing below.
pass
try:
for raw_line in env_path.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
if key:
os.environ[key] = value
except OSError:
# If we can't read .env, just skip; callers will see missing env vars.
return
_load_env()
def _retry_llm_call(fn, max_retries=2, base_delay=1.0):
"""Retry an LLM API call with exponential backoff."""
last_err = None
for attempt in range(max_retries + 1):
try:
return fn()
except Exception as exc:
last_err = exc
err_str = str(exc).lower()
if any(
kw in err_str
for kw in [
"api_key",
"authentication",
"unauthorized",
"invalid key",
"not installed",
]
):
break
if attempt < max_retries:
_time.sleep(base_delay * (2 ** attempt))
return {"error": str(last_err), "status": "unavailable"}
def _canonical_ticker(symbol: str) -> str:
token = str(symbol or "").strip().upper()
if not token:
return token
return _TICKER_ALIASES.get(token, token)
def _normalize_ticker_list(symbols: Iterable[str]) -> List[str]:
seen = set()
normalized: List[str] = []
for symbol in symbols or []:
token = _canonical_ticker(symbol)
if not token or token in seen:
continue
seen.add(token)
normalized.append(token)
return normalized
def load_watchlist_tickers() -> List[str]:
if not WATCHLIST_PATH.exists():
return []
tickers: List[str] = []
raw_text = WATCHLIST_PATH.read_text(encoding="utf-8")
for raw_line in raw_text.splitlines():
line = raw_line.split("#", 1)[0].strip()
if not line:
continue
parts = [p for p in re.split(r"[\s,]+", line) if p]
tickers.extend(parts)
return _normalize_ticker_list(tickers)
def _now_utc_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def _load_json_array(path: Path) -> List[Dict[str, Any]]:
if not path.exists():
return []
text = path.read_text(encoding="utf-8").strip()
if not text:
return []
try:
data = json.loads(text)
except json.JSONDecodeError as exc:
raise RuntimeError(f"Unable to parse existing JSON file as array: {path}") from exc
if isinstance(data, list):
return data
# Fallback: wrap single object
if isinstance(data, dict):
return [data]
raise RuntimeError(f"Unexpected JSON root type in {path}: {type(data).__name__}")
def _parse_llm_json(raw_content: str) -> Dict[str, Any]:
"""
Parse JSON returned by an LLM, being tolerant of common wrappers like
Markdown code fences (```json ... ```).
"""
text = (raw_content or "").strip()
if text.startswith("```"):
# Strip leading ``` or ```json and trailing ``` if present
# Split once on newline to drop the first fence line.
parts = text.split("\n", 1)
text = parts[1] if len(parts) == 2 else ""
if text.rstrip().endswith("```"):
text = text.rsplit("```", 1)[0]
text = text.strip()
return json.loads(text)
def _ensure_meta_fields(obj: Dict[str, Any], symbol: str, mode: str) -> Dict[str, Any]:
meta = obj.get("meta") or {}
if not isinstance(meta, dict):
meta = {}
meta["symbol"] = symbol
meta["generated_at"] = _now_utc_iso()
meta["mode"] = mode
obj["meta"] = meta
return obj
def _safe_float(value: Any, fallback: float = 0.0) -> float:
try:
num = float(value)
except (TypeError, ValueError):
return fallback
if not math.isfinite(num):
return fallback
return num
def _build_forecast_prompt(
symbol: str,
mode: str,
current_price: float,
sma_5: float,
sentiment_score: float,
) -> str:
return f"""You are a financial forecasting assistant.
Given the stock ticker "{symbol}", produce a concise next-session forecast.
Current IRIS metrics (use these as factual context):
- current_price_usd: {current_price:.4f}
- sma_5_usd: {sma_5:.4f}
- sentiment_score: {sentiment_score:.4f}
Respond with a single JSON object with this exact structure and field names:
{{
"meta": {{
"symbol": "{symbol}",
"generated_at": "<ISO8601-UTC timestamp>",
"mode": "{mode}"
}},
"market": {{
"current_price": <float>,
"predicted_price_next_session": <float>
}},
"signals": {{
"trend_label": "<exactly one of: STRONG UPTREND, WEAK UPTREND, WEAK DOWNTREND, STRONG DOWNTREND>",
"sentiment_score": <float between -1 and 1>,
"check_engine_light": "<string description like ' RED (..)' or ' YELLOW (..)' or ' GREEN (..)'>"
}},
"evidence": {{
"headlines_used": [
{{"title": "<short headline 1>", "url": ""}},
{{"title": "<short headline 2>", "url": ""}}
]
}}
}}
Rules:
- Only output raw JSON (no markdown, no code fences, no commentary).
- Use realistic but approximate prices in USD.
- Set market.current_price to current_price_usd exactly.
- Set signals.sentiment_score to sentiment_score exactly.
- Use sma_5_usd relative to current_price_usd and sentiment_score for trend reasoning.
- headlines_used items must be JSON objects with "title" (string) and "url" (empty string ""). Never output raw strings in that array."""
def _build_horizon_forecast_prompt(
symbol: str,
mode: str,
current_price: float,
sma_5: float,
rsi_14: float,
sentiment_score: float,
horizon_label: str,
horizon_days: int,
headlines_summary: str,
) -> str:
return f"""You are a quantitative financial analyst.
Given the stock ticker "{symbol}", produce a forecast for the {horizon_label} horizon ({horizon_days} trading days).
Current metrics:
- current_price_usd: {current_price:.4f}
- sma_5_usd: {sma_5:.4f}
- rsi_14: {rsi_14:.2f}
- sentiment_score: {sentiment_score:.4f}
- horizon: {horizon_label} ({horizon_days} trading days)
Recent relevant headlines:
{headlines_summary}
Respond with ONLY a single JSON object (no markdown, no code fences):
{{
"meta": {{
"symbol": "{symbol}",
"generated_at": "<ISO8601-UTC timestamp>",
"mode": "{mode}",
"horizon": "{horizon_label}",
"horizon_days": {horizon_days}
}},
"market": {{
"current_price": {current_price:.4f},
"predicted_price_horizon": <float>,
"predicted_price_next_session": <float>
}},
"signals": {{
"trend_label": "<STRONG UPTREND|WEAK UPTREND|WEAK DOWNTREND|STRONG DOWNTREND>",
"sentiment_score": {sentiment_score:.4f},
"check_engine_light": "<GREEN (..)|YELLOW (..)|RED (..)>",
"investment_signal": "<STRONG BUY|BUY|HOLD|SELL|STRONG SELL>"
}},
"evidence": {{
"headlines_used": [
{{"title": "<headline 1>", "url": ""}},
{{"title": "<headline 2>", "url": ""}}
]
}},
"reasoning": "<2-3 sentence explanation of why you made this prediction>"
}}
Rules:
- investment_signal MUST be exactly one of: STRONG BUY, BUY, HOLD, SELL, STRONG SELL
- trend_label must be plain ASCII text with no emoji, no Unicode symbols, no special characters
- predicted_price_horizon is the price at END of the {horizon_label} period
- reasoning should reference the metrics and headlines provided
- Only output raw JSON"""
def get_chatgpt52_forecast(
symbol: str,
current_price: float,
sma_5: float,
sentiment_score: float,
*,
mode: str = "live_forecast",
) -> Dict[str, Any]:
"""
Call ChatGPT 5.2 (or configured OpenAI model) to get a forecast JSON.
Requires:
- OPENAI_API_KEY in environment
- Optional OPENAI_MODEL_CHATGPT52 for model override (default: gpt-4o)
"""
try:
from openai import OpenAI # type: ignore
except ImportError as exc:
raise RuntimeError("openai package is not installed. Install with 'pip install openai'.") from exc
client = OpenAI()
model_name = os.environ.get("OPENAI_MODEL_CHATGPT52", "gpt-4o")
prompt = _build_forecast_prompt(
symbol,
mode,
current_price,
sma_5,
sentiment_score,
)
response = client.chat.completions.create(
model=model_name,
messages=[
{"role": "system", "content": "You produce structured JSON forecasts for US equities."},
{"role": "user", "content": prompt},
],
temperature=0.4,
)
content = response.choices[0].message.content or ""
data = _parse_llm_json(content)
return _ensure_meta_fields(data, symbol, mode)
def get_deepseek_v3_forecast(
symbol: str,
current_price: float,
sma_5: float,
sentiment_score: float,
*,
mode: str = "live_forecast",
) -> Dict[str, Any]:
"""
Call DeepSeek V3 API (OpenAI-compatible HTTP) to get a forecast JSON.
Requires:
- DEEPSEEK_API_KEY in environment
- Optional DEEPSEEK_BASE_URL (default: https://api.deepseek.com)
- Optional DEEPSEEK_MODEL (default: deepseek-chat)
"""
try:
import requests # type: ignore
except ImportError as exc:
raise RuntimeError("requests package is not installed. Install with 'pip install requests'.") from exc
api_key = os.environ.get("DEEPSEEK_API_KEY")
if not api_key:
raise RuntimeError("DEEPSEEK_API_KEY environment variable is required.")
base_url = os.environ.get("DEEPSEEK_BASE_URL", "https://api.deepseek.com")
model_name = os.environ.get("DEEPSEEK_MODEL", "deepseek-chat")
url = f"{base_url.rstrip('/')}/v1/chat/completions"
prompt = _build_forecast_prompt(
symbol,
mode,
current_price,
sma_5,
sentiment_score,
)
payload = {
"model": model_name,
"messages": [
{"role": "system", "content": "You produce structured JSON forecasts for US equities."},
{"role": "user", "content": prompt},
],
"temperature": 0.4,
}
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
resp = requests.post(url, headers=headers, json=payload, timeout=60)
resp.raise_for_status()
body = resp.json()
content = body["choices"][0]["message"]["content"]
data = _parse_llm_json(content)
return _ensure_meta_fields(data, symbol, mode)
def get_geminiv3_forecast(
symbol: str,
current_price: float,
sma_5: float,
sentiment_score: float,
*,
mode: str = "live_forecast",
) -> Dict[str, Any]:
"""
Call Gemini V3 Pro via google-genai client to get a forecast JSON.
Requires:
- GEMINI_API_KEY in environment
- Optional GEMINI_MODEL (default: gemini-3-flash-preview or similar)
"""
try:
from google import genai # type: ignore
except ImportError as exc:
raise RuntimeError("google-genai package is not installed. Install with 'pip install google-genai'.") from exc
api_key = os.environ.get("GEMINI_API_KEY")
if not api_key:
raise RuntimeError("GEMINI_API_KEY environment variable is required.")
model_name = os.environ.get("GEMINI_MODEL", "gemini-3-flash-preview")
client = genai.Client(api_key=api_key)
prompt = _build_forecast_prompt(
symbol,
mode,
current_price,
sma_5,
sentiment_score,
)
response = client.models.generate_content(model=model_name, contents=prompt)
content = response.text or ""
data = _parse_llm_json(content)
return _ensure_meta_fields(data, symbol, mode)
def predict_with_llms(
symbol: str,
current_price: float,
sma_5: float,
rsi_14: float,
sentiment_score: float,
horizon: str,
horizon_days: int,
horizon_label: str,
headlines_summary: str,
mode: str = "live_forecast",
) -> dict:
"""Call all three LLM providers in parallel with retries and return all 3 keys."""
del horizon # Kept for compatibility with endpoint call signature.
prompt = _build_horizon_forecast_prompt(
symbol=symbol,
mode=mode,
current_price=current_price,
sma_5=sma_5,
rsi_14=rsi_14,
sentiment_score=sentiment_score,
horizon_label=horizon_label,
horizon_days=horizon_days,
headlines_summary=headlines_summary,
)
def _call_chatgpt():
api_key = os.environ.get("OPENAI_API_KEY", "").strip()
if not api_key:
return {"error": "OPENAI_API_KEY not configured", "status": "unavailable"}
from openai import OpenAI # type: ignore
client = OpenAI(api_key=api_key)
model_name = os.environ.get("OPENAI_MODEL_CHATGPT52", "gpt-4o")
resp = client.chat.completions.create(
model=model_name,
messages=[
{"role": "system", "content": "You produce structured JSON forecasts for US equities."},
{"role": "user", "content": prompt},
],
temperature=0.4,
max_tokens=800,
)
data = _parse_llm_json(resp.choices[0].message.content or "")
return _ensure_meta_fields(data, symbol, mode)
def _call_deepseek():
api_key = os.environ.get("DEEPSEEK_API_KEY", "").strip()
if not api_key:
return {"error": "DEEPSEEK_API_KEY not configured", "status": "unavailable"}
import requests as req # type: ignore
base_url = os.environ.get("DEEPSEEK_BASE_URL", "https://api.deepseek.com")
model_name = os.environ.get("DEEPSEEK_MODEL", "deepseek-chat")
resp = req.post(
f"{base_url.rstrip('/')}/v1/chat/completions",
headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"},
json={
"model": model_name,
"messages": [
{"role": "system", "content": "You produce structured JSON forecasts for US equities."},
{"role": "user", "content": prompt},
],
"temperature": 0.4,
"max_tokens": 800,
},
timeout=20,
)
resp.raise_for_status()
data = _parse_llm_json(resp.json()["choices"][0]["message"]["content"])
return _ensure_meta_fields(data, symbol, mode)
def _call_gemini():
api_key = os.environ.get("GEMINI_API_KEY", "").strip()
if not api_key:
return {"error": "GEMINI_API_KEY not configured", "status": "unavailable"}
from google import genai # type: ignore
model_name = os.environ.get("GEMINI_MODEL", "gemini-2.5-flash")
client = genai.Client(api_key=api_key)
resp = client.models.generate_content(model=model_name, contents=prompt)
data = _parse_llm_json(resp.text or "")
return _ensure_meta_fields(data, symbol, mode)
results = {}
with ThreadPoolExecutor(max_workers=3) as executor:
futures = {
executor.submit(_retry_llm_call, _call_chatgpt): "chatgpt52",
executor.submit(_retry_llm_call, _call_deepseek): "deepseek_v3",
executor.submit(_retry_llm_call, _call_gemini): "gemini_v3_pro",
}
try:
for future in as_completed(futures, timeout=45):
model_key = futures[future]
try:
results[model_key] = future.result()
except Exception as e:
results[model_key] = {"error": str(e), "status": "unavailable"}
except Exception:
# Preserve completed results and mark unresolved calls as unavailable.
pass
for model_key in ("chatgpt52", "deepseek_v3", "gemini_v3_pro"):
if model_key not in results:
results[model_key] = {"error": "Request timed out", "status": "unavailable"}
return results
_VALID_SIGNALS = {"STRONG BUY", "BUY", "HOLD", "SELL", "STRONG SELL"}
def _normalize_llm_result(result: dict) -> dict:
"""Normalize signal and ensure reasoning exists."""
if "error" in result:
return result
signals = result.get("signals", {})
raw_signal = str(signals.get("investment_signal", "")).strip().upper()
if raw_signal not in _VALID_SIGNALS:
signals["investment_signal"] = "HOLD"
else:
signals["investment_signal"] = raw_signal
result["signals"] = signals
reasoning = str(result.get("reasoning", "")).strip()
if not reasoning or len(reasoning) < 10:
trend = str(signals.get("trend_label", "neutral")).lower().strip()
price = result.get("market", {}).get("predicted_price_horizon", "N/A")
reasoning = f"Model predicts {trend} trend to ${price}."
# Truncate at sentence boundary (defensive), avoiding mid-word cuts.
max_len = 500
if len(reasoning) > max_len:
truncated = reasoning[:max_len]
last_period = truncated.rfind(". ")
if last_period > 100:
reasoning = truncated[:last_period + 1]
else:
last_space = truncated.rfind(" ")
if last_space > 100:
reasoning = truncated[:last_space] + "\u2026"
else:
reasoning = truncated + "\u2026"
result["reasoning"] = reasoning
return result
def _normalize_llm_signal(result: dict) -> dict:
"""Backward-compatible alias for legacy callers."""
return _normalize_llm_result(result)
def generate_reports_for_watchlist(*, mode: str = "live_forecast") -> None:
tickers = load_watchlist_tickers()
if not tickers:
print("No tickers found in watchlist.txt; nothing to do.")
return
LLM_REPORTS_DIR.mkdir(parents=True, exist_ok=True)
files = {
"chatgpt52": LLM_REPORTS_DIR / "chatgpt_5.2.json",
"deepseek_v3": LLM_REPORTS_DIR / "deepseek_v3.json",
"gemini_v3_pro": LLM_REPORTS_DIR / "gemini_v3_pro.json",
}
existing: Dict[str, List[Dict[str, Any]]] = {
key: _load_json_array(path) for key, path in files.items()
}
app = IRIS_System()
for symbol in tickers:
print(f"Fetching LLM forecasts for {symbol}...")
market_data = app.get_market_data(symbol) or {}
sentiment_raw, _headlines = app.analyze_news(symbol)
current_price = _safe_float(market_data.get("current_price"), 0.0)
sma_5 = current_price
history_df = market_data.get("history_df")
if history_df is not None:
try:
sma_5 = _safe_float(history_df["sma_5"].iloc[-1], current_price)
except Exception:
sma_5 = current_price
sentiment_score = _safe_float(sentiment_raw, 0.0)
# ChatGPT 5.2
try:
chatgpt_obj = get_chatgpt52_forecast(
symbol,
current_price,
sma_5,
sentiment_score,
mode=mode,
)
except Exception as exc:
print(f" ChatGPT 5.2 error for {symbol}: {exc}")
else:
existing["chatgpt52"].append(chatgpt_obj)
# DeepSeek V3
try:
deepseek_obj = get_deepseek_v3_forecast(
symbol,
current_price,
sma_5,
sentiment_score,
mode=mode,
)
except Exception as exc:
print(f" DeepSeek V3 error for {symbol}: {exc}")
else:
existing["deepseek_v3"].append(deepseek_obj)
# Gemini V3 Pro
try:
gemini_obj = get_geminiv3_forecast(
symbol,
current_price,
sma_5,
sentiment_score,
mode=mode,
)
except Exception as exc:
print(f" Gemini V3 Pro error for {symbol}: {exc}")
else:
existing["gemini_v3_pro"].append(gemini_obj)
for key, path in files.items():
path.write_text(json.dumps(existing[key], indent=2), encoding="utf-8")
print(f"Wrote {len(existing[key])} entries to {path}")
def main() -> int:
# For now, we just run once in "live_forecast" mode for all watchlist tickers.
generate_reports_for_watchlist(mode="live_forecast")
return 0
if __name__ == "__main__":
raise SystemExit(main())