"""Telegram bot — runs as a daemon thread sharing the same ledger instance.""" import os import asyncio import logging import threading from telegram import Update from telegram.ext import ( ApplicationBuilder, CommandHandler, MessageHandler, ContextTypes, filters, ) from ledger import Ledger from agent import batch_response, execute logger = logging.getLogger(__name__) MAX_HISTORY = 20 # messages (10 turns) kept per user # ── handlers ────────────────────────────────────────────────────────────────── async def on_start(update: Update, context: ContextTypes.DEFAULT_TYPE): await update.message.reply_text( "👋 *Finance Manager*\n\n" "Just tell me about your expenses naturally:\n" "• _Spent $12 on lunch_\n" "• _Paid $1200 rent yesterday_\n" "• _Undo_ — removes the last entry\n\n" "Commands:\n" "/summary — spending by category\n" "/clear — reset conversation history", parse_mode="Markdown", ) async def on_summary(update: Update, context: ContextTypes.DEFAULT_TYPE): ledger: Ledger = context.bot_data["ledger"] by_cat = ledger.by_category() total = ledger.total() if not by_cat: await update.message.reply_text("No entries yet. Start logging expenses!") return lines = [f"💰 *Total: ${total:.2f}*\n"] lines += [ f"• {cat}: ${amt:.2f}" for cat, amt in sorted(by_cat.items(), key=lambda x: -x[1]) ] await update.message.reply_text("\n".join(lines), parse_mode="Markdown") async def on_clear(update: Update, context: ContextTypes.DEFAULT_TYPE): context.user_data["history"] = [] await update.message.reply_text("Conversation history cleared.") async def on_message(update: Update, context: ContextTypes.DEFAULT_TYPE): ledger: Ledger = context.bot_data["ledger"] token = os.getenv("HF_TOKEN", "") if not token: await update.message.reply_text("HF_TOKEN is not configured.") return history: list[dict] = context.user_data.get("history", []) text = update.message.text reply, action = await asyncio.to_thread(batch_response, text, history, ledger, token) if action: confirmation = execute(action, ledger, text) if confirmation: reply += f"\n\n{confirmation}" # Persist last N messages for context context.user_data["history"] = ( history + [ {"role": "user", "content": text}, {"role": "assistant", "content": reply}, ] )[-MAX_HISTORY:] await update.message.reply_text(reply, parse_mode="Markdown") # ── entry point ─────────────────────────────────────────────────────────────── def start(ledger: Ledger): """Start the Telegram bot in a daemon thread. No-op if token not set.""" bot_token = os.getenv("TELEGRAM_BOT_TOKEN") if not bot_token: logger.info("TELEGRAM_BOT_TOKEN not set — Telegram bot disabled.") return async def _run(): app = ApplicationBuilder().token(bot_token).build() app.bot_data["ledger"] = ledger app.add_handler(CommandHandler("start", on_start)) app.add_handler(CommandHandler("summary", on_summary)) app.add_handler(CommandHandler("clear", on_clear)) app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, on_message)) logger.info("Telegram bot polling started.") async with app: await app.start() await app.updater.start_polling() await asyncio.Event().wait() # run until process exits threading.Thread(target=lambda: asyncio.run(_run()), daemon=True).start()