| import logging |
| import threading |
| from typing import Callable, Optional |
|
|
| import telebot |
|
|
|
|
| class TelegramChatService: |
| def __init__(self, llm_factory: Callable, token: str, logger: Optional[logging.Logger] = None, jira_connector=None): |
| self._llm_factory = llm_factory |
| self._token = token |
| self._logger = logger or logging.getLogger(__name__) |
| self._jira_connector = jira_connector |
| self._bot: Optional[telebot.TeleBot] = None |
| self._llm = None |
| self._thread: Optional[threading.Thread] = None |
| self._lock = threading.Lock() |
| self._running = False |
|
|
| def _ensure_bot(self) -> telebot.TeleBot: |
| if self._bot is not None: |
| return self._bot |
|
|
| bot = telebot.TeleBot(self._token, parse_mode=None) |
|
|
| @bot.message_handler(commands=["start"]) |
| def on_start(message): |
| bot.reply_to(message, "Telegram chat is ready. Send me a message.") |
|
|
| @bot.message_handler(func=lambda message: True) |
| def on_text(message): |
| text = (message.text or "").strip() |
| if not text: |
| bot.reply_to(message, "Please send a non-empty message.") |
| return |
|
|
| try: |
| answer = None |
| if self._jira_connector is not None: |
| answer = self._jira_connector.maybe_handle(text) |
|
|
| if answer is None: |
| if self._llm is None: |
| self._llm = self._llm_factory() |
| answer = self._llm.invoke(text) |
|
|
| bot.reply_to(message, str(answer).strip()) |
| except Exception as exc: |
| self._logger.exception("Failed to handle Telegram message") |
| bot.reply_to(message, f"Error: {exc}") |
|
|
| self._bot = bot |
| return bot |
|
|
| def _poll(self) -> None: |
| bot = self._ensure_bot() |
| try: |
| bot.delete_webhook(drop_pending_updates=False) |
| bot.infinity_polling(timeout=20, long_polling_timeout=20, skip_pending=True) |
| except Exception: |
| self._logger.exception("Telegram polling crashed") |
| finally: |
| with self._lock: |
| self._running = False |
|
|
| def start(self) -> bool: |
| with self._lock: |
| if self._running: |
| return False |
|
|
| if not self._token: |
| raise ValueError("Missing TELEGRAM_TOKEN") |
|
|
| self._running = True |
| self._thread = threading.Thread(target=self._poll, daemon=True) |
| self._thread.start() |
| return True |
|
|
| def stop(self) -> bool: |
| with self._lock: |
| if not self._running: |
| return False |
| self._running = False |
|
|
| if self._bot is not None: |
| self._bot.stop_polling() |
| return True |
|
|
| @property |
| def is_running(self) -> bool: |
| with self._lock: |
| return self._running |
|
|