OmidSakaki commited on
Commit
cd19213
·
verified ·
1 Parent(s): 144fc70

Update src/environments/advanced_trading_env.py

Browse files
Files changed (1) hide show
  1. src/environments/advanced_trading_env.py +259 -103
src/environments/advanced_trading_env.py CHANGED
@@ -1,137 +1,293 @@
1
  import numpy as np
 
 
2
  from .visual_trading_env import VisualTradingEnvironment
3
  from src.sentiment.twitter_analyzer import AdvancedSentimentAnalyzer
4
- from typing import Dict, Any
 
 
 
5
 
6
  class AdvancedTradingEnvironment(VisualTradingEnvironment):
7
- def __init__(self, initial_balance=10000, risk_level="Medium", asset_type="Crypto",
8
- use_sentiment=True, sentiment_influence=0.3):
9
  super().__init__(initial_balance, risk_level, asset_type)
10
-
 
 
 
 
 
 
11
  self.use_sentiment = use_sentiment
12
- self.sentiment_influence = sentiment_influence # How much sentiment affects decisions
13
- self.sentiment_history = []
14
- self.sentiment_window = 20
 
 
 
 
 
 
15
 
16
  if use_sentiment:
17
- self.sentiment_analyzer = AdvancedSentimentAnalyzer()
18
- self.sentiment_analyzer.initialize_models()
19
- self.current_sentiment = 0.5
20
- self.sentiment_confidence = 0.0
21
-
22
- def step(self, action):
 
 
 
23
  """Execute trading step with sentiment influence"""
24
- # Get market sentiment before executing action
25
- if self.use_sentiment and self.current_step % 5 == 0: # Update sentiment every 5 steps
 
 
 
 
26
  self._update_sentiment()
27
 
28
- # Execute the original step
29
- observation, reward, done, info = super().step(action)
30
 
31
- # Enhance reward with sentiment analysis
 
 
 
 
 
 
 
 
 
 
 
32
  if self.use_sentiment:
33
- reward = self._apply_sentiment_to_reward(reward, action, info)
 
 
 
34
 
35
- # Add sentiment info to the observation
36
  enhanced_observation = self._enhance_observation(observation)
37
 
38
- # Add sentiment data to info
39
- info['sentiment'] = self.current_sentiment
40
- info['sentiment_confidence'] = self.sentiment_confidence
41
- info['sentiment_influence'] = self.sentiment_influence
 
 
 
42
 
43
- return enhanced_observation, reward, done, info
44
-
45
  def _update_sentiment(self):
46
- """Update current market sentiment"""
 
 
 
47
  try:
48
  sentiment_data = self.sentiment_analyzer.get_influencer_sentiment()
49
- self.current_sentiment = sentiment_data['market_sentiment']
50
- self.sentiment_confidence = sentiment_data['confidence']
51
 
52
- # Update sentiment history
53
- self.sentiment_history.append(self.current_sentiment)
54
- if len(self.sentiment_history) > self.sentiment_window:
55
- self.sentiment_history.pop(0)
56
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57
  except Exception as e:
58
- print(f"Error updating sentiment: {e}")
 
59
  self.current_sentiment = 0.5
60
  self.sentiment_confidence = 0.0
61
-
62
- def _apply_sentiment_to_reward(self, original_reward: float, action: int, info: Dict) -> float:
63
- """Modify reward based on sentiment analysis"""
64
- if self.sentiment_confidence < 0.3: # Low confidence, minimal influence
 
 
 
 
 
 
65
  return original_reward
66
 
67
- sentiment_multiplier = 1.0
68
-
69
- # Bullish sentiment should reward buying actions
70
- if self.current_sentiment > 0.6: # Bullish
71
- if action == 1: # Buy
72
- sentiment_multiplier += self.sentiment_influence * self.sentiment_confidence
73
- elif action == 2: # Sell (increase position)
74
- sentiment_multiplier += self.sentiment_influence * 0.5 * self.sentiment_confidence
75
- elif action == 3: # Close (might miss opportunity)
76
- sentiment_multiplier -= self.sentiment_influence * 0.3 * self.sentiment_confidence
77
-
78
- # Bearish sentiment should reward selling/closing actions
79
- elif self.current_sentiment < 0.4: # Bearish
80
- if action == 3: # Close position
81
- sentiment_multiplier += self.sentiment_influence * self.sentiment_confidence
82
- elif action == 1: # Buy (might be risky)
83
- sentiment_multiplier -= self.sentiment_influence * 0.5 * self.sentiment_confidence
84
-
85
- # Apply sentiment trend momentum
86
- if len(self.sentiment_history) > 5:
87
- recent_trend = np.mean(self.sentiment_history[-5:]) - np.mean(self.sentiment_history[-10:-5])
88
- trend_influence = recent_trend * self.sentiment_influence * 0.5
89
- sentiment_multiplier += trend_influence
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
90
 
91
- enhanced_reward = original_reward * sentiment_multiplier
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
92
 
93
- # Ensure reward doesn't become too extreme
94
- max_reward = abs(original_reward) * 3
95
- return np.clip(enhanced_reward, -max_reward, max_reward)
96
-
97
- def _enhance_observation(self, original_observation):
98
- """Add sentiment data to observation"""
99
- # For simplicity, we'll keep the original visual observation
100
- # In a more advanced implementation, we could encode sentiment in the image
101
- return original_observation
102
-
103
- def get_sentiment_analysis(self) -> Dict:
104
- """Get detailed sentiment analysis"""
 
 
 
 
 
 
 
 
 
105
  if not self.use_sentiment:
106
- return {"error": "Sentiment analysis disabled"}
107
-
108
- return {
109
- "current_sentiment": self.current_sentiment,
110
- "sentiment_confidence": self.sentiment_confidence,
111
- "sentiment_trend": self._calculate_sentiment_trend(),
112
- "influence_level": self.sentiment_influence,
113
- "history_length": len(self.sentiment_history)
114
- }
115
-
116
- def _calculate_sentiment_trend(self) -> str:
 
 
 
 
 
 
 
 
 
 
 
 
117
  """Calculate sentiment trend direction"""
118
  if len(self.sentiment_history) < 5:
119
- return "neutral"
120
-
121
- recent = np.mean(self.sentiment_history[-5:])
122
- previous = np.mean(self.sentiment_history[-10:-5]) if len(self.sentiment_history) >= 10 else recent
123
 
124
- if recent > previous + 0.1:
125
- return "improving"
126
- elif recent < previous - 0.1:
127
- return "deteriorating"
128
- else:
129
- return "stable"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
130
 
131
- def reset(self):
132
- """Reset environment including sentiment data"""
133
- observation = super().reset()
134
- self.sentiment_history = []
135
- self.current_sentiment = 0.5
136
- self.sentiment_confidence = 0.0
137
- return observation
 
1
  import numpy as np
2
+ import logging
3
+ from typing import Dict, Any, Optional, Tuple
4
  from .visual_trading_env import VisualTradingEnvironment
5
  from src.sentiment.twitter_analyzer import AdvancedSentimentAnalyzer
6
+
7
+ # Setup logging
8
+ logging.basicConfig(level=logging.INFO)
9
+ logger = logging.getLogger(__name__)
10
 
11
  class AdvancedTradingEnvironment(VisualTradingEnvironment):
12
+ def __init__(self, initial_balance=10000, risk_level="Medium", asset_type="Crypto",
13
+ use_sentiment=True, sentiment_influence=0.3, sentiment_update_freq=5):
14
  super().__init__(initial_balance, risk_level, asset_type)
15
+
16
+ # Validate inputs
17
+ if not 0.0 <= sentiment_influence <= 1.0:
18
+ raise ValueError("sentiment_influence must be between 0.0 and 1.0")
19
+ if sentiment_update_freq < 1:
20
+ raise ValueError("sentiment_update_freq must be at least 1")
21
+
22
  self.use_sentiment = use_sentiment
23
+ self.sentiment_influence = sentiment_influence
24
+ self.sentiment_update_freq = sentiment_update_freq
25
+ self.sentiment_history = deque(maxlen=100) # Limited history
26
+ self.current_step = 0
27
+
28
+ # Sentiment analyzer with error handling
29
+ self.sentiment_analyzer = None
30
+ self.current_sentiment = 0.5
31
+ self.sentiment_confidence = 0.0
32
 
33
  if use_sentiment:
34
+ try:
35
+ self.sentiment_analyzer = AdvancedSentimentAnalyzer()
36
+ self.sentiment_analyzer.initialize_models()
37
+ logger.info("Sentiment analyzer initialized successfully")
38
+ except Exception as e:
39
+ logger.warning(f"Failed to initialize sentiment analyzer: {e}. Disabling sentiment.")
40
+ self.use_sentiment = False
41
+
42
+ def step(self, action: int) -> Tuple[np.ndarray, float, bool, Dict[str, Any]]:
43
  """Execute trading step with sentiment influence"""
44
+ if not isinstance(action, int) or action < 0:
45
+ logger.warning(f"Invalid action {action}, defaulting to hold")
46
+ action = 0 # Hold action as default
47
+
48
+ # Update sentiment periodically
49
+ if self.use_sentiment and self.current_step % self.sentiment_update_freq == 0:
50
  self._update_sentiment()
51
 
52
+ self.current_step += 1
 
53
 
54
+ # Execute base environment step
55
+ try:
56
+ observation, reward, done, info = super().step(action)
57
+ except Exception as e:
58
+ logger.error(f"Error in base environment step: {e}")
59
+ # Return safe defaults
60
+ observation = self._get_safe_observation()
61
+ reward = 0.0
62
+ done = False
63
+ info = {}
64
+
65
+ # Apply sentiment modification to reward
66
  if self.use_sentiment:
67
+ try:
68
+ reward = self._apply_sentiment_to_reward(reward, action, info)
69
+ except Exception as e:
70
+ logger.warning(f"Error applying sentiment to reward: {e}")
71
 
72
+ # Enhance observation with sentiment (optional)
73
  enhanced_observation = self._enhance_observation(observation)
74
 
75
+ # Add sentiment info to info dict
76
+ info.update({
77
+ 'sentiment': float(self.current_sentiment),
78
+ 'sentiment_confidence': float(self.sentiment_confidence),
79
+ 'sentiment_influence': float(self.sentiment_influence),
80
+ 'step': self.current_step
81
+ })
82
 
83
+ return enhanced_observation, float(reward), bool(done), info
84
+
85
  def _update_sentiment(self):
86
+ """Update current market sentiment with robust error handling"""
87
+ if not self.sentiment_analyzer:
88
+ return
89
+
90
  try:
91
  sentiment_data = self.sentiment_analyzer.get_influencer_sentiment()
 
 
92
 
93
+ # Validate sentiment data
94
+ if not isinstance(sentiment_data, dict):
95
+ raise ValueError("Invalid sentiment data format")
96
+
97
+ market_sentiment = sentiment_data.get('market_sentiment')
98
+ confidence = sentiment_data.get('confidence')
99
+
100
+ if market_sentiment is None or not (-1.0 <= market_sentiment <= 1.0):
101
+ raise ValueError("Invalid market_sentiment value")
102
+ if confidence is None or not (0.0 <= confidence <= 1.0):
103
+ raise ValueError("Invalid confidence value")
104
+
105
+ self.current_sentiment = float(market_sentiment)
106
+ self.sentiment_confidence = float(confidence)
107
+
108
+ # Normalize sentiment to 0-1 range for consistency
109
+ self.current_sentiment = (self.current_sentiment + 1.0) / 2.0
110
+
111
+ # Update history
112
+ self.sentiment_history.append({
113
+ 'sentiment': self.current_sentiment,
114
+ 'confidence': self.sentiment_confidence,
115
+ 'timestamp': self.current_step
116
+ })
117
+
118
+ logger.debug(f"Updated sentiment: {self.current_sentiment:.3f} (conf: {self.sentiment_confidence:.3f})")
119
+
120
  except Exception as e:
121
+ logger.warning(f"Error updating sentiment: {e}")
122
+ # Fallback to neutral sentiment
123
  self.current_sentiment = 0.5
124
  self.sentiment_confidence = 0.0
125
+ self.sentiment_history.append({
126
+ 'sentiment': 0.5,
127
+ 'confidence': 0.0,
128
+ 'timestamp': self.current_step
129
+ })
130
+
131
+ def _apply_sentiment_to_reward(self, original_reward: float, action: int,
132
+ info: Dict[str, Any]) -> float:
133
+ """Modify reward based on sentiment analysis with bounds checking"""
134
+ if self.sentiment_confidence < 0.3:
135
  return original_reward
136
 
137
+ try:
138
+ sentiment_multiplier = 1.0
139
+ sentiment_score = self.current_sentiment # 0-1 normalized
140
+
141
+ # Define action mappings (adjust based on your action space)
142
+ # Assuming: 0=hold, 1=buy, 2=sell, 3=close
143
+ bullish_threshold = 0.6
144
+ bearish_threshold = 0.4
145
+
146
+ if sentiment_score > bullish_threshold: # Bullish
147
+ if action == 1: # Buy
148
+ sentiment_multiplier += self.sentiment_influence * self.sentiment_confidence
149
+ elif action == 2: # Sell short
150
+ sentiment_multiplier -= self.sentiment_influence * 0.3 * self.sentiment_confidence
151
+ elif action == 3: # Close
152
+ sentiment_multiplier -= self.sentiment_influence * 0.2 * self.sentiment_confidence
153
+
154
+ elif sentiment_score < bearish_threshold: # Bearish
155
+ if action == 2: # Sell short
156
+ sentiment_multiplier += self.sentiment_influence * self.sentiment_confidence
157
+ elif action == 1: # Buy
158
+ sentiment_multiplier -= self.sentiment_influence * 0.5 * self.sentiment_confidence
159
+ elif action == 3: # Close
160
+ sentiment_multiplier += self.sentiment_influence * 0.3 * self.sentiment_confidence
161
+
162
+ # Apply trend momentum if enough history
163
+ trend_multiplier = self._calculate_sentiment_trend_multiplier()
164
+ sentiment_multiplier += trend_multiplier
165
+
166
+ # Clamp multiplier to reasonable bounds
167
+ sentiment_multiplier = np.clip(sentiment_multiplier, 0.5, 2.0)
168
+
169
+ enhanced_reward = original_reward * sentiment_multiplier
170
+
171
+ # Ensure reward doesn't become extreme
172
+ max_reward = abs(original_reward) * 2.5 if original_reward != 0 else 10.0
173
+ return np.clip(enhanced_reward, -max_reward, max_reward)
174
+
175
+ except Exception as e:
176
+ logger.error(f"Error in sentiment reward calculation: {e}")
177
+ return original_reward
178
+
179
+ def _calculate_sentiment_trend_multiplier(self) -> float:
180
+ """Calculate trend-based multiplier from sentiment history"""
181
+ if len(self.sentiment_history) < 10:
182
+ return 0.0
183
 
184
+ try:
185
+ # Get recent and previous sentiment values
186
+ recent_sentiments = [h['sentiment'] for h in list(self.sentiment_history)[-5:]]
187
+ prev_sentiments = [h['sentiment'] for h in list(self.sentiment_history)[-10:-5]]
188
+
189
+ recent_avg = np.mean(recent_sentiments)
190
+ prev_avg = np.mean(prev_sentiments)
191
+
192
+ trend = recent_avg - prev_avg
193
+ # Scale trend influence
194
+ trend_multiplier = np.tanh(trend * 5) * self.sentiment_influence * 0.3
195
+ return float(trend_multiplier)
196
+
197
+ except Exception as e:
198
+ logger.warning(f"Error calculating trend multiplier: {e}")
199
+ return 0.0
200
+
201
+ def _enhance_observation(self, original_observation: np.ndarray) -> np.ndarray:
202
+ """Enhance observation with sentiment information"""
203
+ if not self.use_sentiment or original_observation is None:
204
+ return original_observation
205
 
206
+ try:
207
+ # For now, return original observation
208
+ # Future: could concatenate sentiment as additional channels or metadata
209
+ return original_observation.copy()
210
+ except Exception as e:
211
+ logger.warning(f"Error enhancing observation: {e}")
212
+ return original_observation
213
+
214
+ def _get_safe_observation(self) -> np.ndarray:
215
+ """Get a safe default observation"""
216
+ try:
217
+ # Try to get current observation from base env
218
+ if hasattr(self, 'current_observation'):
219
+ return self.current_observation.copy()
220
+ # Return zero observation of expected shape
221
+ return np.zeros((84, 84, 4), dtype=np.float32)
222
+ except:
223
+ return np.zeros((84, 84, 4), dtype=np.float32)
224
+
225
+ def get_sentiment_analysis(self) -> Dict[str, Any]:
226
+ """Get detailed sentiment analysis with safety checks"""
227
  if not self.use_sentiment:
228
+ return {"error": "Sentiment analysis disabled", "sentiment": 0.5, "confidence": 0.0}
229
+
230
+ try:
231
+ trend_direction = self._calculate_sentiment_trend_direction()
232
+ return {
233
+ "current_sentiment": float(self.current_sentiment),
234
+ "sentiment_confidence": float(self.sentiment_confidence),
235
+ "sentiment_trend": trend_direction,
236
+ "influence_level": float(self.sentiment_influence),
237
+ "history_length": len(self.sentiment_history),
238
+ "update_freq": self.sentiment_update_freq,
239
+ "last_update_step": self.current_step
240
+ }
241
+ except Exception as e:
242
+ logger.error(f"Error in get_sentiment_analysis: {e}")
243
+ return {
244
+ "error": str(e),
245
+ "sentiment": 0.5,
246
+ "confidence": 0.0,
247
+ "trend": "unknown"
248
+ }
249
+
250
+ def _calculate_sentiment_trend_direction(self) -> str:
251
  """Calculate sentiment trend direction"""
252
  if len(self.sentiment_history) < 5:
253
+ return "insufficient_data"
 
 
 
254
 
255
+ try:
256
+ recent_avg = np.mean([h['sentiment'] for h in list(self.sentiment_history)[-5:]])
257
+ prev_avg = np.mean([h['sentiment'] for h in list(self.sentiment_history)[-10:-5]]) if len(self.sentiment_history) >= 10 else recent_avg
258
+
259
+ diff = recent_avg - prev_avg
260
+ if diff > 0.05:
261
+ return "bullish"
262
+ elif diff < -0.05:
263
+ return "bearish"
264
+ else:
265
+ return "neutral"
266
+ except:
267
+ return "error"
268
+
269
+ def reset(self) -> np.ndarray:
270
+ """Reset environment with sentiment state"""
271
+ try:
272
+ observation = super().reset()
273
+ self.current_step = 0
274
+ self.sentiment_history.clear()
275
+ self.current_sentiment = 0.5
276
+ self.sentiment_confidence = 0.0
277
+ logger.info("Environment reset with sentiment state")
278
+ return observation
279
+ except Exception as e:
280
+ logger.error(f"Error in reset: {e}")
281
+ # Force safe reset
282
+ super().reset()
283
+ self.current_step = 0
284
+ self.sentiment_history.clear()
285
+ return np.zeros((84, 84, 4), dtype=np.float32)
286
 
287
+ @property
288
+ def action_space_size(self) -> int:
289
+ """Get action space size from base environment"""
290
+ try:
291
+ return super().action_space.n if hasattr(super(), 'action_space') else 4
292
+ except:
293
+ return 4 # Default assumption