speedtest / main.py
triflix's picture
Update main.py
6afa3c4 verified
from fastapi import FastAPI, Request, BackgroundTasks
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse, JSONResponse
import speedtest
import time
import uuid
import os
from pydantic import BaseModel
app = FastAPI(title="Internet Speed Test")
# Create templates directory if it doesn't exist
os.makedirs("templates", exist_ok=True)
# Mount static files
app.mount("/static", StaticFiles(directory="static"), name="static")
# Templates
templates = Jinja2Templates(directory="templates")
# Store test results
test_results = {}
class SpeedTestResult(BaseModel):
download_mbps: float
upload_mbps: float
ping: float
download_MBps: float
upload_MBps: float
status: str
progress: int = 0
@app.get("/", response_class=HTMLResponse)
async def home(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
def run_speed_test(test_id: str):
try:
# Update progress
test_results[test_id].progress = 10
test_results[test_id].status = "Initializing..."
# Initialize speedtest
st = speedtest.Speedtest()
# Get best server
test_results[test_id].status = "Finding best server..."
test_results[test_id].progress = 20
st.get_best_server()
# Measure download speed
test_results[test_id].status = "Testing download speed..."
test_results[test_id].progress = 30
start_time = time.time()
download_speed = st.download()
end_time = time.time()
# Calculate download speed in MB/s
download_speed_MBps = (download_speed / 8) / 1_000_000
test_results[test_id].download_MBps = round(download_speed_MBps, 2)
test_results[test_id].download_mbps = round(download_speed / 1_000_000, 2)
test_results[test_id].progress = 60
# Measure upload speed
test_results[test_id].status = "Testing upload speed..."
start_time = time.time()
upload_speed = st.upload()
end_time = time.time()
# Calculate upload speed in MB/s
upload_speed_MBps = (upload_speed / 8) / 1_000_000
test_results[test_id].upload_MBps = round(upload_speed_MBps, 2)
test_results[test_id].upload_mbps = round(upload_speed / 1_000_000, 2)
test_results[test_id].progress = 90
# Get ping
ping = st.results.ping
test_results[test_id].ping = round(ping, 2)
# Update status
test_results[test_id].status = "Complete"
test_results[test_id].progress = 100
except Exception as e:
test_results[test_id].status = f"Error: {str(e)}"
test_results[test_id].progress = -1
@app.post("/start-test")
async def start_test(background_tasks: BackgroundTasks):
test_id = str(uuid.uuid4())
test_results[test_id] = SpeedTestResult(
download_mbps=0,
upload_mbps=0,
ping=0,
download_MBps=0,
upload_MBps=0,
status="Starting...",
progress=0
)
background_tasks.add_task(run_speed_test, test_id)
return {"test_id": test_id}
@app.get("/test-status/{test_id}")
async def test_status(test_id: str):
if test_id not in test_results:
return JSONResponse(status_code=404, content={"error": "Test not found"})
return test_results[test_id]
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)