telebot / bot.py
Esmaill1
Feat: Remove quiz mode - generate regular questions without correct answers
aa8169c
import os
import json
import asyncio
import logging
import socket
import time
from io import BytesIO
from datetime import datetime
import psycopg2
from dotenv import load_dotenv
from telegram import Update
from telegram.ext import Application, MessageHandler, filters, ContextTypes
from telegram.request import HTTPXRequest
import PyPDF2
import base64
import httpx
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# Configuration
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434")
OLLAMA_API_KEY = os.getenv("OLLAMA_API_KEY", "") # Optional: for cloud/authenticated services
VISION_MODEL = os.getenv("VISION_MODEL", "llava") # Model for image analysis
CHAT_MODEL = os.getenv("CHAT_MODEL", "mistral") # Model for quiz generation
DATABASE_URL = os.getenv("DATABASE_URL") # Neon connection string
def init_db():
"""Initialize the database table."""
if not DATABASE_URL:
logger.warning("DATABASE_URL not set. Chat memory will be disabled.")
return
try:
conn = psycopg2.connect(DATABASE_URL)
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS chat_history (
id SERIAL PRIMARY KEY,
chat_id BIGINT NOT NULL,
role VARCHAR(10) NOT NULL,
content TEXT NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""")
conn.commit()
cur.close()
conn.close()
logger.info("Database initialized successfully.")
except Exception as e:
logger.error(f"Error initializing database: {e}")
def save_chat_message(chat_id: int, role: str, content: str):
"""Save a chat message to the database."""
if not DATABASE_URL:
return
try:
conn = psycopg2.connect(DATABASE_URL)
cur = conn.cursor()
cur.execute(
"INSERT INTO chat_history (chat_id, role, content) VALUES (%s, %s, %s)",
(chat_id, role, content)
)
conn.commit()
cur.close()
conn.close()
except Exception as e:
logger.error(f"Error saving chat message: {e}")
def get_chat_history(chat_id: int, limit: int = 20):
"""Get recent chat history for a chat_id."""
if not DATABASE_URL:
return []
try:
conn = psycopg2.connect(DATABASE_URL)
cur = conn.cursor()
cur.execute(
"""
SELECT role, content FROM chat_history
WHERE chat_id = %s
ORDER BY timestamp DESC
LIMIT %s
""",
(chat_id, limit)
)
rows = cur.fetchall()
cur.close()
conn.close()
# Return reversed list (oldest to newest)
return [{"role": row[0], "content": row[1]} for row in rows][::-1]
except Exception as e:
logger.error(f"Error getting chat history: {e}")
return []
def clear_chat_history(chat_id: int):
"""Delete all chat history for a chat_id."""
if not DATABASE_URL:
return
try:
conn = psycopg2.connect(DATABASE_URL)
cur = conn.cursor()
cur.execute("DELETE FROM chat_history WHERE chat_id = %s", (chat_id,))
conn.commit()
cur.close()
conn.close()
logger.info(f"Memory cleared for chat {chat_id}")
except Exception as e:
logger.error(f"Error clearing chat history: {e}")
async def extract_text_from_image(image_bytes: bytes) -> str:
"""Extract text from an image using Ollama vision API."""
try:
# Convert image to base64
image_base64 = base64.b64encode(image_bytes).decode('utf-8')
payload = {
"model": VISION_MODEL,
"messages": [{
"role": "user",
"content": "Just transcribe all the text in the image in details. Output only the text, nothing else.",
"images": [image_base64]
}],
"stream": False
}
headers = {}
if OLLAMA_API_KEY:
headers["Authorization"] = f"Bearer {OLLAMA_API_KEY}"
# Use higher timeout - Ollama Cloud can be slow
timeout = httpx.Timeout(10.0, read=600.0) # Connect: 10s, Read: 10 minutes
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(
f"{OLLAMA_HOST}/api/chat",
json=payload,
headers=headers
)
response.raise_for_status()
result = response.json()
return result['message']['content']
except Exception as e:
logger.error(f"Error extracting text from image: {e}")
raise
def extract_text_from_pdf(pdf_bytes: bytes) -> str:
"""Extract text from a PDF file."""
try:
pdf_reader = PyPDF2.PdfReader(BytesIO(pdf_bytes))
text = ""
for page in pdf_reader.pages:
text += page.extract_text() + "\n"
return text.strip()
except Exception as e:
logger.error(f"Error extracting text from PDF: {e}")
raise
async def generate_quiz_questions(text: str, num_questions: int = 5) -> list:
"""Generate multiple-choice questions from the given text using Ollama API."""
system_prompt = """You are a High-Performance Question Generation Engine.
Rules:
1. You MUST generate accurate multiple-choice questions based ONLY on the provided content.
2. Output format: Strictly valid JSON array of objects.
3. ABSOLUTELY NO conversational text, headers, footers, intros, outros, or explanations outside the JSON.
4. These are survey-style questions; do NOT identify or provide a "correct" answer.
5. The language of the questions must match the language of the source text."""
user_prompt = f"""Based on the text I provide, generate {num_questions} multiple-choice questions.
You must output valid JSON only. Do not wrap it in markdown blocks (like ```json).
The output format must be a strictly valid JSON array of objects, like this:
[
{{
"question": "What is the primary topic discussed in the text?",
"options": ["Option A", "Option B", "Option C", "Option D"]
}},
{{
"question": "Which of the following best describes the author's tone?",
"options": ["Option A", "Option B", "Option C", "Option D"]
}}
]
Text to analyze:
{text}"""
try:
payload = {
"model": CHAT_MODEL,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
"stream": False,
"format": "json"
}
headers = {}
if OLLAMA_API_KEY:
headers["Authorization"] = f"Bearer {OLLAMA_API_KEY}"
# Use higher timeout - Ollama Cloud can be slow
timeout = httpx.Timeout(10.0, read=600.0) # Connect: 10s, Read: 10 minutes
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(
f"{OLLAMA_HOST}/api/chat",
json=payload,
headers=headers
)
response.raise_for_status()
result = response.json()
output = result['message']['content']
# Clean up markdown code blocks if present
clean_json = output.replace('```json', '').replace('```', '').strip()
# Parse JSON
questions = json.loads(clean_json)
return questions
except json.JSONDecodeError as e:
logger.error(f"JSON Parse failed: {e}")
logger.error(f"Raw output: {output}")
raise ValueError(f"Failed to parse quiz questions: {e}")
except Exception as e:
logger.error(f"Error generating quiz: {e}")
raise
async def send_question_poll(context: ContextTypes.DEFAULT_TYPE, chat_id: int, question_data: dict):
"""Send a question poll to the Telegram chat."""
try:
await context.bot.send_poll(
chat_id=chat_id,
question=question_data['question'][:300], # Telegram limit is 300 chars
options=question_data['options'][:10], # Telegram limit is 10 options
type='regular',
is_anonymous=False
)
except Exception as e:
logger.error(f"Error sending poll: {e}")
raise
def parse_num_questions(caption: str) -> int:
"""Parse number of questions from caption."""
if not caption:
return 5 # Default
# Try to extract a number from the caption
import re
numbers = re.findall(r'\d+', caption)
if numbers:
num = int(numbers[0])
return min(max(num, 1), 20) # Limit between 1 and 20
return 5
async def handle_file(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle incoming files (images, documents)."""
message = update.message
chat_id = message.chat_id
user_id = message.from_user.id
caption = message.caption or ""
# Send "processing" message
processing_msg = await message.reply_text("📝 جاري معالجة الملف... اصبر اصبر 😮‍💨")
try:
file_bytes = None
file_type = None
# Check what type of file was sent
if message.photo:
# Get the largest photo
photo = message.photo[-1]
file_info = await context.bot.get_file(photo.file_id)
file_path = file_info.file_path
# Handle if file_path is a full URL
if file_path.startswith("http"):
# Extract the relative path (everything after /file/botTOKEN/)
# Format: https://api.telegram.org/file/botTOKEN/photos/file_0.jpg
# We need: photos/file_0.jpg
token_part = f"/file/bot{TELEGRAM_BOT_TOKEN}/"
if token_part in file_path:
file_path = file_path.split(token_part)[-1]
# Manually download using the proxy URL
custom_file_url = f"https://telegram.esmail.app/file/bot{TELEGRAM_BOT_TOKEN}/{file_path}"
async with httpx.AsyncClient() as client:
response = await client.get(custom_file_url)
response.raise_for_status()
file_bytes = response.content
file_type = 'image'
elif message.document:
doc = message.document
file_name = doc.file_name.lower() if doc.file_name else ""
if file_name.endswith(('.jpg', '.jpeg', '.png', '.webp')):
file_info = await context.bot.get_file(doc.file_id)
file_path = file_info.file_path
# Handle if file_path is a full URL
if file_path.startswith("http"):
token_part = f"/file/bot{TELEGRAM_BOT_TOKEN}/"
if token_part in file_path:
file_path = file_path.split(token_part)[-1]
# Manually download using the proxy URL
custom_file_url = f"https://telegram.esmail.app/file/bot{TELEGRAM_BOT_TOKEN}/{file_path}"
async with httpx.AsyncClient() as client:
response = await client.get(custom_file_url)
response.raise_for_status()
file_bytes = response.content
file_type = 'image'
elif file_name.endswith('.pdf'):
file_info = await context.bot.get_file(doc.file_id)
file_path = file_info.file_path
# Handle if file_path is a full URL
if file_path.startswith("http"):
token_part = f"/file/bot{TELEGRAM_BOT_TOKEN}/"
if token_part in file_path:
file_path = file_path.split(token_part)[-1]
# Manually download using the proxy URL
custom_file_url = f"https://telegram.esmail.app/file/bot{TELEGRAM_BOT_TOKEN}/{file_path}"
async with httpx.AsyncClient() as client:
response = await client.get(custom_file_url)
response.raise_for_status()
file_bytes = response.content
file_type = 'pdf'
else:
await processing_msg.edit_text("❌ نوع ملف مش مدعوم! 🤔\n\n✅ الأنواع المدعومة:\n• صور (JPG, PNG, WebP)\n• ملفات PDF\n• نصوص مباشرة")
return
else:
await processing_msg.edit_text("❌ نوع ملف مش مدعوم!")
return
# Extract text based on file type
if file_type == 'image':
await processing_msg.edit_text("📸 جاري تحليل الصورة...")
text = await extract_text_from_image(bytes(file_bytes))
elif file_type == 'pdf':
await processing_msg.edit_text("📄 جاري استخراج النص من الـ PDF...")
text = extract_text_from_pdf(bytes(file_bytes))
if not text or len(text.strip()) < 10:
await processing_msg.edit_text("❌ مش قادر استخرج نص كافي من الملف\n\n💡 تأكد أن الملف يحتوي على نص واضح")
return
# Parse number of questions from caption
num_questions = parse_num_questions(caption)
# Generate quiz questions
await processing_msg.edit_text(f"🧠 جاري توليد {num_questions} اسئلة...")
questions = await generate_quiz_questions(text, num_questions)
if not questions:
await processing_msg.edit_text("❌ مش قادر اعمل اسئلة من النص ده")
return
# Delete processing message
await processing_msg.delete()
# Send each question as a poll
for i, q in enumerate(questions):
try:
await send_question_poll(context, chat_id, q)
# Small delay between polls to avoid rate limiting
if i < len(questions) - 1:
await asyncio.sleep(1)
except Exception as e:
logger.error(f"Error sending question {i+1}: {e}")
continue
# Send completion message after a delay
await asyncio.sleep(2)
await context.bot.send_message(
chat_id=chat_id,
text="✅ انتهيت! \n\nيلا اي خدمه، افتكرنا بعد الامتحانات بقا 😑\nانسان مصلحجي!\n\n🔄 بتبعتلي ملف تاني؟",
reply_to_message_id=message.message_id
)
except Exception as e:
logger.error(f"Error processing file: {e}")
await processing_msg.edit_text(f"❌ حصل مشكلة: {str(e)[:100]}")
async def generate_chat_response(chat_id: int, text: str, user_name: str = "User") -> str:
"""Generate a conversational response using Ollama API with smart context."""
system_prompt = f"""You are the Official AI Question Bot. 🧠
Your strictly defined role is to assist {user_name} in preparing for exams and studying by converting content (Images, PDFs, or Text) into interactive Telegram polls.
- You are professional, authoritative, and helpful.
- If the user chats with you, your primary objective is to guide them towards their study goals.
- If they ask unrelated questions, politely redirect them to your core functionality: "My purpose is to help you study. Please send me content (Image/PDF/Text) to generate questions."
- Maintain a strictly academic yet encouraging tone.
- Do not engage in roleplay or off-topic conversation.
- You are NOT generating questions in this chat mode, just providing guidance or greetings."""
# Get recent history (limit 20 messages)
history = get_chat_history(chat_id, limit=20)
# Smart Context Pruning: Keep history within ~3000 characters
# We iterate from the end (newest) to beginning (oldest) and keep what fits
pruned_history = []
total_chars = 0
max_chars = 3000
for msg in reversed(history):
msg_len = len(msg['content'])
if total_chars + msg_len < max_chars:
pruned_history.insert(0, msg)
total_chars += msg_len
else:
break
messages = [{"role": "system", "content": system_prompt}]
# Add pruned history
for msg in pruned_history:
messages.append(msg)
# Add current user message
messages.append({"role": "user", "content": text})
try:
payload = {
"model": CHAT_MODEL,
"messages": messages,
"stream": False
}
headers = {}
if OLLAMA_API_KEY:
headers["Authorization"] = f"Bearer {OLLAMA_API_KEY}"
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
f"{OLLAMA_HOST}/api/chat",
json=payload,
headers=headers
)
response.raise_for_status()
result = response.json()
return result['message']['content']
except Exception as e:
logger.error(f"Error generating chat response: {e}")
return "اهلا! 👋 ابعتلي صورة او ملف عشان اعملك كويز."
async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle text messages - generate quiz from text directly."""
message = update.message
chat_id = message.chat_id
text = message.text
user_name = message.from_user.first_name or "User"
if not text:
return
# If text is short (< 50 chars), treat it as conversation
if len(text.strip()) < 50:
# Save user message to memory
save_chat_message(chat_id, "user", text)
# Show typing indicator
await context.bot.send_chat_action(chat_id=chat_id, action="typing")
# Generate response with history
response = await generate_chat_response(chat_id, text, user_name)
# Save bot response to memory
save_chat_message(chat_id, "assistant", response)
await message.reply_text(response)
return
# Check if the first word/line is a number for question count
lines = text.strip().split('\n')
first_line = lines[0].strip()
num_questions = 5
if first_line.isdigit():
num_questions = min(max(int(first_line), 1), 20)
text = '\n'.join(lines[1:]) # Remove the number line
processing_msg = await message.reply_text("📝 جاري معالجة النص... اصبر اصبر 😮‍💨")
try:
await processing_msg.edit_text(f"🧠 جاري توليد {num_questions} اسئلة...")
questions = await generate_quiz_questions(text, num_questions)
if not questions:
await processing_msg.edit_text("❌ مش قادر اعمل اسئلة من النص ده")
return
await processing_msg.delete()
for i, q in enumerate(questions):
try:
await send_question_poll(context, chat_id, q)
if i < len(questions) - 1:
await asyncio.sleep(1)
except Exception as e:
logger.error(f"Error sending question {i+1}: {e}")
continue
await asyncio.sleep(2)
await context.bot.send_message(
chat_id=chat_id,
text="✅ انتهيت! \n\nيلا اي خدمه، افتكرنا بعد الامتحانات بقا 😑\nانسان مصلحجي!\n\n🔄 بتبعتلي نص تاني؟",
reply_to_message_id=message.message_id
)
except Exception as e:
logger.error(f"Error processing text: {e}")
await processing_msg.edit_text(f"❌ حصل مشكلة: {str(e)[:100]}")
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /start command."""
chat_id = update.message.chat_id
# Clear memory on /start
clear_chat_history(chat_id)
welcome_message = """📚✨ *(Just share your content, and I’ll handle the rest.)*
I can generate multiple questions from your material (Images, PDFs, or Text).
**How to use:**
1. Send an image, PDF, or text.
2. (Optional) Add a number in the caption to specify how many questions (Default: 5).
🚀 Let's start!"""
await update.message.reply_text(welcome_message)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /help command."""
help_message = """❓ الأسئلة الشائعة
• كم عدد الأسئلة؟
من 1 إلى 20 سؤال
• هل لازم النص عربي؟
لا، يدعم العربي والإنجليزي وعدة لغات
• ما ظهرت الإجابة الصحيحة؟
تأكد أن النص واضح ومقروء
• البوت بطيء؟
يعتمد على حجم الملف وسرعة الإنترنت ⏳
• كيف أبدأ؟
اضغط /start"""
await update.message.reply_text(help_message)
def wait_for_network():
"""Wait for network connectivity/DNS resolution before starting."""
# Check Telegram Proxy
targets = ["telegram.esmail.app"]
# Check Ollama Host
from urllib.parse import urlparse
ollama_host = os.getenv("OLLAMA_HOST", "http://localhost:11434")
try:
parsed_ollama = urlparse(ollama_host)
ollama_hostname = parsed_ollama.hostname
if ollama_hostname:
targets.append(ollama_hostname)
except Exception as e:
logger.warning(f"Could not parse OLLAMA_HOST: {e}")
logger.info(f"OLLAMA_HOST is set to: {ollama_host}")
logger.info(f"Checking network connectivity to: {', '.join(targets)}...")
for target in targets:
retries = 0
max_retries = 12 # Wait up to 60 seconds
resolved = False
while retries < max_retries:
try:
socket.gethostbyname(target)
logger.info(f"DNS resolution successful for {target}.")
resolved = True
break
except socket.gaierror:
retries += 1
logger.warning(f"DNS resolution failed for {target}. Retrying in 5 seconds... ({retries}/{max_retries})")
time.sleep(5)
if not resolved:
logger.error(f"Could not resolve {target} after multiple attempts.")
# We continue anyway, but this is a bad sign.
def main():
"""Start the bot."""
# Initialize database
init_db()
# Fix for running in background thread: Ensure an event loop exists
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if not TELEGRAM_BOT_TOKEN:
raise ValueError("TELEGRAM_BOT_TOKEN environment variable is not set!")
# Wait for network to be ready
wait_for_network()
# Configure connection options to be more resilient
request = HTTPXRequest(
connection_pool_size=8,
connect_timeout=30.0,
read_timeout=30.0
)
# Create application with custom base_url for the proxy
application = Application.builder().token(TELEGRAM_BOT_TOKEN).base_url("https://telegram.esmail.app/bot").request(request).build()
# Add handlers
from telegram.ext import CommandHandler
application.add_handler(CommandHandler("start", start_command))
application.add_handler(CommandHandler("help", help_command))
application.add_handler(MessageHandler(filters.PHOTO | filters.Document.ALL, handle_file))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text))
# Start the bot
logger.info("Starting bot...")
# Disable signal handling because we run in a thread in app.py
application.run_polling(allowed_updates=Update.ALL_TYPES, stop_signals=None)
if __name__ == "__main__":
main()