| | import os |
| | import sys |
| | import pandas as pd |
| | import ast |
| | from dotenv import load_dotenv |
| | from typing import TypedDict, List, Dict, Any |
| |
|
| | from langgraph.graph import StateGraph, END |
| |
|
| | sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| |
|
| | from agents.tool_calling_agents import WebResearchAgent, MarketDataAgent, InternalPortfolioAgent |
| | from agents.data_analysis_agent import DataAnalysisAgent |
| |
|
| | from langchain_google_genai import ChatGoogleGenerativeAI |
| |
|
| | |
| | load_dotenv() |
| |
|
| | |
| | web_agent = WebResearchAgent() |
| | market_agent = MarketDataAgent() |
| | portfolio_agent = InternalPortfolioAgent() |
| |
|
| | |
| | class AgentState(TypedDict): |
| | task: str |
| | symbol: str |
| | web_research_results: str |
| | market_data_results: str |
| | portfolio_data_results: str |
| | scan_intent: str |
| | |
| | analysis_dataframe: pd.DataFrame |
| | analysis_results: Dict[str, Any] |
| | final_report: str |
| | |
| | debug_market_data_raw: Any |
| | debug_dataframe_head: Any |
| | debug_analysis_results_full: Any |
| |
|
| | def get_orchestrator(llm_provider="gemini", api_key=None): |
| | """ |
| | Factory function to create the orchestrator graph with a specific LLM. |
| | """ |
| | |
| | |
| | if not api_key: |
| | api_key = os.getenv("GOOGLE_API_KEY") |
| | if not api_key: |
| | raise ValueError("Google Gemini API Key is missing.") |
| | llm = ChatGoogleGenerativeAI(model="gemini-flash-lite-latest", google_api_key=api_key, temperature=0, max_retries=5) |
| |
|
| | |
| | data_analyzer = DataAnalysisAgent(llm=llm) |
| |
|
| | |
| |
|
| | |
| |
|
| | def extract_symbol_step(state: AgentState): |
| | print("--- π¬ Symbol & Time Range Extraction ---") |
| | prompt = f""" |
| | Analyze the user's request: "{state['task']}" |
| | |
| | Extract TWO things: |
| | 1. Stock symbol or scan intent |
| | 2. Time range (if mentioned) |
| | |
| | RULES: |
| | - If request mentions a SPECIFIC company β Extract symbol |
| | - If request mentions time period β Extract time range |
| | - ONLY set scan_intent for "top gainers", "losers", "scan market" |
| | |
| | Response Format: JSON ONLY. |
| | {{ |
| | "symbol": "TICKER" or null, |
| | "scan_intent": "DOWNWARD" | "UPWARD" | "ALL" or null, |
| | "time_range": "INTRADAY" | "1D" | "3D" | "1W" | "1M" | "3M" | "1Y" or null |
| | }} |
| | |
| | Time Range Examples: |
| | - "today", "now", "current", "recent" β "INTRADAY" |
| | - "yesterday", "1 day back" β "1D" |
| | - "3 days back", "last 3 days" β "3D" |
| | - "last week", "1 week", "7 days" β "1W" |
| | - "last month", "1 month", "30 days" β "1M" |
| | - "3 months", "quarter" β "3M" |
| | - "1 year", "12 months" β "1Y" |
| | |
| | Full Examples: |
| | - "Analyze Tesla" β {{"symbol": "TSLA", "scan_intent": null, "time_range": null}} |
| | - "3 days back stocks of Tesla" β {{"symbol": "TSLA", "scan_intent": null, "time_range": "3D"}} |
| | - "Last week AAPL performance" β {{"symbol": "AAPL", "scan_intent": null, "time_range": "1W"}} |
| | - "1 month trend for NVDA" β {{"symbol": "NVDA", "scan_intent": null, "time_range": "1M"}} |
| | - "Recent analysis of Tesla" β {{"symbol": "TSLA", "scan_intent": null, "time_range": "INTRADAY"}} |
| | - "Show me top gainers" β {{"symbol": null, "scan_intent": "UPWARD", "time_range": null}} |
| | |
| | CRITICAL: Default to null for time_range if not explicitly mentioned! |
| | """ |
| | raw_response = llm.invoke(prompt).content.strip() |
| | |
| | symbol = None |
| | scan_intent = None |
| | time_range = None |
| | |
| | try: |
| | import json |
| | import re |
| | |
| | json_match = re.search(r'\{.*\}', raw_response, re.DOTALL) |
| | if json_match: |
| | data = json.loads(json_match.group(0)) |
| | symbol = data.get("symbol") |
| | scan_intent = data.get("scan_intent") |
| | time_range = data.get("time_range") |
| | else: |
| | print(f" WARNING: No JSON found in extraction response: {raw_response}") |
| | |
| | clean_resp = raw_response.strip().upper() |
| | if "SCAN" in clean_resp or "GAINERS" in clean_resp or "LOSERS" in clean_resp: |
| | scan_intent = "ALL" |
| | elif len(clean_resp) <= 5 and clean_resp.isalpha(): |
| | symbol = clean_resp |
| | except Exception as e: |
| | print(f" Error parsing symbol extraction: {e}") |
| | |
| | if symbol: symbol = symbol.upper().replace("$", "") |
| | |
| | |
| | if time_range is None: |
| | time_range = "INTRADAY" |
| | |
| | print(f" Raw LLM Response: {raw_response}") |
| | print(f" Extracted Symbol: {symbol}") |
| | print(f" Scan Intent: {scan_intent}") |
| | print(f" Time Range: {time_range}") |
| | |
| | return {"symbol": symbol, "scan_intent": scan_intent, "time_range": time_range} |
| |
|
| | def web_research_step(state: AgentState): |
| | print("--- π Web Research ---") |
| | if state.get("scan_intent"): |
| | return {"web_research_results": "Market Scan initiated. Web research skipped for individual stock."} |
| | results = web_agent.research(queries=[state['task']]) |
| | return {"web_research_results": results} |
| |
|
| | def market_data_step(state: AgentState): |
| | print("--- π Market Data Retrieval ---") |
| | |
| | |
| | if state.get("scan_intent"): |
| | print(f" Scan Intent Detected: {state['scan_intent']}") |
| | |
| | |
| | import json |
| | watchlist_path = "watchlist.json" |
| | if not os.path.exists(watchlist_path): |
| | return {"market_data_results": {"error": "Watchlist not found. Please add symbols to your watchlist."}} |
| | |
| | with open(watchlist_path, 'r') as f: |
| | watchlist = json.load(f) |
| | |
| | scan_results = [] |
| | scan_intent = state['scan_intent'] |
| | |
| | for sym in watchlist: |
| | |
| | data = market_agent.get_market_data(symbol=sym, time_range="INTRADAY") |
| | if isinstance(data, dict) and 'data' in data: |
| | ts = data['data'] |
| | sorted_times = sorted(ts.keys()) |
| | if len(sorted_times) > 0: |
| | latest_time = sorted_times[-1] |
| | earliest_time = sorted_times[0] |
| | latest_close = float(ts[latest_time]['4. close']) |
| | earliest_open = float(ts[earliest_time]['1. open']) |
| | pct_change = ((latest_close - earliest_open) / earliest_open) * 100 |
| | |
| | |
| | if scan_intent == "UPWARD" and pct_change > 0: |
| | scan_results.append({"symbol": sym, "price": latest_close, "change": pct_change}) |
| | elif scan_intent == "DOWNWARD" and pct_change < 0: |
| | scan_results.append({"symbol": sym, "price": latest_close, "change": pct_change}) |
| | elif scan_intent == "ALL": |
| | scan_results.append({"symbol": sym, "price": latest_close, "change": pct_change}) |
| | |
| | |
| | scan_results.sort(key=lambda x: x['change'], reverse=True) |
| | return {"market_data_results": {"scan_results": scan_results}} |
| | |
| | |
| | if not state.get("symbol"): |
| | return {"market_data_results": "Skipped."} |
| | |
| | time_range = state.get("time_range", "INTRADAY") |
| | print(f" Fetching market data for {state['symbol']} (time_range={time_range})") |
| | results = market_agent.get_market_data(symbol=state["symbol"], time_range=time_range) |
| | return {"market_data_results": results, "debug_market_data_raw": results} |
| |
|
| | def portfolio_data_step(state: AgentState): |
| | print("--- πΌ Internal Portfolio Data ---") |
| | if state.get("scan_intent"): |
| | return {"portfolio_data_results": "Market Scan initiated. Portfolio context skipped."} |
| | |
| | if not state.get("symbol"): |
| | return {"portfolio_data_results": "Skipped: No symbol provided."} |
| | |
| | results = portfolio_agent.query_portfolio(question=f"What is the current exposure to {state['symbol']}?") |
| | return {"portfolio_data_results": results} |
| |
|
| | def transform_data_step(state: AgentState): |
| | print("--- π Transforming Data for Analysis ---") |
| | if state.get("scan_intent"): |
| | return {"analysis_dataframe": pd.DataFrame()} |
| | |
| | market_data = state.get("market_data_results") |
| | |
| | if not isinstance(market_data, dict) or not market_data.get('data'): |
| | print(" Skipping transformation: No valid market data received.") |
| | return {"analysis_dataframe": pd.DataFrame()} |
| | |
| | try: |
| | time_series_data = market_data.get('data') |
| | if not time_series_data: |
| | raise ValueError("The 'data' key is empty.") |
| |
|
| | df = pd.DataFrame.from_dict(time_series_data, orient='index') |
| | df.index = pd.to_datetime(df.index) |
| | df.index.name = "timestamp" |
| | df.rename(columns={ |
| | '1. open': 'open', '2. high': 'high', '3. low': 'low', |
| | '4. close': 'close', '5. volume': 'volume' |
| | }, inplace=True) |
| | df = df.apply(pd.to_numeric).sort_index() |
| | |
| | print(f" Successfully created DataFrame with shape {df.shape}") |
| | return {"analysis_dataframe": df, "debug_dataframe_head": df.head().to_dict()} |
| | except Exception as e: |
| | print(f" CRITICAL ERROR during data transformation: {e}") |
| | return {"analysis_dataframe": pd.DataFrame()} |
| |
|
| | def run_data_analysis_step(state: AgentState): |
| | print("--- π¬ Running Deep-Dive Data Analysis ---") |
| | if state.get("scan_intent"): |
| | return {"analysis_results": {}} |
| | |
| | df = state.get("analysis_dataframe") |
| | if df is not None and not df.empty: |
| | analysis_results = data_analyzer.run_analysis(df) |
| | return {"analysis_results": analysis_results, "debug_analysis_results_full": analysis_results} |
| | else: |
| | print(" Skipping analysis: No data to analyze.") |
| | return {"analysis_results": {}} |
| |
|
| | def synthesize_report_step(state: AgentState): |
| | print("--- π Synthesizing Final Report ---") |
| | |
| | |
| | def truncate(text, max_chars=3000): |
| | s = str(text) |
| | if len(s) > max_chars: |
| | return s[:max_chars] + "... (truncated)" |
| | return s |
| |
|
| | |
| | market_data_res = state.get("market_data_results", {}) |
| | if isinstance(market_data_res, dict) and "scan_results" in market_data_res: |
| | scan_results = market_data_res["scan_results"] |
| | |
| | scan_results_str = truncate(scan_results, 4000) |
| | |
| | report_prompt = f""" |
| | You are a senior financial analyst. The user requested a market scan: "{state['task']}". |
| | |
| | Scan Results (from Watchlist): |
| | {scan_results_str} |
| | |
| | Generate a "Market Scan Report". |
| | 1. Summary: Briefly explain the criteria and the overall market status based on these results. |
| | 2. Results Table: Create a markdown table with columns: Symbol | Price | % Change. |
| | 3. Conclusion: Highlight the most significant movers. |
| | """ |
| | final_report = llm.invoke(report_prompt).content |
| | return {"final_report": final_report} |
| |
|
| | analysis_insights = state.get("analysis_results", {}).get("insights", "Not available.") |
| | |
| | |
| | web_data = truncate(state.get('web_research_results', 'Not available.'), 3000) |
| | market_summary = truncate(state.get('market_data_results', 'Not available'), 2000) |
| | portfolio_data = truncate(state.get('portfolio_data_results', 'Not available.'), 2000) |
| | |
| | |
| | market_data_raw = state.get("market_data_results", {}) |
| | data_source = "Unknown" |
| | if isinstance(market_data_raw, dict): |
| | meta = market_data_raw.get("meta_data", {}) |
| | if isinstance(meta, dict): |
| | data_source = meta.get("Source", "Real API (Alpha Vantage)") |
| | |
| | report_prompt = f""" |
| | You are a senior financial analyst writing a comprehensive "Alpha Report". |
| | Your task is to synthesize all available information into a structured, cited report. |
| | |
| | Original User Task: {state['task']} |
| | Target Symbol: {state.get('symbol', 'Unknown')} |
| | Data Source: {data_source} |
| | --- |
| | Available Information: |
| | - Web Intelligence: {web_data} |
| | - Market Data Summary: {market_summary} |
| | - Deep-Dive Data Analysis Insights: {analysis_insights} |
| | - Internal Portfolio Context: {portfolio_data} |
| | --- |
| | |
| | CRITICAL INSTRUCTIONS: |
| | 1. First, evaluate the "Available Information". |
| | - If the Target Symbol is 'Unknown' OR if the Web Intelligence and Market Data contain no meaningful information: |
| | You MUST respond with: "I am not sure about this company as I could not find sufficient data." |
| | Do NOT generate the rest of the report. |
| | |
| | 2. Otherwise, generate the "Alpha Report" with the following sections: |
| | |
| | > [!NOTE] |
| | > **Data Source**: {data_source} |
| | |
| | ## 1. Executive Summary |
| | A 2-3 sentence overview of the key findings and current situation. |
| | |
| | ## 2. Internal Context |
| | Detail the firm's current exposure: |
| | - IF the firm has shares > 0: Present as a markdown table: |
| | | Symbol | Shares | Avg Cost | Current Value | |
| | |--------|--------|----------|---------------| |
| | - IF the firm has 0 shares: State: "The firm has no current exposure to {state.get('symbol')}." |
| | |
| | ## 3. Market Data |
| | ALWAYS present as a markdown table: |
| | | Metric | Value | Implication | |
| | |--------|-------|-------------| |
| | | Current Price | $XXX.XX | +/-X.X% vs. open | |
| | | 5-Day Trend | Upward/Downward/Flat | Brief note | |
| | | Volume | X.XXM | Above/Below average | |
| | |
| | ## 4. Real-Time Intelligence |
| | ### News |
| | - **[Headline]** - [Brief summary] `[Source: URL]` |
| | - **[Headline]** - [Brief summary] `[Source: URL]` |
| | |
| | ### Filings (if any) |
| | - **[Filing Type]** - [Brief description] `[Source: URL]` |
| | |
| | ## 5. Sentiment Analysis |
| | **Overall Sentiment:** Bullish / Bearish / Neutral |
| | |
| | **Evidence:** |
| | - [Specific fact from news/data supporting this sentiment] |
| | - [Another supporting fact] |
| | |
| | ## 6. Synthesis & Recommendations |
| | Combine all information to provide actionable insights. Focus on: |
| | - Key risks and opportunities |
| | - Recommended actions (if any) |
| | - Items to monitor |
| | |
| | FORMATTING RULES: |
| | - Use markdown headers (##, ###) |
| | - Include URLs in backticks: `[Source: example.com]` |
| | - Use tables for structured data |
| | - Be concise but comprehensive |
| | """ |
| | final_report = llm.invoke(report_prompt).content |
| | return {"final_report": final_report} |
| |
|
| | |
| | workflow = StateGraph(AgentState) |
| |
|
| | workflow.add_node("extract_symbol", extract_symbol_step) |
| | workflow.add_node("web_researcher", web_research_step) |
| | workflow.add_node("market_data_analyst", market_data_step) |
| | workflow.add_node("portfolio_data_fetcher", portfolio_data_step) |
| | workflow.add_node("transform_data", transform_data_step) |
| | workflow.add_node("data_analyzer", run_data_analysis_step) |
| | workflow.add_node("report_synthesizer", synthesize_report_step) |
| |
|
| | workflow.set_entry_point("extract_symbol") |
| | workflow.add_edge("extract_symbol", "web_researcher") |
| | workflow.add_edge("web_researcher", "market_data_analyst") |
| | workflow.add_edge("market_data_analyst", "portfolio_data_fetcher") |
| | workflow.add_edge("portfolio_data_fetcher", "transform_data") |
| | workflow.add_edge("transform_data", "data_analyzer") |
| | workflow.add_edge("data_analyzer", "report_synthesizer") |
| | workflow.add_edge("report_synthesizer", END) |
| |
|
| | return workflow.compile() |