| import os |
| import json |
| import asyncio |
| import logging |
| import socket |
| import time |
| from io import BytesIO |
| from datetime import datetime |
| import psycopg2 |
| from dotenv import load_dotenv |
| from telegram import Update |
| from telegram.ext import Application, MessageHandler, filters, ContextTypes |
| from telegram.request import HTTPXRequest |
| import PyPDF2 |
| import base64 |
| import httpx |
|
|
| |
| load_dotenv() |
|
|
| |
| logging.basicConfig( |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
| level=logging.INFO |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| |
| TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") |
| OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434") |
| OLLAMA_API_KEY = os.getenv("OLLAMA_API_KEY", "") |
| VISION_MODEL = os.getenv("VISION_MODEL", "llava") |
| CHAT_MODEL = os.getenv("CHAT_MODEL", "mistral") |
| DATABASE_URL = os.getenv("DATABASE_URL") |
|
|
|
|
| def init_db(): |
| """Initialize the database table.""" |
| if not DATABASE_URL: |
| logger.warning("DATABASE_URL not set. Chat memory will be disabled.") |
| return |
|
|
| try: |
| conn = psycopg2.connect(DATABASE_URL) |
| cur = conn.cursor() |
| cur.execute(""" |
| CREATE TABLE IF NOT EXISTS chat_history ( |
| id SERIAL PRIMARY KEY, |
| chat_id BIGINT NOT NULL, |
| role VARCHAR(10) NOT NULL, |
| content TEXT NOT NULL, |
| timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
| ); |
| """) |
| conn.commit() |
| cur.close() |
| conn.close() |
| logger.info("Database initialized successfully.") |
| except Exception as e: |
| logger.error(f"Error initializing database: {e}") |
|
|
|
|
| def save_chat_message(chat_id: int, role: str, content: str): |
| """Save a chat message to the database.""" |
| if not DATABASE_URL: |
| return |
|
|
| try: |
| conn = psycopg2.connect(DATABASE_URL) |
| cur = conn.cursor() |
| cur.execute( |
| "INSERT INTO chat_history (chat_id, role, content) VALUES (%s, %s, %s)", |
| (chat_id, role, content) |
| ) |
| conn.commit() |
| cur.close() |
| conn.close() |
| except Exception as e: |
| logger.error(f"Error saving chat message: {e}") |
|
|
|
|
| def get_chat_history(chat_id: int, limit: int = 20): |
| """Get recent chat history for a chat_id.""" |
| if not DATABASE_URL: |
| return [] |
|
|
| try: |
| conn = psycopg2.connect(DATABASE_URL) |
| cur = conn.cursor() |
| cur.execute( |
| """ |
| SELECT role, content FROM chat_history |
| WHERE chat_id = %s |
| ORDER BY timestamp DESC |
| LIMIT %s |
| """, |
| (chat_id, limit) |
| ) |
| rows = cur.fetchall() |
| cur.close() |
| conn.close() |
| |
| return [{"role": row[0], "content": row[1]} for row in rows][::-1] |
| except Exception as e: |
| logger.error(f"Error getting chat history: {e}") |
| return [] |
|
|
|
|
| def clear_chat_history(chat_id: int): |
| """Delete all chat history for a chat_id.""" |
| if not DATABASE_URL: |
| return |
|
|
| try: |
| conn = psycopg2.connect(DATABASE_URL) |
| cur = conn.cursor() |
| cur.execute("DELETE FROM chat_history WHERE chat_id = %s", (chat_id,)) |
| conn.commit() |
| cur.close() |
| conn.close() |
| logger.info(f"Memory cleared for chat {chat_id}") |
| except Exception as e: |
| logger.error(f"Error clearing chat history: {e}") |
|
|
|
|
| async def extract_text_from_image(image_bytes: bytes) -> str: |
| """Extract text from an image using Ollama vision API.""" |
| try: |
| |
| image_base64 = base64.b64encode(image_bytes).decode('utf-8') |
| |
| payload = { |
| "model": VISION_MODEL, |
| "messages": [{ |
| "role": "user", |
| "content": "Just transcribe all the text in the image in details. Output only the text, nothing else.", |
| "images": [image_base64] |
| }], |
| "stream": False |
| } |
| |
| headers = {} |
| if OLLAMA_API_KEY: |
| headers["Authorization"] = f"Bearer {OLLAMA_API_KEY}" |
| |
| |
| timeout = httpx.Timeout(10.0, read=600.0) |
| async with httpx.AsyncClient(timeout=timeout) as client: |
| response = await client.post( |
| f"{OLLAMA_HOST}/api/chat", |
| json=payload, |
| headers=headers |
| ) |
| response.raise_for_status() |
| result = response.json() |
| return result['message']['content'] |
| |
| except Exception as e: |
| logger.error(f"Error extracting text from image: {e}") |
| raise |
|
|
|
|
| def extract_text_from_pdf(pdf_bytes: bytes) -> str: |
| """Extract text from a PDF file.""" |
| try: |
| pdf_reader = PyPDF2.PdfReader(BytesIO(pdf_bytes)) |
| text = "" |
| for page in pdf_reader.pages: |
| text += page.extract_text() + "\n" |
| return text.strip() |
| except Exception as e: |
| logger.error(f"Error extracting text from PDF: {e}") |
| raise |
|
|
|
|
| async def generate_quiz_questions(text: str, num_questions: int = 5) -> list: |
| """Generate multiple-choice questions from the given text using Ollama API.""" |
| |
| system_prompt = """You are a High-Performance Question Generation Engine. |
| Rules: |
| 1. You MUST generate accurate multiple-choice questions based ONLY on the provided content. |
| 2. Output format: Strictly valid JSON array of objects. |
| 3. ABSOLUTELY NO conversational text, headers, footers, intros, outros, or explanations outside the JSON. |
| 4. These are survey-style questions; do NOT identify or provide a "correct" answer. |
| 5. The language of the questions must match the language of the source text.""" |
|
|
| user_prompt = f"""Based on the text I provide, generate {num_questions} multiple-choice questions. |
| |
| You must output valid JSON only. Do not wrap it in markdown blocks (like ```json). |
| The output format must be a strictly valid JSON array of objects, like this: |
| |
| [ |
| {{ |
| "question": "What is the primary topic discussed in the text?", |
| "options": ["Option A", "Option B", "Option C", "Option D"] |
| }}, |
| {{ |
| "question": "Which of the following best describes the author's tone?", |
| "options": ["Option A", "Option B", "Option C", "Option D"] |
| }} |
| ] |
| |
| Text to analyze: |
| {text}""" |
|
|
| try: |
| payload = { |
| "model": CHAT_MODEL, |
| "messages": [ |
| {"role": "system", "content": system_prompt}, |
| {"role": "user", "content": user_prompt} |
| ], |
| "stream": False, |
| "format": "json" |
| } |
| |
| headers = {} |
| if OLLAMA_API_KEY: |
| headers["Authorization"] = f"Bearer {OLLAMA_API_KEY}" |
| |
| |
| timeout = httpx.Timeout(10.0, read=600.0) |
| async with httpx.AsyncClient(timeout=timeout) as client: |
| response = await client.post( |
| f"{OLLAMA_HOST}/api/chat", |
| json=payload, |
| headers=headers |
| ) |
| response.raise_for_status() |
| result = response.json() |
| output = result['message']['content'] |
| |
| |
| clean_json = output.replace('```json', '').replace('```', '').strip() |
| |
| |
| questions = json.loads(clean_json) |
| return questions |
| |
| except json.JSONDecodeError as e: |
| logger.error(f"JSON Parse failed: {e}") |
| logger.error(f"Raw output: {output}") |
| raise ValueError(f"Failed to parse quiz questions: {e}") |
| except Exception as e: |
| logger.error(f"Error generating quiz: {e}") |
| raise |
|
|
|
|
| async def send_question_poll(context: ContextTypes.DEFAULT_TYPE, chat_id: int, question_data: dict): |
| """Send a question poll to the Telegram chat.""" |
| try: |
| await context.bot.send_poll( |
| chat_id=chat_id, |
| question=question_data['question'][:300], |
| options=question_data['options'][:10], |
| type='regular', |
| is_anonymous=False |
| ) |
| except Exception as e: |
| logger.error(f"Error sending poll: {e}") |
| raise |
|
|
|
|
| def parse_num_questions(caption: str) -> int: |
| """Parse number of questions from caption.""" |
| if not caption: |
| return 5 |
| |
| |
| import re |
| numbers = re.findall(r'\d+', caption) |
| if numbers: |
| num = int(numbers[0]) |
| return min(max(num, 1), 20) |
| return 5 |
|
|
|
|
| async def handle_file(update: Update, context: ContextTypes.DEFAULT_TYPE): |
| """Handle incoming files (images, documents).""" |
| message = update.message |
| chat_id = message.chat_id |
| user_id = message.from_user.id |
| caption = message.caption or "" |
| |
| |
| processing_msg = await message.reply_text("📝 جاري معالجة الملف... اصبر اصبر 😮💨") |
| |
| try: |
| file_bytes = None |
| file_type = None |
| |
| |
| if message.photo: |
| |
| photo = message.photo[-1] |
| file_info = await context.bot.get_file(photo.file_id) |
| file_path = file_info.file_path |
| |
| |
| if file_path.startswith("http"): |
| |
| |
| |
| token_part = f"/file/bot{TELEGRAM_BOT_TOKEN}/" |
| if token_part in file_path: |
| file_path = file_path.split(token_part)[-1] |
| |
| |
| custom_file_url = f"https://telegram.esmail.app/file/bot{TELEGRAM_BOT_TOKEN}/{file_path}" |
| async with httpx.AsyncClient() as client: |
| response = await client.get(custom_file_url) |
| response.raise_for_status() |
| file_bytes = response.content |
| |
| file_type = 'image' |
| |
| elif message.document: |
| doc = message.document |
| file_name = doc.file_name.lower() if doc.file_name else "" |
| |
| if file_name.endswith(('.jpg', '.jpeg', '.png', '.webp')): |
| file_info = await context.bot.get_file(doc.file_id) |
| file_path = file_info.file_path |
| |
| |
| if file_path.startswith("http"): |
| token_part = f"/file/bot{TELEGRAM_BOT_TOKEN}/" |
| if token_part in file_path: |
| file_path = file_path.split(token_part)[-1] |
| |
| |
| custom_file_url = f"https://telegram.esmail.app/file/bot{TELEGRAM_BOT_TOKEN}/{file_path}" |
| async with httpx.AsyncClient() as client: |
| response = await client.get(custom_file_url) |
| response.raise_for_status() |
| file_bytes = response.content |
| |
| file_type = 'image' |
| |
| elif file_name.endswith('.pdf'): |
| file_info = await context.bot.get_file(doc.file_id) |
| file_path = file_info.file_path |
| |
| |
| if file_path.startswith("http"): |
| token_part = f"/file/bot{TELEGRAM_BOT_TOKEN}/" |
| if token_part in file_path: |
| file_path = file_path.split(token_part)[-1] |
| |
| |
| custom_file_url = f"https://telegram.esmail.app/file/bot{TELEGRAM_BOT_TOKEN}/{file_path}" |
| async with httpx.AsyncClient() as client: |
| response = await client.get(custom_file_url) |
| response.raise_for_status() |
| file_bytes = response.content |
| |
| file_type = 'pdf' |
| |
| else: |
| await processing_msg.edit_text("❌ نوع ملف مش مدعوم! 🤔\n\n✅ الأنواع المدعومة:\n• صور (JPG, PNG, WebP)\n• ملفات PDF\n• نصوص مباشرة") |
| return |
| else: |
| await processing_msg.edit_text("❌ نوع ملف مش مدعوم!") |
| return |
| |
| |
| if file_type == 'image': |
| await processing_msg.edit_text("📸 جاري تحليل الصورة...") |
| text = await extract_text_from_image(bytes(file_bytes)) |
| elif file_type == 'pdf': |
| await processing_msg.edit_text("📄 جاري استخراج النص من الـ PDF...") |
| text = extract_text_from_pdf(bytes(file_bytes)) |
| |
| if not text or len(text.strip()) < 10: |
| await processing_msg.edit_text("❌ مش قادر استخرج نص كافي من الملف\n\n💡 تأكد أن الملف يحتوي على نص واضح") |
| return |
| |
| |
| num_questions = parse_num_questions(caption) |
| |
| |
| await processing_msg.edit_text(f"🧠 جاري توليد {num_questions} اسئلة...") |
| questions = await generate_quiz_questions(text, num_questions) |
| |
| if not questions: |
| await processing_msg.edit_text("❌ مش قادر اعمل اسئلة من النص ده") |
| return |
| |
| |
| await processing_msg.delete() |
| |
| |
| for i, q in enumerate(questions): |
| try: |
| await send_question_poll(context, chat_id, q) |
| |
| if i < len(questions) - 1: |
| await asyncio.sleep(1) |
| except Exception as e: |
| logger.error(f"Error sending question {i+1}: {e}") |
| continue |
| |
| |
| await asyncio.sleep(2) |
| await context.bot.send_message( |
| chat_id=chat_id, |
| text="✅ انتهيت! \n\nيلا اي خدمه، افتكرنا بعد الامتحانات بقا 😑\nانسان مصلحجي!\n\n🔄 بتبعتلي ملف تاني؟", |
| reply_to_message_id=message.message_id |
| ) |
| |
| except Exception as e: |
| logger.error(f"Error processing file: {e}") |
| await processing_msg.edit_text(f"❌ حصل مشكلة: {str(e)[:100]}") |
|
|
|
|
| async def generate_chat_response(chat_id: int, text: str, user_name: str = "User") -> str: |
| """Generate a conversational response using Ollama API with smart context.""" |
| system_prompt = f"""You are the Official AI Question Bot. 🧠 |
| Your strictly defined role is to assist {user_name} in preparing for exams and studying by converting content (Images, PDFs, or Text) into interactive Telegram polls. |
| - You are professional, authoritative, and helpful. |
| - If the user chats with you, your primary objective is to guide them towards their study goals. |
| - If they ask unrelated questions, politely redirect them to your core functionality: "My purpose is to help you study. Please send me content (Image/PDF/Text) to generate questions." |
| - Maintain a strictly academic yet encouraging tone. |
| - Do not engage in roleplay or off-topic conversation. |
| - You are NOT generating questions in this chat mode, just providing guidance or greetings.""" |
|
|
| |
| history = get_chat_history(chat_id, limit=20) |
| |
| |
| |
| pruned_history = [] |
| total_chars = 0 |
| max_chars = 3000 |
| |
| for msg in reversed(history): |
| msg_len = len(msg['content']) |
| if total_chars + msg_len < max_chars: |
| pruned_history.insert(0, msg) |
| total_chars += msg_len |
| else: |
| break |
| |
| messages = [{"role": "system", "content": system_prompt}] |
| |
| |
| for msg in pruned_history: |
| messages.append(msg) |
| |
| |
| messages.append({"role": "user", "content": text}) |
|
|
| try: |
| payload = { |
| "model": CHAT_MODEL, |
| "messages": messages, |
| "stream": False |
| } |
| |
| headers = {} |
| if OLLAMA_API_KEY: |
| headers["Authorization"] = f"Bearer {OLLAMA_API_KEY}" |
| |
| async with httpx.AsyncClient(timeout=30.0) as client: |
| response = await client.post( |
| f"{OLLAMA_HOST}/api/chat", |
| json=payload, |
| headers=headers |
| ) |
| response.raise_for_status() |
| result = response.json() |
| return result['message']['content'] |
| |
| except Exception as e: |
| logger.error(f"Error generating chat response: {e}") |
| return "اهلا! 👋 ابعتلي صورة او ملف عشان اعملك كويز." |
|
|
|
|
| async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE): |
| """Handle text messages - generate quiz from text directly.""" |
| message = update.message |
| chat_id = message.chat_id |
| text = message.text |
| user_name = message.from_user.first_name or "User" |
| |
| if not text: |
| return |
| |
| |
| if len(text.strip()) < 50: |
| |
| save_chat_message(chat_id, "user", text) |
| |
| |
| await context.bot.send_chat_action(chat_id=chat_id, action="typing") |
| |
| |
| response = await generate_chat_response(chat_id, text, user_name) |
| |
| |
| save_chat_message(chat_id, "assistant", response) |
| |
| await message.reply_text(response) |
| return |
| |
| |
| lines = text.strip().split('\n') |
| first_line = lines[0].strip() |
| |
| num_questions = 5 |
| if first_line.isdigit(): |
| num_questions = min(max(int(first_line), 1), 20) |
| text = '\n'.join(lines[1:]) |
| |
| processing_msg = await message.reply_text("📝 جاري معالجة النص... اصبر اصبر 😮💨") |
| |
| try: |
| await processing_msg.edit_text(f"🧠 جاري توليد {num_questions} اسئلة...") |
| questions = await generate_quiz_questions(text, num_questions) |
| |
| if not questions: |
| await processing_msg.edit_text("❌ مش قادر اعمل اسئلة من النص ده") |
| return |
| |
| await processing_msg.delete() |
| |
| for i, q in enumerate(questions): |
| try: |
| await send_question_poll(context, chat_id, q) |
| if i < len(questions) - 1: |
| await asyncio.sleep(1) |
| except Exception as e: |
| logger.error(f"Error sending question {i+1}: {e}") |
| continue |
| |
| await asyncio.sleep(2) |
| await context.bot.send_message( |
| chat_id=chat_id, |
| text="✅ انتهيت! \n\nيلا اي خدمه، افتكرنا بعد الامتحانات بقا 😑\nانسان مصلحجي!\n\n🔄 بتبعتلي نص تاني؟", |
| reply_to_message_id=message.message_id |
| ) |
| |
| except Exception as e: |
| logger.error(f"Error processing text: {e}") |
| await processing_msg.edit_text(f"❌ حصل مشكلة: {str(e)[:100]}") |
|
|
|
|
| async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE): |
| """Handle /start command.""" |
| chat_id = update.message.chat_id |
| |
| clear_chat_history(chat_id) |
| |
| welcome_message = """📚✨ *(Just share your content, and I’ll handle the rest.)* |
| |
| I can generate multiple questions from your material (Images, PDFs, or Text). |
| |
| **How to use:** |
| 1. Send an image, PDF, or text. |
| 2. (Optional) Add a number in the caption to specify how many questions (Default: 5). |
| |
| 🚀 Let's start!""" |
|
|
| await update.message.reply_text(welcome_message) |
|
|
|
|
| async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE): |
| """Handle /help command.""" |
| help_message = """❓ الأسئلة الشائعة |
| |
| • كم عدد الأسئلة؟ |
| من 1 إلى 20 سؤال |
| |
| • هل لازم النص عربي؟ |
| لا، يدعم العربي والإنجليزي وعدة لغات |
| |
| • ما ظهرت الإجابة الصحيحة؟ |
| تأكد أن النص واضح ومقروء |
| |
| • البوت بطيء؟ |
| يعتمد على حجم الملف وسرعة الإنترنت ⏳ |
| |
| • كيف أبدأ؟ |
| اضغط /start""" |
|
|
| await update.message.reply_text(help_message) |
|
|
|
|
| def wait_for_network(): |
| """Wait for network connectivity/DNS resolution before starting.""" |
| |
| targets = ["telegram.esmail.app"] |
| |
| |
| from urllib.parse import urlparse |
| ollama_host = os.getenv("OLLAMA_HOST", "http://localhost:11434") |
| try: |
| parsed_ollama = urlparse(ollama_host) |
| ollama_hostname = parsed_ollama.hostname |
| if ollama_hostname: |
| targets.append(ollama_hostname) |
| except Exception as e: |
| logger.warning(f"Could not parse OLLAMA_HOST: {e}") |
|
|
| logger.info(f"OLLAMA_HOST is set to: {ollama_host}") |
| logger.info(f"Checking network connectivity to: {', '.join(targets)}...") |
| |
| for target in targets: |
| retries = 0 |
| max_retries = 12 |
| resolved = False |
| while retries < max_retries: |
| try: |
| socket.gethostbyname(target) |
| logger.info(f"DNS resolution successful for {target}.") |
| resolved = True |
| break |
| except socket.gaierror: |
| retries += 1 |
| logger.warning(f"DNS resolution failed for {target}. Retrying in 5 seconds... ({retries}/{max_retries})") |
| time.sleep(5) |
| |
| if not resolved: |
| logger.error(f"Could not resolve {target} after multiple attempts.") |
| |
|
|
|
|
| def main(): |
| """Start the bot.""" |
| |
| init_db() |
|
|
| |
| try: |
| loop = asyncio.get_event_loop() |
| except RuntimeError: |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
|
|
| if not TELEGRAM_BOT_TOKEN: |
| raise ValueError("TELEGRAM_BOT_TOKEN environment variable is not set!") |
| |
| |
| wait_for_network() |
| |
| |
| request = HTTPXRequest( |
| connection_pool_size=8, |
| connect_timeout=30.0, |
| read_timeout=30.0 |
| ) |
| |
| |
| application = Application.builder().token(TELEGRAM_BOT_TOKEN).base_url("https://telegram.esmail.app/bot").request(request).build() |
| |
| |
| from telegram.ext import CommandHandler |
| |
| application.add_handler(CommandHandler("start", start_command)) |
| application.add_handler(CommandHandler("help", help_command)) |
| application.add_handler(MessageHandler(filters.PHOTO | filters.Document.ALL, handle_file)) |
| application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text)) |
| |
| |
| logger.info("Starting bot...") |
| |
| application.run_polling(allowed_updates=Update.ALL_TYPES, stop_signals=None) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|