File size: 3,569 Bytes
6afa3c4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
from fastapi import FastAPI, Request, BackgroundTasks
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse, JSONResponse
import speedtest
import time
import uuid
import os
from pydantic import BaseModel

app = FastAPI(title="Internet Speed Test")

# Create templates directory if it doesn't exist
os.makedirs("templates", exist_ok=True)

# Mount static files
app.mount("/static", StaticFiles(directory="static"), name="static")

# Templates
templates = Jinja2Templates(directory="templates")

# Store test results
test_results = {}

class SpeedTestResult(BaseModel):
    download_mbps: float
    upload_mbps: float
    ping: float
    download_MBps: float
    upload_MBps: float
    status: str
    progress: int = 0

@app.get("/", response_class=HTMLResponse)
async def home(request: Request):
    return templates.TemplateResponse("index.html", {"request": request})

def run_speed_test(test_id: str):
    try:
        # Update progress
        test_results[test_id].progress = 10
        test_results[test_id].status = "Initializing..."
        
        # Initialize speedtest
        st = speedtest.Speedtest()
        
        # Get best server
        test_results[test_id].status = "Finding best server..."
        test_results[test_id].progress = 20
        st.get_best_server()
        
        # Measure download speed
        test_results[test_id].status = "Testing download speed..."
        test_results[test_id].progress = 30
        start_time = time.time()
        download_speed = st.download()
        end_time = time.time()
        
        # Calculate download speed in MB/s
        download_speed_MBps = (download_speed / 8) / 1_000_000
        test_results[test_id].download_MBps = round(download_speed_MBps, 2)
        test_results[test_id].download_mbps = round(download_speed / 1_000_000, 2)
        test_results[test_id].progress = 60
        
        # Measure upload speed
        test_results[test_id].status = "Testing upload speed..."
        start_time = time.time()
        upload_speed = st.upload()
        end_time = time.time()
        
        # Calculate upload speed in MB/s
        upload_speed_MBps = (upload_speed / 8) / 1_000_000
        test_results[test_id].upload_MBps = round(upload_speed_MBps, 2)
        test_results[test_id].upload_mbps = round(upload_speed / 1_000_000, 2)
        test_results[test_id].progress = 90
        
        # Get ping
        ping = st.results.ping
        test_results[test_id].ping = round(ping, 2)
        
        # Update status
        test_results[test_id].status = "Complete"
        test_results[test_id].progress = 100
        
    except Exception as e:
        test_results[test_id].status = f"Error: {str(e)}"
        test_results[test_id].progress = -1

@app.post("/start-test")
async def start_test(background_tasks: BackgroundTasks):
    test_id = str(uuid.uuid4())
    test_results[test_id] = SpeedTestResult(
        download_mbps=0,
        upload_mbps=0,
        ping=0,
        download_MBps=0,
        upload_MBps=0,
        status="Starting...",
        progress=0
    )
    
    background_tasks.add_task(run_speed_test, test_id)
    return {"test_id": test_id}

@app.get("/test-status/{test_id}")
async def test_status(test_id: str):
    if test_id not in test_results:
        return JSONResponse(status_code=404, content={"error": "Test not found"})
    
    return test_results[test_id]

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=7860)