import asyncio import aiohttp import time from fastapi import FastAPI, Request from fastapi.templating import Jinja2Templates from fastapi.responses import HTMLResponse from datetime import datetime, timedelta import firebase_admin from firebase_admin import credentials, firestore app = FastAPI() templates = Jinja2Templates(directory="templates") # Инициализация Firebase (убедись, что файл в корне) try: cred = credentials.Certificate("serviceAccountKey.json") firebase_admin.initialize_app(cred) db = firestore.client() except Exception as e: print(f"Ошибка БД: {e}") db = None # Полный список сайтов и сервисов Google URLS = [ "https://programm-services.firebaseapp.com", "https://programm-services.web.app", "https://dashbord-ps-rid3.web.app", "https://programm-viewer.web.app", "https://www.rid3usercontent.run.place", "https://dash.rid3usercontent.run.place", "https://ai.rid3.linkpc.net", "https://ai-azure.web.app", "https://web-hook.web.app", "https://web.rid3usercontent.run.place", "https://view.rid3usercontent.run.place", # Google Services & APIs "https://www.googleapis.com", "https://apis.google.com", "https://www.google.com", "https://accounts.google.com", "https://firestore.googleapis.com" ] async def check_and_log(): async with aiohttp.ClientSession() as session: while True: for url in URLS: start = time.time() try: async with session.get(url, timeout=10) as resp: latency = round((time.time() - start) * 1000) status_code = resp.status level = "success" if status_code < 400 and latency < 600 else "warning" if status_code < 500 else "error" except: latency, status_code, level = 0, 0, "error" if db: db.collection('status_logs').add({ 'url': url, 'latency': latency, 'code': status_code, 'level': level, 'timestamp': firestore.SERVER_TIMESTAMP }) await asyncio.sleep(300) # Проверка раз в 5 минут @app.on_event("startup") async def startup(): asyncio.create_task(check_and_log()) @app.get("/", response_class=HTMLResponse) async def home(request: Request): return templates.TemplateResponse( request=request, name="index.html", context={} # если нужно передать еще данные, пишите их сюда ) @app.get("/api/history") async def get_history(): if not db: return {"error": "No DB"} thirty_days_ago = datetime.now() - timedelta(days=30) docs = db.collection('status_logs')\ .where('timestamp', '>=', thirty_days_ago)\ .order_by('timestamp', direction=firestore.Query.ASCENDING).stream() data = {} for doc in docs: d = doc.to_dict() url = d['url'] if url not in data: data[url] = [] data[url].append({'t': d['timestamp'].strftime("%d.%m %H:%M"), 'l': d['latency'], 'lv': d['level']}) return data