| |
|
|
| from ray.rllib.env.policy_client import PolicyClient |
| import pandas as pd |
| from prometheus_api_client import PrometheusConnect |
| from kubernetes import client, config |
| import time |
| import numpy as np |
| from collections import OrderedDict |
| from gymnasium.spaces import Discrete, Dict, MultiDiscrete, Tuple, Box |
| import ssl |
| import random |
| import logging |
| ssl._create_default_https_context = ssl._create_unverified_context |
| from itertools import product |
| import time |
| import gymnasium as gym |
| import math |
|
|
|
|
|
|
| class Teastore(gym.Env): |
| DATA_PATH = "./all_load_mpa_cpu_and_performance_without_average.csv" |
| MAX_STEPS = 500 |
|
|
|
|
| def __init__(self) -> None: |
| self.data = pd.read_csv(self.DATA_PATH) |
| |
| |
| self.action_space = Discrete(5) |
| self.observation_space = Box(low=np.array([1, 4, 0, 0]), high=np.array([3, 9, 1000,1000]), dtype=np.float32) |
| self.count = 0 |
| self.info = {} |
| self.previous_tps = 0 |
| self.idx = 0 |
| self.up = None |
| self.load = 0 |
| self.response_time = 0 |
| self.num_request = 0 |
|
|
|
|
|
|
|
|
| def find_next_state(self, target, expected_tps): |
| if expected_tps == 144: |
| self.up = False |
| elif expected_tps == 24: |
| self.up = True |
| |
| if self.up == True: |
| new_expected_tps = expected_tps + 24 |
| elif self.up == False: |
| new_expected_tps = expected_tps - 24 |
|
|
| new_previous_tps = expected_tps |
| |
| |
| next = np.concatenate([target, [new_previous_tps, new_expected_tps]]) |
| equal_rows = np.all(self.data.loc[:, ["replica", "cpu", "previous_tps", "expected_tps"]].values == next, axis=1) |
| matched_indexes = np.where(equal_rows)[0] |
| return matched_indexes.tolist(), new_previous_tps, new_expected_tps |
| |
|
|
| |
| def reset(self, *, seed=None, options=None): |
| self.idx = random.randint(0, len(self.data)-1) |
| self.state = np.array(self.data.loc[self.idx, ["replica", "cpu", 'previous_tps', "expected_tps"]]) |
| |
| self.previous_tps = self.state[2] |
| self.truncated = False |
| self.terminated = False |
| self.reward = 0 |
| self.count = 0 |
| self.info = {} |
| self.up = True if self.state[3] - self.state[2] > 0 else False |
| self.load = self.state[-1] |
| self.response_time = self.data.loc[self.idx, "response_time"] |
| self.num_request = self.data.loc[self.idx, "num_request"] |
| return self.state, self.info |
| |
| def step(self, action): |
| selected_row_idx = 0 |
| self.count += 1 |
|
|
| if action == 0: |
| temp_state = self.state[0:2] + np.array([0, 0]) |
| elif action == 1: |
| temp_state = self.state[0:2] + np.array([1, 0]) |
| elif action == 2: |
| temp_state = self.state[0:2] + np.array([-1, 0]) |
| elif action == 3: |
| temp_state = self.state[0:2] + np.array([0, 1]) |
| else: |
| temp_state = self.state[0:2] + np.array([0 , -1]) |
| |
|
|
|
|
| idx, new_previous_tps, new_expected_tps = self.find_next_state(temp_state, self.state[3]) |
|
|
| if idx: |
| selected_row_idx = random.choice(idx) |
| selected_data = self.data.iloc[selected_row_idx] |
| self.state = np.array(selected_data[["replica", "cpu", 'previous_tps',"expected_tps"]]) |
| self.reward = selected_data['reward'] |
| |
| |
| self.previous_tps = selected_data["expected_tps"] |
| self.num_request = self.data.loc[selected_row_idx, "num_request"] |
| self.response_time = self.data.loc[selected_row_idx, "response_time"] |
| |
| else: |
| self.state[2] = new_previous_tps |
| self.state[3] = new_expected_tps |
| self.previous_tps = new_expected_tps |
| self.reward = -5 |
| self.num_request = 0 |
| self.response_time = 200 |
|
|
| self.load = self.state[-1] |
| |
| |
|
|
| self.terminated = (self.count >= self.MAX_STEPS) |
| self.truncated = self.terminated |
| return self.state, self.reward, self.terminated, self.truncated, self.info |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|