import os import json import asyncio import logging import socket import time from io import BytesIO from datetime import datetime import psycopg2 from dotenv import load_dotenv from telegram import Update from telegram.ext import Application, MessageHandler, filters, ContextTypes from telegram.request import HTTPXRequest import PyPDF2 import base64 import httpx # Load environment variables load_dotenv() # Configure logging logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO ) logger = logging.getLogger(__name__) # Configuration TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434") OLLAMA_API_KEY = os.getenv("OLLAMA_API_KEY", "") # Optional: for cloud/authenticated services VISION_MODEL = os.getenv("VISION_MODEL", "llava") # Model for image analysis CHAT_MODEL = os.getenv("CHAT_MODEL", "mistral") # Model for quiz generation DATABASE_URL = os.getenv("DATABASE_URL") # Neon connection string def init_db(): """Initialize the database table.""" if not DATABASE_URL: logger.warning("DATABASE_URL not set. Chat memory will be disabled.") return try: conn = psycopg2.connect(DATABASE_URL) cur = conn.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS chat_history ( id SERIAL PRIMARY KEY, chat_id BIGINT NOT NULL, role VARCHAR(10) NOT NULL, content TEXT NOT NULL, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); """) conn.commit() cur.close() conn.close() logger.info("Database initialized successfully.") except Exception as e: logger.error(f"Error initializing database: {e}") def save_chat_message(chat_id: int, role: str, content: str): """Save a chat message to the database.""" if not DATABASE_URL: return try: conn = psycopg2.connect(DATABASE_URL) cur = conn.cursor() cur.execute( "INSERT INTO chat_history (chat_id, role, content) VALUES (%s, %s, %s)", (chat_id, role, content) ) conn.commit() cur.close() conn.close() except Exception as e: logger.error(f"Error saving chat message: {e}") def get_chat_history(chat_id: int, limit: int = 20): """Get recent chat history for a chat_id.""" if not DATABASE_URL: return [] try: conn = psycopg2.connect(DATABASE_URL) cur = conn.cursor() cur.execute( """ SELECT role, content FROM chat_history WHERE chat_id = %s ORDER BY timestamp DESC LIMIT %s """, (chat_id, limit) ) rows = cur.fetchall() cur.close() conn.close() # Return reversed list (oldest to newest) return [{"role": row[0], "content": row[1]} for row in rows][::-1] except Exception as e: logger.error(f"Error getting chat history: {e}") return [] def clear_chat_history(chat_id: int): """Delete all chat history for a chat_id.""" if not DATABASE_URL: return try: conn = psycopg2.connect(DATABASE_URL) cur = conn.cursor() cur.execute("DELETE FROM chat_history WHERE chat_id = %s", (chat_id,)) conn.commit() cur.close() conn.close() logger.info(f"Memory cleared for chat {chat_id}") except Exception as e: logger.error(f"Error clearing chat history: {e}") async def extract_text_from_image(image_bytes: bytes) -> str: """Extract text from an image using Ollama vision API.""" try: # Convert image to base64 image_base64 = base64.b64encode(image_bytes).decode('utf-8') payload = { "model": VISION_MODEL, "messages": [{ "role": "user", "content": "Just transcribe all the text in the image in details. Output only the text, nothing else.", "images": [image_base64] }], "stream": False } headers = {} if OLLAMA_API_KEY: headers["Authorization"] = f"Bearer {OLLAMA_API_KEY}" # Use higher timeout - Ollama Cloud can be slow timeout = httpx.Timeout(10.0, read=600.0) # Connect: 10s, Read: 10 minutes async with httpx.AsyncClient(timeout=timeout) as client: response = await client.post( f"{OLLAMA_HOST}/api/chat", json=payload, headers=headers ) response.raise_for_status() result = response.json() return result['message']['content'] except Exception as e: logger.error(f"Error extracting text from image: {e}") raise def extract_text_from_pdf(pdf_bytes: bytes) -> str: """Extract text from a PDF file.""" try: pdf_reader = PyPDF2.PdfReader(BytesIO(pdf_bytes)) text = "" for page in pdf_reader.pages: text += page.extract_text() + "\n" return text.strip() except Exception as e: logger.error(f"Error extracting text from PDF: {e}") raise async def generate_quiz_questions(text: str, num_questions: int = 5) -> list: """Generate multiple-choice questions from the given text using Ollama API.""" system_prompt = """You are a High-Performance Question Generation Engine. Rules: 1. You MUST generate accurate multiple-choice questions based ONLY on the provided content. 2. Output format: Strictly valid JSON array of objects. 3. ABSOLUTELY NO conversational text, headers, footers, intros, outros, or explanations outside the JSON. 4. These are survey-style questions; do NOT identify or provide a "correct" answer. 5. The language of the questions must match the language of the source text.""" user_prompt = f"""Based on the text I provide, generate {num_questions} multiple-choice questions. You must output valid JSON only. Do not wrap it in markdown blocks (like ```json). The output format must be a strictly valid JSON array of objects, like this: [ {{ "question": "What is the primary topic discussed in the text?", "options": ["Option A", "Option B", "Option C", "Option D"] }}, {{ "question": "Which of the following best describes the author's tone?", "options": ["Option A", "Option B", "Option C", "Option D"] }} ] Text to analyze: {text}""" try: payload = { "model": CHAT_MODEL, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], "stream": False, "format": "json" } headers = {} if OLLAMA_API_KEY: headers["Authorization"] = f"Bearer {OLLAMA_API_KEY}" # Use higher timeout - Ollama Cloud can be slow timeout = httpx.Timeout(10.0, read=600.0) # Connect: 10s, Read: 10 minutes async with httpx.AsyncClient(timeout=timeout) as client: response = await client.post( f"{OLLAMA_HOST}/api/chat", json=payload, headers=headers ) response.raise_for_status() result = response.json() output = result['message']['content'] # Clean up markdown code blocks if present clean_json = output.replace('```json', '').replace('```', '').strip() # Parse JSON questions = json.loads(clean_json) return questions except json.JSONDecodeError as e: logger.error(f"JSON Parse failed: {e}") logger.error(f"Raw output: {output}") raise ValueError(f"Failed to parse quiz questions: {e}") except Exception as e: logger.error(f"Error generating quiz: {e}") raise async def send_question_poll(context: ContextTypes.DEFAULT_TYPE, chat_id: int, question_data: dict): """Send a question poll to the Telegram chat.""" try: await context.bot.send_poll( chat_id=chat_id, question=question_data['question'][:300], # Telegram limit is 300 chars options=question_data['options'][:10], # Telegram limit is 10 options type='regular', is_anonymous=False ) except Exception as e: logger.error(f"Error sending poll: {e}") raise def parse_num_questions(caption: str) -> int: """Parse number of questions from caption.""" if not caption: return 5 # Default # Try to extract a number from the caption import re numbers = re.findall(r'\d+', caption) if numbers: num = int(numbers[0]) return min(max(num, 1), 20) # Limit between 1 and 20 return 5 async def handle_file(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle incoming files (images, documents).""" message = update.message chat_id = message.chat_id user_id = message.from_user.id caption = message.caption or "" # Send "processing" message processing_msg = await message.reply_text("📝 جاري معالجة الملف... اصبر اصبر 😮‍💨") try: file_bytes = None file_type = None # Check what type of file was sent if message.photo: # Get the largest photo photo = message.photo[-1] file_info = await context.bot.get_file(photo.file_id) file_path = file_info.file_path # Handle if file_path is a full URL if file_path.startswith("http"): # Extract the relative path (everything after /file/botTOKEN/) # Format: https://api.telegram.org/file/botTOKEN/photos/file_0.jpg # We need: photos/file_0.jpg token_part = f"/file/bot{TELEGRAM_BOT_TOKEN}/" if token_part in file_path: file_path = file_path.split(token_part)[-1] # Manually download using the proxy URL custom_file_url = f"https://telegram.esmail.app/file/bot{TELEGRAM_BOT_TOKEN}/{file_path}" async with httpx.AsyncClient() as client: response = await client.get(custom_file_url) response.raise_for_status() file_bytes = response.content file_type = 'image' elif message.document: doc = message.document file_name = doc.file_name.lower() if doc.file_name else "" if file_name.endswith(('.jpg', '.jpeg', '.png', '.webp')): file_info = await context.bot.get_file(doc.file_id) file_path = file_info.file_path # Handle if file_path is a full URL if file_path.startswith("http"): token_part = f"/file/bot{TELEGRAM_BOT_TOKEN}/" if token_part in file_path: file_path = file_path.split(token_part)[-1] # Manually download using the proxy URL custom_file_url = f"https://telegram.esmail.app/file/bot{TELEGRAM_BOT_TOKEN}/{file_path}" async with httpx.AsyncClient() as client: response = await client.get(custom_file_url) response.raise_for_status() file_bytes = response.content file_type = 'image' elif file_name.endswith('.pdf'): file_info = await context.bot.get_file(doc.file_id) file_path = file_info.file_path # Handle if file_path is a full URL if file_path.startswith("http"): token_part = f"/file/bot{TELEGRAM_BOT_TOKEN}/" if token_part in file_path: file_path = file_path.split(token_part)[-1] # Manually download using the proxy URL custom_file_url = f"https://telegram.esmail.app/file/bot{TELEGRAM_BOT_TOKEN}/{file_path}" async with httpx.AsyncClient() as client: response = await client.get(custom_file_url) response.raise_for_status() file_bytes = response.content file_type = 'pdf' else: await processing_msg.edit_text("❌ نوع ملف مش مدعوم! 🤔\n\n✅ الأنواع المدعومة:\n• صور (JPG, PNG, WebP)\n• ملفات PDF\n• نصوص مباشرة") return else: await processing_msg.edit_text("❌ نوع ملف مش مدعوم!") return # Extract text based on file type if file_type == 'image': await processing_msg.edit_text("📸 جاري تحليل الصورة...") text = await extract_text_from_image(bytes(file_bytes)) elif file_type == 'pdf': await processing_msg.edit_text("📄 جاري استخراج النص من الـ PDF...") text = extract_text_from_pdf(bytes(file_bytes)) if not text or len(text.strip()) < 10: await processing_msg.edit_text("❌ مش قادر استخرج نص كافي من الملف\n\n💡 تأكد أن الملف يحتوي على نص واضح") return # Parse number of questions from caption num_questions = parse_num_questions(caption) # Generate quiz questions await processing_msg.edit_text(f"🧠 جاري توليد {num_questions} اسئلة...") questions = await generate_quiz_questions(text, num_questions) if not questions: await processing_msg.edit_text("❌ مش قادر اعمل اسئلة من النص ده") return # Delete processing message await processing_msg.delete() # Send each question as a poll for i, q in enumerate(questions): try: await send_question_poll(context, chat_id, q) # Small delay between polls to avoid rate limiting if i < len(questions) - 1: await asyncio.sleep(1) except Exception as e: logger.error(f"Error sending question {i+1}: {e}") continue # Send completion message after a delay await asyncio.sleep(2) await context.bot.send_message( chat_id=chat_id, text="✅ انتهيت! \n\nيلا اي خدمه، افتكرنا بعد الامتحانات بقا 😑\nانسان مصلحجي!\n\n🔄 بتبعتلي ملف تاني؟", reply_to_message_id=message.message_id ) except Exception as e: logger.error(f"Error processing file: {e}") await processing_msg.edit_text(f"❌ حصل مشكلة: {str(e)[:100]}") async def generate_chat_response(chat_id: int, text: str, user_name: str = "User") -> str: """Generate a conversational response using Ollama API with smart context.""" system_prompt = f"""You are the Official AI Question Bot. 🧠 Your strictly defined role is to assist {user_name} in preparing for exams and studying by converting content (Images, PDFs, or Text) into interactive Telegram polls. - You are professional, authoritative, and helpful. - If the user chats with you, your primary objective is to guide them towards their study goals. - If they ask unrelated questions, politely redirect them to your core functionality: "My purpose is to help you study. Please send me content (Image/PDF/Text) to generate questions." - Maintain a strictly academic yet encouraging tone. - Do not engage in roleplay or off-topic conversation. - You are NOT generating questions in this chat mode, just providing guidance or greetings.""" # Get recent history (limit 20 messages) history = get_chat_history(chat_id, limit=20) # Smart Context Pruning: Keep history within ~3000 characters # We iterate from the end (newest) to beginning (oldest) and keep what fits pruned_history = [] total_chars = 0 max_chars = 3000 for msg in reversed(history): msg_len = len(msg['content']) if total_chars + msg_len < max_chars: pruned_history.insert(0, msg) total_chars += msg_len else: break messages = [{"role": "system", "content": system_prompt}] # Add pruned history for msg in pruned_history: messages.append(msg) # Add current user message messages.append({"role": "user", "content": text}) try: payload = { "model": CHAT_MODEL, "messages": messages, "stream": False } headers = {} if OLLAMA_API_KEY: headers["Authorization"] = f"Bearer {OLLAMA_API_KEY}" async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post( f"{OLLAMA_HOST}/api/chat", json=payload, headers=headers ) response.raise_for_status() result = response.json() return result['message']['content'] except Exception as e: logger.error(f"Error generating chat response: {e}") return "اهلا! 👋 ابعتلي صورة او ملف عشان اعملك كويز." async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle text messages - generate quiz from text directly.""" message = update.message chat_id = message.chat_id text = message.text user_name = message.from_user.first_name or "User" if not text: return # If text is short (< 50 chars), treat it as conversation if len(text.strip()) < 50: # Save user message to memory save_chat_message(chat_id, "user", text) # Show typing indicator await context.bot.send_chat_action(chat_id=chat_id, action="typing") # Generate response with history response = await generate_chat_response(chat_id, text, user_name) # Save bot response to memory save_chat_message(chat_id, "assistant", response) await message.reply_text(response) return # Check if the first word/line is a number for question count lines = text.strip().split('\n') first_line = lines[0].strip() num_questions = 5 if first_line.isdigit(): num_questions = min(max(int(first_line), 1), 20) text = '\n'.join(lines[1:]) # Remove the number line processing_msg = await message.reply_text("📝 جاري معالجة النص... اصبر اصبر 😮‍💨") try: await processing_msg.edit_text(f"🧠 جاري توليد {num_questions} اسئلة...") questions = await generate_quiz_questions(text, num_questions) if not questions: await processing_msg.edit_text("❌ مش قادر اعمل اسئلة من النص ده") return await processing_msg.delete() for i, q in enumerate(questions): try: await send_question_poll(context, chat_id, q) if i < len(questions) - 1: await asyncio.sleep(1) except Exception as e: logger.error(f"Error sending question {i+1}: {e}") continue await asyncio.sleep(2) await context.bot.send_message( chat_id=chat_id, text="✅ انتهيت! \n\nيلا اي خدمه، افتكرنا بعد الامتحانات بقا 😑\nانسان مصلحجي!\n\n🔄 بتبعتلي نص تاني؟", reply_to_message_id=message.message_id ) except Exception as e: logger.error(f"Error processing text: {e}") await processing_msg.edit_text(f"❌ حصل مشكلة: {str(e)[:100]}") async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle /start command.""" chat_id = update.message.chat_id # Clear memory on /start clear_chat_history(chat_id) welcome_message = """📚✨ *(Just share your content, and I’ll handle the rest.)* I can generate multiple questions from your material (Images, PDFs, or Text). **How to use:** 1. Send an image, PDF, or text. 2. (Optional) Add a number in the caption to specify how many questions (Default: 5). 🚀 Let's start!""" await update.message.reply_text(welcome_message) async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle /help command.""" help_message = """❓ الأسئلة الشائعة • كم عدد الأسئلة؟ من 1 إلى 20 سؤال • هل لازم النص عربي؟ لا، يدعم العربي والإنجليزي وعدة لغات • ما ظهرت الإجابة الصحيحة؟ تأكد أن النص واضح ومقروء • البوت بطيء؟ يعتمد على حجم الملف وسرعة الإنترنت ⏳ • كيف أبدأ؟ اضغط /start""" await update.message.reply_text(help_message) def wait_for_network(): """Wait for network connectivity/DNS resolution before starting.""" # Check Telegram Proxy targets = ["telegram.esmail.app"] # Check Ollama Host from urllib.parse import urlparse ollama_host = os.getenv("OLLAMA_HOST", "http://localhost:11434") try: parsed_ollama = urlparse(ollama_host) ollama_hostname = parsed_ollama.hostname if ollama_hostname: targets.append(ollama_hostname) except Exception as e: logger.warning(f"Could not parse OLLAMA_HOST: {e}") logger.info(f"OLLAMA_HOST is set to: {ollama_host}") logger.info(f"Checking network connectivity to: {', '.join(targets)}...") for target in targets: retries = 0 max_retries = 12 # Wait up to 60 seconds resolved = False while retries < max_retries: try: socket.gethostbyname(target) logger.info(f"DNS resolution successful for {target}.") resolved = True break except socket.gaierror: retries += 1 logger.warning(f"DNS resolution failed for {target}. Retrying in 5 seconds... ({retries}/{max_retries})") time.sleep(5) if not resolved: logger.error(f"Could not resolve {target} after multiple attempts.") # We continue anyway, but this is a bad sign. def main(): """Start the bot.""" # Initialize database init_db() # Fix for running in background thread: Ensure an event loop exists try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) if not TELEGRAM_BOT_TOKEN: raise ValueError("TELEGRAM_BOT_TOKEN environment variable is not set!") # Wait for network to be ready wait_for_network() # Configure connection options to be more resilient request = HTTPXRequest( connection_pool_size=8, connect_timeout=30.0, read_timeout=30.0 ) # Create application with custom base_url for the proxy application = Application.builder().token(TELEGRAM_BOT_TOKEN).base_url("https://telegram.esmail.app/bot").request(request).build() # Add handlers from telegram.ext import CommandHandler application.add_handler(CommandHandler("start", start_command)) application.add_handler(CommandHandler("help", help_command)) application.add_handler(MessageHandler(filters.PHOTO | filters.Document.ALL, handle_file)) application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text)) # Start the bot logger.info("Starting bot...") # Disable signal handling because we run in a thread in app.py application.run_polling(allowed_updates=Update.ALL_TYPES, stop_signals=None) if __name__ == "__main__": main()