Janus-backend / backend /app /agents /finance_node.py
DevodG's picture
deploy: Janus full system stabilization
24f95f0
"""
Finance data node — Alpha Vantage integration.
Fetches market data, fundamentals, sentiment, and economic indicators.
No chart rendering — raw structured data only.
"""
import asyncio
from typing import List, Dict, Any, Optional
import json
import os, re, logging
from app.agents._model import call_model, safe_parse
from app.config import load_prompt
from app.domain_packs.finance.entity_resolver import extract_entities
from app.domain_packs.finance.market_data import get_company_overview, get_quote
from app.domain_packs.finance.news import get_company_news, get_top_headlines
from app.domain_packs.finance.ticker_resolver import extract_tickers, resolve_company_to_ticker
logger = logging.getLogger(__name__)
AV_BASE = "https://www.alphavantage.co/query"
AV_KEY = os.getenv("ALPHA_VANTAGE_API_KEY", os.getenv("ALPHAVANTAGE_API_KEY", "demo"))
async def av_get(function: str, **params) -> dict:
"""Single Alpha Vantage GET call (Async)."""
try:
async with httpx.AsyncClient(timeout=20) as client:
r = await client.get(
AV_BASE,
params={"function": function, "apikey": AV_KEY, **params}
)
r.raise_for_status()
data = r.json()
if "Information" in data or "Note" in data:
return {"error": data.get("Information") or data.get("Note")}
return data
except Exception as e:
return {"error": str(e)}
def extract_tickers_list(intent: str) -> List[str]:
"""Extract list of ticker symbols from intent."""
return extract_tickers(intent)
async def resolve_all_tickers(intent: str) -> List[str]:
"""Resolve multiple company names to tickers (Async)."""
tickers = set()
# Try direct resolution of the whole intent
res = await resolve_company_to_ticker(intent)
if res:
tickers.add(res)
try:
entities = extract_entities(intent)
for entity in entities:
if entity.get("type") != "company":
continue
res = await resolve_company_to_ticker(entity.get("text", ""))
if res:
tickers.add(res)
except Exception as e:
logger.debug("Multi-entity ticker resolution failed: %s", e)
# If still empty and short, try Alpha Vantage search fallback
if not tickers:
cleaned = re.sub(r"[^A-Za-z0-9 .&-]", " ", intent).strip()
if len(cleaned.split()) <= 6:
result = await av_get("SYMBOL_SEARCH", keywords=cleaned)
matches = result.get("bestMatches", [])
for m in matches[:2]:
tickers.add(m.get("1. symbol"))
return list(tickers)
async def _gather_single_ticker(ticker: str) -> Dict[str, Any]:
"""Gather all data for one ticker in parallel."""
try:
# Fetching quote, overview, and news concurrently for this ticker
quote_task = get_quote(ticker)
overview_task = get_company_overview(ticker)
quote, overview = await asyncio.gather(quote_task, overview_task)
company_name = overview.get("Name") or ticker
news = await get_company_news(company_name, days_back=7, symbol=ticker)
# Strip chart-ish fields
for drop_key in ["52WeekHigh", "52WeekLow", "50DayMovingAverage", "200DayMovingAverage", "AnalystTargetPrice"]:
overview.pop(drop_key, None)
return {
"quote": quote,
"fundamentals": overview,
"news_sentiment": news[:3]
}
except Exception as e:
logger.warning(f"Failed to gather data for {ticker}: {e}")
return {"error": str(e)}
async def run(state: dict) -> dict:
"""Async finance node — parallel data gathering with strict timeouts."""
route = state.get("route", {})
intent = route.get("intent", "")
# Step 1: resolve tickers (multiple supported for comparisons)
found_tickers = extract_tickers_list(intent)
resolved_tickers = await resolve_all_tickers(intent)
all_tickers = list(dict.fromkeys(found_tickers + resolved_tickers))[:3]
logger.info(f"[finance_node] Parallel processing tickers: {all_tickers}")
gathered = {"tickers": {}}
if all_tickers:
try:
# Step 2: Parallelize data fetching across all tickers
# Total timeout of 35s for the entire gathering phase
tasks = [_gather_single_ticker(ticker) for ticker in all_tickers]
results = await asyncio.wait_for(asyncio.gather(*tasks), timeout=35.0)
for ticker, result in zip(all_tickers, results):
if result and "error" not in result:
gathered["tickers"][ticker] = result
except asyncio.TimeoutError:
logger.warning("[finance_node] Data gathering timed out — proceeding with partial data")
else:
# No specific tickers — fetch macro / market-wide data
macro_tasks = [
av_get("TOP_GAINERS_LOSERS"),
get_top_headlines(category="business", page_size=5)
]
results = await asyncio.gather(*macro_tasks)
gathered["top_movers"] = results[0]
gathered["news_general"] = results[1][:5]
# Step 3: if macro / economic query, add indicators
macro_keywords = ["gdp", "inflation", "cpi", "interest rate", "federal", "economy", "recession", "growth", "unemployment"]
if any(kw in intent.lower() for kw in macro_keywords):
macro_keys = ["REAL_GDP", "CPI", "INFLATION"]
macro_results = await asyncio.gather(*[av_get(k) for k in macro_keys])
gathered["macro"] = {
"gdp": macro_results[0],
"cpi": macro_results[1],
"inflation": macro_results[2]
}
# Step 4: LLM interprets the gathered data
prompt = load_prompt("finance")
messages = [
{"role": "system", "content": prompt},
{
"role": "user",
"content": (
f"User intent: {intent}\n\n"
f"Market Data:\n{json.dumps(gathered, indent=2, default=str)}\n\n"
"Analyse this financial data and return ONLY valid JSON:\n"
"{\n"
' "ticker": "<main symbol or null>",\n'
' "signals": ["<signal 1>", "<signal 2>"],\n'
' "risks": ["<risk 1>"],\n'
' "sentiment": "bullish | bearish | neutral",\n'
' "key_metrics": {"<metric>": "<value>"},\n'
' "data_quality": "good | partial | limited",\n'
' "summary": "<2-3 sentence plain English summary comparison>"\n'
"}\n"
),
},
]
try:
raw_response = await asyncio.to_thread(call_model, messages)
result = safe_parse(raw_response)
except Exception as e:
logger.error(f"[AGENT ERROR] finance_node: {e}")
# Deterministic fallback
fallback_summary = []
for t, data in gathered.get("tickers", {}).items():
price = data.get("quote", {}).get("05. price")
fallback_summary.append(f"{t}: Price {price}.")
result = {
"ticker": all_tickers[0] if all_tickers else None,
"signals": ["Data retrieved"],
"risks": ["LLM synthesis failed"],
"sentiment": "neutral",
"key_metrics": {},
"data_quality": "partial",
"summary": " ".join(fallback_summary) or "No market data retrieved.",
"mode": "deterministic_fallback",
}
return {**state, "finance": result}