File size: 7,605 Bytes
24f95f0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
"""
Finance data node — Alpha Vantage integration.
Fetches market data, fundamentals, sentiment, and economic indicators.
No chart rendering — raw structured data only.
"""

import asyncio
from typing import List, Dict, Any, Optional
import json
import os, re, logging
from app.agents._model import call_model, safe_parse
from app.config import load_prompt
from app.domain_packs.finance.entity_resolver import extract_entities
from app.domain_packs.finance.market_data import get_company_overview, get_quote
from app.domain_packs.finance.news import get_company_news, get_top_headlines
from app.domain_packs.finance.ticker_resolver import extract_tickers, resolve_company_to_ticker

logger = logging.getLogger(__name__)

AV_BASE = "https://www.alphavantage.co/query"
AV_KEY = os.getenv("ALPHA_VANTAGE_API_KEY", os.getenv("ALPHAVANTAGE_API_KEY", "demo"))


async def av_get(function: str, **params) -> dict:
    """Single Alpha Vantage GET call (Async)."""
    try:
        async with httpx.AsyncClient(timeout=20) as client:
            r = await client.get(
                AV_BASE,
                params={"function": function, "apikey": AV_KEY, **params}
            )
        r.raise_for_status()
        data = r.json()
        if "Information" in data or "Note" in data:
            return {"error": data.get("Information") or data.get("Note")}
        return data
    except Exception as e:
        return {"error": str(e)}


def extract_tickers_list(intent: str) -> List[str]:
    """Extract list of ticker symbols from intent."""
    return extract_tickers(intent)


async def resolve_all_tickers(intent: str) -> List[str]:
    """Resolve multiple company names to tickers (Async)."""
    tickers = set()
    
    # Try direct resolution of the whole intent
    res = await resolve_company_to_ticker(intent)
    if res:
        tickers.add(res)

    try:
        entities = extract_entities(intent)
        for entity in entities:
            if entity.get("type") != "company":
                continue
            res = await resolve_company_to_ticker(entity.get("text", ""))
            if res:
                tickers.add(res)
    except Exception as e:
        logger.debug("Multi-entity ticker resolution failed: %s", e)

    # If still empty and short, try Alpha Vantage search fallback
    if not tickers:
        cleaned = re.sub(r"[^A-Za-z0-9 .&-]", " ", intent).strip()
        if len(cleaned.split()) <= 6:
            result = await av_get("SYMBOL_SEARCH", keywords=cleaned)
            matches = result.get("bestMatches", [])
            for m in matches[:2]:
                tickers.add(m.get("1. symbol"))
    
    return list(tickers)


async def _gather_single_ticker(ticker: str) -> Dict[str, Any]:
    """Gather all data for one ticker in parallel."""
    try:
        # Fetching quote, overview, and news concurrently for this ticker
        quote_task = get_quote(ticker)
        overview_task = get_company_overview(ticker)
        
        quote, overview = await asyncio.gather(quote_task, overview_task)
        
        company_name = overview.get("Name") or ticker
        news = await get_company_news(company_name, days_back=7, symbol=ticker)
        
        # Strip chart-ish fields
        for drop_key in ["52WeekHigh", "52WeekLow", "50DayMovingAverage", "200DayMovingAverage", "AnalystTargetPrice"]:
            overview.pop(drop_key, None)
            
        return {
            "quote": quote,
            "fundamentals": overview,
            "news_sentiment": news[:3]
        }
    except Exception as e:
        logger.warning(f"Failed to gather data for {ticker}: {e}")
        return {"error": str(e)}


async def run(state: dict) -> dict:
    """Async finance node — parallel data gathering with strict timeouts."""
    route = state.get("route", {})
    intent = route.get("intent", "")

    # Step 1: resolve tickers (multiple supported for comparisons)
    found_tickers = extract_tickers_list(intent)
    resolved_tickers = await resolve_all_tickers(intent)
    
    all_tickers = list(dict.fromkeys(found_tickers + resolved_tickers))[:3]
    logger.info(f"[finance_node] Parallel processing tickers: {all_tickers}")

    gathered = {"tickers": {}}

    if all_tickers:
        try:
            # Step 2: Parallelize data fetching across all tickers
            # Total timeout of 35s for the entire gathering phase
            tasks = [_gather_single_ticker(ticker) for ticker in all_tickers]
            results = await asyncio.wait_for(asyncio.gather(*tasks), timeout=35.0)
            
            for ticker, result in zip(all_tickers, results):
                if result and "error" not in result:
                    gathered["tickers"][ticker] = result
        except asyncio.TimeoutError:
            logger.warning("[finance_node] Data gathering timed out — proceeding with partial data")
    else:
        # No specific tickers — fetch macro / market-wide data
        macro_tasks = [
            av_get("TOP_GAINERS_LOSERS"),
            get_top_headlines(category="business", page_size=5)
        ]
        results = await asyncio.gather(*macro_tasks)
        gathered["top_movers"] = results[0]
        gathered["news_general"] = results[1][:5]

    # Step 3: if macro / economic query, add indicators
    macro_keywords = ["gdp", "inflation", "cpi", "interest rate", "federal", "economy", "recession", "growth", "unemployment"]
    if any(kw in intent.lower() for kw in macro_keywords):
        macro_keys = ["REAL_GDP", "CPI", "INFLATION"]
        macro_results = await asyncio.gather(*[av_get(k) for k in macro_keys])
        gathered["macro"] = {
            "gdp": macro_results[0],
            "cpi": macro_results[1],
            "inflation": macro_results[2]
        }

    # Step 4: LLM interprets the gathered data
    prompt = load_prompt("finance")
    messages = [
        {"role": "system", "content": prompt},
        {
            "role": "user",
            "content": (
                f"User intent: {intent}\n\n"
                f"Market Data:\n{json.dumps(gathered, indent=2, default=str)}\n\n"
                "Analyse this financial data and return ONLY valid JSON:\n"
                "{\n"
                '  "ticker": "<main symbol or null>",\n'
                '  "signals": ["<signal 1>", "<signal 2>"],\n'
                '  "risks": ["<risk 1>"],\n'
                '  "sentiment": "bullish | bearish | neutral",\n'
                '  "key_metrics": {"<metric>": "<value>"},\n'
                '  "data_quality": "good | partial | limited",\n'
                '  "summary": "<2-3 sentence plain English summary comparison>"\n'
                "}\n"
            ),
        },
    ]
    
    try:
        raw_response = await asyncio.to_thread(call_model, messages)
        result = safe_parse(raw_response)
    except Exception as e:
        logger.error(f"[AGENT ERROR] finance_node: {e}")

        # Deterministic fallback
        fallback_summary = []
        for t, data in gathered.get("tickers", {}).items():
            price = data.get("quote", {}).get("05. price")
            fallback_summary.append(f"{t}: Price {price}.")

        result = {
            "ticker": all_tickers[0] if all_tickers else None,
            "signals": ["Data retrieved"],
            "risks": ["LLM synthesis failed"],
            "sentiment": "neutral",
            "key_metrics": {},
            "data_quality": "partial",
            "summary": " ".join(fallback_summary) or "No market data retrieved.",
            "mode": "deterministic_fallback",
        }

    return {**state, "finance": result}