from speedtest import Speedtest, ConfigRetrievalError
from .. import LOGGER
from ..helper.telegram_helper.message_utils import (
send_message,
edit_message,
delete_message,
)
from ..helper.ext_utils.bot_utils import new_task, sync_to_async
from ..helper.ext_utils.status_utils import get_readable_file_size
@new_task
async def speedtest(_, message):
speed = await send_message(message, "Initiating Speedtest...")
try:
speed_results = await sync_to_async(Speedtest)
await sync_to_async(speed_results.get_best_server)
await sync_to_async(speed_results.download)
await sync_to_async(speed_results.upload)
except ConfigRetrievalError:
await edit_message(
speed,
"ERROR: Can't connect to Server at the Moment, Try Again Later !",
)
return
speed_results.results.share()
result = speed_results.results.dict()
string_speed = f"""
➲ SPEEDTEST INFO
┠ Upload: {get_readable_file_size(result['upload'] / 8)}/s
┠ Download: {get_readable_file_size(result['download'] / 8)}/s
┠ Ping: {result['ping']} ms
┠ Time: {result['timestamp']}
┠ Data Sent: {get_readable_file_size(int(result['bytes_sent']))}
┖ Data Received: {get_readable_file_size(int(result['bytes_received']))}
➲ SPEEDTEST SERVER
┠ Name: {result['server']['name']}
┠ Country: {result['server']['country']}, {result['server']['cc']}
┠ Sponsor: {result['server']['sponsor']}
┠ Latency: {result['server']['latency']}
┠ Latitude: {result['server']['lat']}
┖ Longitude: {result['server']['lon']}
"""
try:
await send_message(message, string_speed, photo=result["share"])
await delete_message(speed)
except Exception as e:
LOGGER.error(str(e))
await edit_message(speed, string_speed)