"""
Real-time Market Intelligence Dashboard for NAVADA
Provides comprehensive market data, trends, and competitive intelligence
"""
import asyncio
import json
import logging
import os
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
import yfinance as yf
import pandas as pd
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import requests
from openai import AsyncOpenAI
logger = logging.getLogger(__name__)
class MarketIntelligenceEngine:
"""Core engine for gathering and analyzing market intelligence data"""
def __init__(self):
self.client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
self.search_api_key = os.getenv("SEARCH_API_KEY")
self.data_cache = {}
self.cache_expiry = {}
async def get_market_overview(self, sector: str = "technology") -> Dict[str, Any]:
"""Get comprehensive market overview for a specific sector"""
try:
cache_key = f"market_overview_{sector}"
if self._is_cache_valid(cache_key):
return self.data_cache[cache_key]
# Get major sector ETFs and indices
sector_symbols = self._get_sector_symbols(sector)
market_data = {}
for symbol, name in sector_symbols.items():
try:
ticker = yf.Ticker(symbol)
hist = ticker.history(period="1y")
info = ticker.info
if not hist.empty:
current_price = hist['Close'].iloc[-1]
ytd_change = ((current_price - hist['Close'].iloc[0]) / hist['Close'].iloc[0]) * 100
market_data[symbol] = {
"name": name,
"current_price": float(current_price),
"ytd_change": float(ytd_change),
"volume": float(hist['Volume'].iloc[-1]),
"market_cap": info.get('totalAssets', 0),
"52_week_high": float(hist['High'].max()),
"52_week_low": float(hist['Low'].min())
}
except Exception as e:
logger.warning(f"Error fetching data for {symbol}: {e}")
continue
# Generate market insights using AI
market_insights = await self._generate_market_insights(market_data, sector)
result = {
"sector": sector,
"timestamp": datetime.now().isoformat(),
"market_data": market_data,
"insights": market_insights,
"total_symbols": len(market_data)
}
self._cache_data(cache_key, result, hours=1)
return result
except Exception as e:
logger.error(f"Error getting market overview: {e}")
return {"status": "error", "message": str(e)}
def _get_sector_symbols(self, sector: str) -> Dict[str, str]:
"""Get relevant ETF and index symbols for a sector"""
sector_mappings = {
"technology": {
"QQQ": "NASDAQ-100 Technology",
"XLK": "Technology Select Sector SPDR",
"VGT": "Vanguard Information Technology ETF",
"SOXX": "iShares Semiconductor ETF",
"ARKK": "ARK Innovation ETF"
},
"healthcare": {
"XLV": "Health Care Select Sector SPDR",
"VHT": "Vanguard Health Care ETF",
"IBB": "iShares Biotechnology ETF",
"XBI": "SPDR Biotech ETF"
},
"finance": {
"XLF": "Financial Select Sector SPDR",
"VFH": "Vanguard Financials ETF",
"KBE": "SPDR Banking ETF",
"KRE": "SPDR Regional Banking ETF"
},
"energy": {
"XLE": "Energy Select Sector SPDR",
"VDE": "Vanguard Energy ETF",
"XOP": "SPDR Oil & Gas Exploration ETF",
"ICLN": "iShares Clean Energy ETF"
},
"retail": {
"XRT": "SPDR Retail ETF",
"RTH": "VanEck Retail ETF",
"ONLN": "ProShares Online Retail ETF",
"XLY": "Consumer Discretionary SPDR"
}
}
return sector_mappings.get(sector, sector_mappings["technology"])
async def _generate_market_insights(self, market_data: Dict, sector: str) -> str:
"""Generate AI-powered insights from market data"""
try:
system_prompt = f"""
You are a senior market analyst providing insights on the {sector} sector.
Analyze the provided market data and generate 3-4 key insights covering:
1. Overall sector performance and trends
2. Notable winners and losers
3. Market sentiment and outlook
4. Potential opportunities or risks
Keep insights concise, data-driven, and actionable for startup founders.
"""
user_prompt = f"""
Sector: {sector}
Market Data Summary:
{json.dumps(market_data, indent=2)}
Provide professional market analysis with specific insights for startup founders in this sector.
"""
response = await self.client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
max_tokens=400,
temperature=0.3
)
return response.choices[0].message.content
except Exception as e:
logger.error(f"Error generating market insights: {e}")
return "Market insights temporarily unavailable."
async def get_competitive_landscape(self, company_name: str, industry: str) -> Dict[str, Any]:
"""Analyze competitive landscape for a specific company/industry"""
try:
cache_key = f"competitive_{company_name}_{industry}"
if self._is_cache_valid(cache_key):
return self.data_cache[cache_key]
# Get industry leaders and competitors
competitors = await self._identify_competitors(company_name, industry)
competitive_data = {}
for competitor in competitors[:10]: # Limit to top 10
try:
ticker = yf.Ticker(competitor["symbol"])
info = ticker.info
hist = ticker.history(period="1y")
if not hist.empty:
competitive_data[competitor["symbol"]] = {
"name": competitor["name"],
"market_cap": info.get('marketCap', 0),
"revenue": info.get('totalRevenue', 0),
"employees": info.get('fullTimeEmployees', 0),
"pe_ratio": info.get('trailingPE', 0),
"profit_margin": info.get('profitMargins', 0),
"revenue_growth": info.get('revenueGrowth', 0),
"current_price": float(hist['Close'].iloc[-1]),
"ytd_performance": ((hist['Close'].iloc[-1] - hist['Close'].iloc[0]) / hist['Close'].iloc[0]) * 100
}
except Exception as e:
logger.warning(f"Error fetching competitor data for {competitor}: {e}")
continue
# Generate competitive analysis
analysis = await self._generate_competitive_analysis(competitive_data, company_name, industry)
result = {
"company": company_name,
"industry": industry,
"timestamp": datetime.now().isoformat(),
"competitors": competitive_data,
"analysis": analysis,
"market_leaders": self._identify_market_leaders(competitive_data)
}
self._cache_data(cache_key, result, hours=6)
return result
except Exception as e:
logger.error(f"Error getting competitive landscape: {e}")
return {"status": "error", "message": str(e)}
async def _identify_competitors(self, company_name: str, industry: str) -> List[Dict[str, str]]:
"""Identify key competitors using AI and market data"""
try:
# Use AI to identify competitors
system_prompt = """
You are a market research analyst. Identify the top public competitors for the given company and industry.
Return a JSON list of competitors with their stock symbols and full company names.
Format: [{"symbol": "AAPL", "name": "Apple Inc."}, ...]
Focus on direct competitors that are publicly traded.
"""
user_prompt = f"""
Company: {company_name}
Industry: {industry}
Identify 8-12 key public competitors with their stock symbols.
"""
response = await self.client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
max_tokens=300,
temperature=0.3
)
try:
competitors = json.loads(response.choices[0].message.content)
return competitors if isinstance(competitors, list) else []
except json.JSONDecodeError:
logger.warning("Failed to parse competitors JSON")
return []
except Exception as e:
logger.error(f"Error identifying competitors: {e}")
return []
async def _generate_competitive_analysis(self, competitive_data: Dict, company_name: str, industry: str) -> str:
"""Generate AI-powered competitive analysis"""
try:
system_prompt = f"""
You are a senior business analyst providing competitive intelligence for {company_name} in the {industry} industry.
Analyze the competitive landscape and provide insights on:
1. Market positioning and differentiation opportunities
2. Financial performance benchmarks
3. Strategic threats and opportunities
4. Market dynamics and trends
Be specific and actionable for startup strategy.
"""
user_prompt = f"""
Company: {company_name}
Industry: {industry}
Competitive Data:
{json.dumps(competitive_data, indent=2)}
Provide strategic competitive analysis with actionable insights.
"""
response = await self.client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
max_tokens=500,
temperature=0.3
)
return response.choices[0].message.content
except Exception as e:
logger.error(f"Error generating competitive analysis: {e}")
return "Competitive analysis temporarily unavailable."
def _identify_market_leaders(self, competitive_data: Dict) -> List[Dict[str, Any]]:
"""Identify market leaders based on key metrics"""
try:
if not competitive_data:
return []
# Sort by market cap and revenue
sorted_by_market_cap = sorted(
competitive_data.items(),
key=lambda x: x[1].get('market_cap', 0),
reverse=True
)
leaders = []
for symbol, data in sorted_by_market_cap[:5]:
leaders.append({
"symbol": symbol,
"name": data.get('name', symbol),
"market_cap": data.get('market_cap', 0),
"revenue": data.get('revenue', 0),
"market_position": "Leader" if len(leaders) == 0 else "Major Player"
})
return leaders
except Exception as e:
logger.error(f"Error identifying market leaders: {e}")
return []
async def get_trend_analysis(self, keywords: List[str], timeframe: str = "1y") -> Dict[str, Any]:
"""Analyze market trends for specific keywords/topics"""
try:
cache_key = f"trends_{'_'.join(keywords)}_{timeframe}"
if self._is_cache_valid(cache_key):
return self.data_cache[cache_key]
# Analyze related stocks and ETFs
trend_data = {}
for keyword in keywords:
related_symbols = await self._find_related_symbols(keyword)
keyword_performance = {}
for symbol in related_symbols[:5]: # Top 5 related symbols
try:
ticker = yf.Ticker(symbol)
hist = ticker.history(period=timeframe)
if not hist.empty:
performance = ((hist['Close'].iloc[-1] - hist['Close'].iloc[0]) / hist['Close'].iloc[0]) * 100
keyword_performance[symbol] = {
"performance": float(performance),
"volatility": float(hist['Close'].std()),
"volume_avg": float(hist['Volume'].mean())
}
except Exception as e:
logger.warning(f"Error analyzing {symbol}: {e}")
continue
trend_data[keyword] = keyword_performance
# Generate trend insights
insights = await self._generate_trend_insights(trend_data, keywords, timeframe)
result = {
"keywords": keywords,
"timeframe": timeframe,
"timestamp": datetime.now().isoformat(),
"trend_data": trend_data,
"insights": insights
}
self._cache_data(cache_key, result, hours=2)
return result
except Exception as e:
logger.error(f"Error getting trend analysis: {e}")
return {"status": "error", "message": str(e)}
async def _find_related_symbols(self, keyword: str) -> List[str]:
"""Find stock symbols related to a keyword"""
# This is a simplified implementation
# In production, you might use more sophisticated symbol mapping
keyword_mappings = {
"ai": ["NVDA", "GOOGL", "MSFT", "AMD", "INTC"],
"cloud": ["AMZN", "MSFT", "GOOGL", "CRM", "SNOW"],
"fintech": ["SQ", "PYPL", "V", "MA", "COIN"],
"biotech": ["GILD", "AMGN", "BIIB", "REGN", "VRTX"],
"ev": ["TSLA", "F", "GM", "NIO", "RIVN"],
"crypto": ["COIN", "MSTR", "RIOT", "MARA", "HOOD"]
}
return keyword_mappings.get(keyword.lower(), ["SPY"])
async def _generate_trend_insights(self, trend_data: Dict, keywords: List[str], timeframe: str) -> str:
"""Generate AI insights about market trends"""
try:
system_prompt = f"""
You are a market trend analyst providing insights on emerging trends and market dynamics.
Analyze the trend data for the keywords: {', '.join(keywords)} over {timeframe}.
Provide insights on:
1. Overall trend momentum and direction
2. Investment themes and opportunities
3. Risk factors and market dynamics
4. Implications for startups in these sectors
Keep insights concise and actionable.
"""
user_prompt = f"""
Keywords: {keywords}
Timeframe: {timeframe}
Trend Performance Data:
{json.dumps(trend_data, indent=2)}
Provide trend analysis with startup implications.
"""
response = await self.client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
max_tokens=400,
temperature=0.3
)
return response.choices[0].message.content
except Exception as e:
logger.error(f"Error generating trend insights: {e}")
return "Trend insights temporarily unavailable."
def _is_cache_valid(self, cache_key: str) -> bool:
"""Check if cached data is still valid"""
if cache_key not in self.cache_expiry:
return False
return datetime.now() < self.cache_expiry[cache_key]
def _cache_data(self, cache_key: str, data: Any, hours: int = 1):
"""Cache data with expiry"""
self.data_cache[cache_key] = data
self.cache_expiry[cache_key] = datetime.now() + timedelta(hours=hours)
class DashboardVisualizer:
"""Creates interactive visualizations for market intelligence dashboard"""
def __init__(self):
self.theme_colors = {
"primary": "#667eea",
"secondary": "#764ba2",
"success": "#4CAF50",
"danger": "#f44336",
"warning": "#ff9800",
"info": "#2196F3"
}
def create_market_overview_chart(self, market_data: Dict[str, Any]) -> str:
"""Create market overview visualization"""
try:
symbols = list(market_data["market_data"].keys())
names = [data["name"] for data in market_data["market_data"].values()]
ytd_changes = [data["ytd_change"] for data in market_data["market_data"].values()]
# Create bar chart
fig = go.Figure()
colors = [self.theme_colors["success"] if change >= 0 else self.theme_colors["danger"]
for change in ytd_changes]
fig.add_trace(go.Bar(
x=symbols,
y=ytd_changes,
text=[f"{change:.1f}%" for change in ytd_changes],
textposition='auto',
marker_color=colors,
hovertemplate='%{x}
YTD Change: %{y:.1f}%
Real-time market data, competitive intelligence, and trend analysis