Sentinel_V2 / agents /orchestrator_v3.py
Asish Karthikeya Gogineni
Deploy Sentinel AI 2026-02-26_17:09:25
5d2eba0
import os
import sys
import pandas as pd
import ast
from dotenv import load_dotenv
from typing import TypedDict, List, Dict, Any
from langgraph.graph import StateGraph, END
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from agents.tool_calling_agents import WebResearchAgent, MarketDataAgent, InternalPortfolioAgent
from agents.data_analysis_agent import DataAnalysisAgent
from features.utils import call_gemini
# --- Configuration ---
load_dotenv()
# --- Initialize workers (Stateless) ---
web_agent = WebResearchAgent()
market_agent = MarketDataAgent()
portfolio_agent = InternalPortfolioAgent()
# --- Define the Enhanced State ---
class AgentState(TypedDict):
task: str
symbol: str
web_research_results: str
market_data_results: str
portfolio_data_results: str
scan_intent: str # "DOWNWARD", "UPWARD", "ALL", or None
# --- NEW FIELDS FOR ANALYSIS ---
analysis_dataframe: pd.DataFrame
analysis_results: Dict[str, Any]
final_report: str
# Debug fields
debug_market_data_raw: Any
debug_dataframe_head: Any
debug_analysis_results_full: Any
def get_orchestrator(llm_provider="gemini", api_key=None):
"""
Factory function to create the orchestrator graph with a specific LLM.
"""
# 2. Initialize Data Analyzer (Now uses global call_gemini fallback)
data_analyzer = DataAnalysisAgent()
# 3. Define Nodes
# 3. Define Nodes (Closure captures 'llm' and 'data_analyzer')
def extract_symbol_step(state: AgentState):
print("--- πŸ”¬ Symbol & Time Range Extraction ---")
prompt = f"""
Analyze the user's request: "{state['task']}"
Extract TWO things:
1. Stock symbol or scan intent
2. Time range (if mentioned)
RULES:
- If request mentions a SPECIFIC company β†’ Extract symbol
- If request mentions time period β†’ Extract time range
- ONLY set scan_intent for "top gainers", "losers", "scan market"
Response Format: JSON ONLY.
{{
"symbol": "TICKER" or null,
"scan_intent": "DOWNWARD" | "UPWARD" | "ALL" or null,
"time_range": "INTRADAY" | "1D" | "3D" | "1W" | "1M" | "3M" | "1Y" or null
}}
Time Range Examples:
- "today", "now", "current", "recent" β†’ "INTRADAY"
- "yesterday", "1 day back" β†’ "1D"
- "3 days back", "last 3 days" β†’ "3D"
- "last week", "1 week", "7 days" β†’ "1W"
- "last month", "1 month", "30 days" β†’ "1M"
- "3 months", "quarter" β†’ "3M"
- "1 year", "12 months" β†’ "1Y"
Full Examples:
- "Analyze Tesla" β†’ {{"symbol": "TSLA", "scan_intent": null, "time_range": null}}
- "3 days back stocks of Tesla" β†’ {{"symbol": "TSLA", "scan_intent": null, "time_range": "3D"}}
- "Last week AAPL performance" β†’ {{"symbol": "AAPL", "scan_intent": null, "time_range": "1W"}}
- "1 month trend for NVDA" β†’ {{"symbol": "NVDA", "scan_intent": null, "time_range": "1M"}}
- "Recent analysis of Tesla" β†’ {{"symbol": "TSLA", "scan_intent": null, "time_range": "INTRADAY"}}
- "Show me top gainers" β†’ {{"symbol": null, "scan_intent": "UPWARD", "time_range": null}}
CRITICAL: Default to null for time_range if not explicitly mentioned!
"""
raw_response = call_gemini(prompt).strip()
symbol = None
scan_intent = None
time_range = None
try:
import json
import re
# Find JSON in response
json_match = re.search(r'\{.*\}', raw_response, re.DOTALL)
if json_match:
data = json.loads(json_match.group(0))
symbol = data.get("symbol")
scan_intent = data.get("scan_intent")
time_range = data.get("time_range")
else:
print(f" WARNING: No JSON found in extraction response: {raw_response}")
# Fallback to simple cleaning
clean_resp = raw_response.strip().upper()
if "SCAN" in clean_resp or "GAINERS" in clean_resp or "LOSERS" in clean_resp:
scan_intent = "ALL"
elif len(clean_resp) <= 5 and clean_resp.isalpha():
symbol = clean_resp
except Exception as e:
print(f" Error parsing symbol extraction: {e}")
if symbol: symbol = symbol.upper().replace("$", "")
# Default time_range to 1M if null (INTRADAY is premium-only, 1M uses free DAILY endpoint)
if time_range is None:
time_range = "1M"
print(f" Raw LLM Response: {raw_response}")
print(f" Extracted Symbol: {symbol}")
print(f" Scan Intent: {scan_intent}")
print(f" Time Range: {time_range}")
return {"symbol": symbol, "scan_intent": scan_intent, "time_range": time_range}
def web_research_step(state: AgentState):
print("--- πŸ”Ž Web Research ---")
if state.get("scan_intent"):
return {"web_research_results": "Market Scan initiated. Web research skipped for individual stock."}
results = web_agent.research(queries=[state['task']])
return {"web_research_results": results}
def market_data_step(state: AgentState):
print("--- πŸ“Š Market Data Retrieval ---")
# Handle scan intent
if state.get("scan_intent"):
print(f" Scan Intent Detected: {state['scan_intent']}")
# Load watchlist
import json
watchlist_path = "watchlist.json"
if not os.path.exists(watchlist_path):
return {"market_data_results": {"error": "Watchlist not found. Please add symbols to your watchlist."}}
with open(watchlist_path, 'r') as f:
watchlist = json.load(f)
scan_results = []
scan_intent = state['scan_intent']
for sym in watchlist:
try:
# Use GLOBAL_QUOTE for real-time price (free tier)
quote = market_agent.get_global_quote(symbol=sym)
quote_data = quote.get("data", {})
price = float(quote_data.get("price", 0))
change_pct_str = quote_data.get("change_percent", "0%").replace("%", "")
pct_change = float(change_pct_str) if change_pct_str else 0
if scan_intent == "UPWARD" and pct_change > 0:
scan_results.append({"symbol": sym, "price": price, "change": pct_change})
elif scan_intent == "DOWNWARD" and pct_change < 0:
scan_results.append({"symbol": sym, "price": price, "change": pct_change})
elif scan_intent == "ALL":
scan_results.append({"symbol": sym, "price": price, "change": pct_change})
except Exception as e:
print(f" ⚠️ Error scanning {sym}: {e}")
# Sort by change
scan_results.sort(key=lambda x: x['change'], reverse=True)
return {"market_data_results": {"scan_results": scan_results}}
# Single symbol analysis
if not state.get("symbol"):
return {"market_data_results": "Skipped."}
symbol = state["symbol"]
combined_data = {"symbol": symbol}
# 1. Get REAL current price via GLOBAL_QUOTE (free tier)
try:
import time
quote = market_agent.get_global_quote(symbol=symbol)
combined_data["quote"] = quote.get("data", {})
combined_data["quote_source"] = quote.get("source", "Unknown")
print(f" βœ… Real-time quote: ${combined_data['quote'].get('price', 'N/A')}")
time.sleep(1) # Respect rate limit (1 req/sec)
except Exception as e:
print(f" ⚠️ Quote fetch failed: {e}")
combined_data["quote"] = {}
# 2. Get REAL fundamentals via OVERVIEW (free tier)
try:
overview = market_agent.get_company_overview(symbol=symbol)
combined_data["overview"] = overview.get("data", {})
combined_data["overview_source"] = overview.get("source", "Unknown")
print(f" βœ… Company: {combined_data['overview'].get('Name', symbol)}, P/E: {combined_data['overview'].get('PERatio', 'N/A')}")
import time
time.sleep(1) # Respect rate limit
except Exception as e:
print(f" ⚠️ Overview fetch failed: {e}")
combined_data["overview"] = {}
# 3. Get historical data via DAILY (free tier) for trend analysis
try:
time_range = state.get("time_range", "1M")
# Map INTRADAY to 1M for free tier compatibility
if time_range == "INTRADAY":
time_range = "1M"
print(f" Fetching DAILY data for {symbol} (time_range={time_range})")
results = market_agent.get_market_data(symbol=symbol, time_range=time_range)
combined_data["daily_data"] = results
source = results.get("meta_data", {}).get("Source", "Unknown")
data_points = len(results.get("data", {}))
print(f" βœ… Daily data: {data_points} data points (Source: {source})")
except Exception as e:
print(f" ⚠️ Daily data fetch failed: {e}")
combined_data["daily_data"] = {}
return {"market_data_results": combined_data, "debug_market_data_raw": combined_data}
def portfolio_data_step(state: AgentState):
print("--- πŸ’Ό Internal Portfolio Data ---")
if state.get("scan_intent"):
return {"portfolio_data_results": "Market Scan initiated. Portfolio context skipped."}
if not state.get("symbol"):
return {"portfolio_data_results": "Skipped: No symbol provided."}
try:
results = portfolio_agent.query_portfolio(question=f"What is the current exposure to {state['symbol']}?")
return {"portfolio_data_results": results}
except Exception as e:
print(f" ⚠️ Portfolio data fetch failed (Private MCP may be down): {e}")
return {"portfolio_data_results": f"Portfolio data unavailable (service error). Analysis continues without internal portfolio context."}
def transform_data_step(state: AgentState):
print("--- πŸ”€ Transforming Data for Analysis ---")
if state.get("scan_intent"):
return {"analysis_dataframe": pd.DataFrame()} # Skip transformation for scan
market_data = state.get("market_data_results")
if not isinstance(market_data, dict):
print(" Skipping transformation: No valid market data received.")
return {"analysis_dataframe": pd.DataFrame()}
# Extract daily_data from the new combined format
daily_data = market_data.get('daily_data', {})
time_series_data = daily_data.get('data', {}) if isinstance(daily_data, dict) else {}
if not time_series_data:
print(" Skipping transformation: No daily time series data available.")
return {"analysis_dataframe": pd.DataFrame()}
try:
df = pd.DataFrame.from_dict(time_series_data, orient='index')
df.index = pd.to_datetime(df.index)
df.index.name = "timestamp"
df.rename(columns={
'1. open': 'open', '2. high': 'high', '3. low': 'low',
'4. close': 'close', '5. volume': 'volume'
}, inplace=True)
df = df.apply(pd.to_numeric).sort_index()
print(f" Successfully created DataFrame with shape {df.shape}")
return {"analysis_dataframe": df, "debug_dataframe_head": df.head().to_dict()}
except Exception as e:
print(f" CRITICAL ERROR during data transformation: {e}")
return {"analysis_dataframe": pd.DataFrame()}
def run_data_analysis_step(state: AgentState):
print("--- πŸ”¬ Running Deep-Dive Data Analysis ---")
if state.get("scan_intent"):
return {"analysis_results": {}} # Skip analysis for scan
df = state.get("analysis_dataframe")
if df is not None and not df.empty:
analysis_results = data_analyzer.run_analysis(df)
return {"analysis_results": analysis_results, "debug_analysis_results_full": analysis_results}
else:
print(" Skipping analysis: No data to analyze.")
return {"analysis_results": {}}
def synthesize_report_step(state: AgentState):
print("--- πŸ“ Synthesizing Final Report ---")
# Helper to truncate text to avoid Rate Limits
def truncate(text, max_chars=3000):
s = str(text)
if len(s) > max_chars:
return s[:max_chars] + "... (truncated)"
return s
# Check for Scan Results
market_data_res = state.get("market_data_results", {})
if isinstance(market_data_res, dict) and "scan_results" in market_data_res:
scan_results = market_data_res["scan_results"]
# Truncate scan results if necessary (though usually small)
scan_results_str = truncate(scan_results, 4000)
report_prompt = f"""
You are a senior financial analyst. The user requested a market scan: "{state['task']}".
Scan Results (from Watchlist):
{scan_results_str}
Generate a "Market Scan Report".
1. Summary: Briefly explain the criteria and the overall market status based on these results.
2. Results Table: Create a markdown table with columns: Symbol | Price | % Change.
3. Conclusion: Highlight the most significant movers.
"""
final_report = call_gemini(report_prompt)
return {"final_report": final_report}
analysis_insights = state.get("analysis_results", {}).get("insights", "Not available.")
# Truncate inputs for the main report
web_data = truncate(state.get('web_research_results', 'Not available.'), 3000)
portfolio_data = truncate(state.get('portfolio_data_results', 'Not available.'), 2000)
# Extract rich data from combined market results
market_data_raw = state.get("market_data_results", {})
data_sources = []
# Build rich market context from the new format
quote_data = {}
overview_data = {}
if isinstance(market_data_raw, dict):
quote_data = market_data_raw.get("quote", {})
overview_data = market_data_raw.get("overview", {})
if market_data_raw.get("quote_source"):
data_sources.append(f"Price: {market_data_raw['quote_source']}")
if market_data_raw.get("overview_source"):
data_sources.append(f"Fundamentals: {market_data_raw['overview_source']}")
daily = market_data_raw.get("daily_data", {})
if isinstance(daily, dict):
src = daily.get("meta_data", {}).get("Source", "")
if src:
data_sources.append(f"Historical: {src}")
data_source = " | ".join(data_sources) if data_sources else "Unknown"
# Build a structured market data section
market_context = f"""
--- REAL-TIME PRICE (GLOBAL_QUOTE) ---
Current Price: ${quote_data.get('price', 'N/A')}
Change: {quote_data.get('change', 'N/A')} ({quote_data.get('change_percent', 'N/A')})
Open: ${quote_data.get('open', 'N/A')}
High: ${quote_data.get('high', 'N/A')}
Low: ${quote_data.get('low', 'N/A')}
Volume: {quote_data.get('volume', 'N/A')}
Previous Close: ${quote_data.get('previous_close', 'N/A')}
--- COMPANY FUNDAMENTALS (OVERVIEW) ---
Company: {overview_data.get('Name', 'N/A')}
Sector: {overview_data.get('Sector', 'N/A')} | Industry: {overview_data.get('Industry', 'N/A')}
Market Cap: ${overview_data.get('MarketCapitalization', 'N/A')}
Revenue (TTM): ${overview_data.get('RevenueTTM', 'N/A')}
EPS: ${overview_data.get('EPS', 'N/A')}
P/E Ratio: {overview_data.get('PERatio', 'N/A')}
Forward P/E: {overview_data.get('ForwardPE', 'N/A')}
Profit Margin: {overview_data.get('ProfitMargin', 'N/A')}
Operating Margin: {overview_data.get('OperatingMarginTTM', 'N/A')}
Return on Equity: {overview_data.get('ReturnOnEquityTTM', 'N/A')}
Beta: {overview_data.get('Beta', 'N/A')}
52-Week High: ${overview_data.get('52WeekHigh', 'N/A')}
52-Week Low: ${overview_data.get('52WeekLow', 'N/A')}
Dividend Yield: {overview_data.get('DividendYield', 'N/A')}
Analyst Target: ${overview_data.get('AnalystTargetPrice', 'N/A')}
Quarterly Earnings Growth: {overview_data.get('QuarterlyEarningsGrowthYOY', 'N/A')}
Quarterly Revenue Growth: {overview_data.get('QuarterlyRevenueGrowthYOY', 'N/A')}
"""
report_prompt = f"""
You are a senior financial analyst writing a comprehensive "Alpha Report".
Your task is to synthesize all available information into a structured, cited report.
USE THE REAL FINANCIAL NUMBERS PROVIDED β€” do NOT say data is unavailable if numbers are given.
Original User Task: {state['task']}
Target Symbol: {state.get('symbol', 'Unknown')}
Data Source: {data_source}
---
Available Information:
- Web Intelligence: {web_data}
- Market Data & Fundamentals: {market_context}
- Deep-Dive Data Analysis Insights: {analysis_insights}
- Internal Portfolio Context: {portfolio_data}
---
CRITICAL INSTRUCTIONS:
1. First, evaluate the "Available Information".
- If the Target Symbol is 'Unknown' OR if the Web Intelligence and Market Data contain no meaningful information:
You MUST respond with: "I am not sure about this company as I could not find sufficient data."
Do NOT generate the rest of the report.
2. Otherwise, generate the "Alpha Report" with the following sections:
> [!NOTE]
> **Data Source**: {data_source}
## 1. Executive Summary
A 2-3 sentence overview of the key findings and current situation.
## 2. Internal Context
Detail the firm's current exposure:
- IF the firm has shares > 0: Present as a markdown table:
| Symbol | Shares | Avg Cost | Current Value |
|--------|--------|----------|---------------|
- IF the firm has 0 shares: State: "The firm has no current exposure to {state.get('symbol')}."
## 3. Market Data
ALWAYS present as a markdown table:
| Metric | Value | Implication |
|--------|-------|-------------|
| Current Price | $XXX.XX | +/-X.X% vs. open |
| 5-Day Trend | Upward/Downward/Flat | Brief note |
| Volume | X.XXM | Above/Below average |
## 4. Real-Time Intelligence
### News
- **[Headline]** - [Brief summary] `[Source: URL]`
- **[Headline]** - [Brief summary] `[Source: URL]`
### Filings (if any)
- **[Filing Type]** - [Brief description] `[Source: URL]`
## 5. Sentiment Analysis
**Overall Sentiment:** Bullish / Bearish / Neutral
**Evidence:**
- [Specific fact from news/data supporting this sentiment]
- [Another supporting fact]
## 6. Synthesis & Recommendations
Combine all information to provide actionable insights. Focus on:
- Key risks and opportunities
- Recommended actions (if any)
- Items to monitor
FORMATTING RULES:
- Use markdown headers (##, ###)
- Include URLs in backticks: `[Source: example.com]`
- Use tables for structured data
- Be concise but comprehensive
"""
final_report = call_gemini(report_prompt)
return {"final_report": final_report}
# 4. Build the Graph
workflow = StateGraph(AgentState)
workflow.add_node("extract_symbol", extract_symbol_step)
workflow.add_node("web_researcher", web_research_step)
workflow.add_node("market_data_analyst", market_data_step)
workflow.add_node("portfolio_data_fetcher", portfolio_data_step)
workflow.add_node("transform_data", transform_data_step)
workflow.add_node("data_analyzer", run_data_analysis_step)
workflow.add_node("report_synthesizer", synthesize_report_step)
workflow.set_entry_point("extract_symbol")
workflow.add_edge("extract_symbol", "web_researcher")
workflow.add_edge("web_researcher", "market_data_analyst")
workflow.add_edge("market_data_analyst", "portfolio_data_fetcher")
workflow.add_edge("portfolio_data_fetcher", "transform_data")
workflow.add_edge("transform_data", "data_analyzer")
workflow.add_edge("data_analyzer", "report_synthesizer")
workflow.add_edge("report_synthesizer", END)
return workflow.compile()