Spaces:
Runtime error
Runtime error
| """Telegram bot β runs as a daemon thread sharing the same ledger instance.""" | |
| import os | |
| import asyncio | |
| import logging | |
| import threading | |
| from telegram import Update | |
| from telegram.ext import ( | |
| ApplicationBuilder, CommandHandler, MessageHandler, | |
| ContextTypes, filters, | |
| ) | |
| from ledger import Ledger | |
| from agent import batch_response, execute | |
| logger = logging.getLogger(__name__) | |
| MAX_HISTORY = 20 # messages (10 turns) kept per user | |
| # ββ handlers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def on_start(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| await update.message.reply_text( | |
| "π *Finance Manager*\n\n" | |
| "Just tell me about your expenses naturally:\n" | |
| "β’ _Spent $12 on lunch_\n" | |
| "β’ _Paid $1200 rent yesterday_\n" | |
| "β’ _Undo_ β removes the last entry\n\n" | |
| "Commands:\n" | |
| "/summary β spending by category\n" | |
| "/clear β reset conversation history", | |
| parse_mode="Markdown", | |
| ) | |
| async def on_summary(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| ledger: Ledger = context.bot_data["ledger"] | |
| by_cat = ledger.by_category() | |
| total = ledger.total() | |
| if not by_cat: | |
| await update.message.reply_text("No entries yet. Start logging expenses!") | |
| return | |
| lines = [f"π° *Total: ${total:.2f}*\n"] | |
| lines += [ | |
| f"β’ {cat}: ${amt:.2f}" | |
| for cat, amt in sorted(by_cat.items(), key=lambda x: -x[1]) | |
| ] | |
| await update.message.reply_text("\n".join(lines), parse_mode="Markdown") | |
| async def on_clear(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| context.user_data["history"] = [] | |
| await update.message.reply_text("Conversation history cleared.") | |
| async def on_message(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| ledger: Ledger = context.bot_data["ledger"] | |
| token = os.getenv("HF_TOKEN", "") | |
| if not token: | |
| await update.message.reply_text("HF_TOKEN is not configured.") | |
| return | |
| history: list[dict] = context.user_data.get("history", []) | |
| text = update.message.text | |
| reply, action = await asyncio.to_thread(batch_response, text, history, ledger, token) | |
| if action: | |
| confirmation = execute(action, ledger, text) | |
| if confirmation: | |
| reply += f"\n\n{confirmation}" | |
| # Persist last N messages for context | |
| context.user_data["history"] = ( | |
| history + [ | |
| {"role": "user", "content": text}, | |
| {"role": "assistant", "content": reply}, | |
| ] | |
| )[-MAX_HISTORY:] | |
| await update.message.reply_text(reply, parse_mode="Markdown") | |
| # ββ entry point βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def start(ledger: Ledger): | |
| """Start the Telegram bot in a daemon thread. No-op if token not set.""" | |
| bot_token = os.getenv("TELEGRAM_BOT_TOKEN") | |
| if not bot_token: | |
| logger.info("TELEGRAM_BOT_TOKEN not set β Telegram bot disabled.") | |
| return | |
| async def _run(): | |
| app = ApplicationBuilder().token(bot_token).build() | |
| app.bot_data["ledger"] = ledger | |
| app.add_handler(CommandHandler("start", on_start)) | |
| app.add_handler(CommandHandler("summary", on_summary)) | |
| app.add_handler(CommandHandler("clear", on_clear)) | |
| app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, on_message)) | |
| logger.info("Telegram bot polling started.") | |
| async with app: | |
| await app.start() | |
| await app.updater.start_polling() | |
| await asyncio.Event().wait() # run until process exits | |
| threading.Thread(target=lambda: asyncio.run(_run()), daemon=True).start() | |