from __future__ import annotations import asyncio import json import logging import os import sqlite3 from dataclasses import dataclass from datetime import date, datetime from decimal import Decimal from pathlib import Path from typing import Optional import google.generativeai as genai import pytesseract import uvicorn from fastapi import FastAPI from telegram import KeyboardButton, ReplyKeyboardMarkup, Update, WebAppInfo from telegram.ext import ( Application, CommandHandler, ContextTypes, MessageHandler, filters, ) from .utils import ( Settings, SlipExtractionResult, extract_slip_information, verify_booking_amount, save_base64_image, chunk_for_telegram, ) # FastAPI app app = FastAPI() bot_app: Optional[Application] = None # Logging logging.basicConfig( format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO, ) logger = logging.getLogger(__name__) def load_settings() -> Settings: """Load settings from environment variables.""" def req_env(name: str) -> str: val = os.getenv(name) if not val: raise RuntimeError(f"Missing env var: {name}") return val return Settings( telegram_bot_token=req_env("TELEGRAM_BOT_TOKEN"), authorized_user_id=int(req_env("TELEGRAM_USER_ID")), gemini_api_key=req_env("GEMINI_API_KEY"), gemini_model=os.getenv("GEMINI_MODEL", "gemini-pro"), pms_blueprint_path=Path("app/resources/pms_blueprint.txt"), database_path=Path("./data/hotel_os_bot.db"), slip_storage_dir=Path("./data/slips"), webapp_base_url=os.getenv("WEBAPP_BASE_URL", "http://localhost:8080/index.html"), webapp_static_dir=Path("./webapp"), webapp_host="0.0.0.0", webapp_port=int(os.getenv("PORT", "8080")), ocr_language=os.getenv("OCR_LANGUAGE", "eng+tha"), tesseract_cmd=os.getenv("TESSERACT_CMD"), amount_tolerance=Decimal("1.00"), ) def init_db(settings: Settings): """Initialize SQLite database.""" settings.database_path.parent.mkdir(parents=True, exist_ok=True) settings.slip_storage_dir.mkdir(parents=True, exist_ok=True) with sqlite3.connect(settings.database_path) as conn: conn.executescript(""" CREATE TABLE IF NOT EXISTS bookings ( id TEXT PRIMARY KEY, guest_name TEXT, total_due REAL, currency TEXT, status TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS payment_slips ( id INTEGER PRIMARY KEY AUTOINCREMENT, booking_id TEXT, reference_code TEXT, slip_image_path TEXT, extracted_amount REAL, extracted_date TEXT, extracted_time TEXT, status TEXT, ocr_text TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS expenses ( id INTEGER PRIMARY KEY AUTOINCREMENT, amount REAL NOT NULL, category TEXT NOT NULL, note TEXT, created_by TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP ); """) def is_authorized(update: Update, context: ContextTypes.DEFAULT_TYPE) -> bool: """Check if user is authorized.""" user = update.effective_user if not user: return False auth_id = context.application.bot_data.get("authorized_user_id") return auth_id is not None and user.id == auth_id async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle /start command.""" if not is_authorized(update, context): await update.message.reply_text("🔐 คุณไม่มีสิทธิ์ใช้งานบอทนี้") return stg: Settings = context.application.bot_data["settings"] keyboard = ReplyKeyboardMarkup( [[KeyboardButton("🛎️ เปิดระบบบริหารโรงแรม", web_app=WebAppInfo(url=stg.webapp_base_url))]], resize_keyboard=True ) await update.message.reply_text("ยินดีต้อนรับสู่ Hotel OS Bot!", reply_markup=keyboard) async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle /help command.""" if not is_authorized(update, context): return await update.message.reply_text( "/start – เปิดระบบ\n" "/daily_report – รายงานประจำวัน\n" "/receipt – ใบเสร็จ\n" "พิมพ์ข้อความ → AI ตอบ" ) async def daily_report(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle /daily_report command.""" if not is_authorized(update, context): return stg: Settings = context.application.bot_data["settings"] today = date.today().isoformat() with sqlite3.connect(stg.database_path) as conn: bookings = conn.execute("SELECT COUNT(*) FROM bookings WHERE DATE(created_at) = ?", (today,)).fetchone()[0] paid = conn.execute("SELECT SUM(extracted_amount) FROM payment_slips WHERE status = 'verified' AND DATE(created_at) = ?", (today,)).fetchone()[0] or 0 expense = conn.execute("SELECT SUM(amount) FROM expenses WHERE DATE(created_at) = ?", (today,)).fetchone()[0] or 0 await update.message.reply_text( f"📊 รายงานวันที่ {today}:\n" f"• การจอง: {bookings} ราย\n" f"• ชำระ: {paid:,.2f} บาท\n" f"• ค่าใช้จ่าย: {expense:,.2f} บาท\n" f"• กำไร: {paid - expense:,.2f} บาท" ) async def echo(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle text messages with AI.""" if not is_authorized(update, context) or not update.message.text: return model = context.application.bot_data.get("gemini_model") if not model: await update.message.reply_text("AI ยังไม่พร้อม") return try: resp = await model.generate_content_async(update.message.text) await update.message.reply_text(resp.text or "ขอโทษครับ ผมตอบไม่ได้") except Exception as e: logger.error(f"Gemini error: {e}") await update.message.reply_text("เกิดข้อผิดพลาดกับ AI") async def web_app_handler(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle data from Web App.""" if not is_authorized(update, context) or not update.message.web_app_data: return try: payload = json.loads(update.message.web_app_data.data) except json.JSONDecodeError: await update.message.reply_text("ข้อมูลผิดพลาด") return if payload.get("type") == "payment_slip": await process_slip(update, context, payload) elif payload.get("type") == "expense_entry": await process_expense(update, context, payload) async def process_slip(update: Update, context: ContextTypes.DEFAULT_TYPE, payload: dict): """Process payment slip from Web App.""" stg: Settings = context.application.bot_data["settings"] booking_id = payload.get("booking_id") file_b64 = payload.get("file_base64") if not booking_id or not file_b64: await update.message.reply_text("ข้อมูลไม่ครบ") return dest = stg.slip_storage_dir / f"{booking_id}_{int(datetime.utcnow().timestamp())}.jpg" try: await asyncio.to_thread(save_base64_image, file_b64, dest) extraction = await asyncio.to_thread(extract_slip_information, dest, stg) verification = await asyncio.to_thread(verify_booking_amount, stg, booking_id, extraction.amount) except Exception as e: logger.error(f"Slip processing error: {e}") await update.message.reply_text(f"เกิดข้อผิดพลาด: {e}") return # Save to DB with sqlite3.connect(stg.database_path) as conn: conn.execute( "INSERT INTO payment_slips (booking_id, slip_image_path, extracted_amount, status, ocr_text) VALUES (?, ?, ?, ?, ?)", (booking_id, str(dest), float(extraction.amount) if extraction.amount else None, verification["status"], extraction.raw_text) ) if verification["status"] == "verified": await update.message.reply_text(f"✅ ตรวจสอบสลิปสำเร็จ ({booking_id})") else: await update.message.reply_text(f"⚠️ สลิปไม่ตรงกับข้อมูล: {verification['status']}") async def process_expense(update: Update, context: ContextTypes.DEFAULT_TYPE, payload: dict): """Process expense entry from Web App.""" stg: Settings = context.application.bot_data["settings"] try: amount = Decimal(str(payload["amount"])) except: await update.message.reply_text("จำนวนเงินไม่ถูกต้อง") return with sqlite3.connect(stg.database_path) as conn: conn.execute( "INSERT INTO expenses (amount, category, note) VALUES (?, ?, ?)", (float(amount), payload.get("category", "อื่นๆ"), payload.get("note", "")) ) await update.message.reply_text(f"✅ บันทึกค่าใช้จ่าย {amount} บาท") @app.on_event("startup") async def startup(): """Initialize bot on startup.""" global bot_app settings = load_settings() init_db(settings) if settings.tesseract_cmd: pytesseract.pytesseract.tesseract_cmd = settings.tesseract_cmd genai.configure(api_key=settings.gemini_api_key) gemini_model = genai.GenerativeModel(settings.gemini_model) bot_app = Application.builder().token(settings.telegram_bot_token).build() bot_app.bot_data["settings"] = settings bot_app.bot_data["authorized_user_id"] = settings.authorized_user_id bot_app.bot_data["gemini_model"] = gemini_model # Add handlers bot_app.add_handler(CommandHandler("start", start)) bot_app.add_handler(CommandHandler("help", help_command)) bot_app.add_handler(CommandHandler("daily_report", daily_report)) bot_app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, echo)) bot_app.add_handler(MessageHandler(filters.StatusUpdate.WEB_APP_DATA, web_app_handler)) await bot_app.initialize() await bot_app.start() asyncio.create_task(bot_app.updater.start_polling()) logger.info("✅ Bot started in polling mode") @app.get("/health") async def health(): return {"status": "ok"} if __name__ == "__main__": port = int(os.getenv("PORT", "8080")) uvicorn.run(app, host="0.0.0.0", port=port)