Spaces:
Sleeping
Sleeping
Create src/environments/advanced_trading_env.py
Browse files
src/environments/advanced_trading_env.py
ADDED
|
@@ -0,0 +1,137 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
import numpy as np
|
| 2 |
+
from .visual_trading_env import VisualTradingEnvironment
|
| 3 |
+
from src.sentiment.twitter_analyzer import AdvancedSentimentAnalyzer
|
| 4 |
+
from typing import Dict, Any
|
| 5 |
+
|
| 6 |
+
class AdvancedTradingEnvironment(VisualTradingEnvironment):
|
| 7 |
+
def __init__(self, initial_balance=10000, risk_level="Medium", asset_type="Crypto",
|
| 8 |
+
use_sentiment=True, sentiment_influence=0.3):
|
| 9 |
+
super().__init__(initial_balance, risk_level, asset_type)
|
| 10 |
+
|
| 11 |
+
self.use_sentiment = use_sentiment
|
| 12 |
+
self.sentiment_influence = sentiment_influence # How much sentiment affects decisions
|
| 13 |
+
self.sentiment_history = []
|
| 14 |
+
self.sentiment_window = 20
|
| 15 |
+
|
| 16 |
+
if use_sentiment:
|
| 17 |
+
self.sentiment_analyzer = AdvancedSentimentAnalyzer()
|
| 18 |
+
self.sentiment_analyzer.initialize_models()
|
| 19 |
+
self.current_sentiment = 0.5
|
| 20 |
+
self.sentiment_confidence = 0.0
|
| 21 |
+
|
| 22 |
+
def step(self, action):
|
| 23 |
+
"""Execute trading step with sentiment influence"""
|
| 24 |
+
# Get market sentiment before executing action
|
| 25 |
+
if self.use_sentiment and self.current_step % 5 == 0: # Update sentiment every 5 steps
|
| 26 |
+
self._update_sentiment()
|
| 27 |
+
|
| 28 |
+
# Execute the original step
|
| 29 |
+
observation, reward, done, info = super().step(action)
|
| 30 |
+
|
| 31 |
+
# Enhance reward with sentiment analysis
|
| 32 |
+
if self.use_sentiment:
|
| 33 |
+
reward = self._apply_sentiment_to_reward(reward, action, info)
|
| 34 |
+
|
| 35 |
+
# Add sentiment info to the observation
|
| 36 |
+
enhanced_observation = self._enhance_observation(observation)
|
| 37 |
+
|
| 38 |
+
# Add sentiment data to info
|
| 39 |
+
info['sentiment'] = self.current_sentiment
|
| 40 |
+
info['sentiment_confidence'] = self.sentiment_confidence
|
| 41 |
+
info['sentiment_influence'] = self.sentiment_influence
|
| 42 |
+
|
| 43 |
+
return enhanced_observation, reward, done, info
|
| 44 |
+
|
| 45 |
+
def _update_sentiment(self):
|
| 46 |
+
"""Update current market sentiment"""
|
| 47 |
+
try:
|
| 48 |
+
sentiment_data = self.sentiment_analyzer.get_influencer_sentiment()
|
| 49 |
+
self.current_sentiment = sentiment_data['market_sentiment']
|
| 50 |
+
self.sentiment_confidence = sentiment_data['confidence']
|
| 51 |
+
|
| 52 |
+
# Update sentiment history
|
| 53 |
+
self.sentiment_history.append(self.current_sentiment)
|
| 54 |
+
if len(self.sentiment_history) > self.sentiment_window:
|
| 55 |
+
self.sentiment_history.pop(0)
|
| 56 |
+
|
| 57 |
+
except Exception as e:
|
| 58 |
+
print(f"Error updating sentiment: {e}")
|
| 59 |
+
self.current_sentiment = 0.5
|
| 60 |
+
self.sentiment_confidence = 0.0
|
| 61 |
+
|
| 62 |
+
def _apply_sentiment_to_reward(self, original_reward: float, action: int, info: Dict) -> float:
|
| 63 |
+
"""Modify reward based on sentiment analysis"""
|
| 64 |
+
if self.sentiment_confidence < 0.3: # Low confidence, minimal influence
|
| 65 |
+
return original_reward
|
| 66 |
+
|
| 67 |
+
sentiment_multiplier = 1.0
|
| 68 |
+
|
| 69 |
+
# Bullish sentiment should reward buying actions
|
| 70 |
+
if self.current_sentiment > 0.6: # Bullish
|
| 71 |
+
if action == 1: # Buy
|
| 72 |
+
sentiment_multiplier += self.sentiment_influence * self.sentiment_confidence
|
| 73 |
+
elif action == 2: # Sell (increase position)
|
| 74 |
+
sentiment_multiplier += self.sentiment_influence * 0.5 * self.sentiment_confidence
|
| 75 |
+
elif action == 3: # Close (might miss opportunity)
|
| 76 |
+
sentiment_multiplier -= self.sentiment_influence * 0.3 * self.sentiment_confidence
|
| 77 |
+
|
| 78 |
+
# Bearish sentiment should reward selling/closing actions
|
| 79 |
+
elif self.current_sentiment < 0.4: # Bearish
|
| 80 |
+
if action == 3: # Close position
|
| 81 |
+
sentiment_multiplier += self.sentiment_influence * self.sentiment_confidence
|
| 82 |
+
elif action == 1: # Buy (might be risky)
|
| 83 |
+
sentiment_multiplier -= self.sentiment_influence * 0.5 * self.sentiment_confidence
|
| 84 |
+
|
| 85 |
+
# Apply sentiment trend momentum
|
| 86 |
+
if len(self.sentiment_history) > 5:
|
| 87 |
+
recent_trend = np.mean(self.sentiment_history[-5:]) - np.mean(self.sentiment_history[-10:-5])
|
| 88 |
+
trend_influence = recent_trend * self.sentiment_influence * 0.5
|
| 89 |
+
sentiment_multiplier += trend_influence
|
| 90 |
+
|
| 91 |
+
enhanced_reward = original_reward * sentiment_multiplier
|
| 92 |
+
|
| 93 |
+
# Ensure reward doesn't become too extreme
|
| 94 |
+
max_reward = abs(original_reward) * 3
|
| 95 |
+
return np.clip(enhanced_reward, -max_reward, max_reward)
|
| 96 |
+
|
| 97 |
+
def _enhance_observation(self, original_observation):
|
| 98 |
+
"""Add sentiment data to observation"""
|
| 99 |
+
# For simplicity, we'll keep the original visual observation
|
| 100 |
+
# In a more advanced implementation, we could encode sentiment in the image
|
| 101 |
+
return original_observation
|
| 102 |
+
|
| 103 |
+
def get_sentiment_analysis(self) -> Dict:
|
| 104 |
+
"""Get detailed sentiment analysis"""
|
| 105 |
+
if not self.use_sentiment:
|
| 106 |
+
return {"error": "Sentiment analysis disabled"}
|
| 107 |
+
|
| 108 |
+
return {
|
| 109 |
+
"current_sentiment": self.current_sentiment,
|
| 110 |
+
"sentiment_confidence": self.sentiment_confidence,
|
| 111 |
+
"sentiment_trend": self._calculate_sentiment_trend(),
|
| 112 |
+
"influence_level": self.sentiment_influence,
|
| 113 |
+
"history_length": len(self.sentiment_history)
|
| 114 |
+
}
|
| 115 |
+
|
| 116 |
+
def _calculate_sentiment_trend(self) -> str:
|
| 117 |
+
"""Calculate sentiment trend direction"""
|
| 118 |
+
if len(self.sentiment_history) < 5:
|
| 119 |
+
return "neutral"
|
| 120 |
+
|
| 121 |
+
recent = np.mean(self.sentiment_history[-5:])
|
| 122 |
+
previous = np.mean(self.sentiment_history[-10:-5]) if len(self.sentiment_history) >= 10 else recent
|
| 123 |
+
|
| 124 |
+
if recent > previous + 0.1:
|
| 125 |
+
return "improving"
|
| 126 |
+
elif recent < previous - 0.1:
|
| 127 |
+
return "deteriorating"
|
| 128 |
+
else:
|
| 129 |
+
return "stable"
|
| 130 |
+
|
| 131 |
+
def reset(self):
|
| 132 |
+
"""Reset environment including sentiment data"""
|
| 133 |
+
observation = super().reset()
|
| 134 |
+
self.sentiment_history = []
|
| 135 |
+
self.current_sentiment = 0.5
|
| 136 |
+
self.sentiment_confidence = 0.0
|
| 137 |
+
return observation
|