|
|
from fastapi import FastAPI, Request, BackgroundTasks |
|
|
from fastapi.templating import Jinja2Templates |
|
|
from fastapi.staticfiles import StaticFiles |
|
|
from fastapi.responses import HTMLResponse, JSONResponse |
|
|
import speedtest |
|
|
import time |
|
|
import uuid |
|
|
import os |
|
|
from pydantic import BaseModel |
|
|
|
|
|
app = FastAPI(title="Internet Speed Test") |
|
|
|
|
|
|
|
|
os.makedirs("templates", exist_ok=True) |
|
|
|
|
|
|
|
|
app.mount("/static", StaticFiles(directory="static"), name="static") |
|
|
|
|
|
|
|
|
templates = Jinja2Templates(directory="templates") |
|
|
|
|
|
|
|
|
test_results = {} |
|
|
|
|
|
class SpeedTestResult(BaseModel): |
|
|
download_mbps: float |
|
|
upload_mbps: float |
|
|
ping: float |
|
|
download_MBps: float |
|
|
upload_MBps: float |
|
|
status: str |
|
|
progress: int = 0 |
|
|
|
|
|
@app.get("/", response_class=HTMLResponse) |
|
|
async def home(request: Request): |
|
|
return templates.TemplateResponse("index.html", {"request": request}) |
|
|
|
|
|
def run_speed_test(test_id: str): |
|
|
try: |
|
|
|
|
|
test_results[test_id].progress = 10 |
|
|
test_results[test_id].status = "Initializing..." |
|
|
|
|
|
|
|
|
st = speedtest.Speedtest() |
|
|
|
|
|
|
|
|
test_results[test_id].status = "Finding best server..." |
|
|
test_results[test_id].progress = 20 |
|
|
st.get_best_server() |
|
|
|
|
|
|
|
|
test_results[test_id].status = "Testing download speed..." |
|
|
test_results[test_id].progress = 30 |
|
|
start_time = time.time() |
|
|
download_speed = st.download() |
|
|
end_time = time.time() |
|
|
|
|
|
|
|
|
download_speed_MBps = (download_speed / 8) / 1_000_000 |
|
|
test_results[test_id].download_MBps = round(download_speed_MBps, 2) |
|
|
test_results[test_id].download_mbps = round(download_speed / 1_000_000, 2) |
|
|
test_results[test_id].progress = 60 |
|
|
|
|
|
|
|
|
test_results[test_id].status = "Testing upload speed..." |
|
|
start_time = time.time() |
|
|
upload_speed = st.upload() |
|
|
end_time = time.time() |
|
|
|
|
|
|
|
|
upload_speed_MBps = (upload_speed / 8) / 1_000_000 |
|
|
test_results[test_id].upload_MBps = round(upload_speed_MBps, 2) |
|
|
test_results[test_id].upload_mbps = round(upload_speed / 1_000_000, 2) |
|
|
test_results[test_id].progress = 90 |
|
|
|
|
|
|
|
|
ping = st.results.ping |
|
|
test_results[test_id].ping = round(ping, 2) |
|
|
|
|
|
|
|
|
test_results[test_id].status = "Complete" |
|
|
test_results[test_id].progress = 100 |
|
|
|
|
|
except Exception as e: |
|
|
test_results[test_id].status = f"Error: {str(e)}" |
|
|
test_results[test_id].progress = -1 |
|
|
|
|
|
@app.post("/start-test") |
|
|
async def start_test(background_tasks: BackgroundTasks): |
|
|
test_id = str(uuid.uuid4()) |
|
|
test_results[test_id] = SpeedTestResult( |
|
|
download_mbps=0, |
|
|
upload_mbps=0, |
|
|
ping=0, |
|
|
download_MBps=0, |
|
|
upload_MBps=0, |
|
|
status="Starting...", |
|
|
progress=0 |
|
|
) |
|
|
|
|
|
background_tasks.add_task(run_speed_test, test_id) |
|
|
return {"test_id": test_id} |
|
|
|
|
|
@app.get("/test-status/{test_id}") |
|
|
async def test_status(test_id: str): |
|
|
if test_id not in test_results: |
|
|
return JSONResponse(status_code=404, content={"error": "Test not found"}) |
|
|
|
|
|
return test_results[test_id] |
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
uvicorn.run(app, host="0.0.0.0", port=7860) |