load-test-queue / app.py
omerXfaruq's picture
Update app.py
4cf74c3
import asyncio
import json
import random
import time
import websockets
import sys
import gradio as gr
host = "wss://spaces.huggingface.tech/farukozderim/test-new-queue/queue/join"
#duration_list = []
success = 0
async def startup(client_count, interval_duration):
begin = time.time()
await asyncio.gather(*[client(i, interval_duration) for i in range(client_count)])
end = time.time()
return f"Total-duration: {round(end-begin,3)}, success: {success} out of {client_count}"
async def client(rank, interval_duration):
await asyncio.sleep(rank*interval_duration) # Server cannot handle a lot of instantanous conns
async with websockets.connect(host, timeout=10000) as websocket:
start = time.time()
while True:
raw_msg = await websocket.recv()
jso = json.loads(raw_msg)
msg = jso["msg"]
if msg == "send_data":
await websocket.send('{"data": ["text2"]}')
elif msg == "estimation":
#print(jso)
pass
elif msg == "process_starts":
pass
elif msg == "process_completed":
end = time.time()
duration = end - start
if jso["success"]:
global success
success+=1
return
describe = f'Sends x number of events(requests) with y second intervals to another space, https://huggingface.co/spaces/farukozderim/test-new-queue'
gr.Interface(startup, [gr.Number(precision=0), gr.Number(precision=0)], "text", description=describe).launch()