| from contextlib import asynccontextmanager, suppress |
|
|
| from fastapi import FastAPI, Response |
|
|
| from bot import build_application |
|
|
|
|
| telegram_application = None |
|
|
|
|
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| |
| global telegram_application |
| telegram_application = build_application() |
| await telegram_application.initialize() |
| await telegram_application.start() |
| if telegram_application.updater: |
| await telegram_application.updater.start_polling() |
|
|
| yield |
|
|
| |
| if telegram_application: |
| if telegram_application.updater: |
| with suppress(Exception): |
| await telegram_application.updater.stop() |
| with suppress(Exception): |
| await telegram_application.stop() |
| with suppress(Exception): |
| await telegram_application.shutdown() |
|
|
|
|
| app = FastAPI(title="Telegram SOCKS5 Proxy Bot", lifespan=lifespan) |
|
|
|
|
| @app.get("/") |
| async def root() -> dict[str, str]: |
| return {"status": "ok"} |
|
|
|
|
| @app.get("/healthz") |
| async def health() -> Response: |
| return Response(content="ok", media_type="text/plain") |
|
|