telegram-Bot / main.py
Ahmad3g's picture
dhjfr
78d7683
import asyncio
import logging
import sys
from datetime import datetime
from aiohttp import web
from aiogram import Bot, Dispatcher
from aiogram.client.default import DefaultBotProperties
from aiogram.enums import ParseMode
from aiogram.fsm.storage.memory import MemoryStorage
from aiogram.webhook.aiohttp_server import (
SimpleRequestHandler,
setup_application,
)
from config import config
from database.db import init_db
from handlers import admin, owner, user
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ LOGGING โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)-8s | %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
stream=sys.stdout,
)
logger = logging.getLogger(__name__)
BOT_STATUS = {"running": False, "started_at": None, "error": None}
# Webhook path โ€” Telegram will POST updates here
WEBHOOK_PATH = "/webhook"
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ HEALTH ENDPOINTS โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
async def handle_root(request: web.Request) -> web.Response:
status = "โœ… running" if BOT_STATUS["running"] else "โณ starting"
error = f"\nโŒ Last error: {BOT_STATUS['error']}" if BOT_STATUS["error"] else ""
# ู…ู„ุงุญุธุฉ ู„ู„ู…ุณุชุฎุฏู… ููŠ ุงู„ุตูุญุฉ ุงู„ุฑุฆูŠุณูŠุฉ
manual_notice = "\n\n๐Ÿ’ก Note: Webhook is set manually to bypass HF outbound block."
body = (
f"๐Ÿค– Knowledge Base Bot โ€” {status}\n"
f"๐Ÿ•’ Server time : {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')} UTC\n"
f"๐Ÿš€ Bot started : {BOT_STATUS['started_at'] or 'not yet'}"
f"{error}{manual_notice}"
)
return web.Response(text=body, content_type="text/plain", status=200)
async def handle_health(request: web.Request) -> web.Response:
import json
payload = {
"status" : "ok" if BOT_STATUS["running"] else "starting",
"bot_ok" : BOT_STATUS["running"],
"started": BOT_STATUS["started_at"],
"error" : BOT_STATUS["error"],
}
return web.Response(
text=json.dumps(payload),
content_type="application/json",
status=200,
)
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ DISPATCHER โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def build_dispatcher() -> Dispatcher:
dp = Dispatcher(storage=MemoryStorage())
dp.include_router(owner.router)
dp.include_router(admin.router)
dp.include_router(user.router)
return dp
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ MAIN โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
async def main() -> None:
print(f"\n{'='*50}")
print(f" Startup at {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')} UTC")
print(f"{'='*50}\n")
# โ”€โ”€ 1. Validate config โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
try:
config.validate()
except EnvironmentError as e:
logger.critical(str(e))
BOT_STATUS["error"] = str(e)
app = web.Application()
app.router.add_get("/", handle_root)
runner = web.AppRunner(app)
await runner.setup()
await web.TCPSite(runner, config.WEB_HOST, config.WEB_PORT).start()
while True:
await asyncio.sleep(3600)
# โ”€โ”€ 2. Database with retry โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
for attempt in range(1, 6):
try:
logger.info(f"๐Ÿ”Œ DB connect attempt {attempt}/5โ€ฆ")
await init_db()
logger.info("โœ… Database ready.")
break
except Exception as exc:
BOT_STATUS["error"] = str(exc)
logger.error(f"โŒ DB failed: {exc}")
if attempt < 5:
await asyncio.sleep(5)
else:
logger.critical("๐Ÿ’ฅ DB unavailable.")
break
# โ”€โ”€ 3. Bot Setup โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
bot = Bot(
token=config.BOT_TOKEN,
default=DefaultBotProperties(parse_mode=ParseMode.HTML),
)
dp = build_dispatcher()
# โš ๏ธ ุชู… ุชุนุทูŠู„ ุงู„ุญุฐู ูˆุงู„ุชุณุฌูŠู„ ุงู„ุชู„ู‚ุงุฆูŠ ู„ู„ูˆูŠุจ ู‡ูˆูƒ ู‡ู†ุง ู„ุชุฌู†ุจ ุญุฌุจ HF
# ุงู„ุชุณุฌูŠู„ ูŠุชู… ูŠุฏูˆูŠุงู‹ ู…ู† ุงู„ู…ุชุตูุญ ูƒู…ุง ูุนู„ุช ุณุงุจู‚ุงู‹
logger.info("โ„น๏ธ Skipping automatic webhook registration (HF compatibility mode).")
BOT_STATUS["running"] = True
BOT_STATUS["started_at"] = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC")
BOT_STATUS["error"] = None
# โ”€โ”€ 4. aiohttp app with webhook handler โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
app = web.Application()
# Health endpoints
app.router.add_get("/", handle_root)
app.router.add_get("/health", handle_health)
# ุชุณุฌูŠู„ ู…ุนุงู„ุฌ ุงู„ูˆูŠุจ ู‡ูˆูƒ (ู‡ุฐุง ูŠุฌุนู„ ุงู„ุจูˆุช "ูŠุณู…ุน" ู„ู„ุฑุณุงุฆู„ ุงู„ู‚ุงุฏู…ุฉ)
SimpleRequestHandler(dispatcher=dp, bot=bot).register(app, path=WEBHOOK_PATH)
setup_application(app, dp, bot=bot)
# โ”€โ”€ 5. Start server โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, host=config.WEB_HOST, port=config.WEB_PORT)
await site.start()
logger.info(f"๐Ÿš€ Server is UP on http://{config.WEB_HOST}:{config.WEB_PORT}/")
logger.info(f"๐Ÿ“ก Waiting for Telegram updates via: {config.WEBHOOK_URL}{WEBHOOK_PATH}")
# Keep running forever
while True:
await asyncio.sleep(3600)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("๐Ÿ‘‹ Shutdown by user.")