Spaces:
Sleeping
Sleeping
| import asyncio | |
| import aiohttp | |
| import time | |
| import uuid | |
| from fastapi import FastAPI, Request | |
| from fastapi.templating import Jinja2Templates | |
| from fastapi.staticfiles import StaticFiles | |
| import socketio | |
| from typing import Dict, List | |
| # Create a Socket.IO server (ASGI mode) and FastAPI app. | |
| sio = socketio.AsyncServer(async_mode="asgi") | |
| fastapi_app = FastAPI() | |
| fastapi_app.mount("/static", StaticFiles(directory="static"), name="static") | |
| templates = Jinja2Templates(directory="templates") | |
| # Wrap the FastAPI app with Socket.IO. | |
| app = socketio.ASGIApp(sio, fastapi_app) | |
| # Global website statuses. | |
| website_statuses: Dict[str, bool] = {} | |
| # Global next check timestamp (epoch seconds). | |
| next_check_timestamp: float = 0 | |
| # List of websites to check. | |
| websites = [ | |
| "https://www.vegetablesking.in/", | |
| "https://triflix-checkerss.hf.space/status", | |
| "https://medium.com/@devarshia5/linear-regression-predicting-the-future-with-math-magic-461c5e2a0fe5", | |
| "https://triflix-testingproxy123.hf.space", | |
| "https://triflix-goodwill.hf.space", | |
| "https://triflix-speedtest.hf.space", | |
| "https://www.google.com/search?q=curseofwitcher+instagram", | |
| "https://www.google.com/search?q=curseofwitcher", | |
| "https://in.linkedin.com/in/aditya-devarshi", | |
| "https://www.google.com/search?q=Aditya+devarshi+Pune+Maharashtra+%2C+India", | |
| "https://www.google.com/search?q=Aditya+Devarshi+ai+engineer" | |
| ] | |
| # Persistent client management. | |
| persistent_clients = set() # set of all persistent client_ids that have ever connected | |
| client_sessions: Dict[str, str] = {} # maps persistent client_id to current socket session id | |
| cached_notifications: Dict[str, List[str]] = {} # maps persistent client_id to list of pending notifications | |
| async def check_website(session: aiohttp.ClientSession, url: str) -> bool: | |
| """ | |
| Asynchronously checks if a website is available by sending an HTTP GET request. | |
| Returns True if available (HTTP status code < 400) and False otherwise. | |
| """ | |
| try: | |
| async with session.get(url, timeout=10) as response: | |
| return response.status < 400 | |
| except Exception as e: | |
| print(f"Error checking {url}: {e}") | |
| return False | |
| async def check_websites(websites: List[str]) -> List[bool]: | |
| """ | |
| Checks multiple websites concurrently and returns a list of booleans indicating their availability. | |
| """ | |
| async with aiohttp.ClientSession() as session: | |
| tasks = [check_website(session, url) for url in websites] | |
| return await asyncio.gather(*tasks) | |
| async def website_checker_loop(): | |
| """ | |
| Continuous loop that checks websites every 15 minutes. | |
| Updates the global website_statuses dictionary and computes the next check time. | |
| Sends realtime updates (statuses and next check timestamp) via Socket.IO. | |
| Also caches notifications for clients that are offline. | |
| """ | |
| global next_check_timestamp | |
| interval_seconds = 15 * 60 # 15 minutes | |
| while True: | |
| now = time.time() | |
| next_check_timestamp = now + interval_seconds | |
| print("Checking websites...") | |
| results = await check_websites(websites) | |
| message_lines = [] | |
| for url, is_available in zip(websites, results): | |
| website_statuses[url] = is_available | |
| status_text = "Available" if is_available else "Not Available" | |
| message_lines.append(f"{url}: {status_text}") | |
| print(f"{url} is {status_text}.") | |
| message = "\n".join(message_lines) | |
| # Data to be sent includes statuses and the next check timestamp. | |
| data = {"statuses": website_statuses, "next_check": next_check_timestamp} | |
| await sio.emit("status_update", data) | |
| # For each persistent client that is offline, cache the notification. | |
| for client_id in persistent_clients: | |
| if client_id not in client_sessions: | |
| cached_notifications.setdefault(client_id, []).append(message) | |
| await asyncio.sleep(interval_seconds) | |
| async def index(request: Request): | |
| """ | |
| Render the frontend template with the initial website statuses. | |
| """ | |
| # Also pass the current next_check_timestamp. | |
| return templates.TemplateResponse("index.html", {"request": request, "statuses": website_statuses, "next_check": next_check_timestamp}) | |
| async def get_status(): | |
| """ | |
| API endpoint to return the latest website statuses along with the next check timestamp. | |
| """ | |
| return {"statuses": website_statuses, "next_check": next_check_timestamp} | |
| async def startup_event(): | |
| """ | |
| On application startup, launch the background task to check websites continuously. | |
| """ | |
| asyncio.create_task(website_checker_loop()) | |
| # Socket.IO event handlers. | |
| async def connect(sid, environ): | |
| # Retrieve the persistent client_id from the query string. | |
| query = environ.get("QUERY_STRING", "") | |
| client_id = None | |
| for param in query.split("&"): | |
| if param.startswith("client_id="): | |
| client_id = param.split("=")[1] | |
| break | |
| # If no client_id is provided, generate one. | |
| if not client_id: | |
| client_id = str(uuid.uuid4()) | |
| # Tell the client its generated id. | |
| await sio.emit("set_client_id", {"client_id": client_id}, room=sid) | |
| # Register the persistent client. | |
| persistent_clients.add(client_id) | |
| client_sessions[client_id] = sid | |
| print(f"Client connected: {sid} with persistent id {client_id}") | |
| # Send current statuses and next check timestamp upon connection. | |
| await sio.emit("status_update", {"statuses": website_statuses, "next_check": next_check_timestamp}, room=sid) | |
| # If there are any cached notifications for this client, send them and clear the cache. | |
| if client_id in cached_notifications and cached_notifications[client_id]: | |
| for note in cached_notifications[client_id]: | |
| await sio.emit("notification", {"message": note}, room=sid) | |
| cached_notifications[client_id] = [] | |
| async def disconnect(sid): | |
| # Remove the disconnected session. | |
| remove_key = None | |
| for client_id, session_id in client_sessions.items(): | |
| if session_id == sid: | |
| remove_key = client_id | |
| break | |
| if remove_key: | |
| print(f"Client disconnected: {sid} with persistent id {remove_key}") | |
| client_sessions.pop(remove_key) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=True) | |