dashboard / test /dashboard.py
hasannyr's picture
Added rl-based test dashboard
177875a
import solara
import pandas as pd
import os
import zipfile
from ray.tune.registry import register_env
from env import Teastore
from ray.rllib.algorithms import ppo, sac, dqn
from solara.components.file_drop import FileInfo
import time
test_plot_data = solara.reactive({'step': [], 'replica': [], 'cpu': [], "load": [],
"num_request": [], "response_time": []})
uploaded_algo = solara.reactive(None)
error_state = solara.reactive(None)
number_of_steps = solara.reactive(10)
available_checkpoint_names = solara.reactive([])
selected_checkpoint_name = solara.reactive(None)
uploaded_algo_status = solara.reactive(False)
@solara.component
def status_plot(data):
options_replica = {
"xAxis": {
"type": "category",
"data": data["step"],
},
"yAxis": {
"type": "value",
},
"series": [
{
"name": "Replica",
"data": data['replica'],
"type": 'line'
},
],
"title": {
"text": 'Replica Number',
"left": "center"
},
"legend": {
"orient": 'vertical',
"right": 0,
# "top": 50,
# "bottom": 50,
"data": ["Replica"]
},
}
options_cpu= {
"xAxis": {
"type": "category",
"data": data["step"],
},
"yAxis": {
"type": "value",
},
"series": [
{
"name": "CPU",
"data": data['cpu'],
"type": 'line'
},
],
"title": {
"text": 'CPU Limit',
"left": "center"
},
"legend": {
"orient": 'vertical',
"right": 0,
# "top": 50,
# "bottom": 50,
"data": ["CPU"]
},
}
options_load= {
"xAxis": {
"type": "category",
"data": data["step"],
},
"yAxis": {
"type": "value",
},
"series": [
{
"name": "Processed req",
"data": data['num_request'],
"type": 'line'
},
{
"name": "Load",
"data": data['load'],
"type": 'line'
},
],
"title": {
"text": 'Number of Request (Tps) and Load (Tps)',
"left": "center"
},
"legend": {
"orient": 'vertical',
"right": 0,
# "top": 50,
# "bottom": 50,
"data": ["Processed req", "Load"]
},
}
options_response_time= {
"xAxis": {
"type": "category",
"data": data["step"],
},
"yAxis": {
"type": "value",
},
"series": [
{
"name": "Response time",
"data": data['response_time'],
"type": 'line'
},
],
"title": {
"text": 'Response time (ms)',
"left": "center"
},
"legend": {
"orient": 'vertical',
"right": 0,
# "top": 50,
# "bottom": 50,
"data": ["Response time"]
},
}
with solara.GridFixed(columns=1):
# with solara.Column():
solara.FigureEcharts(option=options_replica)
solara.FigureEcharts(option=options_cpu)
solara.FigureEcharts(option=options_load)
solara.FigureEcharts(option=options_response_time)
@solara.component
def CheckpointDrop():
zip_content, set_zip_content = solara.use_state("")
content, set_content = solara.use_state(b"")
filename, set_filename = solara.use_state("")
size, set_size = solara.use_state(0)
extract_path, set_extract_path = solara.use_state("")
def on_file(f: FileInfo):
set_filename(f["name"])
set_size(f["size"])
temp_path = os.path.join(f["name"])
with open(temp_path, "wb") as temp_file:
temp_file.write(f["file_obj"].read())
extracted_folder = os.path.join("extracted_files", os.path.splitext(f["name"])[0])
with zipfile.ZipFile(temp_path, 'r') as zip_ref:
zip_ref.extractall(extracted_folder)
set_extract_path(extracted_folder)
extracted_files = os.listdir(extracted_folder)
set_zip_content("\n".join(extracted_files))
# find the names of the checkpoint folders
#existing_names = available_checkpoint_names.value
# updated_names = set(new_names + existing_names)
available_checkpoint_names.set(['denee'])
os.remove(temp_path)
solara.FileDrop(
label="Drag and drop a file here",
on_file=on_file,
lazy=True, # We will only read the first 100 bytes
)
@solara.component
def ListAvailableCheckpoints():
list_subfolders_names = [f.name for f in os.scandir("extracted_files") if f.is_dir()]
available_checkpoint_names.set(list_subfolders_names)
def load_agent():
if selected_checkpoint_name.value is None:
return None
register_env("teastore", lambda config: Teastore())
config_dqn = (
dqn.DQNConfig()
.environment(env="teastore")
.rollouts(num_rollout_workers=1, enable_connectors=False, num_envs_per_worker=1)
.resources(num_gpus=0, num_cpus_per_worker=1)
.training(train_batch_size=256, model={"fcnet_hiddens": [32, 32]})
)
algo = config_dqn.build()
checkpoint_dir = selected_checkpoint_name.value
checkpoint_path = os.path.join("extracted_files", checkpoint_dir)
algo.restore(checkpoint_path)
return algo
def start_test():
env = Teastore()
obs, info = env.reset()
done = False
truncated = False
sum_reward = 0
step_list = []
replica_array = []
cpu_array = []
num_request_array = []
load_array = []
response_time_array = []
for i in range(number_of_steps.value):
step_list.append(i)
replica_array.append(obs[0])
cpu_array.append(obs[1])
load_array.append(env.load)
response_time_array.append(env.response_time)
num_request_array.append(env.num_request)
action = uploaded_algo.value.compute_single_action(obs, explore=False)
next_state, reward, _, _, _ = env.step(action)
obs = next_state
test_plot_data.set(
{'step':step_list.copy(),
'replica': replica_array.copy(),
'cpu': cpu_array.copy(),
"load": load_array.copy(),
"response_time": response_time_array.copy(),
"num_request": num_request_array.copy()
})
# time.sleep(2)
def load_test():
algo = load_agent()
if algo is None:
error_state.set('Couldnt load checkpoint')
else:
uploaded_algo_status.set(True)
uploaded_algo.set(algo)
@solara.component
def Page():
with solara.Sidebar():
if error_state.value is not None:
solara.Error(label=f'{error_state.value}')
# CheckpointDrop()
ListAvailableCheckpoints()
solara.Select(label="Choose checkpoint", values=available_checkpoint_names.value, value=selected_checkpoint_name.value,
on_value=selected_checkpoint_name.set
)
solara.Button(label="Run test", on_click=start_test, disabled=True if uploaded_algo.value is None else False)
solara.Button(label="Load agent", on_click=load_test, disabled=True if selected_checkpoint_name.value is None else False)
if uploaded_algo_status.value == False:
solara.Info("Agent is not uploaded")
else:
solara.Info("Agent is ready for test")
solara.SliderInt(label="choose number of steps", min=1, max=500, value=number_of_steps)
status_plot(test_plot_data.value)