Spaces:
Runtime error
Runtime error
| # telegram_bot.py | |
| import os | |
| import requests | |
| import json | |
| from dotenv import load_dotenv | |
| import datetime | |
| import logging | |
| # Telegram imports (PTB v20+) | |
| from telegram import Update, ReplyKeyboardMarkup, ReplyKeyboardRemove | |
| from telegram.request import HTTPXRequest | |
| from telegram.constants import ChatAction | |
| from telegram.ext import ( | |
| Application, CommandHandler, MessageHandler, filters, | |
| ContextTypes, ConversationHandler | |
| ) | |
| # LangChain embeddings + Chroma. Provide a fallback import for the new package name. | |
| try: | |
| # older usage | |
| from langchain_community.vectorstores import Chroma | |
| except Exception: | |
| try: | |
| # attempt newer package import if available | |
| from langchain_chroma import Chroma | |
| except Exception: | |
| Chroma = None | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| # Load environment variables (useful for local dev) | |
| load_dotenv() | |
| # Logging | |
| logging.basicConfig( | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| level=logging.INFO | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Conversation states | |
| DESCRIBING_SYMPTOMS = 1 | |
| class TelegramHomeopathyBot: | |
| def __init__(self, use_vector_db: bool = True): | |
| self.logs_dir = "UserChatLogs" | |
| os.makedirs(self.logs_dir, exist_ok=True) | |
| self.embeddings = None | |
| self.vector_store = None | |
| self.retriever = None | |
| # store user sessions in memory | |
| self.user_sessions = {} | |
| # Initialize embeddings & vector store lazily to reduce startup memory spike | |
| if use_vector_db and Chroma is not None: | |
| try: | |
| # embedding model (cpu) | |
| TINY_EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2" | |
| self.embeddings = HuggingFaceEmbeddings( | |
| model_name=TINY_EMBEDDING_MODEL, | |
| model_kwargs={'device': 'cpu'} | |
| ) | |
| # initialize Chroma vector store (will create directory if missing) | |
| self.vector_store = Chroma( | |
| persist_directory="./vector_db", | |
| embedding_function=self.embeddings | |
| ) | |
| self.retriever = self.vector_store.as_retriever(search_kwargs={"k": 4}) | |
| logger.info("Vector DB and retriever initialized.") | |
| except Exception as e: | |
| # If vector DB fails, keep running but log the issue | |
| logger.error(f"Failed to initialize vector DB or embeddings: {e}") | |
| self.embeddings = None | |
| self.vector_store = None | |
| self.retriever = None | |
| else: | |
| if Chroma is None: | |
| logger.warning("Chroma import is not available. Skipping vector DB initialization.") | |
| else: | |
| logger.info("Vector DB initialization disabled by flag.") | |
| def log_chat(self, user_id: int, username: str, message: str, is_bot: bool = False): | |
| """Log user/bot messages to individual files""" | |
| try: | |
| log_file = os.path.join(self.logs_dir, f"user_{user_id}.log") | |
| timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| direction = "BOT" if is_bot else "USER" | |
| with open(log_file, "a", encoding="utf-8") as f: | |
| f.write(f"[{timestamp}] {direction} ({username}): {message}\n") | |
| except Exception as e: | |
| logger.error(f"Failed to log chat: {e}") | |
| def get_user_session(self, user_id): | |
| """Get or create user session, initializing history.""" | |
| if user_id not in self.user_sessions: | |
| self.user_sessions[user_id] = { | |
| 'chat_history': [], | |
| 'consultation_count': 0, | |
| 'last_query': None | |
| } | |
| return self.user_sessions[user_id] | |
| def get_relevant_context(self, query: str, history: list): | |
| """ | |
| Retrieve relevant context from the vector database. | |
| Returns a string (concatenated page_content) or empty string. | |
| """ | |
| if not self.retriever: | |
| return "" | |
| # Build a composite query using user history to improve retrieval | |
| user_history = [msg['content'] for msg in history if msg.get('role') == 'user'] | |
| history_summary = " ".join(user_history) | |
| composite_query = f"Patient's case summary: {history_summary} {query}" if history_summary else query | |
| try: | |
| # Proper LangChain retriever call | |
| docs = self.retriever.get_relevant_documents(composite_query) | |
| context = "\n".join([getattr(d, "page_content", str(d)) for d in docs]) | |
| return context | |
| except Exception as e: | |
| logger.error(f"Retrieval error: {e}") | |
| return "" | |
| def query_ai(self, user_message: str, context: str, history: list): | |
| """Query the AI model via Chute.ai (or configured LLM endpoint).""" | |
| api_key = os.getenv("CHUTEAI_API_KEY") | |
| if not api_key: | |
| logger.error("CRITICAL: CHUTEAI_API_KEY is missing.") | |
| return "β Configuration Error: The AI service API key (CHUTEAI_API_KEY) is missing. Please check your setup." | |
| headers = { | |
| "Authorization": f"Bearer {api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| system_prompt = """You are an expert Homeopathy Doctor. Your goal is to find the best possible remedy from the provided Context. | |
| **DIAGNOSTIC PROCESS & CONSTRAINTS:** | |
| 1. **Focus:** Analyze the patient's full symptom set, including the **Chat History**. Only analyze the symptoms explicitly mentioned by the patient. IGNORE any symptoms found *only* in the Context that the patient has not mentioned. | |
| 2. **Clarification:** You MUST conclude the diagnosis and suggest a remedy within the first **two or three turns** of the conversation. Ask a MAXIMUM of 3 concise clarifying questions in the first turn only, if needed. In subsequent turns, prioritize diagnosing based on the accumulated history. | |
| 3. **Prescription Rule:** **MUST** prescribe the single best-matching remedy when: | |
| a) You have clear matching symptoms from the Context. | |
| b) The patient explicitly asks for the medicine, or indicates they cannot answer more questions. In this case, use the best available information from the Chat History to prescribe. | |
| 4. **Safety:** If unsure or the context doesn't contain a relevant remedy, admit it honestly. | |
| 5. **Tone:** Always be professional, caring, and responsible. | |
| """ | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| *history, | |
| {"role": "user", "content": f"Context from homeopathy book:\n{context}\n\nPatient Complaint: {user_message}"} | |
| ] | |
| data = { | |
| "model": "meituan-longcat/LongCat-Flash-Chat-FP8", | |
| "messages": messages, | |
| "temperature": 0.2, | |
| "max_tokens": 500 | |
| } | |
| api_url = "https://llm.chutes.ai/v1/chat/completions" | |
| try: | |
| response = requests.post( | |
| api_url, | |
| headers=headers, | |
| json=data, | |
| timeout=(5, 30) # connect + read | |
| ) | |
| if response.status_code == 200: | |
| j = response.json() | |
| return j["choices"][0]["message"]["content"] | |
| else: | |
| try: | |
| error_data = response.json() | |
| error_message = error_data.get("error", {}).get("message", "No detailed error message.") | |
| except Exception: | |
| error_message = response.text | |
| if response.status_code == 401: | |
| logger.error("CHUTEAI_API_KEY is likely invalid or unauthorized (401).") | |
| logger.error(f"Chute.ai Error {response.status_code}: {error_message}") | |
| return f"β I'm having technical difficulties. Please try again later. (Error: {response.status_code})" | |
| except requests.exceptions.Timeout: | |
| logger.error("Chute.ai request timed out.") | |
| return "β Connection error: The AI service took too long to respond. Please try again." | |
| except Exception as e: | |
| logger.error(f"Connection error while querying AI: {e}") | |
| return f"β Connection error. Please try again. Error: {str(e)}" | |
| # Create bot instance (lazy) | |
| try: | |
| homeopathy_bot = TelegramHomeopathyBot(use_vector_db=True) | |
| except Exception as e: | |
| logger.error(f"Failed to initialize TelegramHomeopathyBot: {e}") | |
| homeopathy_bot = None | |
| # Telegram handlers (async) | |
| async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| """Send welcome message and reset session when /start is issued.""" | |
| if not homeopathy_bot: | |
| await update.message.reply_text("β Initialization Error: The AI service failed to start due to resource constraints. Please check the logs.") | |
| return ConversationHandler.END | |
| user = update.effective_user | |
| welcome_text = f"""π Hello *{user.first_name}*! I'm your AI Homeopathy Doctor π€ | |
| π‘ *How to use:* | |
| 1. Describe your symptoms in detail | |
| 2. I'll ask clarifying questions (max 3 in the first turn) | |
| 3. I'll suggest potential homeopathic remedies within 3 turns | |
| β οΈ *Disclaimer:* This is for educational purposes only. Always consult a qualified homeopath or medical professional. | |
| Type your symptoms below to begin... | |
| """ | |
| # Reset session | |
| user_id = update.effective_user.id | |
| homeopathy_bot.user_sessions[user_id] = {'chat_history': [], 'consultation_count': 0, 'last_query': None} | |
| await update.message.reply_text(welcome_text, parse_mode='Markdown') | |
| homeopathy_bot.log_chat(user.id, user.username or f"{user.first_name} {user.last_name or ''}".strip(), "/start command received", is_bot=False) | |
| return DESCRIBING_SYMPTOMS | |
| async def handle_symptoms(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| """Handle user's symptom description and continue the diagnosis.""" | |
| if not homeopathy_bot: | |
| await update.message.reply_text("β Service is unavailable due to an earlier initialization failure. Please try later.") | |
| return ConversationHandler.END | |
| user_id = update.effective_user.id | |
| user_input = update.message.text or "" | |
| session = homeopathy_bot.get_user_session(user_id) | |
| session['consultation_count'] += 1 | |
| # Send typing action properly | |
| try: | |
| await context.bot.send_chat_action(chat_id=update.effective_chat.id, action=ChatAction.TYPING) | |
| except Exception: | |
| pass | |
| processing_msg = await update.message.reply_text("π Analyzing your symptoms...") | |
| try: | |
| # Retrieve context | |
| context_text = homeopathy_bot.get_relevant_context(user_input, session['chat_history']) | |
| response = homeopathy_bot.query_ai(user_input, context_text, session['chat_history']) | |
| # Update chat history only if response isn't an error notice | |
| if not response.startswith("β"): | |
| session['chat_history'].append({"role": "user", "content": user_input}) | |
| session['chat_history'].append({"role": "assistant", "content": response}) | |
| # Logging | |
| user = update.effective_user | |
| homeopathy_bot.log_chat(user.id, user.username or f"{user.first_name} {user.last_name or ''}".strip(), user_input, is_bot=False) | |
| homeopathy_bot.log_chat(user.id, user.username or f"{user.first_name} {user.last_name or ''}".strip(), response, is_bot=True) | |
| # Keep only last 6 messages to prevent context overflow | |
| if len(session['chat_history']) > 6: | |
| session['chat_history'] = session['chat_history'][-6:] | |
| # Delete processing message if possible | |
| try: | |
| await processing_msg.delete() | |
| except Exception: | |
| pass | |
| await update.message.reply_text(f"π©Ί *Homeopathy Doctor:*\n\n{response}", parse_mode='Markdown') | |
| # Quick action buttons | |
| quick_actions = [["π Describe more symptoms or answer questions"], ["π Start a new consultation"]] | |
| reply_markup = ReplyKeyboardMarkup(quick_actions, one_time_keyboard=True, resize_keyboard=True) | |
| await update.message.reply_text("What would you like to do next?", reply_markup=reply_markup) | |
| except Exception as e: | |
| logger.error(f"Error processing message: {e}") | |
| try: | |
| await processing_msg.delete() | |
| except Exception: | |
| pass | |
| await update.message.reply_text("β Sorry, I encountered an error. Please try again.") | |
| return DESCRIBING_SYMPTOMS | |
| async def handle_quick_actions(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| """Handle quick action buttons.""" | |
| user_input = update.message.text or "" | |
| if user_input in ["π Describe more symptoms or answer questions", "Reset please"]: | |
| await update.message.reply_text("Please provide any additional details or clarify the Doctor's previous questions...") | |
| elif user_input in ["π Start a new consultation", "Reset please"]: | |
| user_id = update.effective_user.id | |
| if user_id in homeopathy_bot.user_sessions: | |
| homeopathy_bot.user_sessions[user_id] = {'chat_history': [], 'consultation_count': 0, 'last_query': None} | |
| if homeopathy_bot.vector_store: | |
| # reinitialize retriever to clear cache (if available) | |
| try: | |
| homeopathy_bot.retriever = homeopathy_bot.vector_store.as_retriever(search_kwargs={"k": 4}) | |
| except Exception: | |
| pass | |
| await update.message.reply_text("π Starting new consultation. Please describe your symptoms...", reply_markup=ReplyKeyboardRemove()) | |
| return DESCRIBING_SYMPTOMS | |
| async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| """Cancel the conversation.""" | |
| user_id = update.effective_user.id | |
| if user_id in homeopathy_bot.user_sessions: | |
| del homeopathy_bot.user_sessions[user_id] | |
| await update.message.reply_text( | |
| "π Consultation ended. Thank you for using Homeopathy AI Doctor!\n\nRemember to consult a qualified homeopath for proper treatment. πΏ", | |
| reply_markup=ReplyKeyboardRemove() | |
| ) | |
| return ConversationHandler.END | |
| async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| """Send help message.""" | |
| help_text = """ | |
| π€ *Homeopathy AI Doctor Bot Help* | |
| *Available Commands:* | |
| /start - Begin a new consultation (resets history) | |
| /help - Show this help message | |
| /cancel - End current consultation | |
| *How to get the best results:* | |
| β’ Describe symptoms in detail | |
| β’ Mention location, intensity, and timing | |
| β’ Be specific about associated feelings | |
| *Disclaimer:* This bot provides educational information only. Always consult qualified medical professionals. | |
| """ | |
| await update.message.reply_text(help_text, parse_mode='Markdown') | |
| async def generic_message(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| """Handles any plain text message received outside the ConversationHandler.""" | |
| await update.message.reply_text( | |
| "π Welcome! To begin a new consultation and describe your symptoms, please use the */start* command.", | |
| parse_mode='Markdown' | |
| ) | |
| async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE): | |
| """Log errors and send a friendly message.""" | |
| logger.error(f"Update {update} caused error {context.error}") | |
| try: | |
| if getattr(update, "message", None): | |
| await update.message.reply_text( | |
| "β Sorry, I encountered an unexpected error. Please try again or use /start to begin a new consultation." | |
| ) | |
| except Exception: | |
| pass | |
| def check_dependencies(): | |
| """Checks for required environment variables and the vector database.""" | |
| if not os.path.exists("./vector_db"): | |
| print("β Vector database not found. If you rely on it, please upload 'vector_db' folder or disable use_vector_db.") | |
| # do not block startup - vector DB optional | |
| if not os.getenv("CHUTEAI_API_KEY"): | |
| print("β CHUTEAI_API_KEY not found in environment. AI queries will fail until it's set.") | |
| return True | |
| def main(): | |
| TELEGRAM_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") | |
| if not TELEGRAM_TOKEN: | |
| print("FATAL ERROR: TELEGRAM_BOT_TOKEN is missing from environment secrets. Cannot initialize the Telegram client.") | |
| return | |
| # Sanity check token length | |
| if len(TELEGRAM_TOKEN) < 40: | |
| print(f"WARNING: TELEGRAM_BOT_TOKEN has suspicious length ({len(TELEGRAM_TOKEN)}). Double-check the secret value.") | |
| if not check_dependencies(): | |
| return | |
| print("π Starting Telegram Homeopathy Bot...") | |
| # Create HTTPXRequest and pass it into Application.builder().request(...) | |
| request = HTTPXRequest( | |
| connection_pool_size=4, | |
| connect_timeout=5.0, | |
| read_timeout=15.0, | |
| ) | |
| application = ( | |
| Application.builder() | |
| .token(TELEGRAM_TOKEN) | |
| .request(request) | |
| .build() | |
| ) | |
| # Conversation handler | |
| conv_handler = ConversationHandler( | |
| entry_points=[CommandHandler('start', start)], | |
| states={ | |
| DESCRIBING_SYMPTOMS: [ | |
| MessageHandler(filters.Regex(r'^(π|π)'), handle_quick_actions), | |
| MessageHandler(filters.TEXT & ~filters.COMMAND, handle_symptoms), | |
| ], | |
| }, | |
| fallbacks=[CommandHandler('cancel', cancel)], | |
| ) | |
| # Register handlers | |
| application.add_handler(conv_handler) | |
| application.add_handler(CommandHandler('help', help_command)) | |
| application.add_handler(CommandHandler('cancel', cancel)) | |
| # generic message outside conversation flow - keep last so it doesn't override conv | |
| application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, generic_message)) | |
| # Error handler | |
| application.add_error_handler(error_handler) | |
| print("β Bot is running... Press Ctrl+C to stop") | |
| application.run_polling(allowed_updates=Update.ALL_TYPES, bootstrap_retries=5) | |
| if __name__ == '__main__': | |
| main() | |