| import gym |
| from gym import spaces |
| from gym.utils import seeding |
| import numpy as np |
| from enum import Enum |
| import matplotlib.pyplot as plt |
|
|
|
|
| class Actions(Enum): |
| Sell = 0 |
| Buy = 1 |
| Do_nothing = 2 |
|
|
|
|
|
|
| class TradingEnv(gym.Env): |
|
|
| metadata = {'render.modes': ['human']} |
|
|
| def __init__(self, df, window_size, frame_bound): |
| assert df.ndim == 2 |
|
|
| assert len(frame_bound) == 2 |
| self.frame_bound = frame_bound |
|
|
| self.seed() |
| self.df = df |
| self.window_size = window_size |
| self.prices, self.signal_features = self._process_data() |
| self.shape = (window_size, self.signal_features.shape[1]) |
|
|
| |
| self.action_space = spaces.Discrete(len(Actions)) |
| self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=self.shape, dtype=np.float64) |
|
|
| |
| self._start_tick = self.window_size |
| self._end_tick = len(self.prices) - 1 |
| self._done = None |
| self._current_tick = None |
| self._last_trade_tick = None |
| self._position = None |
| self._position_history = None |
| self._total_reward = None |
| self._total_profit = None |
| self._first_rendering = None |
| self.history = None |
|
|
| |
| self.trade_fee_bid_percent = 0.0005 |
| self.trade_fee_ask_percent = 0.0005 |
|
|
|
|
| def seed(self, seed=None): |
| self.np_random, seed = seeding.np_random(seed) |
| return [seed] |
|
|
|
|
| def reset(self): |
| self._done = False |
| self._current_tick = self._start_tick |
| self._last_trade_tick = self._current_tick - 1 |
| self._position = 0 |
| self._position_history = (self.window_size * [None]) |
| |
| self._total_reward = 0. |
| self._total_profit = 0. |
| self.history = {} |
| return self._get_observation() |
|
|
|
|
| def _calculate_reward(self, action): |
| step_reward = 0 |
|
|
| current_price = self.prices[self._current_tick] |
| last_price = self.prices[self._current_tick - 1] |
| price_diff = current_price - last_price |
|
|
| |
| if action == Actions.Buy.value and self._position == 0: |
| self._position = 1 |
| step_reward += price_diff |
| self._last_trade_tick = self._current_tick - 1 |
| self._position_history.append(1) |
|
|
| elif action == Actions.Buy.value and self._position > 0: |
| step_reward += 0 |
| self._position_history.append(-1) |
| |
| elif action == Actions.Buy.value and self._position < 0: |
| self._position = 0 |
| step_reward += -1 * (self.prices[self._current_tick -1] - self.prices[self._last_trade_tick]) |
| self._total_profit += step_reward |
| self._position_history.append(4) |
|
|
| |
| elif action == Actions.Sell.value and self._position == 0: |
| self._position = -1 |
| step_reward += -1 * price_diff |
| self._last_trade_tick = self._current_tick - 1 |
| self._position_history.append(3) |
| |
| elif action == Actions.Sell.value and self._position > 0: |
| self._position = 0 |
| step_reward += self.prices[self._current_tick -1] - self.prices[self._last_trade_tick] |
| self._total_profit += step_reward |
| self._position_history.append(2) |
| elif action == Actions.Sell.value and self._position < 0: |
| step_reward += 0 |
| self._position_history.append(-1) |
|
|
| |
| elif action == Actions.Do_nothing.value and self._position > 0: |
| step_reward += price_diff |
| self._position_history.append(0) |
| elif action == Actions.Do_nothing.value and self._position < 0: |
| step_reward += -1 * price_diff |
| self._position_history.append(0) |
| elif action == Actions.Do_nothing.value and self._position == 0: |
| step_reward += -1 * abs(price_diff) |
| self._position_history.append(0) |
|
|
| return step_reward |
|
|
|
|
| def step(self, action): |
| self._done = False |
| self._current_tick += 1 |
|
|
| if self._current_tick == self._end_tick: |
| self._done = True |
|
|
| step_reward = self._calculate_reward(action) |
| self._total_reward += step_reward |
|
|
| observation = self._get_observation() |
| info = dict( |
| total_reward = self._total_reward, |
| total_profit = self._total_profit, |
| position = self._position |
| ) |
| self._update_history(info) |
|
|
| return observation, step_reward, self._done, info |
|
|
|
|
| def _get_observation(self): |
| return self.signal_features[(self._current_tick-self.window_size+1):self._current_tick+1] |
|
|
|
|
| def _update_history(self, info): |
| if not self.history: |
| self.history = {key: [] for key in info.keys()} |
|
|
| for key, value in info.items(): |
| self.history[key].append(value) |
|
|
|
|
| def render(self, mode='human'): |
| window_ticks = np.arange(len(self._position_history)) |
| plt.plot(self.prices) |
|
|
| open_buy = [] |
| close_buy = [] |
| open_sell = [] |
| close_sell = [] |
| do_nothing = [] |
|
|
| for i, tick in enumerate(window_ticks): |
| if self._position_history[i] is None: |
| continue |
|
|
| if self._position_history[i] == 1: |
| open_buy.append(tick) |
| elif self._position_history[i] == 2 : |
| close_buy.append(tick) |
| elif self._position_history[i] == 3 : |
| open_sell.append(tick) |
| elif self._position_history[i] == 4 : |
| close_sell.append(tick) |
| elif self._position_history[i] == 0 : |
| do_nothing.append(tick) |
|
|
| plt.plot(open_buy, self.prices[open_buy], 'go', marker="^") |
| plt.plot(close_buy, self.prices[close_buy], 'go', marker="v") |
| plt.plot(open_sell, self.prices[open_sell], 'ro', marker="v") |
| plt.plot(close_sell, self.prices[close_sell], 'ro', marker="^") |
| |
| plt.plot(do_nothing, self.prices[do_nothing], 'yo') |
|
|
| plt.suptitle( |
| "Total Reward: %.6f" % self._total_reward + ' ~ ' + |
| "Total Profit: %.6f" % self._total_profit |
| ) |
| |
| |
| def close(self): |
| plt.close() |
|
|
|
|
| def save_rendering(self, filepath): |
| plt.savefig(filepath) |
|
|
|
|
| def pause_rendering(self): |
| plt.show() |
|
|
|
|
| def _process_data(self): |
| prices = self.df.loc[:, 'Close'].to_numpy() |
|
|
| prices[self.frame_bound[0] - self.window_size] |
| prices = prices[self.frame_bound[0]-self.window_size:self.frame_bound[1]] |
|
|
| diff = np.insert(np.diff(prices), 0, 0) |
| signal_features = np.column_stack((prices, diff)) |
|
|
| return prices, signal_features |
|
|
|
|
| def _update_profit(self, action): |
| trade = False |
| if ((action == Actions.Buy.value and self._position == Positions.Short) or |
| (action == Actions.Sell.value and self._position == Positions.Long)): |
| trade = True |
|
|
| if trade or self._done: |
| current_price = self.prices[self._current_tick] |
| last_trade_price = self.prices[self._last_trade_tick] |
|
|
| if self._position == Positions.Long: |
| shares = (self._total_profit * (1 - self.trade_fee_ask_percent)) / last_trade_price |
| self._total_profit = (shares * (1 - self.trade_fee_bid_percent)) * current_price |
|
|
|
|
| def max_possible_profit(self): |
| current_tick = self._start_tick |
| last_trade_tick = current_tick - 1 |
| profit = 1. |
|
|
| while current_tick <= self._end_tick: |
| position = None |
| if self.prices[current_tick] < self.prices[current_tick - 1]: |
| while (current_tick <= self._end_tick and |
| self.prices[current_tick] < self.prices[current_tick - 1]): |
| current_tick += 1 |
| position = Positions.Short |
| else: |
| while (current_tick <= self._end_tick and |
| self.prices[current_tick] >= self.prices[current_tick - 1]): |
| current_tick += 1 |
| position = Positions.Long |
|
|
| if position == Positions.Long: |
| current_price = self.prices[current_tick - 1] |
| last_trade_price = self.prices[last_trade_tick] |
| shares = profit / last_trade_price |
| profit = shares * current_price |
| last_trade_tick = current_tick - 1 |
|
|
| return profit |
|
|