Bot_telegram / app /main.py
nssuwan186's picture
Chore: Remove diagnostic code
dcf6765 verified
from __future__ import annotations
import asyncio
import json
import logging
import os
import sqlite3
from dataclasses import dataclass
from datetime import date, datetime
from decimal import Decimal
from pathlib import Path
from typing import Optional
import google.generativeai as genai
import pytesseract
import uvicorn
from fastapi import FastAPI
from telegram import KeyboardButton, ReplyKeyboardMarkup, Update, WebAppInfo
from telegram.ext import (
Application,
CommandHandler,
ContextTypes,
MessageHandler,
filters,
)
from .utils import (
Settings,
SlipExtractionResult,
extract_slip_information,
verify_booking_amount,
save_base64_image,
chunk_for_telegram,
)
# FastAPI app
app = FastAPI()
bot_app: Optional[Application] = None
# Logging
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.INFO,
)
logger = logging.getLogger(__name__)
def load_settings() -> Settings:
"""Load settings from environment variables."""
def req_env(name: str) -> str:
val = os.getenv(name)
if not val:
raise RuntimeError(f"Missing env var: {name}")
return val
return Settings(
telegram_bot_token=req_env("TELEGRAM_BOT_TOKEN"),
authorized_user_id=int(req_env("TELEGRAM_USER_ID")),
gemini_api_key=req_env("GEMINI_API_KEY"),
gemini_model=os.getenv("GEMINI_MODEL", "gemini-pro"),
pms_blueprint_path=Path("app/resources/pms_blueprint.txt"),
database_path=Path("./data/hotel_os_bot.db"),
slip_storage_dir=Path("./data/slips"),
webapp_base_url=os.getenv("WEBAPP_BASE_URL", "http://localhost:8080/index.html"),
webapp_static_dir=Path("./webapp"),
webapp_host="0.0.0.0",
webapp_port=int(os.getenv("PORT", "8080")),
ocr_language=os.getenv("OCR_LANGUAGE", "eng+tha"),
tesseract_cmd=os.getenv("TESSERACT_CMD"),
amount_tolerance=Decimal("1.00"),
)
def init_db(settings: Settings):
"""Initialize SQLite database."""
settings.database_path.parent.mkdir(parents=True, exist_ok=True)
settings.slip_storage_dir.mkdir(parents=True, exist_ok=True)
with sqlite3.connect(settings.database_path) as conn:
conn.executescript("""
CREATE TABLE IF NOT EXISTS bookings (
id TEXT PRIMARY KEY,
guest_name TEXT,
total_due REAL,
currency TEXT,
status TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS payment_slips (
id INTEGER PRIMARY KEY AUTOINCREMENT,
booking_id TEXT,
reference_code TEXT,
slip_image_path TEXT,
extracted_amount REAL,
extracted_date TEXT,
extracted_time TEXT,
status TEXT,
ocr_text TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS expenses (
id INTEGER PRIMARY KEY AUTOINCREMENT,
amount REAL NOT NULL,
category TEXT NOT NULL,
note TEXT,
created_by TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
);
""")
def is_authorized(update: Update, context: ContextTypes.DEFAULT_TYPE) -> bool:
"""Check if user is authorized."""
user = update.effective_user
if not user:
return False
auth_id = context.application.bot_data.get("authorized_user_id")
return auth_id is not None and user.id == auth_id
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /start command."""
if not is_authorized(update, context):
await update.message.reply_text("🔐 คุณไม่มีสิทธิ์ใช้งานบอทนี้")
return
stg: Settings = context.application.bot_data["settings"]
keyboard = ReplyKeyboardMarkup(
[[KeyboardButton("🛎️ เปิดระบบบริหารโรงแรม", web_app=WebAppInfo(url=stg.webapp_base_url))]],
resize_keyboard=True
)
await update.message.reply_text("ยินดีต้อนรับสู่ Hotel OS Bot!", reply_markup=keyboard)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /help command."""
if not is_authorized(update, context):
return
await update.message.reply_text(
"/start – เปิดระบบ\n"
"/daily_report – รายงานประจำวัน\n"
"/receipt <ID> – ใบเสร็จ\n"
"พิมพ์ข้อความ → AI ตอบ"
)
async def daily_report(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /daily_report command."""
if not is_authorized(update, context):
return
stg: Settings = context.application.bot_data["settings"]
today = date.today().isoformat()
with sqlite3.connect(stg.database_path) as conn:
bookings = conn.execute("SELECT COUNT(*) FROM bookings WHERE DATE(created_at) = ?", (today,)).fetchone()[0]
paid = conn.execute("SELECT SUM(extracted_amount) FROM payment_slips WHERE status = 'verified' AND DATE(created_at) = ?", (today,)).fetchone()[0] or 0
expense = conn.execute("SELECT SUM(amount) FROM expenses WHERE DATE(created_at) = ?", (today,)).fetchone()[0] or 0
await update.message.reply_text(
f"📊 รายงานวันที่ {today}:\n"
f"• การจอง: {bookings} ราย\n"
f"• ชำระ: {paid:,.2f} บาท\n"
f"• ค่าใช้จ่าย: {expense:,.2f} บาท\n"
f"• กำไร: {paid - expense:,.2f} บาท"
)
async def echo(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle text messages with AI."""
if not is_authorized(update, context) or not update.message.text:
return
model = context.application.bot_data.get("gemini_model")
if not model:
await update.message.reply_text("AI ยังไม่พร้อม")
return
try:
resp = await model.generate_content_async(update.message.text)
await update.message.reply_text(resp.text or "ขอโทษครับ ผมตอบไม่ได้")
except Exception as e:
logger.error(f"Gemini error: {e}")
await update.message.reply_text("เกิดข้อผิดพลาดกับ AI")
async def web_app_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle data from Web App."""
if not is_authorized(update, context) or not update.message.web_app_data:
return
try:
payload = json.loads(update.message.web_app_data.data)
except json.JSONDecodeError:
await update.message.reply_text("ข้อมูลผิดพลาด")
return
if payload.get("type") == "payment_slip":
await process_slip(update, context, payload)
elif payload.get("type") == "expense_entry":
await process_expense(update, context, payload)
async def process_slip(update: Update, context: ContextTypes.DEFAULT_TYPE, payload: dict):
"""Process payment slip from Web App."""
stg: Settings = context.application.bot_data["settings"]
booking_id = payload.get("booking_id")
file_b64 = payload.get("file_base64")
if not booking_id or not file_b64:
await update.message.reply_text("ข้อมูลไม่ครบ")
return
dest = stg.slip_storage_dir / f"{booking_id}_{int(datetime.utcnow().timestamp())}.jpg"
try:
await asyncio.to_thread(save_base64_image, file_b64, dest)
extraction = await asyncio.to_thread(extract_slip_information, dest, stg)
verification = await asyncio.to_thread(verify_booking_amount, stg, booking_id, extraction.amount)
except Exception as e:
logger.error(f"Slip processing error: {e}")
await update.message.reply_text(f"เกิดข้อผิดพลาด: {e}")
return
# Save to DB
with sqlite3.connect(stg.database_path) as conn:
conn.execute(
"INSERT INTO payment_slips (booking_id, slip_image_path, extracted_amount, status, ocr_text) VALUES (?, ?, ?, ?, ?)",
(booking_id, str(dest), float(extraction.amount) if extraction.amount else None, verification["status"], extraction.raw_text)
)
if verification["status"] == "verified":
await update.message.reply_text(f"✅ ตรวจสอบสลิปสำเร็จ ({booking_id})")
else:
await update.message.reply_text(f"⚠️ สลิปไม่ตรงกับข้อมูล: {verification['status']}")
async def process_expense(update: Update, context: ContextTypes.DEFAULT_TYPE, payload: dict):
"""Process expense entry from Web App."""
stg: Settings = context.application.bot_data["settings"]
try:
amount = Decimal(str(payload["amount"]))
except:
await update.message.reply_text("จำนวนเงินไม่ถูกต้อง")
return
with sqlite3.connect(stg.database_path) as conn:
conn.execute(
"INSERT INTO expenses (amount, category, note) VALUES (?, ?, ?)",
(float(amount), payload.get("category", "อื่นๆ"), payload.get("note", ""))
)
await update.message.reply_text(f"✅ บันทึกค่าใช้จ่าย {amount} บาท")
@app.on_event("startup")
async def startup():
"""Initialize bot on startup."""
global bot_app
settings = load_settings()
init_db(settings)
if settings.tesseract_cmd:
pytesseract.pytesseract.tesseract_cmd = settings.tesseract_cmd
genai.configure(api_key=settings.gemini_api_key)
gemini_model = genai.GenerativeModel(settings.gemini_model)
bot_app = Application.builder().token(settings.telegram_bot_token).build()
bot_app.bot_data["settings"] = settings
bot_app.bot_data["authorized_user_id"] = settings.authorized_user_id
bot_app.bot_data["gemini_model"] = gemini_model
# Add handlers
bot_app.add_handler(CommandHandler("start", start))
bot_app.add_handler(CommandHandler("help", help_command))
bot_app.add_handler(CommandHandler("daily_report", daily_report))
bot_app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, echo))
bot_app.add_handler(MessageHandler(filters.StatusUpdate.WEB_APP_DATA, web_app_handler))
await bot_app.initialize()
await bot_app.start()
asyncio.create_task(bot_app.updater.start_polling())
logger.info("✅ Bot started in polling mode")
@app.get("/health")
async def health():
return {"status": "ok"}
if __name__ == "__main__":
port = int(os.getenv("PORT", "8080"))
uvicorn.run(app, host="0.0.0.0", port=port)