| import numpy as np |
| import pandas as pd |
| from typing import Dict, Any |
| import requests |
| import os |
| from datetime import datetime, timedelta |
|
|
| class MarketAnalyzer: |
| def __init__(self): |
| self.risk_levels = { |
| 'LOW': 'Conservative investment suitable for long-term holding', |
| 'MEDIUM': 'Moderate risk with potential for both gains and losses', |
| 'HIGH': 'High volatility, suitable for aggressive trading' |
| } |
| self.deepseek_api_key = os.environ.get("DEEPSEEK_API_KEY") |
| self.deepseek_api_url = "https://api.deepseek.com/v1/chat/completions" |
|
|
| def calculate_technical_indicators(self, data: pd.DataFrame) -> Dict[str, Any]: |
| """Calculate comprehensive technical indicators""" |
| try: |
| |
| data['SMA_20'] = data['Close'].rolling(window=20).mean() |
| data['SMA_50'] = data['Close'].rolling(window=50).mean() |
| data['SMA_200'] = data['Close'].rolling(window=200).mean() |
|
|
| |
| delta = data['Close'].diff() |
| gain = (delta.where(delta > 0, 0)).rolling(window=14).mean() |
| loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() |
| rs = gain / loss |
| data['RSI'] = 100 - (100 / (1 + rs)) |
|
|
| |
| exp1 = data['Close'].ewm(span=12, adjust=False).mean() |
| exp2 = data['Close'].ewm(span=26, adjust=False).mean() |
| data['MACD'] = exp1 - exp2 |
| data['Signal_Line'] = data['MACD'].ewm(span=9, adjust=False).mean() |
|
|
| |
| data['BB_middle'] = data['Close'].rolling(window=20).mean() |
| bb_std = data['Close'].rolling(window=20).std() |
| data['BB_upper'] = data['BB_middle'] + (bb_std * 2) |
| data['BB_lower'] = data['BB_middle'] - (bb_std * 2) |
|
|
| |
| high_low = data['High'] - data['Low'] |
| high_close = abs(data['High'] - data['Close'].shift()) |
| low_close = abs(data['Low'] - data['Close'].shift()) |
| ranges = pd.concat([high_low, high_close, low_close], axis=1) |
| true_range = ranges.max(axis=1) |
| data['ATR'] = true_range.rolling(window=14).mean() |
|
|
| |
| data['Volume_SMA'] = data['Volume'].rolling(window=20).mean() |
| data['Volume_Rate'] = data['Volume'] / data['Volume_SMA'] |
|
|
| return data |
| except Exception as e: |
| raise Exception(f"Error calculating technical indicators: {str(e)}") |
|
|
| def analyze_price_action(self, data: pd.DataFrame) -> Dict[str, Any]: |
| """Analyze recent price action and trends""" |
| try: |
| current_price = data['Close'].iloc[-1] |
| sma_20 = data['SMA_20'].iloc[-1] |
| sma_50 = data['SMA_50'].iloc[-1] |
| sma_200 = data['SMA_200'].iloc[-1] |
| rsi = data['RSI'].iloc[-1] |
| macd = data['MACD'].iloc[-1] |
| signal = data['Signal_Line'].iloc[-1] |
| volume_rate = data['Volume_Rate'].iloc[-1] |
|
|
| |
| short_term_trend = "BULLISH" if current_price > sma_20 else "BEARISH" |
| medium_term_trend = "BULLISH" if sma_20 > sma_50 else "BEARISH" |
| long_term_trend = "BULLISH" if sma_50 > sma_200 else "BEARISH" |
|
|
| |
| momentum = { |
| 'RSI': { |
| 'value': rsi, |
| 'signal': 'OVERBOUGHT' if rsi > 70 else 'OVERSOLD' if rsi < 30 else 'NEUTRAL' |
| }, |
| 'MACD': { |
| 'histogram': macd - signal, |
| 'signal': 'BULLISH' if macd > signal else 'BEARISH' |
| } |
| } |
|
|
| |
| volume_trend = "HIGH" if volume_rate > 1.5 else "LOW" if volume_rate < 0.5 else "NORMAL" |
|
|
| return { |
| 'trends': { |
| 'short_term': short_term_trend, |
| 'medium_term': medium_term_trend, |
| 'long_term': long_term_trend |
| }, |
| 'momentum': momentum, |
| 'volume_analysis': { |
| 'trend': volume_trend, |
| 'rate': volume_rate |
| } |
| } |
| except Exception as e: |
| raise Exception(f"Error analyzing price action: {str(e)}") |
|
|
| def generate_trade_signals(self, data: pd.DataFrame) -> Dict[str, Any]: |
| """Generate trading signals based on technical analysis""" |
| try: |
| analysis = self.analyze_price_action(data) |
| current_price = data['Close'].iloc[-1] |
|
|
| |
| bullish_signals = 0 |
| bearish_signals = 0 |
|
|
| |
| for trend in analysis['trends'].values(): |
| if trend == "BULLISH": |
| bullish_signals += 1 |
| else: |
| bearish_signals += 1 |
|
|
| |
| if analysis['momentum']['RSI']['signal'] == 'OVERSOLD': |
| bullish_signals += 1 |
| elif analysis['momentum']['RSI']['signal'] == 'OVERBOUGHT': |
| bearish_signals += 1 |
|
|
| if analysis['momentum']['MACD']['signal'] == 'BULLISH': |
| bullish_signals += 1 |
| else: |
| bearish_signals += 1 |
|
|
| |
| if analysis['volume_analysis']['trend'] == 'HIGH': |
| if bullish_signals > bearish_signals: |
| bullish_signals += 1 |
| else: |
| bearish_signals += 1 |
|
|
| |
| volatility = data['ATR'].iloc[-1] / current_price * 100 |
| risk_level = 'HIGH' if volatility > 3 else 'MEDIUM' if volatility > 1.5 else 'LOW' |
|
|
| |
| signal_strength = (bullish_signals - bearish_signals) / (bullish_signals + bearish_signals) |
|
|
| if abs(signal_strength) < 0.2: |
| action = "HOLD" |
| confidence = "LOW" |
| else: |
| action = "BUY" if signal_strength > 0 else "SELL" |
| confidence = "HIGH" if abs(signal_strength) > 0.6 else "MEDIUM" |
|
|
| return { |
| 'action': action, |
| 'confidence': confidence, |
| 'risk_level': risk_level, |
| 'support_resistance': { |
| 'support': data['BB_lower'].iloc[-1], |
| 'resistance': data['BB_upper'].iloc[-1] |
| }, |
| 'metrics': { |
| 'bullish_signals': bullish_signals, |
| 'bearish_signals': bearish_signals, |
| 'volatility': volatility |
| } |
| } |
| except Exception as e: |
| raise Exception(f"Error generating trade signals: {str(e)}") |
|
|
| async def get_ai_sentiment(self, symbol: str, price_data: pd.DataFrame) -> str: |
| """Get AI-powered market sentiment analysis using Deepseek API""" |
| try: |
| if not self.deepseek_api_key: |
| return "Error: Deepseek API key not found" |
|
|
| |
| recent_data = price_data.tail(5) |
| price_change = (recent_data['Close'].iloc[-1] - recent_data['Close'].iloc[0]) / recent_data['Close'].iloc[0] * 100 |
| volume_change = (recent_data['Volume'].iloc[-1] - recent_data['Volume'].iloc[0]) / recent_data['Volume'].iloc[0] * 100 |
| rsi = self.calculate_rsi(price_data['Close'])[-1] if not price_data.empty else None |
|
|
| |
| prompt = f"""Analyze the following market data for {symbol}: |
| - Recent price change: {price_change:.2f}% |
| - Volume change: {volume_change:.2f}% |
| - RSI: {rsi:.2f if rsi is not None else 'N/A'} |
| |
| Please provide a JSON response with the following structure: |
| {{ |
| "sentiment": "bullish/bearish/neutral", |
| "confidence": <float between 0 and 1>, |
| "recommendation": "buy/sell/hold", |
| "risk_level": "low/medium/high", |
| "key_factors": [<array of reasons>] |
| }}""" |
|
|
| |
| headers = { |
| "Authorization": f"Bearer {self.deepseek_api_key}", |
| "Content-Type": "application/json" |
| } |
|
|
| payload = { |
| "model": "deepseek-chat", |
| "messages": [ |
| { |
| "role": "system", |
| "content": "You are an expert financial analyst. Provide market analysis in the exact JSON format requested." |
| }, |
| { |
| "role": "user", |
| "content": prompt |
| } |
| ], |
| "temperature": 0.7, |
| "max_tokens": 500 |
| } |
|
|
| response = requests.post( |
| self.deepseek_api_url, |
| headers=headers, |
| json=payload, |
| timeout=10 |
| ) |
|
|
| if response.status_code != 200: |
| return f"API Error: {response.status_code} - {response.text}" |
|
|
| |
| try: |
| result = response.json() |
| if 'choices' in result and len(result['choices']) > 0: |
| return result['choices'][0]['message']['content'] |
| else: |
| return "Error: Invalid API response format" |
| except Exception as e: |
| return f"Error parsing API response: {str(e)}" |
|
|
| except Exception as e: |
| return f"Error in sentiment analysis: {str(e)}" |
|
|
| def calculate_rsi(self, prices: pd.Series, period: int = 14) -> pd.Series: |
| """Calculate Relative Strength Index""" |
| delta = prices.diff() |
| gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() |
| loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() |
| rs = gain / loss |
| return 100 - (100 / (1 + rs)) |
|
|
| def generate_detailed_report(self, symbol: str, data: pd.DataFrame, ai_sentiment: str = None) -> Dict[str, Any]: |
| """Generate comprehensive analysis report""" |
| try: |
| |
| data = self.calculate_technical_indicators(data) |
|
|
| |
| signals = self.generate_trade_signals(data) |
|
|
| |
| current_price = data['Close'].iloc[-1] |
| price_change_1d = (current_price - data['Close'].iloc[-2]) / data['Close'].iloc[-2] * 100 |
| price_change_1w = (current_price - data['Close'].iloc[-6]) / data['Close'].iloc[-6] * 100 |
|
|
| report = { |
| 'symbol': symbol, |
| 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), |
| 'price_analysis': { |
| 'current_price': current_price, |
| 'changes': { |
| '1d': price_change_1d, |
| '1w': price_change_1w |
| } |
| }, |
| 'technical_analysis': self.analyze_price_action(data), |
| 'trade_signals': signals, |
| 'risk_assessment': { |
| 'level': signals['risk_level'], |
| 'description': self.risk_levels[signals['risk_level']], |
| 'volatility': signals['metrics']['volatility'] |
| } |
| } |
|
|
| if ai_sentiment: |
| report['ai_sentiment'] = ai_sentiment |
|
|
| return report |
| except Exception as e: |
| raise Exception(f"Error generating detailed report: {str(e)}") |