from fastapi import FastAPI, Request, BackgroundTasks from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles from fastapi.responses import HTMLResponse, JSONResponse import speedtest import time import uuid import os from pydantic import BaseModel app = FastAPI(title="Internet Speed Test") # Create templates directory if it doesn't exist os.makedirs("templates", exist_ok=True) # Mount static files app.mount("/static", StaticFiles(directory="static"), name="static") # Templates templates = Jinja2Templates(directory="templates") # Store test results test_results = {} class SpeedTestResult(BaseModel): download_mbps: float upload_mbps: float ping: float download_MBps: float upload_MBps: float status: str progress: int = 0 @app.get("/", response_class=HTMLResponse) async def home(request: Request): return templates.TemplateResponse("index.html", {"request": request}) def run_speed_test(test_id: str): try: # Update progress test_results[test_id].progress = 10 test_results[test_id].status = "Initializing..." # Initialize speedtest st = speedtest.Speedtest() # Get best server test_results[test_id].status = "Finding best server..." test_results[test_id].progress = 20 st.get_best_server() # Measure download speed test_results[test_id].status = "Testing download speed..." test_results[test_id].progress = 30 start_time = time.time() download_speed = st.download() end_time = time.time() # Calculate download speed in MB/s download_speed_MBps = (download_speed / 8) / 1_000_000 test_results[test_id].download_MBps = round(download_speed_MBps, 2) test_results[test_id].download_mbps = round(download_speed / 1_000_000, 2) test_results[test_id].progress = 60 # Measure upload speed test_results[test_id].status = "Testing upload speed..." start_time = time.time() upload_speed = st.upload() end_time = time.time() # Calculate upload speed in MB/s upload_speed_MBps = (upload_speed / 8) / 1_000_000 test_results[test_id].upload_MBps = round(upload_speed_MBps, 2) test_results[test_id].upload_mbps = round(upload_speed / 1_000_000, 2) test_results[test_id].progress = 90 # Get ping ping = st.results.ping test_results[test_id].ping = round(ping, 2) # Update status test_results[test_id].status = "Complete" test_results[test_id].progress = 100 except Exception as e: test_results[test_id].status = f"Error: {str(e)}" test_results[test_id].progress = -1 @app.post("/start-test") async def start_test(background_tasks: BackgroundTasks): test_id = str(uuid.uuid4()) test_results[test_id] = SpeedTestResult( download_mbps=0, upload_mbps=0, ping=0, download_MBps=0, upload_MBps=0, status="Starting...", progress=0 ) background_tasks.add_task(run_speed_test, test_id) return {"test_id": test_id} @app.get("/test-status/{test_id}") async def test_status(test_id: str): if test_id not in test_results: return JSONResponse(status_code=404, content={"error": "Test not found"}) return test_results[test_id] if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)