Spaces:
Sleeping
Sleeping
File size: 5,223 Bytes
a2cbcac d5ba3a3 a2cbcac d5ba3a3 a2cbcac d5ba3a3 a2cbcac d5ba3a3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 | from typing import Dict, Any, Optional
import pandas as pd
from datetime import datetime, timedelta
from ..workflows.state import AgentState
from ..tools.technical_indicators_tool import calculate_technical_indicators
import yfinance as yf
async def analyze_technical(symbol: str, analysis_date: Optional[str] = None, market_data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Calculate technical indicators for a symbol up to a specific date.
Args:
symbol: Stock symbol (e.g., 'AAPL')
analysis_date: Date for analysis in YYYY-MM-DD format (optional)
market_data: Optional market data from previous agent
Returns:
Dict with technical indicators or error info
"""
try:
symbol = symbol.upper()
# Get historical OHLCV data for technical analysis
ticker = yf.Ticker(symbol)
if analysis_date:
# Get 60 days of data before analysis_date to ensure enough for indicators
target_date = datetime.strptime(analysis_date, "%Y-%m-%d")
start_date = target_date - timedelta(days=60)
hist_data = ticker.history(
start=start_date.strftime("%Y-%m-%d"),
end=(target_date + timedelta(days=1)).strftime("%Y-%m-%d")
)
else:
# Default: get last 6 months of data
hist_data = ticker.history(period="6mo")
if hist_data.empty:
return {
'symbol': symbol,
'success': False,
'error': 'No historical data available'
}
# Simple date filtering - keep only data up to analysis_date
if analysis_date and not hist_data.empty:
# Filter by converting both to date strings for comparison
mask = [str(date.date()) <= analysis_date for date in hist_data.index]
hist_data = hist_data[mask]
if hist_data.empty:
return {
'symbol': symbol,
'success': False,
'error': f'No historical data available up to {analysis_date}'
}
# Check if we have enough data for technical indicators
if len(hist_data) < 20: # Minimum required for most indicators
return {
'symbol': symbol,
'success': False,
'error': f'Insufficient historical data for {analysis_date}. Need at least 20 days, got {len(hist_data)} days'
}
print(f"Historical data for {symbol} up to {analysis_date}: {len(hist_data)} days")
# Calculate technical indicators using dedicated tool
indicators = ['SMA', 'EMA', 'RSI', 'MACD', 'BBANDS', 'ADX', 'CCI']
# Ensure hist_data is DataFrame type for linter
hist_data_df = pd.DataFrame(hist_data) if not isinstance(hist_data, pd.DataFrame) else hist_data
result = await calculate_technical_indicators(hist_data_df, indicators, symbol, analysis_date)
if not result.success:
return {
'symbol': symbol,
'success': False,
'error': result.error or 'Technical analysis failed'
}
# Get current price from market data or last close price from filtered data
current_price = 0.0
if market_data and isinstance(market_data, dict) and 'current_price' in market_data:
current_price = market_data['current_price']
elif not hist_data_df.empty:
current_price = float(hist_data_df['Close'].iloc[-1])
# Add current price to indicators
indicators_data = result.data if result.data else {}
indicators_data['current_price'] = current_price
return {
'symbol': symbol,
'indicators': indicators_data,
'success': True
}
except Exception as e:
print(f"Error in technical analysis for {symbol}: {e}")
return {
'symbol': symbol,
'success': False,
'error': str(e)
}
async def technical_analysis_agent_node(state: AgentState) -> dict:
"""LangGraph node for technical analysis. Returns partial state updates."""
try:
symbol = state['symbols'][0] if state['symbols'] else 'AAPL'
analysis_date = state['analysis_date']
data_collection_results = state.get('data_collection_results')
market_data = data_collection_results.get('market_data') if data_collection_results else None
result = await analyze_technical(symbol, analysis_date, market_data)
updates: dict = {
"technical_analysis_results": result,
"current_step": "technical_analysis_complete",
}
if not result['success']:
updates["error"] = result.get('error', 'Technical analysis failed')
return updates
except Exception as e:
print(f"Technical analysis node error: {e}")
return {"error": str(e), "current_step": "error"} |