triflix commited on
Commit
6afa3c4
·
verified ·
1 Parent(s): 38dfb36

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +114 -114
main.py CHANGED
@@ -1,114 +1,114 @@
1
- from fastapi import FastAPI, Request, BackgroundTasks
2
- from fastapi.templating import Jinja2Templates
3
- from fastapi.staticfiles import StaticFiles
4
- from fastapi.responses import HTMLResponse, JSONResponse
5
- import speedtest
6
- import time
7
- import uuid
8
- import os
9
- from pydantic import BaseModel
10
-
11
- app = FastAPI(title="Internet Speed Test")
12
-
13
- # Create templates directory if it doesn't exist
14
- os.makedirs("templates", exist_ok=True)
15
-
16
- # Mount static files
17
- app.mount("/static", StaticFiles(directory="static"), name="static")
18
-
19
- # Templates
20
- templates = Jinja2Templates(directory="templates")
21
-
22
- # Store test results
23
- test_results = {}
24
-
25
- class SpeedTestResult(BaseModel):
26
- download_mbps: float
27
- upload_mbps: float
28
- ping: float
29
- download_MBps: float
30
- upload_MBps: float
31
- status: str
32
- progress: int = 0
33
-
34
- @app.get("/", response_class=HTMLResponse)
35
- async def home(request: Request):
36
- return templates.TemplateResponse("index.html", {"request": request})
37
-
38
- def run_speed_test(test_id: str):
39
- try:
40
- # Update progress
41
- test_results[test_id].progress = 10
42
- test_results[test_id].status = "Initializing..."
43
-
44
- # Initialize speedtest
45
- st = speedtest.Speedtest()
46
-
47
- # Get best server
48
- test_results[test_id].status = "Finding best server..."
49
- test_results[test_id].progress = 20
50
- st.get_best_server()
51
-
52
- # Measure download speed
53
- test_results[test_id].status = "Testing download speed..."
54
- test_results[test_id].progress = 30
55
- start_time = time.time()
56
- download_speed = st.download()
57
- end_time = time.time()
58
-
59
- # Calculate download speed in MB/s
60
- download_speed_MBps = (download_speed / 8) / 1_000_000
61
- test_results[test_id].download_MBps = round(download_speed_MBps, 2)
62
- test_results[test_id].download_mbps = round(download_speed / 1_000_000, 2)
63
- test_results[test_id].progress = 60
64
-
65
- # Measure upload speed
66
- test_results[test_id].status = "Testing upload speed..."
67
- start_time = time.time()
68
- upload_speed = st.upload()
69
- end_time = time.time()
70
-
71
- # Calculate upload speed in MB/s
72
- upload_speed_MBps = (upload_speed / 8) / 1_000_000
73
- test_results[test_id].upload_MBps = round(upload_speed_MBps, 2)
74
- test_results[test_id].upload_mbps = round(upload_speed / 1_000_000, 2)
75
- test_results[test_id].progress = 90
76
-
77
- # Get ping
78
- ping = st.results.ping
79
- test_results[test_id].ping = round(ping, 2)
80
-
81
- # Update status
82
- test_results[test_id].status = "Complete"
83
- test_results[test_id].progress = 100
84
-
85
- except Exception as e:
86
- test_results[test_id].status = f"Error: {str(e)}"
87
- test_results[test_id].progress = -1
88
-
89
- @app.post("/start-test")
90
- async def start_test(background_tasks: BackgroundTasks):
91
- test_id = str(uuid.uuid4())
92
- test_results[test_id] = SpeedTestResult(
93
- download_mbps=0,
94
- upload_mbps=0,
95
- ping=0,
96
- download_MBps=0,
97
- upload_MBps=0,
98
- status="Starting...",
99
- progress=0
100
- )
101
-
102
- background_tasks.add_task(run_speed_test, test_id)
103
- return {"test_id": test_id}
104
-
105
- @app.get("/test-status/{test_id}")
106
- async def test_status(test_id: str):
107
- if test_id not in test_results:
108
- return JSONResponse(status_code=404, content={"error": "Test not found"})
109
-
110
- return test_results[test_id]
111
-
112
- if __name__ == "__main__":
113
- import uvicorn
114
- uvicorn.run(app, host="0.0.0.0", port=8000)
 
1
+ from fastapi import FastAPI, Request, BackgroundTasks
2
+ from fastapi.templating import Jinja2Templates
3
+ from fastapi.staticfiles import StaticFiles
4
+ from fastapi.responses import HTMLResponse, JSONResponse
5
+ import speedtest
6
+ import time
7
+ import uuid
8
+ import os
9
+ from pydantic import BaseModel
10
+
11
+ app = FastAPI(title="Internet Speed Test")
12
+
13
+ # Create templates directory if it doesn't exist
14
+ os.makedirs("templates", exist_ok=True)
15
+
16
+ # Mount static files
17
+ app.mount("/static", StaticFiles(directory="static"), name="static")
18
+
19
+ # Templates
20
+ templates = Jinja2Templates(directory="templates")
21
+
22
+ # Store test results
23
+ test_results = {}
24
+
25
+ class SpeedTestResult(BaseModel):
26
+ download_mbps: float
27
+ upload_mbps: float
28
+ ping: float
29
+ download_MBps: float
30
+ upload_MBps: float
31
+ status: str
32
+ progress: int = 0
33
+
34
+ @app.get("/", response_class=HTMLResponse)
35
+ async def home(request: Request):
36
+ return templates.TemplateResponse("index.html", {"request": request})
37
+
38
+ def run_speed_test(test_id: str):
39
+ try:
40
+ # Update progress
41
+ test_results[test_id].progress = 10
42
+ test_results[test_id].status = "Initializing..."
43
+
44
+ # Initialize speedtest
45
+ st = speedtest.Speedtest()
46
+
47
+ # Get best server
48
+ test_results[test_id].status = "Finding best server..."
49
+ test_results[test_id].progress = 20
50
+ st.get_best_server()
51
+
52
+ # Measure download speed
53
+ test_results[test_id].status = "Testing download speed..."
54
+ test_results[test_id].progress = 30
55
+ start_time = time.time()
56
+ download_speed = st.download()
57
+ end_time = time.time()
58
+
59
+ # Calculate download speed in MB/s
60
+ download_speed_MBps = (download_speed / 8) / 1_000_000
61
+ test_results[test_id].download_MBps = round(download_speed_MBps, 2)
62
+ test_results[test_id].download_mbps = round(download_speed / 1_000_000, 2)
63
+ test_results[test_id].progress = 60
64
+
65
+ # Measure upload speed
66
+ test_results[test_id].status = "Testing upload speed..."
67
+ start_time = time.time()
68
+ upload_speed = st.upload()
69
+ end_time = time.time()
70
+
71
+ # Calculate upload speed in MB/s
72
+ upload_speed_MBps = (upload_speed / 8) / 1_000_000
73
+ test_results[test_id].upload_MBps = round(upload_speed_MBps, 2)
74
+ test_results[test_id].upload_mbps = round(upload_speed / 1_000_000, 2)
75
+ test_results[test_id].progress = 90
76
+
77
+ # Get ping
78
+ ping = st.results.ping
79
+ test_results[test_id].ping = round(ping, 2)
80
+
81
+ # Update status
82
+ test_results[test_id].status = "Complete"
83
+ test_results[test_id].progress = 100
84
+
85
+ except Exception as e:
86
+ test_results[test_id].status = f"Error: {str(e)}"
87
+ test_results[test_id].progress = -1
88
+
89
+ @app.post("/start-test")
90
+ async def start_test(background_tasks: BackgroundTasks):
91
+ test_id = str(uuid.uuid4())
92
+ test_results[test_id] = SpeedTestResult(
93
+ download_mbps=0,
94
+ upload_mbps=0,
95
+ ping=0,
96
+ download_MBps=0,
97
+ upload_MBps=0,
98
+ status="Starting...",
99
+ progress=0
100
+ )
101
+
102
+ background_tasks.add_task(run_speed_test, test_id)
103
+ return {"test_id": test_id}
104
+
105
+ @app.get("/test-status/{test_id}")
106
+ async def test_status(test_id: str):
107
+ if test_id not in test_results:
108
+ return JSONResponse(status_code=404, content={"error": "Test not found"})
109
+
110
+ return test_results[test_id]
111
+
112
+ if __name__ == "__main__":
113
+ import uvicorn
114
+ uvicorn.run(app, host="0.0.0.0", port=7860)