Spaces:
Running
Running
| import os | |
| import sys | |
| import pandas as pd | |
| import ast | |
| from dotenv import load_dotenv | |
| from typing import TypedDict, List, Dict, Any | |
| from langgraph.graph import StateGraph, END | |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| from agents.tool_calling_agents import WebResearchAgent, MarketDataAgent, InternalPortfolioAgent | |
| from agents.data_analysis_agent import DataAnalysisAgent | |
| from features.utils import call_gemini | |
| # --- Configuration --- | |
| load_dotenv() | |
| # --- Initialize workers (Stateless) --- | |
| web_agent = WebResearchAgent() | |
| market_agent = MarketDataAgent() | |
| portfolio_agent = InternalPortfolioAgent() | |
| # --- Define the Enhanced State --- | |
| class AgentState(TypedDict): | |
| task: str | |
| symbol: str | |
| web_research_results: str | |
| market_data_results: str | |
| portfolio_data_results: str | |
| scan_intent: str # "DOWNWARD", "UPWARD", "ALL", or None | |
| # --- NEW FIELDS FOR ANALYSIS --- | |
| analysis_dataframe: pd.DataFrame | |
| analysis_results: Dict[str, Any] | |
| final_report: str | |
| # Debug fields | |
| debug_market_data_raw: Any | |
| debug_dataframe_head: Any | |
| debug_analysis_results_full: Any | |
| def get_orchestrator(llm_provider="gemini", api_key=None): | |
| """ | |
| Factory function to create the orchestrator graph with a specific LLM. | |
| """ | |
| # 2. Initialize Data Analyzer (Now uses global call_gemini fallback) | |
| data_analyzer = DataAnalysisAgent() | |
| # 3. Define Nodes | |
| # 3. Define Nodes (Closure captures 'llm' and 'data_analyzer') | |
| def extract_symbol_step(state: AgentState): | |
| print("--- π¬ Symbol & Time Range Extraction ---") | |
| prompt = f""" | |
| Analyze the user's request: "{state['task']}" | |
| Extract TWO things: | |
| 1. Stock symbol or scan intent | |
| 2. Time range (if mentioned) | |
| RULES: | |
| - If request mentions a SPECIFIC company β Extract symbol | |
| - If request mentions time period β Extract time range | |
| - ONLY set scan_intent for "top gainers", "losers", "scan market" | |
| Response Format: JSON ONLY. | |
| {{ | |
| "symbol": "TICKER" or null, | |
| "scan_intent": "DOWNWARD" | "UPWARD" | "ALL" or null, | |
| "time_range": "INTRADAY" | "1D" | "3D" | "1W" | "1M" | "3M" | "1Y" or null | |
| }} | |
| Time Range Examples: | |
| - "today", "now", "current", "recent" β "INTRADAY" | |
| - "yesterday", "1 day back" β "1D" | |
| - "3 days back", "last 3 days" β "3D" | |
| - "last week", "1 week", "7 days" β "1W" | |
| - "last month", "1 month", "30 days" β "1M" | |
| - "3 months", "quarter" β "3M" | |
| - "1 year", "12 months" β "1Y" | |
| Full Examples: | |
| - "Analyze Tesla" β {{"symbol": "TSLA", "scan_intent": null, "time_range": null}} | |
| - "3 days back stocks of Tesla" β {{"symbol": "TSLA", "scan_intent": null, "time_range": "3D"}} | |
| - "Last week AAPL performance" β {{"symbol": "AAPL", "scan_intent": null, "time_range": "1W"}} | |
| - "1 month trend for NVDA" β {{"symbol": "NVDA", "scan_intent": null, "time_range": "1M"}} | |
| - "Recent analysis of Tesla" β {{"symbol": "TSLA", "scan_intent": null, "time_range": "INTRADAY"}} | |
| - "Show me top gainers" β {{"symbol": null, "scan_intent": "UPWARD", "time_range": null}} | |
| CRITICAL: Default to null for time_range if not explicitly mentioned! | |
| """ | |
| raw_response = call_gemini(prompt).strip() | |
| symbol = None | |
| scan_intent = None | |
| time_range = None | |
| try: | |
| import json | |
| import re | |
| # Find JSON in response | |
| json_match = re.search(r'\{.*\}', raw_response, re.DOTALL) | |
| if json_match: | |
| data = json.loads(json_match.group(0)) | |
| symbol = data.get("symbol") | |
| scan_intent = data.get("scan_intent") | |
| time_range = data.get("time_range") | |
| else: | |
| print(f" WARNING: No JSON found in extraction response: {raw_response}") | |
| # Fallback to simple cleaning | |
| clean_resp = raw_response.strip().upper() | |
| if "SCAN" in clean_resp or "GAINERS" in clean_resp or "LOSERS" in clean_resp: | |
| scan_intent = "ALL" | |
| elif len(clean_resp) <= 5 and clean_resp.isalpha(): | |
| symbol = clean_resp | |
| except Exception as e: | |
| print(f" Error parsing symbol extraction: {e}") | |
| if symbol: symbol = symbol.upper().replace("$", "") | |
| # Default time_range to 1M if null (INTRADAY is premium-only, 1M uses free DAILY endpoint) | |
| if time_range is None: | |
| time_range = "1M" | |
| print(f" Raw LLM Response: {raw_response}") | |
| print(f" Extracted Symbol: {symbol}") | |
| print(f" Scan Intent: {scan_intent}") | |
| print(f" Time Range: {time_range}") | |
| return {"symbol": symbol, "scan_intent": scan_intent, "time_range": time_range} | |
| def web_research_step(state: AgentState): | |
| print("--- π Web Research ---") | |
| if state.get("scan_intent"): | |
| return {"web_research_results": "Market Scan initiated. Web research skipped for individual stock."} | |
| results = web_agent.research(queries=[state['task']]) | |
| return {"web_research_results": results} | |
| def market_data_step(state: AgentState): | |
| print("--- π Market Data Retrieval ---") | |
| # Handle scan intent | |
| if state.get("scan_intent"): | |
| print(f" Scan Intent Detected: {state['scan_intent']}") | |
| # Load watchlist | |
| import json | |
| watchlist_path = "watchlist.json" | |
| if not os.path.exists(watchlist_path): | |
| return {"market_data_results": {"error": "Watchlist not found. Please add symbols to your watchlist."}} | |
| with open(watchlist_path, 'r') as f: | |
| watchlist = json.load(f) | |
| scan_results = [] | |
| scan_intent = state['scan_intent'] | |
| for sym in watchlist: | |
| try: | |
| # Use GLOBAL_QUOTE for real-time price (free tier) | |
| quote = market_agent.get_global_quote(symbol=sym) | |
| quote_data = quote.get("data", {}) | |
| price = float(quote_data.get("price", 0)) | |
| change_pct_str = quote_data.get("change_percent", "0%").replace("%", "") | |
| pct_change = float(change_pct_str) if change_pct_str else 0 | |
| if scan_intent == "UPWARD" and pct_change > 0: | |
| scan_results.append({"symbol": sym, "price": price, "change": pct_change}) | |
| elif scan_intent == "DOWNWARD" and pct_change < 0: | |
| scan_results.append({"symbol": sym, "price": price, "change": pct_change}) | |
| elif scan_intent == "ALL": | |
| scan_results.append({"symbol": sym, "price": price, "change": pct_change}) | |
| except Exception as e: | |
| print(f" β οΈ Error scanning {sym}: {e}") | |
| # Sort by change | |
| scan_results.sort(key=lambda x: x['change'], reverse=True) | |
| return {"market_data_results": {"scan_results": scan_results}} | |
| # Single symbol analysis | |
| if not state.get("symbol"): | |
| return {"market_data_results": "Skipped."} | |
| symbol = state["symbol"] | |
| combined_data = {"symbol": symbol} | |
| # 1. Get REAL current price via GLOBAL_QUOTE (free tier) | |
| try: | |
| import time | |
| quote = market_agent.get_global_quote(symbol=symbol) | |
| combined_data["quote"] = quote.get("data", {}) | |
| combined_data["quote_source"] = quote.get("source", "Unknown") | |
| print(f" β Real-time quote: ${combined_data['quote'].get('price', 'N/A')}") | |
| time.sleep(1) # Respect rate limit (1 req/sec) | |
| except Exception as e: | |
| print(f" β οΈ Quote fetch failed: {e}") | |
| combined_data["quote"] = {} | |
| # 2. Get REAL fundamentals via OVERVIEW (free tier) | |
| try: | |
| overview = market_agent.get_company_overview(symbol=symbol) | |
| combined_data["overview"] = overview.get("data", {}) | |
| combined_data["overview_source"] = overview.get("source", "Unknown") | |
| print(f" β Company: {combined_data['overview'].get('Name', symbol)}, P/E: {combined_data['overview'].get('PERatio', 'N/A')}") | |
| import time | |
| time.sleep(1) # Respect rate limit | |
| except Exception as e: | |
| print(f" β οΈ Overview fetch failed: {e}") | |
| combined_data["overview"] = {} | |
| # 3. Get historical data via DAILY (free tier) for trend analysis | |
| try: | |
| time_range = state.get("time_range", "1M") | |
| # Map INTRADAY to 1M for free tier compatibility | |
| if time_range == "INTRADAY": | |
| time_range = "1M" | |
| print(f" Fetching DAILY data for {symbol} (time_range={time_range})") | |
| results = market_agent.get_market_data(symbol=symbol, time_range=time_range) | |
| combined_data["daily_data"] = results | |
| source = results.get("meta_data", {}).get("Source", "Unknown") | |
| data_points = len(results.get("data", {})) | |
| print(f" β Daily data: {data_points} data points (Source: {source})") | |
| except Exception as e: | |
| print(f" β οΈ Daily data fetch failed: {e}") | |
| combined_data["daily_data"] = {} | |
| return {"market_data_results": combined_data, "debug_market_data_raw": combined_data} | |
| def portfolio_data_step(state: AgentState): | |
| print("--- πΌ Internal Portfolio Data ---") | |
| if state.get("scan_intent"): | |
| return {"portfolio_data_results": "Market Scan initiated. Portfolio context skipped."} | |
| if not state.get("symbol"): | |
| return {"portfolio_data_results": "Skipped: No symbol provided."} | |
| try: | |
| results = portfolio_agent.query_portfolio(question=f"What is the current exposure to {state['symbol']}?") | |
| return {"portfolio_data_results": results} | |
| except Exception as e: | |
| print(f" β οΈ Portfolio data fetch failed (Private MCP may be down): {e}") | |
| return {"portfolio_data_results": f"Portfolio data unavailable (service error). Analysis continues without internal portfolio context."} | |
| def transform_data_step(state: AgentState): | |
| print("--- π Transforming Data for Analysis ---") | |
| if state.get("scan_intent"): | |
| return {"analysis_dataframe": pd.DataFrame()} # Skip transformation for scan | |
| market_data = state.get("market_data_results") | |
| if not isinstance(market_data, dict): | |
| print(" Skipping transformation: No valid market data received.") | |
| return {"analysis_dataframe": pd.DataFrame()} | |
| # Extract daily_data from the new combined format | |
| daily_data = market_data.get('daily_data', {}) | |
| time_series_data = daily_data.get('data', {}) if isinstance(daily_data, dict) else {} | |
| if not time_series_data: | |
| print(" Skipping transformation: No daily time series data available.") | |
| return {"analysis_dataframe": pd.DataFrame()} | |
| try: | |
| df = pd.DataFrame.from_dict(time_series_data, orient='index') | |
| df.index = pd.to_datetime(df.index) | |
| df.index.name = "timestamp" | |
| df.rename(columns={ | |
| '1. open': 'open', '2. high': 'high', '3. low': 'low', | |
| '4. close': 'close', '5. volume': 'volume' | |
| }, inplace=True) | |
| df = df.apply(pd.to_numeric).sort_index() | |
| print(f" Successfully created DataFrame with shape {df.shape}") | |
| return {"analysis_dataframe": df, "debug_dataframe_head": df.head().to_dict()} | |
| except Exception as e: | |
| print(f" CRITICAL ERROR during data transformation: {e}") | |
| return {"analysis_dataframe": pd.DataFrame()} | |
| def run_data_analysis_step(state: AgentState): | |
| print("--- π¬ Running Deep-Dive Data Analysis ---") | |
| if state.get("scan_intent"): | |
| return {"analysis_results": {}} # Skip analysis for scan | |
| df = state.get("analysis_dataframe") | |
| if df is not None and not df.empty: | |
| analysis_results = data_analyzer.run_analysis(df) | |
| return {"analysis_results": analysis_results, "debug_analysis_results_full": analysis_results} | |
| else: | |
| print(" Skipping analysis: No data to analyze.") | |
| return {"analysis_results": {}} | |
| def synthesize_report_step(state: AgentState): | |
| print("--- π Synthesizing Final Report ---") | |
| # Helper to truncate text to avoid Rate Limits | |
| def truncate(text, max_chars=3000): | |
| s = str(text) | |
| if len(s) > max_chars: | |
| return s[:max_chars] + "... (truncated)" | |
| return s | |
| # Check for Scan Results | |
| market_data_res = state.get("market_data_results", {}) | |
| if isinstance(market_data_res, dict) and "scan_results" in market_data_res: | |
| scan_results = market_data_res["scan_results"] | |
| # Truncate scan results if necessary (though usually small) | |
| scan_results_str = truncate(scan_results, 4000) | |
| report_prompt = f""" | |
| You are a senior financial analyst. The user requested a market scan: "{state['task']}". | |
| Scan Results (from Watchlist): | |
| {scan_results_str} | |
| Generate a "Market Scan Report". | |
| 1. Summary: Briefly explain the criteria and the overall market status based on these results. | |
| 2. Results Table: Create a markdown table with columns: Symbol | Price | % Change. | |
| 3. Conclusion: Highlight the most significant movers. | |
| """ | |
| final_report = call_gemini(report_prompt) | |
| return {"final_report": final_report} | |
| analysis_insights = state.get("analysis_results", {}).get("insights", "Not available.") | |
| # Truncate inputs for the main report | |
| web_data = truncate(state.get('web_research_results', 'Not available.'), 3000) | |
| portfolio_data = truncate(state.get('portfolio_data_results', 'Not available.'), 2000) | |
| # Extract rich data from combined market results | |
| market_data_raw = state.get("market_data_results", {}) | |
| data_sources = [] | |
| # Build rich market context from the new format | |
| quote_data = {} | |
| overview_data = {} | |
| if isinstance(market_data_raw, dict): | |
| quote_data = market_data_raw.get("quote", {}) | |
| overview_data = market_data_raw.get("overview", {}) | |
| if market_data_raw.get("quote_source"): | |
| data_sources.append(f"Price: {market_data_raw['quote_source']}") | |
| if market_data_raw.get("overview_source"): | |
| data_sources.append(f"Fundamentals: {market_data_raw['overview_source']}") | |
| daily = market_data_raw.get("daily_data", {}) | |
| if isinstance(daily, dict): | |
| src = daily.get("meta_data", {}).get("Source", "") | |
| if src: | |
| data_sources.append(f"Historical: {src}") | |
| data_source = " | ".join(data_sources) if data_sources else "Unknown" | |
| # Build a structured market data section | |
| market_context = f""" | |
| --- REAL-TIME PRICE (GLOBAL_QUOTE) --- | |
| Current Price: ${quote_data.get('price', 'N/A')} | |
| Change: {quote_data.get('change', 'N/A')} ({quote_data.get('change_percent', 'N/A')}) | |
| Open: ${quote_data.get('open', 'N/A')} | |
| High: ${quote_data.get('high', 'N/A')} | |
| Low: ${quote_data.get('low', 'N/A')} | |
| Volume: {quote_data.get('volume', 'N/A')} | |
| Previous Close: ${quote_data.get('previous_close', 'N/A')} | |
| --- COMPANY FUNDAMENTALS (OVERVIEW) --- | |
| Company: {overview_data.get('Name', 'N/A')} | |
| Sector: {overview_data.get('Sector', 'N/A')} | Industry: {overview_data.get('Industry', 'N/A')} | |
| Market Cap: ${overview_data.get('MarketCapitalization', 'N/A')} | |
| Revenue (TTM): ${overview_data.get('RevenueTTM', 'N/A')} | |
| EPS: ${overview_data.get('EPS', 'N/A')} | |
| P/E Ratio: {overview_data.get('PERatio', 'N/A')} | |
| Forward P/E: {overview_data.get('ForwardPE', 'N/A')} | |
| Profit Margin: {overview_data.get('ProfitMargin', 'N/A')} | |
| Operating Margin: {overview_data.get('OperatingMarginTTM', 'N/A')} | |
| Return on Equity: {overview_data.get('ReturnOnEquityTTM', 'N/A')} | |
| Beta: {overview_data.get('Beta', 'N/A')} | |
| 52-Week High: ${overview_data.get('52WeekHigh', 'N/A')} | |
| 52-Week Low: ${overview_data.get('52WeekLow', 'N/A')} | |
| Dividend Yield: {overview_data.get('DividendYield', 'N/A')} | |
| Analyst Target: ${overview_data.get('AnalystTargetPrice', 'N/A')} | |
| Quarterly Earnings Growth: {overview_data.get('QuarterlyEarningsGrowthYOY', 'N/A')} | |
| Quarterly Revenue Growth: {overview_data.get('QuarterlyRevenueGrowthYOY', 'N/A')} | |
| """ | |
| report_prompt = f""" | |
| You are a senior financial analyst writing a comprehensive "Alpha Report". | |
| Your task is to synthesize all available information into a structured, cited report. | |
| USE THE REAL FINANCIAL NUMBERS PROVIDED β do NOT say data is unavailable if numbers are given. | |
| Original User Task: {state['task']} | |
| Target Symbol: {state.get('symbol', 'Unknown')} | |
| Data Source: {data_source} | |
| --- | |
| Available Information: | |
| - Web Intelligence: {web_data} | |
| - Market Data & Fundamentals: {market_context} | |
| - Deep-Dive Data Analysis Insights: {analysis_insights} | |
| - Internal Portfolio Context: {portfolio_data} | |
| --- | |
| CRITICAL INSTRUCTIONS: | |
| 1. First, evaluate the "Available Information". | |
| - If the Target Symbol is 'Unknown' OR if the Web Intelligence and Market Data contain no meaningful information: | |
| You MUST respond with: "I am not sure about this company as I could not find sufficient data." | |
| Do NOT generate the rest of the report. | |
| 2. Otherwise, generate the "Alpha Report" with the following sections: | |
| > [!NOTE] | |
| > **Data Source**: {data_source} | |
| ## 1. Executive Summary | |
| A 2-3 sentence overview of the key findings and current situation. | |
| ## 2. Internal Context | |
| Detail the firm's current exposure: | |
| - IF the firm has shares > 0: Present as a markdown table: | |
| | Symbol | Shares | Avg Cost | Current Value | | |
| |--------|--------|----------|---------------| | |
| - IF the firm has 0 shares: State: "The firm has no current exposure to {state.get('symbol')}." | |
| ## 3. Market Data | |
| ALWAYS present as a markdown table: | |
| | Metric | Value | Implication | | |
| |--------|-------|-------------| | |
| | Current Price | $XXX.XX | +/-X.X% vs. open | | |
| | 5-Day Trend | Upward/Downward/Flat | Brief note | | |
| | Volume | X.XXM | Above/Below average | | |
| ## 4. Real-Time Intelligence | |
| ### News | |
| - **[Headline]** - [Brief summary] `[Source: URL]` | |
| - **[Headline]** - [Brief summary] `[Source: URL]` | |
| ### Filings (if any) | |
| - **[Filing Type]** - [Brief description] `[Source: URL]` | |
| ## 5. Sentiment Analysis | |
| **Overall Sentiment:** Bullish / Bearish / Neutral | |
| **Evidence:** | |
| - [Specific fact from news/data supporting this sentiment] | |
| - [Another supporting fact] | |
| ## 6. Synthesis & Recommendations | |
| Combine all information to provide actionable insights. Focus on: | |
| - Key risks and opportunities | |
| - Recommended actions (if any) | |
| - Items to monitor | |
| FORMATTING RULES: | |
| - Use markdown headers (##, ###) | |
| - Include URLs in backticks: `[Source: example.com]` | |
| - Use tables for structured data | |
| - Be concise but comprehensive | |
| """ | |
| final_report = call_gemini(report_prompt) | |
| return {"final_report": final_report} | |
| # 4. Build the Graph | |
| workflow = StateGraph(AgentState) | |
| workflow.add_node("extract_symbol", extract_symbol_step) | |
| workflow.add_node("web_researcher", web_research_step) | |
| workflow.add_node("market_data_analyst", market_data_step) | |
| workflow.add_node("portfolio_data_fetcher", portfolio_data_step) | |
| workflow.add_node("transform_data", transform_data_step) | |
| workflow.add_node("data_analyzer", run_data_analysis_step) | |
| workflow.add_node("report_synthesizer", synthesize_report_step) | |
| workflow.set_entry_point("extract_symbol") | |
| workflow.add_edge("extract_symbol", "web_researcher") | |
| workflow.add_edge("web_researcher", "market_data_analyst") | |
| workflow.add_edge("market_data_analyst", "portfolio_data_fetcher") | |
| workflow.add_edge("portfolio_data_fetcher", "transform_data") | |
| workflow.add_edge("transform_data", "data_analyzer") | |
| workflow.add_edge("data_analyzer", "report_synthesizer") | |
| workflow.add_edge("report_synthesizer", END) | |
| return workflow.compile() |