File size: 12,870 Bytes
f87de7a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 |
import numpy as np
from concurrent.futures import ThreadPoolExecutor
import time
import json
import tritonclient.grpc as grpcclient
from tritonclient.utils import *
import queue
from functools import partial
import random
run_multiple_tests = False
resp_list = []
scode_breakup = {}
def np_to_server_dtype(np_dtype):
if np_dtype == bool:
return "BOOL"
elif np_dtype == np.int8:
return "INT8"
elif np_dtype == np.int16:
return "INT16"
elif np_dtype == np.int32:
return "INT32"
elif np_dtype == np.int64:
return "INT64"
elif np_dtype == np.uint8:
return "UINT8"
elif np_dtype == np.uint16:
return "UINT16"
elif np_dtype == np.uint32:
return "UINT32"
elif np_dtype == np.uint64:
return "UINT64"
elif np_dtype == np.float16:
return "FP16"
elif np_dtype == np.float32:
return "FP32"
elif np_dtype == np.float64:
return "FP64"
elif np_dtype == np.object_ or np_dtype.type == np.bytes_:
return "BYTES"
return None
class UserData:
def __init__(self):
self._completed_requests = queue.Queue()
def callback(user_data, result, error):
if error:
user_data._completed_requests.put(error)
else:
user_data._completed_requests.put(result)
def prepare_tensor(name: str, data: np.ndarray):
server_input = grpcclient.InferInput(name=name, shape=data.shape,
datatype=np_to_server_dtype(data.dtype))
server_input.set_data_from_numpy(data)
return server_input
def process_and_send_request(sample_request):
prompt = sample_request['prompt']
negative_prompt = sample_request['negative_prompt'] if 'negative_prompt' in sample_request else None
height = sample_request['height'] if 'height' in sample_request else None
width = sample_request['width'] if 'width' in sample_request else None
num_images_per_prompt = sample_request['num_images_per_prompt'] if 'num_images_per_prompt' in sample_request else 1
num_inference_steps = sample_request['num_inference_steps'] if 'num_inference_steps' in sample_request else 20
image = sample_request['image'] if 'image' in sample_request else None
mask_image = sample_request['mask_image'] if 'mask_image' in sample_request else None
control_images = sample_request['control_images'] if 'control_images' in sample_request else None
control_weightages = sample_request['control_weightages'] if 'control_weightages' in sample_request else None
control_modes = sample_request['control_modes'] if 'control_modes' in sample_request else None
seed = sample_request['seed'] if 'seed' in sample_request else -1
guidance_scale = sample_request['guidance_scale'] if 'guidance_scale' in sample_request else 7.5
strength = sample_request['strength'] if 'strength' in sample_request else 1
scheduler = sample_request['scheduler'] if 'scheduler' in sample_request else "EULER-A"
model_type = sample_request['model_type'] if 'model_type' in sample_request else None
lora_weights = sample_request['lora_weights'] if 'lora_weights' in sample_request else None
control_guidance_start = sample_request['control_guidance_start'] if 'control_guidance_start' in sample_request else None
control_guidance_end = sample_request['control_guidance_end'] if 'control_guidance_end' in sample_request else None
inputs = []
inputs.append(prepare_tensor("prompt", np.array([prompt], dtype=np.object_)))
if negative_prompt is not None:
inputs.append(prepare_tensor("negative_prompt", np.array([negative_prompt], dtype=np.object_)))
if height is not None:
inputs.append(prepare_tensor("height", np.array([height], dtype=np.int32)))
if width is not None:
inputs.append(prepare_tensor("width", np.array([width], dtype=np.int32)))
if num_images_per_prompt is not None:
inputs.append(prepare_tensor("num_images_per_prompt", np.array([num_images_per_prompt], dtype=np.int32)))
if num_inference_steps is not None:
inputs.append(prepare_tensor("num_inference_steps", np.array([num_inference_steps], dtype=np.int32)))
if image is not None:
inputs.append(prepare_tensor("image", np.array([image], dtype=np.object_)))
if mask_image is not None:
inputs.append(prepare_tensor("mask_image", np.array([mask_image], dtype=np.object_)))
if seed is not None:
inputs.append(prepare_tensor("seed", np.array([seed], dtype=np.int64)))
if guidance_scale is not None:
inputs.append(prepare_tensor("guidance_scale", np.array([guidance_scale], dtype=np.float32)))
if model_type is not None:
inputs.append(prepare_tensor("model_type", np.array([model_type], dtype=np.object_)))
if strength is not None:
inputs.append(prepare_tensor("strength", np.array([strength], dtype=np.float32)))
if scheduler is not None:
inputs.append(prepare_tensor("scheduler", np.array([scheduler], dtype=np.object_)))
if control_images is not None:
inputs.append(prepare_tensor("control_images", np.array([control_images], dtype=np.object_)))
if control_weightages is not None:
inputs.append(prepare_tensor("control_weightages", np.array([control_weightages], dtype=np.float32)))
if control_modes is not None:
inputs.append(prepare_tensor("control_modes", np.array([control_modes], dtype=np.int32)))
if lora_weights is not None:
inputs.append(prepare_tensor("lora_weights", np.array([lora_weights], dtype=np.object_)))
if control_guidance_start is not None:
inputs.append(prepare_tensor("control_guidance_start", np.array([control_guidance_start], dtype=np.float32)))
if control_guidance_end is not None:
inputs.append(prepare_tensor("control_guidance_end", np.array([control_guidance_end], dtype=np.float32)))
outputs = [
grpcclient.InferRequestedOutput("response_id"),
grpcclient.InferRequestedOutput("time_taken"),
grpcclient.InferRequestedOutput("load_lora"),
grpcclient.InferRequestedOutput("output_image_urls"),
grpcclient.InferRequestedOutput("error"),
# grpcclient.InferRequestedOutput("mega_pixel")
]
user_data = UserData()
st = time.time()
mega_pixel = 0
url = "localhost:8002"
with grpcclient.InferenceServerClient(url=url, ssl=False) as triton_client:
triton_client.start_stream(callback=partial(callback, user_data))
triton_client.async_stream_infer(
model_name="flux",
inputs=inputs,
outputs=outputs,
)
et = time.time()
response = user_data._completed_requests.get()
print(response)
# Check if response is an error (InferenceServerException)
if hasattr(response, 'message'):
# This is an error response
print(f"Server error: {response}")
output_image_urls = []
inference_time = 0
lora_time = 0
response_id = None
mega_pixel = 0
error = str(response)
sCode = 500
else:
# This is a successful response
try:
inference_time = 0
lora_time = 0
response_id = None
inference_time = response.as_numpy("time_taken").item()
lora_time = response.as_numpy("load_lora").item()
response_id = response.as_numpy("response_id").item().decode() if response.as_numpy("response_id").item() else None
output_image_urls = response.as_numpy("output_image_urls").tolist() if response.as_numpy("output_image_urls") is not None else []
mega_pixel = response.as_numpy("mega_pixel").item().decode() if response.as_numpy("mega_pixel") is not None else "0"
error_tensor = response.as_numpy("error")
error = error_tensor.item().decode() if error_tensor is not None and error_tensor.item() else None
sCode = 200
except Exception as e:
print(f"Error processing response: {e}")
output_image_urls = []
inference_time = 0
lora_time = 0
response_id = None
mega_pixel = 0
error = str(e)
sCode = 500
results = {
"response_id": response_id,
"total_time_taken": et-st,
"inference_time_taken": inference_time,
"loading_lora_time": lora_time,
"output_image_urls": output_image_urls,
"error": error,
"mega_pixel": 0 if mega_pixel is None else mega_pixel
}
print(results)
if output_image_urls == []:
print("No images generated")
results["error"] = "No images generated"
return results
def warmup_and_load_lora(warmup_json_path):
if warmup_json_path is None:
return False
with open(warmup_json_path, 'r') as f:
warmup_data = json.load(f)
st = time.time()
for request in warmup_data:
process_and_send_request(request)
resp_time = time.time()-st
print(f"Warmup and load lora done in {resp_time:.3f} seconds")
return True
def generate_jitter_window():
percent_bifer = random.randint(1,100)
if percent_bifer >= 1 and percent_bifer <= 50:
jitter_window = [1, 5]
elif percent_bifer >= 51 and percent_bifer <= 75:
jitter_window = [10, 15]
else:
jitter_window = [20,30]
time.sleep(random.randint(jitter_window[0],jitter_window[1]))
return True
def predict(requests_data,percent_bifer):
random_request = random.choice(requests_data)
sample_request = random_request['payload']
generate_jitter_window()
return process_and_send_request(sample_request)
def run_single_test(requests_data,id = 0):
return predict(requests_data,id)
number_of_users = 1 #change here for concurrent users
duration_minutes = 2
def run_concurrent_tests_cont(number_of_users, duration_minutes):
start_time = time.time()
end_time = start_time + duration_minutes * 60
results = []
with ThreadPoolExecutor(max_workers=number_of_users) as executor:
future_to_start_time = {}
while time.time() < end_time:
# Submit new tasks continuously
percent_bifer = random.randint(1,10)
if len(future_to_start_time) < number_of_users:
future = executor.submit(run_single_test, requests_data)
future_to_start_time[future] = time.time()
# Process completed tasks and replace them
done_futures = [f for f in future_to_start_time if f.done()]
for future in done_futures:
response_time = future.result()
results.append(response_time)
del future_to_start_time[future]
# Wait for any remaining tasks to finish
for future in future_to_start_time:
results.append(future.result())
p25 = np.percentile(results, 25)
p50 = np.percentile(results, 50)
p90 = np.percentile(results, 90)
p99 = np.percentile(results, 99)
avg = sum(results) / len(results)
with open(f"result_dump_{number_of_users}_{duration_minutes}.json", "w") as f:
f.write(
json.dumps(resp_list, indent=4)
)
return p25, p50, p90, p99, avg
if run_multiple_tests:
p25_result , p50_results, p90_resutls, p99_results, avg = run_concurrent_tests_cont(number_of_users,duration_minutes)
load_lora_time = warmup_and_load_lora(requests_data)
print(f"25th Percentile: {p25_result:.3f} seconds")
print(f"50th Percentile: {p50_results:.3f} seconds")
print(f"90th Percentile: {p90_resutls:.3f} seconds")
print(f"99th Percentile: {p99_results:.3f} seconds")
print(f"Average Response Time: {avg:.3f} seconds")
with open(f"test_results.json", "w") as f:
f.write(
json.dumps({
"p25": p25_result,
"p50": p50_results,
"p90": p90_resutls,
"p99": p99_results,
"avg": avg,
"sCode_breakup": scode_breakup
}, indent=4)
)
else:
payload = {
"prompt": "A girl in city, 25 years old, cool, futuristic <lora:https://huggingface.co/XLabs-AI/flux-lora-collection/resolve/main/art_lora.safetensors:0.5>",
"negative_prompt": "blurry, low quality, distorted",
"height": 1024,
"width": 1024,
"num_images_per_prompt": 1,
"num_inference_steps": 20,
"seed": 42424243,
"guidance_scale": 7.0,
"model_type": "txt2img"
}
result = process_and_send_request(payload)
print(result) |