Spaces:
No application file
No application file
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import logging | |
| import os | |
| import sqlite3 | |
| from dataclasses import dataclass | |
| from datetime import date, datetime | |
| from decimal import Decimal | |
| from pathlib import Path | |
| from typing import Optional | |
| import google.generativeai as genai | |
| import pytesseract | |
| import uvicorn | |
| from fastapi import FastAPI | |
| from telegram import KeyboardButton, ReplyKeyboardMarkup, Update, WebAppInfo | |
| from telegram.ext import ( | |
| Application, | |
| CommandHandler, | |
| ContextTypes, | |
| MessageHandler, | |
| filters, | |
| ) | |
| from .utils import ( | |
| Settings, | |
| SlipExtractionResult, | |
| extract_slip_information, | |
| verify_booking_amount, | |
| save_base64_image, | |
| chunk_for_telegram, | |
| ) | |
| # FastAPI app | |
| app = FastAPI() | |
| bot_app: Optional[Application] = None | |
| # Logging | |
| logging.basicConfig( | |
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
| level=logging.INFO, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| def load_settings() -> Settings: | |
| """Load settings from environment variables.""" | |
| def req_env(name: str) -> str: | |
| val = os.getenv(name) | |
| if not val: | |
| raise RuntimeError(f"Missing env var: {name}") | |
| return val | |
| return Settings( | |
| telegram_bot_token=req_env("TELEGRAM_BOT_TOKEN"), | |
| authorized_user_id=int(req_env("TELEGRAM_USER_ID")), | |
| gemini_api_key=req_env("GEMINI_API_KEY"), | |
| gemini_model=os.getenv("GEMINI_MODEL", "gemini-pro"), | |
| pms_blueprint_path=Path("app/resources/pms_blueprint.txt"), | |
| database_path=Path("./data/hotel_os_bot.db"), | |
| slip_storage_dir=Path("./data/slips"), | |
| webapp_base_url=os.getenv("WEBAPP_BASE_URL", "http://localhost:8080/index.html"), | |
| webapp_static_dir=Path("./webapp"), | |
| webapp_host="0.0.0.0", | |
| webapp_port=int(os.getenv("PORT", "8080")), | |
| ocr_language=os.getenv("OCR_LANGUAGE", "eng+tha"), | |
| tesseract_cmd=os.getenv("TESSERACT_CMD"), | |
| amount_tolerance=Decimal("1.00"), | |
| ) | |
| def init_db(settings: Settings): | |
| """Initialize SQLite database.""" | |
| settings.database_path.parent.mkdir(parents=True, exist_ok=True) | |
| settings.slip_storage_dir.mkdir(parents=True, exist_ok=True) | |
| with sqlite3.connect(settings.database_path) as conn: | |
| conn.executescript(""" | |
| CREATE TABLE IF NOT EXISTS bookings ( | |
| id TEXT PRIMARY KEY, | |
| guest_name TEXT, | |
| total_due REAL, | |
| currency TEXT, | |
| status TEXT, | |
| created_at TEXT DEFAULT CURRENT_TIMESTAMP | |
| ); | |
| CREATE TABLE IF NOT EXISTS payment_slips ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| booking_id TEXT, | |
| reference_code TEXT, | |
| slip_image_path TEXT, | |
| extracted_amount REAL, | |
| extracted_date TEXT, | |
| extracted_time TEXT, | |
| status TEXT, | |
| ocr_text TEXT, | |
| created_at TEXT DEFAULT CURRENT_TIMESTAMP | |
| ); | |
| CREATE TABLE IF NOT EXISTS expenses ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| amount REAL NOT NULL, | |
| category TEXT NOT NULL, | |
| note TEXT, | |
| created_by TEXT, | |
| created_at TEXT DEFAULT CURRENT_TIMESTAMP | |
| ); | |
| """) | |
| def is_authorized(update: Update, context: ContextTypes.DEFAULT_TYPE) -> bool: | |
| """Check if user is authorized.""" | |
| user = update.effective_user | |
| if not user: | |
| return False | |
| auth_id = context.application.bot_data.get("authorized_user_id") | |
| return auth_id is not None and user.id == auth_id | |
| async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| """Handle /start command.""" | |
| if not is_authorized(update, context): | |
| await update.message.reply_text("🔐 คุณไม่มีสิทธิ์ใช้งานบอทนี้") | |
| return | |
| stg: Settings = context.application.bot_data["settings"] | |
| keyboard = ReplyKeyboardMarkup( | |
| [[KeyboardButton("🛎️ เปิดระบบบริหารโรงแรม", web_app=WebAppInfo(url=stg.webapp_base_url))]], | |
| resize_keyboard=True | |
| ) | |
| await update.message.reply_text("ยินดีต้อนรับสู่ Hotel OS Bot!", reply_markup=keyboard) | |
| async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| """Handle /help command.""" | |
| if not is_authorized(update, context): | |
| return | |
| await update.message.reply_text( | |
| "/start – เปิดระบบ\n" | |
| "/daily_report – รายงานประจำวัน\n" | |
| "/receipt <ID> – ใบเสร็จ\n" | |
| "พิมพ์ข้อความ → AI ตอบ" | |
| ) | |
| async def daily_report(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| """Handle /daily_report command.""" | |
| if not is_authorized(update, context): | |
| return | |
| stg: Settings = context.application.bot_data["settings"] | |
| today = date.today().isoformat() | |
| with sqlite3.connect(stg.database_path) as conn: | |
| bookings = conn.execute("SELECT COUNT(*) FROM bookings WHERE DATE(created_at) = ?", (today,)).fetchone()[0] | |
| paid = conn.execute("SELECT SUM(extracted_amount) FROM payment_slips WHERE status = 'verified' AND DATE(created_at) = ?", (today,)).fetchone()[0] or 0 | |
| expense = conn.execute("SELECT SUM(amount) FROM expenses WHERE DATE(created_at) = ?", (today,)).fetchone()[0] or 0 | |
| await update.message.reply_text( | |
| f"📊 รายงานวันที่ {today}:\n" | |
| f"• การจอง: {bookings} ราย\n" | |
| f"• ชำระ: {paid:,.2f} บาท\n" | |
| f"• ค่าใช้จ่าย: {expense:,.2f} บาท\n" | |
| f"• กำไร: {paid - expense:,.2f} บาท" | |
| ) | |
| async def echo(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| """Handle text messages with AI.""" | |
| if not is_authorized(update, context) or not update.message.text: | |
| return | |
| model = context.application.bot_data.get("gemini_model") | |
| if not model: | |
| await update.message.reply_text("AI ยังไม่พร้อม") | |
| return | |
| try: | |
| resp = await model.generate_content_async(update.message.text) | |
| await update.message.reply_text(resp.text or "ขอโทษครับ ผมตอบไม่ได้") | |
| except Exception as e: | |
| logger.error(f"Gemini error: {e}") | |
| await update.message.reply_text("เกิดข้อผิดพลาดกับ AI") | |
| async def web_app_handler(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| """Handle data from Web App.""" | |
| if not is_authorized(update, context) or not update.message.web_app_data: | |
| return | |
| try: | |
| payload = json.loads(update.message.web_app_data.data) | |
| except json.JSONDecodeError: | |
| await update.message.reply_text("ข้อมูลผิดพลาด") | |
| return | |
| if payload.get("type") == "payment_slip": | |
| await process_slip(update, context, payload) | |
| elif payload.get("type") == "expense_entry": | |
| await process_expense(update, context, payload) | |
| async def process_slip(update: Update, context: ContextTypes.DEFAULT_TYPE, payload: dict): | |
| """Process payment slip from Web App.""" | |
| stg: Settings = context.application.bot_data["settings"] | |
| booking_id = payload.get("booking_id") | |
| file_b64 = payload.get("file_base64") | |
| if not booking_id or not file_b64: | |
| await update.message.reply_text("ข้อมูลไม่ครบ") | |
| return | |
| dest = stg.slip_storage_dir / f"{booking_id}_{int(datetime.utcnow().timestamp())}.jpg" | |
| try: | |
| await asyncio.to_thread(save_base64_image, file_b64, dest) | |
| extraction = await asyncio.to_thread(extract_slip_information, dest, stg) | |
| verification = await asyncio.to_thread(verify_booking_amount, stg, booking_id, extraction.amount) | |
| except Exception as e: | |
| logger.error(f"Slip processing error: {e}") | |
| await update.message.reply_text(f"เกิดข้อผิดพลาด: {e}") | |
| return | |
| # Save to DB | |
| with sqlite3.connect(stg.database_path) as conn: | |
| conn.execute( | |
| "INSERT INTO payment_slips (booking_id, slip_image_path, extracted_amount, status, ocr_text) VALUES (?, ?, ?, ?, ?)", | |
| (booking_id, str(dest), float(extraction.amount) if extraction.amount else None, verification["status"], extraction.raw_text) | |
| ) | |
| if verification["status"] == "verified": | |
| await update.message.reply_text(f"✅ ตรวจสอบสลิปสำเร็จ ({booking_id})") | |
| else: | |
| await update.message.reply_text(f"⚠️ สลิปไม่ตรงกับข้อมูล: {verification['status']}") | |
| async def process_expense(update: Update, context: ContextTypes.DEFAULT_TYPE, payload: dict): | |
| """Process expense entry from Web App.""" | |
| stg: Settings = context.application.bot_data["settings"] | |
| try: | |
| amount = Decimal(str(payload["amount"])) | |
| except: | |
| await update.message.reply_text("จำนวนเงินไม่ถูกต้อง") | |
| return | |
| with sqlite3.connect(stg.database_path) as conn: | |
| conn.execute( | |
| "INSERT INTO expenses (amount, category, note) VALUES (?, ?, ?)", | |
| (float(amount), payload.get("category", "อื่นๆ"), payload.get("note", "")) | |
| ) | |
| await update.message.reply_text(f"✅ บันทึกค่าใช้จ่าย {amount} บาท") | |
| async def startup(): | |
| """Initialize bot on startup.""" | |
| global bot_app | |
| settings = load_settings() | |
| init_db(settings) | |
| if settings.tesseract_cmd: | |
| pytesseract.pytesseract.tesseract_cmd = settings.tesseract_cmd | |
| genai.configure(api_key=settings.gemini_api_key) | |
| gemini_model = genai.GenerativeModel(settings.gemini_model) | |
| bot_app = Application.builder().token(settings.telegram_bot_token).build() | |
| bot_app.bot_data["settings"] = settings | |
| bot_app.bot_data["authorized_user_id"] = settings.authorized_user_id | |
| bot_app.bot_data["gemini_model"] = gemini_model | |
| # Add handlers | |
| bot_app.add_handler(CommandHandler("start", start)) | |
| bot_app.add_handler(CommandHandler("help", help_command)) | |
| bot_app.add_handler(CommandHandler("daily_report", daily_report)) | |
| bot_app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, echo)) | |
| bot_app.add_handler(MessageHandler(filters.StatusUpdate.WEB_APP_DATA, web_app_handler)) | |
| await bot_app.initialize() | |
| await bot_app.start() | |
| asyncio.create_task(bot_app.updater.start_polling()) | |
| logger.info("✅ Bot started in polling mode") | |
| async def health(): | |
| return {"status": "ok"} | |
| if __name__ == "__main__": | |
| port = int(os.getenv("PORT", "8080")) | |
| uvicorn.run(app, host="0.0.0.0", port=port) |