AI_Trade_Analyzer / agents.py
akankshar639's picture
Update agents.py
c4b9008 verified
import yfinance as yf
import requests
from bs4 import BeautifulSoup
import pandas as pd
import numpy as np
from langchain_openai import ChatOpenAI
from langchain_groq import ChatGroq
import os
import pytz
from typing import Dict, Any
from state import TraderState
from datetime import datetime, timedelta
# os.environ["OPENAI_API_KEY"] = os.getenv("OPENROUTER_API_KEY", "")
# llm = ChatOpenAI(model="openai/gpt-3.5-turbo", openai_api_base="https://openrouter.ai/api/v1", temperature=0.2)
llm = ChatGroq(model = "moonshotai/kimi-k2-instruct-0905",api_key=os.getenv("GROQ_API_KEY"), temperature=0.0)
ALPHA_VANTAGE_KEY = os.getenv("ALPHA_VANTAGE_API_KEY", "")
NEWSAPI_KEY = os.getenv("NEWSAPI_KEY", "")
def fetch_data(symbol: str, horizon: str) -> pd.DataFrame:
"""Fetch data precisely for each trading type"""
if horizon == "intraday":
period = "1d"
interval = "1m"
elif horizon == "scalping":
period = "1d"
interval = "1m"
elif horizon == "swing":
period = "3mo"
interval = "1d"
elif horizon == "momentum":
period = "6mo"
interval = "1d"
else: # long_term
period = "2y"
interval = "1d"
try:
stock = yf.Ticker(symbol)
hist = stock.history(period=period, interval=interval)
if hist.empty or len(hist) < 50:
hist = stock.history(period="1y", interval="1d") # Fallback
if hist.empty:
raise ValueError(f"No data from yfinance for {symbol}")
eastern = pytz.timezone('US/Eastern')
ist = pytz.timezone('Asia/Kolkata')
hist.index = hist.index.tz_convert(eastern).tz_convert(ist)
print(f"Data fetched for {symbol} ({horizon}): {len(hist)} rows ({interval} intervals)")
return hist
except Exception as e:
print(f"Yfinance failed for {symbol}: {e}")
return pd.DataFrame()
def compute_indicators(hist: pd.DataFrame, horizon: str) -> Dict[str, Any]:
if hist.empty:
return {"error": "No data available", "current_price": 0, "rsi": 50, "macd": 0, "sma_20": 0, "ema_50": 0, "bb_upper": 0, "bb_lower": 0, "volume_avg": 0}
try:
hist = hist.sort_index()
close = hist['Close']
if len(close) < 50:
return {"error": "Insufficient data"}
delta = close.diff()
gain = (delta.where(delta > 0, 0)).rolling(14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs)).iloc[-1]
ema12 = close.ewm(12).mean()
ema26 = close.ewm(26).mean()
macd = ema12 - ema26
macd_value = macd.iloc[-1]
sma_20 = close.rolling(20).mean().iloc[-1]
ema_50 = close.ewm(50).mean().iloc[-1]
if len(close) >= 20:
std_bb = close.rolling(20).std()
bb_upper = (sma_20 + 2 * std_bb).iloc[-1]
bb_lower = (sma_20 - 2 * std_bb).iloc[-1]
else:
bb_upper = bb_lower = None
volume_avg = hist['Volume'].rolling(20).mean().iloc[-1] if 'Volume' in hist else 0
return {
"current_price": close.iloc[-1], "rsi": rsi, "macd": macd_value,
"sma_20": sma_20, "ema_50": ema_50, "bb_upper": bb_upper, "bb_lower": bb_lower,
"volume_avg": volume_avg
}
except Exception as e:
return {"error": str(e)}
def fetch_news_sentiment(symbol: str) -> dict:
"""
Fetch recent news from NewsAPI (free forever) and analyze sentiment.
"""
try:
base_symbol = symbol.replace(".NS", "").replace(".BO", "")
query = f'"{base_symbol}" OR "{base_symbol} stock"'
url = f"https://newsapi.org/v2/everything?q={query}&sortBy=publishedAt&apiKey={NEWSAPI_KEY}&pageSize=10&language=en"
response = requests.get(url, timeout=10)
data = response.json()
if 'articles' in data and data['articles']:
articles = data['articles'][:5]
headlines = [art['title'] for art in articles]
descriptions = [art.get('description', '') for art in articles]
positive_words = ['gain', 'rise', 'profit', 'growth', 'earnings', 'bullish', 'up', 'positive', 'strong']
negative_words = ['loss', 'fall', 'decline', 'drop', 'bearish', 'down', 'negative', 'weak', 'crash']
pos_count = sum(1 for h in headlines + descriptions for word in positive_words if word in h.lower())
neg_count = sum(1 for h in headlines + descriptions for word in negative_words if word in h.lower())
sentiment = 'positive' if pos_count > neg_count else 'negative' if neg_count > pos_count else 'neutral'
summary = f"Recent news sentiment: {sentiment}. Key headlines: {'; '.join(headlines[:3])}."
return {'sentiment': sentiment, 'headlines': headlines, 'summary': summary}
else:
return {'sentiment': 'neutral', 'headlines': ["No recent news found."], 'summary': "No recent news available; sentiment neutral."}
except Exception as e:
print(f"NewsAPI error: {e}")
return {'sentiment': 'neutral', 'headlines': ["News fetch failed."], 'summary': "News unavailable; assuming neutral sentiment."}
def summarize_features(indicators: Dict[str, Any], hist: pd.DataFrame, news: list, action: str, symbol: str, news_sentiment: str = 'neutral') -> str:
if indicators.get("error"):
try:
stock = yf.Ticker(symbol)
info = stock.info
price = info.get('currentPrice', 0)
change = info.get('regularMarketChangePercent', 0)
signals = [f"Recent change: {change:.1f}%"]
if change > 0:
signals.append("Positive momentum")
else:
signals.append("Negative momentum")
return f"Fallback Signals: {', '.join(signals)}; News: {news[0] if news else 'Neutral'}"
except:
return "No reliable data; default to caution."
rsi = indicators.get('rsi', 50)
macd = indicators.get('macd', 0)
price = indicators.get('current_price', 0)
sma_20 = indicators.get('sma_20', price)
bb_upper = indicators.get('bb_upper', price * 1.1)
bb_lower = indicators.get('bb_lower', price * 0.9)
recent_prices = hist['Close'].tail(10) if not hist.empty else pd.Series([price])
trend = "upward" if recent_prices.iloc[-1] > recent_prices.iloc[0] else "downward"
change_pct = ((recent_prices.iloc[-1] - recent_prices.iloc[0]) / recent_prices.iloc[0]) * 100 if len(recent_prices) > 1 else 0
signals = []
if rsi < 30:
signals.append("Oversold RSI (potential buy reversal)")
elif rsi > 70:
signals.append("Overbought RSI (sell risk)")
if macd > 0:
signals.append("Bullish MACD")
else:
signals.append("Bearish MACD")
if price < bb_lower:
signals.append("Price near support (buy opportunity)")
elif price > bb_upper:
signals.append("Price near resistance (sell signal)")
if price > sma_20:
signals.append("Above SMA20 (momentum)")
else:
signals.append("Below SMA20 (weakness)")
news_sentiment_str = f"News Sentiment: {news_sentiment.capitalize()} (e.g., {news[0] if news else 'No updates'})"
action_bias = "favorable for buying" if action == "buy" and trend == "upward" else "favorable for selling" if action == "sell" and trend == "downward" else "mixed"
features = [
f"Trend: {trend} ({change_pct:.1f}% change)",
f"Key Signals: {', '.join(signals)}",
news_sentiment_str,
f"Overall Bias for {action}: {action_bias}"
]
return "; ".join(features)
def calculate_holding_metrics(hist: pd.DataFrame, holding_period: str, current_price: float) -> Dict[str, Any]:
if not holding_period or hist.empty:
return {}
import re
match = re.search(r'(\d+)\s*(month|week|day)', holding_period.lower())
if match:
num, unit = int(match.group(1)), match.group(2)
days = num * 30 if unit == 'month' else num * 7 if unit == 'week' else num
purchase_date = datetime.now(pytz.timezone('Asia/Kolkata')) - timedelta(days=days)
hist_before = hist[hist.index < purchase_date]
if not hist_before.empty:
purchase_price = hist_before['Close'].iloc[-1]
else:
# Use oldest price in data if exact date not found
purchase_price = hist['Close'].iloc[0] if not hist.empty else 0
try:
live_price = yf.Ticker(hist.index.name or 'AAPL').info.get('currentPrice', current_price)
if live_price <= 0:
live_price = current_price
except:
live_price = current_price
if purchase_price > 0:
holding_return = ((live_price - purchase_price) / purchase_price) * 100
else:
holding_return = 0
return {
"purchase_price": purchase_price,
"holding_return": holding_return,
"days_held": days,
"pnl": "profit" if holding_return > 0 else "loss"
}
return {}
def predict(horizon: str, indicators: Dict[str, Any], hist: pd.DataFrame, news: list, news_sentiment: str = 'neutral') -> str:
if hist.empty or indicators.get("error"):
return "Unable to predict due to lack of data."
rsi = indicators.get('rsi', 50)
macd = indicators.get('macd', 0)
close = hist['Close']
if horizon == "intraday":
# minute changes for same-day prediction
recent_change = ((close.iloc[-1] - close.iloc[-10]) / close.iloc[-10]) * 100 if len(close) >= 10 else 0 # Last 10 minutes
prob = 70 if (rsi < 40 and macd > 0) else 50 if rsi > 60 else 30
direction = "rise" if recent_change > 0 else "fall"
pct = abs(recent_change) * 0.8
sentiment_boost = 1.15 if news_sentiment == 'positive' else 0.85 if news_sentiment == 'negative' else 1.0
prob = min(95, int(prob * sentiment_boost))
return f"{prob}% chance of {pct:.1f}% {direction} in the next 1-2 hours (same-day intraday) based on RSI {rsi:.1f}, MACD {macd:.2f}, recent {recent_change:.1f}% change, and {news_sentiment} news."
elif horizon == "scalping":
# Ultra-short minute prediction
recent_change = ((close.iloc[-1] - close.iloc[-5]) / close.iloc[-5]) * 100 if len(close) >= 5 else 0 # Last 5 minutes
prob = 80 if rsi < 35 else 40
direction = "quick rise" if rsi < 35 else "quick fall"
pct = 1.5
return f"{prob}% chance of {pct:.1f}% {direction} in the next 5-10 minutes (ultra-short scalping) based on RSI ({rsi:.1f})."
elif horizon == "swing":
# Formula: Weekly changes for 1-4 week prediction
avg_weekly = close.tail(20).pct_change().mean() * 100 if len(close) >= 20 else 0 # Approx weekly
prob = 65 if avg_weekly > 0 and macd > 0 else 45 if avg_weekly < 0 else 50
direction = "up" if avg_weekly > 0 else "down"
pct = abs(avg_weekly) * 3
return f"{prob}% chance of {pct:.1f}% {direction} over the next 1-4 weeks (swing trading)."
elif horizon == "momentum":
# Formula: Momentum over weeks
prob = 70 if macd > 0 and rsi < 60 else 45
direction = "continued rise" if macd > 0 else "fall"
pct = 4.0
return f"{prob}% chance of {pct:.1f}% {direction} over the next 2-4 weeks (momentum trading) based on MACD ({macd:.2f})."
else: # long_term
# Formula: Monthly changes for months/years
avg_monthly = close.tail(60).pct_change().mean() * 100 if len(close) >= 60 else 0
prob = 70 if avg_monthly > 1 and macd > 0 else 40 if avg_monthly < 0 else 50
direction = "growth" if avg_monthly > 0 else "decline"
pct = abs(avg_monthly) * 2
sentiment_boost = 1.15 if news_sentiment == 'positive' else 0.85 if news_sentiment == 'negative' else 1.0
prob = min(95, int(prob * sentiment_boost))
return f"{prob}% chance of {pct:.1f}% {direction} over months (position trading) based on {avg_monthly:.1f}% avg monthly change and {news_sentiment} news."
def hunter_agent(state: TraderState) -> TraderState:
try:
symbol = state['input_symbol']
action = state['action']
horizon = state['horizon']
if state.get('raw_data') and state['raw_data'].get('hist') is not None:
hist = state['raw_data']['hist']
else:
hist = fetch_data(symbol, horizon)
if state.get('raw_data') is None:
state['raw_data'] = {}
state['raw_data']['hist'] = hist
indicators = compute_indicators(hist, horizon)
news_data = fetch_news_sentiment(symbol)
headlines = news_data['headlines']
news_sentiment = news_data['sentiment']
features_summary = summarize_features(indicators, hist, headlines, action, symbol, news_sentiment)
prediction = predict(horizon, indicators, hist, headlines, news_sentiment)
holding_metrics = calculate_holding_metrics(hist, state.get('holding_period'), indicators.get('current_price', 0))
state['raw_data']['indicators'] = indicators
state['raw_data']['news'] = headlines
state['raw_data']['features'] = features_summary
state['raw_data']['prediction'] = prediction
state['raw_data']['holding'] = holding_metrics
state['raw_data']['news_sentiment'] = news_sentiment
if holding_metrics:
features_summary += f"; Holding: {holding_metrics['days_held']} days, {holding_metrics['holding_return']:.1f}% {holding_metrics['pnl']}"
rsi = indicators.get('rsi', 50)
macd = indicators.get('macd', 0)
if action == "buy":
if rsi < 30 and macd > 0:
state['claim'] = f"Strong buy signal: Oversold RSI ({rsi:.1f}) with bullish MACD ({macd:.2f}) indicates reversal."
elif rsi < 40 or macd > 5:
state['claim'] = f"Moderate buy signal: RSI at {rsi:.1f}, MACD at {macd:.2f} shows positive momentum."
else:
state['claim'] = f"Weak buy signal: RSI ({rsi:.1f}) and MACD ({macd:.2f}) don't strongly support buying now."
else:
if rsi > 70 or macd < -5:
state['claim'] = f"Strong sell signal: Overbought RSI ({rsi:.1f}) or weak MACD ({macd:.2f}) suggests taking profits."
elif rsi > 60 or macd < 0:
state['claim'] = f"Moderate sell signal: RSI at {rsi:.1f}, MACD at {macd:.2f} shows weakening momentum."
else:
state['claim'] = f"Weak sell signal: RSI ({rsi:.1f}) and MACD ({macd:.2f}) don't strongly support selling now."
return state
except Exception as e:
print(f"Error in hunter_agent: {e}")
return state # Always return state
def skeptic_agent(state: TraderState) -> TraderState:
try:
# Use cached raw_data
raw_data = state.get('raw_data', {}) or {}
features_summary = raw_data.get("features", "No features available.")
action = state['action']
prompt = f"Counter claim '{state.get('claim', '')}' for {action}: {features_summary}. Provide 1 specific risk."
try:
state['skepticism'] = llm.invoke(prompt).content.strip()
except:
rsi = raw_data.get("indicators", {}).get('rsi', 50)
state['skepticism'] = f"Risk: RSI at {rsi} indicates potential reversal."
return state
except Exception as e:
print(f"Error in skeptic_agent: {e}")
return state # Always return state
def calibrator_agent(state: TraderState) -> TraderState:
try:
raw_data = state.get('raw_data', {}) or {}
indicators = raw_data.get("indicators", {})
action = state['action']
news_sentiment = raw_data.get('news_sentiment', 'neutral')
holding = raw_data.get('holding', {})
state['iterations'] = state.get('iterations', 0) + 1
rsi = indicators.get('rsi', 50)
macd = indicators.get('macd', 0)
base = 50
if action == "buy":
if rsi < 30:
base += 30
elif rsi < 40:
base += 15
elif rsi > 70:
base -= 25
elif rsi > 60:
base -= 10
if macd > 5:
base += 20
elif macd > 0:
base += 10
elif macd < -5:
base -= 20
elif macd < 0:
base -= 10
if news_sentiment == 'positive':
base += 15
elif news_sentiment == 'negative':
base -= 15
else:
if rsi > 70:
base += 30
elif rsi > 60:
base += 15
elif rsi < 30:
base -= 25
elif rsi < 40:
base -= 10
if macd < -5:
base += 20
elif macd < 0:
base += 10
elif macd > 5:
base -= 20
elif macd > 0:
base -= 10
if news_sentiment == 'negative':
base += 15
elif news_sentiment == 'positive':
base -= 15
if holding:
pnl_pct = holding.get('holding_return', 0)
if action == "sell":
if pnl_pct > 10:
base += 15
elif pnl_pct > 5:
base += 10
elif pnl_pct < -10:
base += 20
elif pnl_pct < -5:
base += 10
state['confidence'] = min(95, max(5, base))
state['stop'] = state['iterations'] >= 5 or state['confidence'] >= 85 or state['confidence'] <= 15
return state
except Exception as e:
print(f"Error in calibrator_agent: {e}")
return state