================================================================================ PYTHON CODE COLLECTION Project Folder: main_project Total .py files found: 38 ================================================================================ ================================================================================ FILE 1: app.py FULL PATH: main_project\app.py ================================================================================ """ ═══════════════════════════════════════════════════════════════════ CORVO AI - Complete Teaching Platform Main Application Entry Point (High-Performance Edition) Run with gunicorn: gunicorn -c gunicorn_config.py app:app Or dev mode: python app.py Open: http://localhost:5000 ═══════════════════════════════════════════════════════════════════ """ from gevent import monkey monkey.patch_all() import os import atexit from flask import Flask, redirect, url_for, session, flash, render_template, send_from_directory, jsonify, request from flask_socketio import SocketIO from config import SECRET_KEY, DEBUG, HOST, PORT, REQUIRED_DIRS # Check if running in Docker IN_DOCKER = os.path.exists('/.dockerenv') or os.environ.get('DOCKER', 'false') == 'true' if IN_DOCKER: print(" 🐳 Running inside Docker container") # Check if data volume is mounted for f in ['users.json', 'users_db.json', 'cards.json', 'chat_history_db.json']: if os.path.exists(f): size = os.path.getsize(f) print(f" ✅ {f} found ({size} bytes)") else: print(f" ⚠️ {f} not found - creating empty") with open(f, 'w') as fh: fh.write('{}') # ─── Create Flask app ─── app = Flask(__name__) app.secret_key = SECRET_KEY socketio = SocketIO(app, cors_allowed_origins="*", async_mode='threading') # ─── Create required directories (audio only, no JSON) ─── for d in REQUIRED_DIRS: os.makedirs(d, exist_ok=True) # ─── Initialize In-Memory Database (loads from GitHub) ─── from memory_db import get_db db = get_db() # ─── Print DB status (no file init needed) ─── from database.users import init_users_db from database.cards import init_cards_db from database.chat_history import init_chat_history_db init_users_db() init_cards_db() init_chat_history_db() # ─── Initialize AI engines ─── from chat.agent import AIAgent from board.engine import BoardEngine agent = AIAgent() board_engine = BoardEngine() # ─── Register blueprints ─── # Auth from auth.routes import auth_bp, init_auth_socketio init_auth_socketio(socketio) app.register_blueprint(auth_bp) # Chat from chat.routes import chat_bp, init_chat_agent init_chat_agent(agent) app.register_blueprint(chat_bp) # Board from board.routes import board_bp, init_board_engine init_board_engine(board_engine) app.register_blueprint(board_bp) # Exam from exam.routes import exam_bp app.register_blueprint(exam_bp) # Media from media.routes import media_bp, init_media_agent init_media_agent(agent) app.register_blueprint(media_bp) # Market from market.routes import market_bp app.register_blueprint(market_bp) # WebSocket from websocket.events import register_socketio_events register_socketio_events(socketio) # ═══════════════════════════════════════════════════════════════════ # ROOT ROUTES # ═══════════════════════════════════════════════════════════════════ from auth.helpers import is_session_valid from database.users import load_users_db, get_user from subjects.definitions import SUBJECTS, is_board_enabled @app.route('/') def index(): if is_session_valid(): return redirect(url_for('dashboard')) return redirect(url_for('auth.signup')) @app.route('/dashboard') def dashboard(): if not is_session_valid(): session.clear() flash('انتهت الجلسة، يرجى تسجيل الدخول مرة أخرى.', 'error') return redirect(url_for('auth.login')) username = session['username'] user_data = get_user(username) if not user_data: session.clear() flash('المستخدم غير موجود.', 'error') return redirect(url_for('auth.login')) student_type = user_data.get('student_type', 'علمي') all_subjects = SUBJECTS.get(student_type, []) purchased_subjects = user_data.get('purchased_subjects', ['islamic']) my_subjects = [] for s in all_subjects: if s['id'] in purchased_subjects: subject_info = dict(s) subject_info['board_enabled'] = is_board_enabled(s['id']) my_subjects.append(subject_info) available_subjects = [s for s in all_subjects if s['id'] not in purchased_subjects] return render_template('dashboard.html', username=username, user_data=user_data, my_subjects=my_subjects, available_subjects=available_subjects, balance=user_data.get('balance', 0)) @app.route('/static/') def serve_static(filename): return send_from_directory('static', filename) # ═══════════════════════════════════════════════════════════════════ # DATABASE BACKUP & ADMIN ROUTES (GitHub-only) # ═══════════════════════════════════════════════════════════════════ ADMIN_SECRET = os.environ.get("ADMIN_SECRET", "corvo_admin_2024") def _check_admin_auth(): secret = request.headers.get('X-Admin-Secret') or request.args.get('secret', '') return secret == ADMIN_SECRET @app.route('/backup_db', methods=['GET', 'POST']) def backup_db(): """Push all in-memory data to GitHub.""" if not _check_admin_auth(): return jsonify({"error": "Unauthorized", "hint": "Provide ?secret= or X-Admin-Secret header"}), 401 success, errors = db.push_to_github() if success: return jsonify({ "success": True, "message": "All databases pushed to GitHub successfully", "stats": db.get_stats() }) else: return jsonify({ "success": False, "message": "Some files failed to push", "errors": errors, "stats": db.get_stats() }), 500 @app.route('/backup_db_single', methods=['GET', 'POST']) def backup_db_single(): """Push a single database to GitHub.""" if not _check_admin_auth(): return jsonify({"error": "Unauthorized"}), 401 store_name = request.args.get('store', '') or (request.get_json() or {}).get('store', '') if not store_name: return jsonify({ "error": "Specify store name", "available": list(db.STORE_FILES.keys()) }), 400 if store_name not in db.STORE_FILES: return jsonify({ "error": f"Unknown store: {store_name}", "available": list(db.STORE_FILES.keys()) }), 400 success, error = db.push_single_to_github(store_name) if success: return jsonify({ "success": True, "message": f"'{store_name}' pushed to GitHub", "records": db.count(store_name) }) else: return jsonify({ "success": False, "error": error }), 500 @app.route('/restore_db', methods=['GET', 'POST']) def restore_db(): """Pull all data from GitHub into memory.""" if not _check_admin_auth(): return jsonify({"error": "Unauthorized"}), 401 success, error = db.pull_from_github() if success: return jsonify({ "success": True, "message": "All databases restored from GitHub into memory", "stats": db.get_stats() }) else: return jsonify({ "success": False, "message": "Failed to restore from GitHub", "error": error, "stats": db.get_stats() }), 500 @app.route('/db_stats', methods=['GET']) def db_stats(): """Get database statistics.""" if not _check_admin_auth(): return jsonify({"error": "Unauthorized"}), 401 stats = db.get_stats() try: from github_storage import get_github_storage gh = get_github_storage() stats["github"] = gh.get_status() except Exception as e: stats["github"] = {"error": str(e)} return jsonify(stats) @app.route('/github_status', methods=['GET']) def github_status(): """Check GitHub storage status and API rate limits.""" if not _check_admin_auth(): return jsonify({"error": "Unauthorized"}), 401 try: from github_storage import get_github_storage gh = get_github_storage() return jsonify(gh.get_status()) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/health', methods=['GET']) def health_check(): """Health check endpoint. No auth required.""" stats = {} for store_name in db.STORES: stats[store_name] = db.count(store_name) return jsonify({ "status": "healthy", "records": stats, "version": "2.0-github-only", "persistence": "github", "local_files": False }) @app.route('/backup_status', methods=['GET']) def backup_status(): """Detailed backup status. No auth required.""" files_status = {} for store_name in db.STORES: files_status[store_name] = { "github_file": db.STORE_FILES.get(store_name, ""), "records_in_memory": db.count(store_name) } github_configured = False github_rate = None try: from github_storage import get_github_storage gh = get_github_storage() github_configured = gh._configured if github_configured: status = gh.get_status() github_rate = status.get("rate_limit") except Exception: pass return jsonify({ "status": "ok", "github_configured": github_configured, "github_rate_limit": github_rate, "stores": files_status, "stats": db.get_stats() }) # ─── Graceful shutdown ─── def _on_shutdown(): print("\n🛑 Application shutting down...") db.shutdown() print("⚠️ Data is in memory only! Push to GitHub: /backup_db") atexit.register(_on_shutdown) # ═══════════════════════════════════════════════════════════════════ # MAIN # ═══════════════════════════════════════════════════════════════════ if __name__ == '__main__': print("\n" + "═" * 60) print(" 🎓 CORVO AI - High Performance Teaching Platform") print("═" * 60) print() print(" 📋 Architecture:") print(" ├── memory_db → Pure RAM database") print(" ├── github_storage → GitHub persistence (manual)") print(" ├── http_pool → Connection pooling for APIs") print(" ├── auth/ → Login, signup, sessions") print(" ├── chat/ → Text AI chat") print(" ├── board/ → Interactive whiteboard") print(" ├── exam/ → MCQ exam generation") print(" ├── media/ → Voice & image processing") print(" ├── market/ → Subject purchasing & cards") print(" ├── websocket/ → Real-time sessions") print(" └── subjects/ → Subject definitions") print() print(" 💾 STORAGE:") print(" ✅ All data in RAM (zero disk I/O)") print(" ✅ GitHub backup (manual via /backup_db)") print(" ✅ NO local JSON files needed") print(" ✅ Loads from GitHub on startup") print() print(" 🔒 ADMIN ENDPOINTS:") print(f" GET /health → Health check") print(f" GET /backup_status → Backup info") print(f" GET /backup_db → Push ALL to GitHub") print(f" GET /backup_db_single → Push ONE store") print(f" GET /restore_db → Pull ALL from GitHub") print(f" GET /db_stats → Database stats") print(f" GET /github_status → GitHub API info") print(f" (admin routes need ?secret={ADMIN_SECRET})") print() print(f" 🌐 Open: http://localhost:{PORT}") print("═" * 60 + "\n") socketio.run(app, debug=DEBUG, host=HOST, port=PORT) ================================================================================ FILE 2: auth\__init__.py FULL PATH: main_project\auth\__init__.py ================================================================================ """ Authentication package. """ ================================================================================ FILE 3: auth\helpers.py FULL PATH: main_project\auth\helpers.py ================================================================================ """ Session and authentication helpers. Updated to use MemoryDB for fast lookups. """ import uuid from flask import session from memory_db import get_db # Active WebSocket connections per user active_connections = {} def generate_session_id(): return str(uuid.uuid4()) def is_session_valid(): if 'username' not in session or 'session_id' not in session: return False username = session['username'] session_id = session['session_id'] db = get_db() user_data = db.read_key('users', username) if not user_data: return False if user_data.get('session_id') != session_id: return False return True def notify_logout(username, socketio, exclude_sid=None): """Notify all connections for a user to logout.""" if username in active_connections: for sid in active_connections[username]: if sid != exclude_sid: socketio.emit('force_logout', { 'message': 'لقد تم تسجيل خروجك لأن شخصاً آخر قام بتسجيل الدخول.' }, room=sid) ================================================================================ FILE 4: auth\routes.py FULL PATH: main_project\auth\routes.py ================================================================================ """ Authentication routes: signup, login, logout, verification. Updated to use MemoryDB for atomic operations. """ from flask import Blueprint, render_template, request, redirect, url_for, session, flash from datetime import datetime from auth.helpers import is_session_valid, generate_session_id, notify_logout, active_connections from memory_db import get_db from database.telegram import load_telegram_db, save_telegram_db auth_bp = Blueprint('auth', __name__) _socketio = None def init_auth_socketio(socketio): global _socketio _socketio = socketio @auth_bp.route('/signup', methods=['GET', 'POST']) def signup(): if request.method == 'POST': username = request.form.get('username', '').strip() password = request.form.get('password', '') confirm_password = request.form.get('confirm_password', '') student_type = request.form.get('student_type', '').strip() if not username or not password or not confirm_password: flash('جميع الحقول مطلوبة!', 'error') return render_template('signup.html') if not student_type or student_type not in ['أدبي', 'علمي']: flash('يرجى اختيار نوع الدراسة!', 'error') return render_template('signup.html') if len(username) < 3: flash('اسم المستخدم قصير جداً!', 'error') return render_template('signup.html') if password != confirm_password: flash('كلمات المرور غير متطابقة!', 'error') return render_template('signup.html') if len(password) < 6: flash('كلمة المرور قصيرة جداً (6 أحرف على الأقل)!', 'error') return render_template('signup.html') db = get_db() if db.has_key('users', username): flash('اسم المستخدم موجود مسبقاً!', 'error') return render_template('signup.html') session['temp_username'] = username session['temp_password'] = password session['temp_student_type'] = student_type return redirect(url_for('auth.verification')) return render_template('signup.html') @auth_bp.route('/verification', methods=['GET', 'POST']) def verification(): if 'temp_username' not in session: flash('يرجى بدء عملية التسجيل من البداية.', 'error') return redirect(url_for('auth.signup')) if request.method == 'POST': code = request.form.get('code', '').strip() if not code: flash('يرجى إدخال رمز التحقق!', 'error') return render_template('verification.html') db = get_db() telegram_db = db.read('telegram') user_found = False user_id = None for uid, user_data in telegram_db.items(): if user_data.get('code') == code and user_data.get('code') != "DONE": user_found = True user_id = uid break if not user_found: flash('رمز التحقق غير صحيح أو تم استخدامه!', 'error') return render_template('verification.html') # Update telegram record atomically db.update_key('telegram', user_id, lambda t: { **(t or {}), 'code': "DONE", 'status': "verified", 'verified_at': datetime.now().isoformat() }) new_session_id = generate_session_id() username = session['temp_username'] password = session['temp_password'] student_type = session['temp_student_type'] # Create user atomically db.write('users', username, { "username": username, "password": password, "student_type": student_type, "telegram_user_id": user_id, "session_id": new_session_id, "created_at": datetime.now().isoformat(), "verified": True, "last_login": datetime.now().isoformat(), "balance": 0, "purchased_subjects": ['islamic'] }) session.pop('temp_username', None) session.pop('temp_password', None) session.pop('temp_student_type', None) session['username'] = username session['user_id'] = user_id session['session_id'] = new_session_id flash('تم إنشاء الحساب وتسجيل الدخول بنجاح!', 'success') return redirect(url_for('dashboard')) return render_template('verification.html') @auth_bp.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form.get('username', '').strip() password = request.form.get('password', '') db = get_db() user_data = db.read_key('users', username) if user_data and user_data['password'] == password: new_session_id = generate_session_id() if _socketio: notify_logout(username, _socketio) # Atomic update db.update_key('users', username, lambda u: { **(u or {}), 'session_id': new_session_id, 'last_login': datetime.now().isoformat() }) session['username'] = username session['user_id'] = user_data.get('telegram_user_id') session['session_id'] = new_session_id flash('تم تسجيل الدخول بنجاح!', 'success') return redirect(url_for('dashboard')) else: flash('اسم المستخدم أو كلمة المرور خطأ!', 'error') return render_template('login.html') @auth_bp.route('/logout') def logout(): if 'username' in session: username = session['username'] if username in active_connections: active_connections[username] = [] db = get_db() db.update_key('users', username, lambda u: { **(u or {}), 'session_id': None }) session.clear() flash('تم تسجيل الخروج.', 'success') return redirect(url_for('auth.login')) @auth_bp.route('/forgot-password') def forgot_password(): return render_template('forgot_password.html') ================================================================================ FILE 5: board\__init__.py FULL PATH: main_project\board\__init__.py ================================================================================ """ Board package. Interactive whiteboard with AI, TTS, icons, and page images. """ ================================================================================ FILE 6: board\engine.py FULL PATH: main_project\board\engine.py ================================================================================ """ Board AI Engine - STREAMING VERSION. Updated to use HTTP connection pools for 200 concurrent users. """ import re import json from config import GPT_URL, MAX_CHAT_HISTORY, GPT_TIMEOUT from http_pool import gpt_session from board.tts import TTSEngine from board.icons import IconResolver from board.pages import resolve_page_tags from subjects.loader import subject_loader from json_processor import BoardProcessor class BoardEngine: """ Board AI Engine with streaming segment delivery. Each board+voice pair is processed and yielded independently. """ def __init__(self): self.gpt_url = GPT_URL self.board_processor = BoardProcessor() self.tts_engine = TTSEngine() self.icon_resolver = IconResolver() # Per-user state self._user_sessions = {} self._sessions_lock = __import__('threading').Lock() print("✅ BoardEngine initialized (streaming mode + connection pooling)") # ─── User session management (thread-safe) ─── def _get_user_session(self, username): with self._sessions_lock: if username not in self._user_sessions: self._user_sessions[username] = { "subject_id": None, "conversation_history": [], "last_sequence": [], } return self._user_sessions[username] def set_subject(self, username, subject_id): with self._sessions_lock: if username not in self._user_sessions: self._user_sessions[username] = { "subject_id": None, "conversation_history": [], "last_sequence": [], } us = self._user_sessions[username] if us["subject_id"] and us["subject_id"] != subject_id: us["conversation_history"] = [] us["last_sequence"] = [] print(f" 🔄 Board subject switched: {us['subject_id']} → {subject_id} for {username}") us["subject_id"] = subject_id print(f" 📋 Board subject set: {subject_id} for {username}") def get_subject(self, username): with self._sessions_lock: us = self._user_sessions.get(username, {}) return us.get("subject_id") # ─── GPT call (uses connection pool) ─── def _call_gpt5(self, user_message, system_prompt, temperature=0.7, max_tokens=4000): payload = { "user_input": user_message, "chat_history": [ {"role": "system", "content": system_prompt} ], "temperature": temperature, "top_p": 0.95, "max_completion_tokens": max_tokens } try: response = gpt_session.post(self.gpt_url, json=payload, timeout=GPT_TIMEOUT) response.raise_for_status() return response.json().get("assistant_response", "") except Exception as e: print(f" ❌ GPT error: {e}") return None # ─── Chat history formatting ─── def _format_chat_history(self, username): us = self._get_user_session(username) history = us.get("conversation_history", []) if not history: return "" recent = history[-10:] parts = [] for msg in recent: if msg["role"] == "user": parts.append(f"الطالب: {msg['content']}") elif msg["role"] == "assistant": parts.append(f"المدرس: {msg['content']}") return ( "\n\n═══ سجل المحادثة السابقة ═══\n" + "\n".join(parts) + "\n═══ نهاية السجل ═══" ) # ─── Step 1: Route message ─── def _route_message(self, user_message, username): us = self._get_user_session(username) subject_id = us["subject_id"] if not subject_id: return "main.txt" subject_data = subject_loader.load(subject_id) if not subject_data: return "main.txt" structure = subject_data.get("structure.txt", "") p_files = subject_data.get("_p_files", []) chat_history_text = self._format_chat_history(username) p_files_desc = "\n".join([f"- {f}: الفصل {i+1}" for i, f in enumerate(p_files)]) routing_prompt = f"""أنت نظام توجيه ذكي لمساعد تعليمي. مهمتك: تحليل رسالة الطالب واختيار الملف المناسب للرد. الملفات المتاحة: - main.txt: للتحيات، الأسئلة العامة، أي شيء لا يتعلق بفصل محدد {p_files_desc} فهرس الكتاب (للمساعدة في التوجيه): {structure} {chat_history_text} تعليمات: 1. إذا كانت الرسالة تحية أو سؤال عام → main.txt 2. إذا كانت تسأل عن موضوع في فصل محدد → الملف المناسب 3. استخدم الفهرس لتحديد الفصل الصحيح 4. مهم جداً: إذا قال الطالب "اشرح أكثر" أو "وضح" أو أي طلب متابعة، ارجع لسجل المحادثة لتعرف الموضوع الحالي واختر نفس الملف 5. أجب فقط باسم الملف (مثال: p1.txt أو main.txt) بدون أي كلام إضافي رسالة الطالب: {user_message} الملف المناسب:""" chosen = self._call_gpt5( user_message, routing_prompt, temperature=0.2, max_tokens=50 ) if not chosen: return "main.txt" chosen = chosen.strip().lower() valid_files = ["main.txt"] + p_files for v in valid_files: if v in chosen: return v return "main.txt" # ─── Step 2: Generate XML response ─── def _generate_xml_response(self, user_message, chosen_file, username): us = self._get_user_session(username) subject_id = us["subject_id"] if not subject_id: return None subject_data = subject_loader.load(subject_id) if not subject_data: return None file_content = subject_data.get(chosen_file, "") chat_history_text = self._format_chat_history(username) system_prompt = f"""انتي مدرسة خبيرة ومحترفة في هذه المادة. تشرح على سبورة تفاعلية رقمية. ═══ صيغة الرد ═══ يجب أن تردي بصيغة XML خاصة تحتوي على: 1. عناصر السبورة - ما سيُرسم/يُضاف على السبورة أولاً 2. نص الكلام - النص الذي سيُقرأ بصوت عالٍ للطالب بعد رسم العناصر (عربي طبيعي) ═══ عناصر السبورة المتاحة (داخل ) ═══ • محتوى الملاحظةنص مباشر على السبورة بدون خلفية الأنواع: circle, triangle, star, arrow-right, arrow-left, arrow-up, arrow-down, rectangle, diamond, hexagon, square, oval, arrow-double-h, checkmark, cross, heart, cloud, lightning, speech, process, decision • كلمة_بحث_بالإنجليزية سيتم البحث عن أيقونة مرسومة يدوياً (مثل: ball, car, force, spring, weight, rope, pulley) • رقم_الصفحة لعرض صفحة محددة من الكتاب كصورة على السبورة مثال: 12 لعرض الصفحة 12 من الكتاب استخدمها عندما تحتاج تعرض للطالب صفحة معينة من الكتاب ═══ قواعد مهمة جداً ═══ 1. اشرح خطوة بخطوة: ابدأ بـ ثم ثم ثم وهكذا 2. اجعل الشرح متدرجاً كأنك تشرح على سبورة حقيقية أمام الطلاب 3. = ما يظهر على السبورة أولاً (ملاحظات، نصوص، أشكال، صور، صفحات الكتاب) 4. = الكلام المسموع بعد رسم العناصر (طبيعي، ودود، واضح، يشرح ما تم رسمه) 5. لا تضع كل شيء دفعة واحدة - اجعله تسلسلياً 7. فقط بكلمات إنجليزية بسيطة ومعبرة 8. السبورة تعمل بنظام الإضافة - العناصر السابقة تبقى 9. استخدم للعناوين والمعادلات المهمة (بدون خلفية) 10. استخدم للتوضيحات والملاحظات (مع خلفية ملونة) 11. لا تستخدم أكثر من 3-4 عناصر في كل 12. اجعل النص في طبيعياً كأنك تتحدث مع طالب ويشرح ما تم رسمه على السبورة 13. ارسم أولاً ثم تكلم - هذا مهم جداً! 14. راجع سجل المحادثة السابقة لتعرف ما تم شرحه وتكمل من حيث توقفت - لا تكرر ما قلته سابقاً 15. استخدم عندما تريد عرض صفحة من الكتاب - مثلاً إذا الطالب سأل عن تمرين أو شكل في صفحة معينة when user talk about something not about the subject or something funny etc... you can actually answer without the board just VOICE and be funny smart perfect girl also: when you explain something dont make all your explain on the NOTE make the note for important point use the TEXT direct on the board and the ICONS/SHAPES ═══ محتوى المادة ═══ {file_content} {chat_history_text} ═══ الآن أجب على سؤال الطالب ═══ رسالة الطالب: {user_message}""" response = self._call_gpt5( user_message, system_prompt, temperature=0.8, max_tokens=4000 ) return response # ─── Resolve tags ─── def _resolve_svg_tags(self, xml_text): if not xml_text: return xml_text return self.icon_resolver.resolve_all_in_xml(xml_text) def _resolve_page_tags(self, xml_text, username): if not xml_text: return xml_text us = self._get_user_session(username) subject_id = us.get("subject_id") if not subject_id: return xml_text base_url = subject_loader.get_pages_base_url(subject_id) if not base_url: print(f" ⚠️ No pages_base_url for subject {subject_id}") return xml_text return resolve_page_tags(xml_text, base_url) # ─── Parse XML into raw segments ─── def _parse_xml_to_raw_segments(self, xml_response): if not xml_response: return [] pattern = r'<(voice|board)>(.*?)' matches = list(re.finditer(pattern, xml_response, re.DOTALL)) if not matches: cleaned = re.sub(r'<[^>]+>', '', xml_response).strip() if cleaned: return [{"boards": [], "voice": cleaned}] return [] groups = [] current_boards = [] for match in matches: tag_type = match.group(1) content = match.group(2).strip() if tag_type == "board": current_boards.append(content) elif tag_type == "voice": cleaned_voice = re.sub(r'<[^>]+>', '', content).strip() groups.append({ "boards": list(current_boards), "voice": cleaned_voice }) current_boards = [] if current_boards: groups.append({ "boards": list(current_boards), "voice": "" }) return groups # ─── Process a single board content into items ─── def _process_single_board(self, board_content, current_board_state): existing_json_str = json.dumps( current_board_state, ensure_ascii=False, indent=2 ) processor_input = ( f"BOARD NOW (make sure no X Y error):\n" f"{existing_json_str}\n\n" f"new board need to add :\n" f"{board_content}" ) print(f" 🔧 Sending to json_processor...") print(f" Current board items: {len(current_board_state)}") try: json_text = self.board_processor.convert_xml_to_json(processor_input) if json_text: new_items = json.loads(json_text) if isinstance(new_items, list) and new_items: added_items = [] existing_ids = set() for item in current_board_state: item_key = json.dumps(item, sort_keys=True, ensure_ascii=False) existing_ids.add(item_key) for item in new_items: item_key = json.dumps(item, sort_keys=True, ensure_ascii=False) if item_key not in existing_ids: added_items.append(item) current_board_state.extend(added_items) print(f" ✅ json_processor: {len(new_items)} total, {len(added_items)} new") return added_items, current_board_state elif isinstance(new_items, dict): current_board_state.append(new_items) print(f" ✅ json_processor: 1 item") return [new_items], current_board_state else: print(f" ⚠️ json_processor: unexpected format") return [], current_board_state except json.JSONDecodeError as e: print(f" ❌ json_processor invalid JSON: {e}") return [], current_board_state except Exception as e: print(f" ❌ json_processor error: {e}") return [], current_board_state return [], current_board_state # ─── STREAMING PIPELINE (generator) ─── def process_message_stream(self, user_message, username, frontend_board_state=None): us = self._get_user_session(username) subject_id = us.get("subject_id") print(f"\n{'═' * 60}") print(f" 👤 Student ({username}): {user_message}") print(f" 📚 Subject: {subject_id}") print(f"{'═' * 60}") if not subject_id: yield json.dumps({ "type": "error", "message": "لم يتم تحديد المادة للسبورة" }, ensure_ascii=False) return if frontend_board_state is None: frontend_board_state = [] current_board_state = list(frontend_board_state) print(f" 📋 Board state from frontend: {len(current_board_state)} items") # Step 1: Route print("\n 📍 Step 1: Routing message...") chosen_file = self._route_message(user_message, username) print(f" 📂 Chosen file: {chosen_file}") # Step 2: Generate XML print(f" 🤖 Step 2: Generating XML response...") xml_response = self._generate_xml_response(user_message, chosen_file, username) if not xml_response: print(" ❌ Failed to generate response") yield json.dumps({ "type": "error", "message": "عذراً، حدث خطأ في النظام. حاول مرة أخرى." }, ensure_ascii=False) return print(f" 📝 XML response: {len(xml_response)} chars") # Step 3: Resolve tags print(" 📄 Step 3: Resolving tags...") xml_response = self._resolve_page_tags(xml_response, username) # Step 4: Resolve tags print(" 🎨 Step 4: Resolving tags...") xml_response = self._resolve_svg_tags(xml_response) # Step 5: Parse XML into segment groups print(" 🔧 Step 5: Parsing XML into segment groups...") raw_groups = self._parse_xml_to_raw_segments(xml_response) total_groups = len(raw_groups) print(f" 📊 Found {total_groups} segment groups to stream") if total_groups == 0: yield json.dumps({ "type": "error", "message": "لم يتم توليد محتوى للسبورة." }, ensure_ascii=False) return # Step 6: Process and yield each group one by one all_voice_texts = [] all_segments_for_replay = [] for idx, group in enumerate(raw_groups): print(f"\n ── Segment {idx + 1}/{total_groups} ──") segment_board_items = [] for board_content in group["boards"]: new_items, current_board_state = self._process_single_board( board_content, current_board_state ) segment_board_items.extend(new_items) voice_text = group.get("voice", "") audio_url = None if voice_text: all_voice_texts.append(voice_text) text_preview = voice_text[:50] print(f" 🎙️ Converting TTS: '{text_preview}...'") audio_file = self.tts_engine.convert(voice_text) if audio_file: audio_url = f"/static/{audio_file}" else: print(f" ⚠️ TTS failed for this segment") segment_data = { "type": "segment", "index": idx, "total_estimate": total_groups, "board_items": segment_board_items, "voice_text": voice_text, "audio_url": audio_url } all_segments_for_replay.append(segment_data) print(f" ✅ Segment {idx + 1} ready: {len(segment_board_items)} board items, voice={'yes' if voice_text else 'no'}") yield json.dumps(segment_data, ensure_ascii=False) # Step 7: Save chat history (thread-safe) with self._sessions_lock: us = self._user_sessions.get(username, {}) if "conversation_history" not in us: us["conversation_history"] = [] us["conversation_history"].append({ "role": "user", "content": user_message }) assistant_text = " ".join(all_voice_texts) if assistant_text: us["conversation_history"].append({ "role": "assistant", "content": assistant_text }) if len(us["conversation_history"]) > MAX_CHAT_HISTORY: us["conversation_history"] = us["conversation_history"][-MAX_CHAT_HISTORY:] us["last_sequence"] = all_segments_for_replay # Cleanup old audio self.tts_engine.cleanup_old_files() yield json.dumps({ "type": "done", "board_state": current_board_state, "chosen_file": chosen_file, "total_segments": total_groups }, ensure_ascii=False) print(f"\n ✅ All {total_groups} segments streamed!") print(f" Board items: {len(current_board_state)}") print(f"{'═' * 60}\n") # ─── NON-STREAMING (kept for compatibility) ─── def process_message(self, user_message, username, frontend_board_state=None): all_segments = [] final_board_state = frontend_board_state or [] chosen_file = None for chunk_str in self.process_message_stream(user_message, username, frontend_board_state): chunk = json.loads(chunk_str) if chunk["type"] == "segment": if chunk.get("board_items"): all_segments.append({ "type": "board_update", "action": "add", "items": chunk["board_items"] }) if chunk.get("voice_text"): all_segments.append({ "type": "voice", "text": chunk["voice_text"], "audio_url": chunk.get("audio_url") }) elif chunk["type"] == "done": final_board_state = chunk.get("board_state", []) chosen_file = chunk.get("chosen_file") elif chunk["type"] == "error": return { "success": False, "error": chunk["message"], "sequence": [{ "type": "voice", "text": chunk["message"], "audio_url": None }], "board_state": frontend_board_state or [] } return { "success": True, "chosen_file": chosen_file, "sequence": all_segments, "board_state": final_board_state } # ─── Replay ─── def get_replay_sequence(self, username): with self._sessions_lock: us = self._user_sessions.get(username, {}) last_seq = us.get("last_sequence", []) if not last_seq: return { "success": False, "error": "No previous response to replay", "sequence": [] } voice_only = [] for item in last_seq: if item.get("voice_text"): voice_only.append({ "type": "voice", "text": item.get("voice_text", ""), "audio_url": item.get("audio_url") }) print(f" 🔄 Replay: {len(voice_only)} voice segments") return { "success": True, "sequence": voice_only } # ─── Clear ─── def clear_board(self, username): with self._sessions_lock: if username in self._user_sessions: self._user_sessions[username]["last_sequence"] = [] print(f" 🗑️ Board cleared for {username}") return {"success": True, "board_state": []} def clear_chat_history(self, username): with self._sessions_lock: if username in self._user_sessions: self._user_sessions[username]["conversation_history"] = [] self._user_sessions[username]["last_sequence"] = [] print(f" 🗑️ Board chat history cleared for {username}") return {"success": True} def clear_user_session(self, username): with self._sessions_lock: if username in self._user_sessions: del self._user_sessions[username] print(f" 🗑️ Full board session cleared for {username}") return {"success": True} ================================================================================ FILE 7: board\icons.py FULL PATH: main_project\board\icons.py ================================================================================ """ Icon resolver using Icons8 API. Updated to use HTTP connection pool + thread-safe cache. """ import re import threading import requests from config import ICONS8_SEARCH_URL, ICONS8_IMAGE_URL from http_pool import icons8_session class IconResolver: """Resolves keyword tags to Icons8 hand-drawn icon URLs.""" def __init__(self): self._cache = {} self._cache_lock = threading.Lock() print(" ✓ IconResolver initialized (pooled + thread-safe cache)") def resolve(self, search_term): if not search_term or not search_term.strip(): return None term = search_term.strip().lower() # Thread-safe cache read with self._cache_lock: if term in self._cache: cached = self._cache[term] print(f" 🎨 Icon cache hit: '{term}' → {cached}") return cached url = ICONS8_SEARCH_URL.format(term=requests.utils.quote(term)) try: resp = icons8_session.get(url, timeout=15) resp.raise_for_status() data = resp.json() if data.get("success") and data.get("icons"): first_icon = data["icons"][0] icon_id = first_icon["id"] icon_name = first_icon.get("name", term) image_url = ICONS8_IMAGE_URL.format(icon_id=icon_id) with self._cache_lock: self._cache[term] = image_url print(f" 🎨 Icon found: '{term}' → '{icon_name}' (id={icon_id})") return image_url else: print(f" ⚠️ No icon found for: '{term}'") with self._cache_lock: self._cache[term] = None return None except Exception as e: print(f" ❌ Icon search error for '{term}': {e}") return None def resolve_all_in_xml(self, xml_text): """Replace all keyword tags with tags.""" if not xml_text: return xml_text pattern = r'(.*?)' matches = list(re.finditer(pattern, xml_text, re.DOTALL)) if not matches: return xml_text print(f" 📝 Found {len(matches)} tag(s) to resolve...") result = xml_text for match in reversed(matches): keyword = match.group(1).strip() start = match.start() end = match.end() image_url = self.resolve(keyword) if image_url: replacement = ( f'{keyword}' ) else: replacement = ( f'' f'🖼️ {keyword}' ) result = result[:start] + replacement + result[end:] return result ================================================================================ FILE 8: board\pages.py FULL PATH: main_project\board\pages.py ================================================================================ """ Page URL builder. Converts NUMBER tags to real image URLs. Each subject can have its own pages_base_url. """ import re def build_page_url(page_number, base_url): """ Convert a page number to the full image URL using the subject's base URL. base_url must contain {page_num} placeholder. Example base_url: https://huggingface.co/.../page_{page_num}.png """ if not base_url: print(f" ❌ No pages_base_url configured") return None try: num = int(str(page_number).strip()) page_str = str(num).zfill(4) url = base_url.format(page_num=page_str) print(f" 📄 Page {num} → {url}") return url except (ValueError, TypeError): print(f" ❌ Invalid page number: {page_number}") return None def resolve_page_tags(xml_text, base_url): """ Find all NUMBER tags in the XML and replace them with tags using the subject's base URL. """ if not xml_text: return xml_text pattern = r'\s*(.*?)\s*' matches = list(re.finditer(pattern, xml_text, re.DOTALL)) if not matches: return xml_text print(f" 📖 Found {len(matches)} tag(s) to resolve...") result = xml_text for match in reversed(matches): page_num_raw = match.group(1).strip() start = match.start() end = match.end() url = build_page_url(page_num_raw, base_url) if url: replacement = ( f'' ) else: replacement = ( f'' f'📄 صفحة {page_num_raw}' ) result = result[:start] + replacement + result[end:] return result ================================================================================ FILE 9: board\routes.py FULL PATH: main_project\board\routes.py ================================================================================ """ Board API routes - STREAMING VERSION. Updated to use MemoryDB for fast user lookups. """ from flask import ( Blueprint, request, jsonify, session, render_template, redirect, url_for, flash, Response, stream_with_context ) import json from auth.helpers import is_session_valid from memory_db import get_db from subjects.access import validate_user_subject_access from subjects.definitions import get_subject_name, is_board_enabled, SUBJECTS from subjects.loader import subject_loader board_bp = Blueprint('board', __name__) _board_engine = None def init_board_engine(engine): global _board_engine _board_engine = engine @board_bp.route('/board/') def board_page(subject_id): """Render the board page for a specific subject.""" if not is_session_valid(): session.clear() flash('انتهت الجلسة، يرجى تسجيل الدخول مرة أخرى.', 'error') return redirect(url_for('auth.login')) username = session['username'] db = get_db() user_data = db.read_key('users', username) if not user_data or not user_data.get('verified', False): flash('حسابك غير مفعّل. تواصل مع الإدارة.', 'error') return redirect(url_for('dashboard')) has_access, access_error = validate_user_subject_access(username, subject_id) if not has_access: flash(access_error, 'error') return redirect(url_for('dashboard')) if not is_board_enabled(subject_id): flash('هذه المادة لا تدعم السبورة التفاعلية حالياً.', 'error') return redirect(url_for('dashboard')) subject_data = subject_loader.load(subject_id) if not subject_data: flash(f"مجلد المادة '{subject_id}' غير موجود على الخادم.", 'error') return redirect(url_for('dashboard')) _board_engine.set_subject(username, subject_id) student_type = user_data.get('student_type', 'علمي') subject_name = get_subject_name(student_type, subject_id) return render_template('board.html', username=username, subject_id=subject_id, subject_name=subject_name) @board_bp.route('/api/board/message', methods=['POST']) def handle_board_message(): """STREAMING board message endpoint using SSE.""" if not is_session_valid(): def auth_error(): yield f"data: {json.dumps({'type': 'error', 'message': 'غير مصرح'}, ensure_ascii=False)}\n\n" return Response(auth_error(), mimetype='text/event-stream') data = request.get_json() if not data: def no_data(): yield f"data: {json.dumps({'type': 'error', 'message': 'No JSON data'}, ensure_ascii=False)}\n\n" return Response(no_data(), mimetype='text/event-stream') username = session['username'] user_message = (data.get('message', '') or '').strip() if not user_message: def empty_msg(): yield f"data: {json.dumps({'type': 'error', 'message': 'Empty message'}, ensure_ascii=False)}\n\n" return Response(empty_msg(), mimetype='text/event-stream') db = get_db() user = db.read_key('users', username) if not user or not user.get('verified', False): def unverified(): yield f"data: {json.dumps({'type': 'error', 'message': 'غير مصرح'}, ensure_ascii=False)}\n\n" return Response(unverified(), mimetype='text/event-stream') subject_id = data.get('subject_id', '').strip() if subject_id: has_access, access_error = validate_user_subject_access(username, subject_id) if not has_access: def no_access(): yield f"data: {json.dumps({'type': 'error', 'message': access_error}, ensure_ascii=False)}\n\n" return Response(no_access(), mimetype='text/event-stream') if not is_board_enabled(subject_id): def no_board(): yield f"data: {json.dumps({'type': 'error', 'message': 'هذه المادة لا تدعم السبورة التفاعلية'}, ensure_ascii=False)}\n\n" return Response(no_board(), mimetype='text/event-stream') _board_engine.set_subject(username, subject_id) else: subject_id = _board_engine.get_subject(username) if not subject_id: def need_subject(): yield f"data: {json.dumps({'type': 'error', 'message': 'لم يتم تحديد المادة للسبورة'}, ensure_ascii=False)}\n\n" return Response(need_subject(), mimetype='text/event-stream') frontend_board_state = data.get('board_state', []) if not isinstance(frontend_board_state, list): frontend_board_state = [] print(f"\n 📥 Board STREAM message from {username}:") print(f" Subject: {subject_id}") print(f" Message: {user_message[:80]}...") print(f" Board state: {len(frontend_board_state)} items") def generate(): try: for chunk_json in _board_engine.process_message_stream( user_message, username, frontend_board_state ): yield f"data: {chunk_json}\n\n" except Exception as e: print(f"❌ Error in board stream: {e}") import traceback traceback.print_exc() error_data = json.dumps({ "type": "error", "message": "عذراً، حدث خطأ غير متوقع." }, ensure_ascii=False) yield f"data: {error_data}\n\n" return Response( stream_with_context(generate()), mimetype='text/event-stream', headers={ 'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no', 'Connection': 'keep-alive' } ) @board_bp.route('/api/board/message-sync', methods=['POST']) def handle_board_message_sync(): """NON-STREAMING fallback endpoint.""" if not is_session_valid(): return jsonify({"error": "غير مصرح", "code": "unauthorized"}), 401 data = request.get_json() if not data: return jsonify({"error": "No JSON data"}), 400 username = session['username'] user_message = (data.get('message', '') or '').strip() if not user_message: return jsonify({"error": "Empty message"}), 400 db = get_db() user = db.read_key('users', username) if not user or not user.get('verified', False): return jsonify({"error": "غير مصرح"}), 403 subject_id = data.get('subject_id', '').strip() if subject_id: has_access, access_error = validate_user_subject_access(username, subject_id) if not has_access: return jsonify({"error": access_error, "code": "no_access"}), 403 if not is_board_enabled(subject_id): return jsonify({"error": "هذه المادة لا تدعم السبورة التفاعلية"}), 400 _board_engine.set_subject(username, subject_id) else: subject_id = _board_engine.get_subject(username) if not subject_id: return jsonify({"error": "لم يتم تحديد المادة للسبورة", "need_subject": True}), 400 frontend_board_state = data.get('board_state', []) if not isinstance(frontend_board_state, list): frontend_board_state = [] try: result = _board_engine.process_message( user_message, username, frontend_board_state ) return jsonify(result) except Exception as e: print(f"❌ Error processing board message: {e}") import traceback traceback.print_exc() return jsonify({ "success": False, "error": str(e), "sequence": [{ "type": "voice", "text": "عذراً، حدث خطأ غير متوقع.", "audio_url": None }], "board_state": frontend_board_state }), 500 @board_bp.route('/api/board/replay', methods=['POST']) def replay_last(): """Replay the last response.""" if not is_session_valid(): return jsonify({"error": "غير مصرح"}), 401 username = session['username'] result = _board_engine.get_replay_sequence(username) return jsonify(result) @board_bp.route('/api/board/state', methods=['GET']) def get_board_state(): """Get board state (frontend is source of truth).""" if not is_session_valid(): return jsonify({"error": "غير مصرح"}), 401 return jsonify({"board_state": []}) @board_bp.route('/api/board/clear', methods=['POST']) def clear_board(): """Clear the board completely.""" if not is_session_valid(): return jsonify({"error": "غير مصرح"}), 401 username = session['username'] result = _board_engine.clear_board(username) return jsonify(result) @board_bp.route('/api/board/chat/clear', methods=['POST']) def clear_board_chat(): """Clear board conversation history.""" if not is_session_valid(): return jsonify({"error": "غير مصرح"}), 401 username = session['username'] result = _board_engine.clear_chat_history(username) return jsonify(result) @board_bp.route('/api/board/sync', methods=['POST']) def sync_board_state(): """Sync board state from frontend.""" if not is_session_valid(): return jsonify({"error": "غير مصرح"}), 401 return jsonify({"success": True}) @board_bp.route('/api/board/set-subject', methods=['POST']) def set_board_subject(): """Set/switch the board subject.""" if not is_session_valid(): return jsonify({"error": "غير مصرح"}), 401 data = request.get_json() if not data: return jsonify({"error": "No JSON data"}), 400 username = session['username'] subject_id = data.get('subject_id', '').strip() if not subject_id: return jsonify({"error": "يرجى تحديد المادة"}), 400 has_access, access_error = validate_user_subject_access(username, subject_id) if not has_access: return jsonify({"error": access_error, "code": "no_access"}), 403 if not is_board_enabled(subject_id): return jsonify({"error": "هذه المادة لا تدعم السبورة التفاعلية"}), 400 subject_data = subject_loader.load(subject_id) if not subject_data: return jsonify({"error": f"مجلد المادة '{subject_id}' غير موجود"}), 404 _board_engine.set_subject(username, subject_id) db = get_db() user = db.read_key('users', username) student_type = user.get('student_type', 'علمي') if user else 'علمي' subject_name = get_subject_name(student_type, subject_id) return jsonify({ "success": True, "subject_id": subject_id, "subject_name": subject_name, "message": f"تم تحديد مادة '{subject_name}' للسبورة" }) ================================================================================ FILE 10: board\tts.py FULL PATH: main_project\board\tts.py ================================================================================ """ Text-to-Speech engine. Updated to use HTTP connection pool. """ import os import uuid from pathlib import Path from config import TTS_URL, AUDIO_OUTPUT_DIR, MAX_AUDIO_FILES, TTS_TIMEOUT from http_pool import tts_session class TTSEngine: def __init__(self): self.synthesize_endpoint = f"{TTS_URL}/synthesize" self.output_dir = AUDIO_OUTPUT_DIR Path(self.output_dir).mkdir(parents=True, exist_ok=True) print(" ✓ TTSEngine initialized (pooled connections)") def convert(self, text, voice_id=None): if not text or not text.strip(): return None clean_text = text.strip() if len(clean_text) < 2: return None file_id = uuid.uuid4().hex[:12] filename = f"audio/voice_{file_id}.mp3" filepath = os.path.join("static", filename) payload = {"text": clean_text} if voice_id: payload["voice_id"] = voice_id headers = {"Content-Type": "application/json"} try: response = tts_session.post( self.synthesize_endpoint, headers=headers, json=payload, timeout=TTS_TIMEOUT ) if response.status_code == 200 and len(response.content) > 500: with open(filepath, 'wb') as f: f.write(response.content) file_size = os.path.getsize(filepath) print(f" 📊 TTS saved: {filename} ({file_size} bytes)") return filename else: print( f" ❌ TTS error: status={response.status_code}, " f"size={len(response.content)}" ) return None except Exception as e: print(f" ❌ TTS error: {e}") return None def cleanup_old_files(self): audio_dir = Path(self.output_dir) files = sorted( audio_dir.glob("voice_*.mp3"), key=lambda f: f.stat().st_mtime ) if len(files) > MAX_AUDIO_FILES: for f in files[:len(files) - MAX_AUDIO_FILES]: try: f.unlink() print(f" 🗑️ Cleaned: {f.name}") except Exception: pass ================================================================================ FILE 11: chat\__init__.py FULL PATH: main_project\chat\__init__.py ================================================================================ """ Chat package. Handles text chat with AI (non-board mode). """ ================================================================================ FILE 12: chat\agent.py FULL PATH: main_project\chat\agent.py ================================================================================ """ AI Agent for text chat mode. Updated to use HTTP connection pools for 200 concurrent users. """ import json import threading from datetime import datetime, timezone from config import ( GPT_URL, MISTRAL_COOKIE, TRANSCRIPT_URL, CLOUDINARY_URL, CLOUDINARY_PRESET, GPT_TIMEOUT, MISTRAL_TIMEOUT, TRANSCRIPT_TIMEOUT ) from http_pool import gpt_session, mistral_session, transcript_session, cloudinary_session from chat.history import ChatHistory from subjects.loader import subject_loader from database.chat_history import save_user_chat_to_db, load_user_chat_from_db class AIAgent: def __init__(self): self.gpt_url = GPT_URL self.mistral_cookie = MISTRAL_COOKIE self.transcript_base_url = TRANSCRIPT_URL self.cloudinary_url = CLOUDINARY_URL self.cloudinary_preset = CLOUDINARY_PRESET self.chat_history = ChatHistory() # Exam sessions (in-memory) self.exam_sessions = {} self.exam_lock = threading.Lock() print("✅ AIAgent initialized (pooled connections)") # ─── GPT call (pooled) ─── def _call_gpt5(self, user_message, system_prompt, temperature=0.7): payload = { "user_input": user_message, "chat_history": [{"role": "system", "content": system_prompt}], "temperature": temperature, "top_p": 0.95, "max_completion_tokens": 4000 } try: response = gpt_session.post(self.gpt_url, json=payload, timeout=GPT_TIMEOUT) response.raise_for_status() return response.json().get("assistant_response", "") except Exception as e: print(f"❌ GPT-5 Error: {e}") return None def _call_gpt5_multipart(self, image_url, text_prompt, system_prompt, temperature=0.7): payload = { "user_input": None, "chat_history": [ {"role": "system", "content": system_prompt}, { "role": "user", "type": "multipart", "content": [ {"type": "image", "url": image_url}, {"type": "text", "text": text_prompt} ] } ], "temperature": temperature, "top_p": 0.95, "max_completion_tokens": 4000 } try: response = gpt_session.post(self.gpt_url, json=payload, timeout=GPT_TIMEOUT) response.raise_for_status() return response.json().get("assistant_response", "") except Exception as e: print(f"❌ GPT-5 Multipart Error: {e}") return None # ─── Mistral streaming (pooled) ─── def _call_mistral_stream(self, user_message, file_content, session_id): chat_context = self.chat_history.get_full_context(session_id) full_prompt = f"""{file_content} === سياق المحادثة === {chat_context if chat_context else "هذه بداية المحادثة مع الطالب."} === نهاية السياق === سؤال الطالب الحالي: {user_message} إجابتك:""" headers = { "Content-Type": "application/json", "Cookie": self.mistral_cookie, "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" } payload = { "inputs": [{ "object": "entry", "type": "message.input", "created_at": datetime.now(timezone.utc).isoformat(), "role": "user", "content": full_prompt, "prefix": False }], "stream": True, "instructions": "", "tools": [], "completion_args": { "temperature": 0.7, "top_p": 0.95, "max_tokens": 4096 }, "model": "mistral-medium-latest" } try: response = mistral_session.post( "https://console.mistral.ai/api-ui/bora/v1/conversations", headers=headers, json=payload, stream=True, timeout=MISTRAL_TIMEOUT ) if response.status_code not in [200, 201]: error_msg = f"Mistral Error: {response.status_code} - {response.text[:500]}" yield json.dumps({"error": error_msg}) + "\n" return full_response = "" for line in response.iter_lines(): if line: line = line.decode('utf-8') if line.startswith('data: '): try: data = json.loads(line[6:]) if data.get('type') == 'message.output.delta': content = data.get('content', '') if content: full_response += content yield json.dumps({"chunk": content}) + "\n" except json.JSONDecodeError: continue if full_response: self.chat_history.add_message(session_id, "assistant", full_response) yield json.dumps({"done": True, "full_response": full_response}) + "\n" except Exception as e: yield json.dumps({"error": str(e)}) + "\n" # ─── Routing ─── def route_message(self, user_message, session_id): subject_id = self.chat_history.get_subject(session_id) subject_data = subject_loader.load(subject_id) if subject_id else None if not subject_data: return "main.txt" structure_content = subject_data.get("structure.txt", "") chat_context = self.chat_history.get_full_context(session_id) last_file = self.chat_history.get_last_file(session_id) p_files = subject_data.get("_p_files", []) p_files_list = "\n".join([f"- {f}" for f in p_files]) routing_prompt = f"""{structure_content} **الملفات المتاحة:** - main.txt: للتحيات والأسئلة العامة {p_files_list} **سياق المحادثة:** {chat_context if chat_context else "لا يوجد سياق سابق"} **الملف الأخير المستخدم:** {last_file if last_file else "لا يوجد"} **تعليمات:** - إذا كانت الرسالة متابعة لموضوع سابق → استخدم نفس الملف الأخير: {last_file if last_file else "main.txt"} - إذا قال الطالب "استمر" أو "أكمل" → استخدم نفس الملف الأخير - أجب **فقط** باسم الملف (مثال: p1.txt أو main.txt) رسالة الطالب: {user_message} الملف المناسب:""" chosen_file = self._call_gpt5(user_message, routing_prompt, temperature=0.2) if not chosen_file: return last_file if last_file else "main.txt" chosen_file = chosen_file.strip().lower() valid_files = ["main.txt"] + p_files for vf in valid_files: if vf in chosen_file: return vf return last_file if last_file else "main.txt" # ─── GPT response ─── def respond_gpt(self, user_message, chosen_file, session_id): subject_id = self.chat_history.get_subject(session_id) subject_data = subject_loader.load(subject_id) if subject_id else None if not subject_data: return "عذراً، لم يتم تحديد المادة. يرجى اختيار المادة أولاً." main_content = subject_data.get("main.txt", "") chat_context = self.chat_history.get_full_context(session_id) system_prompt = f"""{main_content} === سياق المحادثة السابقة === {chat_context if chat_context else "هذه بداية المحادثة."} === نهاية السياق ===""" self.chat_history.add_message(session_id, "user", user_message) response = self._call_gpt5(user_message, system_prompt, temperature=0.8) if response: self.chat_history.add_message(session_id, "assistant", response) return response else: err = "عذراً، حدث خطأ في النظام. حاول مرة أخرى." self.chat_history.add_message(session_id, "assistant", err) return err # ─── Mistral response ─── def respond_mistral_stream(self, user_message, chosen_file, session_id): subject_id = self.chat_history.get_subject(session_id) subject_data = subject_loader.load(subject_id) if subject_id else None if not subject_data: def error_gen(): yield json.dumps({"error": "عذراً، لم يتم تحديد المادة."}) + "\n" return error_gen() file_content = subject_data.get(chosen_file, "") self.chat_history.add_message(session_id, "user", user_message) return self._call_mistral_stream(user_message, file_content, session_id) # ─── Process message (route + decide) ─── def process_message(self, user_message, session_id): subject = self.chat_history.get_subject(session_id) if not subject: return None, "no_subject" subject_data = subject_loader.load(subject) if not subject_data: return None, "subject_not_found" chosen_file = self.route_message(user_message, session_id) self.chat_history.set_last_file(session_id, chosen_file) print(f"📀 Subject: {subject} | Routed to: {chosen_file} | Session: {session_id}") return chosen_file, "ok" # ─── Subject session management ─── def select_subject(self, username, session_id, subject_id): """Select a subject and load/restore chat history.""" current_subject = self.chat_history.get_subject(session_id) if current_subject and current_subject != subject_id: raw = self.chat_history.get_raw_session(session_id) save_user_chat_to_db(username, current_subject, raw) self.chat_history.clear_session(session_id) self.chat_history.set_subject(session_id, subject_id) saved = load_user_chat_from_db(username, subject_id) if saved: self.chat_history.restore_session(session_id, saved) raw = self.chat_history.get_raw_session(session_id) save_user_chat_to_db(username, subject_id, raw) def save_current_chat(self, username, session_id): """Save current session to DB.""" subject_id = self.chat_history.get_subject(session_id) if subject_id: raw = self.chat_history.get_raw_session(session_id) save_user_chat_to_db(username, subject_id, raw) # ─── Audio (pooled) ─── def upload_audio(self, audio_file_path, original_filename): try: with open(audio_file_path, 'rb') as f: files = {'audio': (original_filename, f, 'audio/webm')} response = transcript_session.post( f"{self.transcript_base_url}/upload", files=files, timeout=TRANSCRIPT_TIMEOUT ) response.raise_for_status() data = response.json() if 'file_url' in data: return data['file_url'], None else: return None, data.get('error', 'Upload failed') except Exception as e: print(f"❌ Audio upload error: {e}") return None, str(e) def transcribe_audio(self, file_url): try: payload = { "file_url": file_url, "prompt": "هذا تسجيل صوتي لطالب يسأل سؤالاً دراسياً" } response = transcript_session.post( f"{self.transcript_base_url}/transcribe", json=payload, timeout=TRANSCRIPT_TIMEOUT ) response.raise_for_status() data = response.json() if 'transcription' in data: return data['transcription'], None else: return None, data.get('error', 'Transcription failed') except Exception as e: print(f"❌ Transcription error: {e}") return None, str(e) # ─── Image (pooled) ─── def upload_image_to_cloudinary(self, image_path): try: with open(image_path, 'rb') as image_file: files = {'file': image_file} data = {'upload_preset': self.cloudinary_preset} response = cloudinary_session.post( self.cloudinary_url, files=files, data=data, timeout=60 ) if response.status_code == 200: result = response.json() image_url = result.get('url') or result.get('secure_url') return image_url, None else: return None, f"Cloudinary error: {response.status_code}" except Exception as e: return None, str(e) def analyze_image(self, image_url, session_id): system_prompt = """أنت مساعد تعليمي ذكي متخصص في تحليل الصور الدراسية. مهمتك: 1. إذا كانت الصورة تحتوي على محتوى دراسي (معادلات، نصوص، مسائل، رسوم بيانية، جداول، شروحات، كتب، أوراق امتحانات): - استخرج كل المحتوى من الصورة بدقة تامة - اكتب المعادلات بصيغة LaTeX - اكتب النصوص كما هي - اشرح الرسوم البيانية إن وجدت - قدم المحتوى بشكل منظم ومفيد 2. إذا كانت الصورة لا تحتوي على محتوى دراسي: - أجب فقط بالكلمة: لا تضف أي تعليق إضافي.""" result = self._call_gpt5_multipart( image_url=image_url, text_prompt="حلل هذه الصورة واستخرج محتواها الدراسي", system_prompt=system_prompt, temperature=0.3 ) return result # ─── Exam session management ─── def create_exam_session(self, username, subject_id, door_file, question_count, difficulty): with self.exam_lock: self.exam_sessions[username] = { "subject_id": subject_id, "door_file": door_file, "question_count": question_count, "difficulty": difficulty, "questions": [], "current_index": 0, "score": 0, "answers": [], "started_at": datetime.now().isoformat(), "status": "generating", } return self.exam_sessions[username] def get_exam_session(self, username): with self.exam_lock: return self.exam_sessions.get(username, None) def clear_exam_session(self, username): with self.exam_lock: if username in self.exam_sessions: del self.exam_sessions[username] print(f"🗑️ Cleared exam session for {username}") ================================================================================ FILE 13: chat\history.py FULL PATH: main_project\chat\history.py ================================================================================ """ In-memory chat history manager with auto-summarization. Per-session, thread-safe. Updated to use pooled connections. """ import threading from datetime import datetime, timezone from config import GPT_URL, SUMMARY_TRIGGER_COUNT, GPT_TIMEOUT from http_pool import gpt_session class ChatHistory: def __init__(self): self.gpt_url = GPT_URL self.sessions = {} self.lock = threading.Lock() print(" ✓ ChatHistory initialized (pooled connections)") def get_or_create_session(self, session_id): with self.lock: if session_id not in self.sessions: self.sessions[session_id] = { "messages": [], "full_messages": [], "summary": "", "message_count": 0, "last_file": None, "subject": None, } return self.sessions[session_id] def set_subject(self, session_id, subject): s = self.get_or_create_session(session_id) with self.lock: s["subject"] = subject def get_subject(self, session_id): s = self.get_or_create_session(session_id) return s.get("subject", None) def add_message(self, session_id, role, content): s = self.get_or_create_session(session_id) msg = { "role": role, "content": content, "timestamp": datetime.now(timezone.utc).isoformat() } with self.lock: s["messages"].append(msg) s["full_messages"].append(msg) s["message_count"] += 1 if len(s["messages"]) >= SUMMARY_TRIGGER_COUNT: # Run summarization in background thread to not block threading.Thread( target=self._trigger_summary, args=(session_id,), daemon=True ).start() def _trigger_summary(self, session_id): s = self.sessions.get(session_id) if not s: return with self.lock: msgs = s["messages"].copy() s["messages"] = [] if not msgs: return conv_text = "" for m in msgs: if m["role"] == "user": conv_text += f"الطالب: {m['content']}\n\n" elif m["role"] == "assistant": conv_text += f"المدرس: {m['content']}\n\n" prev_summary = s.get("summary", "") summary_prompt = f"""أنت نظام تلخيص ذكي لمحادثات تعليمية. مهمتك: تلخيص المحادثة التالية بشكل دقيق ومفيد. **التعليمات:** 1. لخّص النقاط الرئيسية التي ناقشها الطالب والمدرس 2. حدد المواضيع التي سأل عنها الطالب 3. قيّم مستوى فهم الطالب (مبتدئ / متوسط / متقدم) 4. اذكر أي نقاط ضعف أو قوة لاحظتها 5. اذكر آخر موضوع كان يُناقش 6. اجعل التلخيص مختصراً لكن شاملاً {"**التلخيص السابق:**" + chr(10) + prev_summary + chr(10) if prev_summary else ""} **المحادثة الجديدة:** {conv_text} **التلخيص الشامل:**""" try: payload = { "user_input": "قم بتلخيص المحادثة", "chat_history": [{"role": "system", "content": summary_prompt}], "temperature": 0.3, "top_p": 0.95, "max_completion_tokens": 1500 } resp = gpt_session.post(self.gpt_url, json=payload, timeout=GPT_TIMEOUT) resp.raise_for_status() new_summary = resp.json().get("assistant_response", "") if new_summary: with self.lock: s["summary"] = new_summary print(f" ✅ Summary generated for session {session_id[:20]}...") else: with self.lock: s["messages"] = msgs + s["messages"] except Exception as e: print(f"❌ Summary error: {e}") with self.lock: s["messages"] = msgs + s["messages"] def get_full_context(self, session_id): s = self.get_or_create_session(session_id) parts = [] if s["summary"]: parts.append(f"**ملخص المحادثة السابقة:**\n{s['summary']}") if s["messages"]: recent = "" for m in s["messages"]: if m["role"] == "user": recent += f"الطالب: {m['content']}\n\n" elif m["role"] == "assistant": recent += f"المدرس: {m['content']}\n\n" if recent: parts.append(f"**المحادثة الحالية:**\n{recent}") return "\n\n".join(parts) def get_messages_for_display(self, session_id): s = self.get_or_create_session(session_id) with self.lock: return s["full_messages"].copy() def get_summary(self, session_id): s = self.get_or_create_session(session_id) return s.get("summary", "") def set_last_file(self, session_id, filename): s = self.get_or_create_session(session_id) with self.lock: s["last_file"] = filename def get_last_file(self, session_id): s = self.get_or_create_session(session_id) return s.get("last_file", None) def clear_session(self, session_id): with self.lock: if session_id in self.sessions: current_subject = self.sessions[session_id].get("subject", None) self.sessions[session_id] = { "messages": [], "full_messages": [], "summary": "", "message_count": 0, "last_file": None, "subject": current_subject, } def restore_session(self, session_id, saved_data): """Restore a session from saved DB data.""" s = self.get_or_create_session(session_id) with self.lock: s["messages"] = saved_data.get("messages", []) s["full_messages"] = saved_data.get("full_messages", saved_data.get("messages", [])) s["summary"] = saved_data.get("summary", "") s["message_count"] = saved_data.get("message_count", 0) s["last_file"] = saved_data.get("last_file", None) s["subject"] = saved_data.get("subject", s.get("subject")) def get_session_info(self, session_id): s = self.get_or_create_session(session_id) return { "total_messages": s["message_count"], "current_messages": len(s["messages"]), "has_summary": bool(s["summary"]), "last_file": s["last_file"], "subject": s["subject"], "summary_preview": ( s["summary"][:200] + "..." if len(s.get("summary", "")) > 200 else s.get("summary", "") ) } def get_raw_session(self, session_id): """Get raw session dict for saving to DB.""" return self.get_or_create_session(session_id) ================================================================================ FILE 14: chat\routes.py FULL PATH: main_project\chat\routes.py ================================================================================ ```python """ Chat API routes for text-based AI chat. Updated to use MemoryDB. """ import json import os import tempfile from datetime import datetime from flask import Blueprint, request, jsonify, Response, stream_with_context, session, render_template, redirect, url_for, flash import requests as http_requests from auth.helpers import is_session_valid from memory_db import get_db from database.chat_history import ( save_user_chat_to_db, clear_user_subject_chat_from_db, get_all_user_chats_from_db ) from subjects.access import validate_user_subject_access, get_user_accessible_subjects from subjects.definitions import SUBJECTS, get_subject_name from subjects.loader import subject_loader chat_bp = Blueprint('chat', __name__) _agent = None def init_chat_agent(agent): global _agent _agent = agent @chat_bp.route('/chat_page') def chat_page(): if not is_session_valid(): session.clear() flash('انتهت الجلسة، يرجى تسجيل الدخول مرة أخرى.', 'error') return redirect(url_for('auth.login')) username = session['username'] db = get_db() user_data = db.read_key('users', username) if not user_data or not user_data.get('verified', False): flash('حسابك غير مفعّل. تواصل مع الإدارة.', 'error') return redirect(url_for('dashboard')) return render_template('chat.html', username=username) @chat_bp.route('/me', methods=['GET']) def get_me(): if not is_session_valid(): return jsonify({"error": "غير مصرح", "code": "unauthorized"}), 401 username = session['username'] db = get_db() user = db.read_key('users', username) if not user: return jsonify({"error": "المستخدم غير موجود", "code": "user_not_found"}), 404 if not user.get('verified', False): return jsonify({"error": "الحساب غير مفعّل", "code": "not_verified"}), 403 subjects, error = get_user_accessible_subjects(username) if error: return jsonify({"error": error}), 400 return jsonify({ "username": username, "student_type": user.get('student_type', ''), "balance": user.get('balance', 0), "session_id": user.get('session_id', username), "subjects": subjects }) @chat_bp.route('/subjects', methods=['GET']) def get_subjects(): if not is_session_valid(): return jsonify({"error": "غير مصرح", "code": "unauthorized"}), 401 username = session['username'] subjects, error = get_user_accessible_subjects(username) if error: return jsonify({"error": error}), 400 return jsonify({ "subjects": subjects, "count": len(subjects), "accessible_count": len([s for s in subjects if s['accessible']]) }) @chat_bp.route('/select-subject', methods=['POST']) def select_subject(): if not is_session_valid(): return jsonify({"error": "غير مصرح", "code": "unauthorized"}), 401 data = request.json username = session['username'] subject_id = data.get('subject', '').strip() if not subject_id: return jsonify({"error": "يرجى تحديد المادة", "code": "no_subject"}), 400 has_access, access_error = validate_user_subject_access(username, subject_id) if not has_access: return jsonify({"error": access_error, "code": "no_access"}), 403 subject_data = subject_loader.load(subject_id) if not subject_data: return jsonify({"error": f"مجلد المادة '{subject_id}' غير موجود على الخادم"}), 404 session_id = username _agent.select_subject(username, session_id, subject_id) db = get_db() user = db.read_key('users', username) student_type = user.get('student_type', '') if user else '' subject_name = get_subject_name(student_type, subject_id) p_files = subject_data.get("_p_files", []) s = _agent.chat_history.get_or_create_session(session_id) full_messages = s.get("full_messages", []) return jsonify({ "success": True, "subject_id": subject_id, "subject_name": subject_name, "chapters_count": len(p_files), "files": ["main.txt"] + p_files, "message": f"تم تحميل مادة '{subject_name}' بنجاح! ({len(p_files)} فصول)", "chat_history": full_messages, "history_count": len(full_messages) }) @chat_bp.route('/switch-subject', methods=['POST']) def switch_subject(): if not is_session_valid(): return jsonify({"error": "غير مصرح", "code": "unauthorized"}), 401 data = request.json username = session['username'] new_subject_id = data.get('subject', '').strip() if not new_subject_id: return jsonify({"error": "يرجى تحديد اسم المادة الجديدة"}), 400 has_access, access_error = validate_user_subject_access(username, new_subject_id) if not has_access: return jsonify({"error": access_error, "code": "no_access"}), 403 subject_data = subject_loader.load(new_subject_id) if not subject_data: return jsonify({"error": f"مجلد المادة '{new_subject_id}' غير موجود على الخادم"}), 404 session_id = username _agent.select_subject(username, session_id, new_subject_id) db = get_db() user = db.read_key('users', username) student_type = user.get('student_type', '') if user else '' subject_name = get_subject_name(student_type, new_subject_id) p_files = subject_data.get("_p_files", []) s = _agent.chat_history.get_or_create_session(session_id) full_messages = s.get("full_messages", []) return jsonify({ "success": True, "subject_id": new_subject_id, "subject_name": subject_name, "chapters_count": len(p_files), "files": ["main.txt"] + p_files, "message": f"تم التبديل إلى مادة '{subject_name}' بنجاح!", "chat_history": full_messages, "history_count": len(full_messages) }) @chat_bp.route('/chat', methods=['POST']) def chat(): if not is_session_valid(): return jsonify({"error": "غير مصرح", "code": "unauthorized"}), 401 data = request.json user_message = data.get('message', '').strip() username = session['username'] if not user_message: return jsonify({"error": "رسالة فارغة"}), 400 session_id = username db = get_db() user = db.read_key('users', username) if not user: return jsonify({"error": "المستخدم غير موجود", "code": "user_not_found"}), 404 if not user.get('verified', False): return jsonify({"error": "الحساب غير مفعّل", "code": "not_verified"}), 403 current_subject = _agent.chat_history.get_subject(session_id) if not current_subject: return jsonify({"error": "لم يتم اختيار المادة بعد", "need_subject": True}), 400 has_access, access_error = validate_user_subject_access(username, current_subject) if not has_access: return jsonify({"error": access_error, "code": "no_access"}), 403 try: chosen_file, status = _agent.process_message(user_message, session_id) if status == "no_subject": return jsonify({"error": "يرجى اختيار المادة أولاً", "need_subject": True}), 400 if status == "subject_not_found": return jsonify({"error": f"مجلد المادة '{current_subject}' غير موجود"}), 404 if chosen_file == "main.txt": response = _agent.respond_gpt(user_message, chosen_file, session_id) _agent.save_current_chat(username, session_id) session_info = _agent.chat_history.get_session_info(session_id) return jsonify({ "type": "gpt", "subject": current_subject, "chosen_file": chosen_file, "response": response, "session_info": session_info }) else: session_info = _agent.chat_history.get_session_info(session_id) return jsonify({ "type": "mistral", "subject": current_subject, "chosen_file": chosen_file, "stream_url": ( f"/stream?file={chosen_file}" f"&session={http_requests.utils.quote(session_id)}" f"&message={http_requests.utils.quote(user_message)}" ), "session_info": session_info }) except Exception as e: print(f"❌ Chat error: {e}") return jsonify({"error": str(e)}), 500 @chat_bp.route('/stream') def stream(): if not is_session_valid(): def auth_error(): yield f"data: {json.dumps({'error': 'غير مصرح'})}\n\n" return Response(auth_error(), mimetype='text/event-stream') username = session['username'] user_message = request.args.get('message', '') chosen_file = request.args.get('file', 'main.txt') session_id = request.args.get('session', username) db = get_db() user = db.read_key('users', username) if not user or not user.get('verified', False): def auth_error(): yield f"data: {json.dumps({'error': 'غير مصرح'})}\n\n" return Response(auth_error(), mimetype='text/event-stream') current_subject = _agent.chat_history.get_subject(session_id) if current_subject: has_access, _ = validate_user_subject_access(username, current_subject) if not has_access: def access_error(): yield f"data: {json.dumps({'error': 'ليس لديك صلاحية الوصول لهذه المادة'})}\n\n" return Response(access_error(), mimetype='text/event-stream') def generate(): try: for chunk in _agent.respond_mistral_stream(user_message, chosen_file, session_id): try: parsed = json.loads(chunk.strip()) if parsed.get("done"): _agent.save_current_chat(username, session_id) except Exception: pass yield f"data: {chunk}\n\n" except Exception as e: yield f"data: {json.dumps({'error': str(e)})}\n\n" return Response( stream_with_context(generate()), mimetype='text/event-stream', headers={ 'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no', 'Connection': 'keep-alive' } ) @chat_bp.route('/clear', methods=['POST']) def clear_history(): if not is_session_valid(): return jsonify({"error": "غير مصرح", "code": "unauthorized"}), 401 username = session['username'] session_id = username current_subject = _agent.chat_history.get_subject(session_id) _agent.chat_history.clear_session(session_id) if current_subject: clear_user_subject_chat_from_db(username, current_subject) return jsonify({ "success": True, "message": "تم مسح سجل المحادثة للمادة الحالية", "subject_kept": current_subject }) @chat_bp.route('/session-info', methods=['POST']) def session_info(): if not is_session_valid(): return jsonify({"error": "غير مصرح", "code": "unauthorized"}), 401 username = session['username'] session_id = username info = _agent.chat_history.get_session_info(session_id) return jsonify(info) @chat_bp.route('/summary', methods=['POST']) def get_summary(): if not is_session_valid(): return jsonify({"error": "غير مصرح", "code": "unauthorized"}), 401 username = session['username'] session_id = username summary = _agent.chat_history.get_summary(session_id) subject = _agent.chat_history.get_subject(session_id) return jsonify({ "username": username, "subject": subject, "summary": summary if summary else "لا يوجد ملخص بعد. سيتم إنشاء ملخص تلقائياً بعد 10 رسائل." }) @chat_bp.route('/reload-subject', methods=['POST']) def reload_subject(): if not is_session_valid(): return jsonify({"error": "غير مصرح", "code": "unauthorized"}), 401 data = request.json username = session['username'] subject_id = data.get('subject', '').strip() if not subject_id: return jsonify({"error": "يرجى تحديد اسم المادة"}), 400 has_access, access_error = validate_user_subject_access(username, subject_id) if not has_access: return jsonify({"error": access_error, "code": "no_access"}), 403 subject_data = subject_loader.reload(subject_id) if not subject_data: return jsonify({"error": f"مجلد المادة '{subject_id}' غير موجود على الخادم"}), 404 p_files = subject_data.get("_p_files", []) return jsonify({ "success": True, "subject_id": subject_id, "chapters_count": len(p_files), "message": f"تم إعادة تحميل المادة '{subject_id}' بنجاح!" }) @chat_bp.route('/get-chat-history', methods=['GET']) def get_chat_history(): if not is_session_valid(): return jsonify({"error": "غير مصرح", "code": "unauthorized"}), 401 username = session['username'] session_id = username s = _agent.chat_history.get_or_create_session(session_id) full_messages = s.get("full_messages", s.get("messages", [])) subject = s.get("subject", None) subject_name = None if subject: db = get_db() user = db.read_key('users', username) student_type = user.get('student_type', '') if user else '' subject_name = get_subject_name(student_type, subject) return jsonify({ "messages": full_messages, "subject": subject, "subject_name": subject_name, "count": len(full_messages) }) @chat_bp.route('/get-all-chats', methods=['GET']) def get_all_chats(): if not is_session_valid(): return jsonify({"error": "غير مصرح", "code": "unauthorized"}), 401 username = session['username'] all_chats = get_all_user_chats_from_db(username) db = get_db() user = db.read_key('users', username) student_type = user.get('student_type', '') if user else '' result = [] for subject_id, chat_data in all_chats.items(): subject_name = get_subject_name(student_type, subject_id) full_msgs = chat_data.get("full_messages", []) result.append({ "subject_id": subject_id, "subject_name": subject_name, "message_count": len(full_msgs), "last_saved": chat_data.get("saved_at", ""), "has_summary": bool(chat_data.get("summary", "")) }) return jsonify({ "chats": result, "total": len(result) }) ================================================================================ FILE 15: config.py FULL PATH: main_project\config.py ================================================================================ """ Global configuration and constants. All settings in one place. """ import os import secrets # ─── Flask ─── SECRET_KEY = os.environ.get("SECRET_KEY", "corvo-ai-fixed-secret-key-change-in-production-2024") DEBUG = os.environ.get("DEBUG", "false").lower() == "true" HOST = os.environ.get("HOST", "0.0.0.0") PORT = int(os.environ.get("PORT", 7860)) # ─── AI endpoints ─── GPT_URL = "https://corvo-ai-gpt-5-4.hf.space/chat" TTS_URL = "https://corvo-ai-tts.hf.space" TRANSCRIPT_URL = "https://corvo-ai-transcript.hf.space" # ─── Connection Pool Settings ─── GPT_POOL_SIZE = 40 TTS_POOL_SIZE = 30 MISTRAL_POOL_SIZE = 30 TRANSCRIPT_POOL_SIZE = 10 CLOUDINARY_POOL_SIZE = 10 GPT_TIMEOUT = 120 TTS_TIMEOUT = 60 MISTRAL_TIMEOUT = 120 TRANSCRIPT_TIMEOUT = 120 # ─── Cloudinary ─── CLOUDINARY_URL = "https://api.cloudinary.com/v1_1/dwsoob1wh/image/upload" CLOUDINARY_PRESET = "Cloud-storage" # ─── Icons8 ─── ICONS8_SEARCH_URL = ( "https://search-app.icons8.com/api/iconsets/v7/search" "?isAnimated=false&language=en&analytics=true" "&spellcheck=false&saveAnalytics=true" "&amount=10&isOuch=true" "&replaceNameWithSynonyms=true" "&offset=0&term={term}" ) ICONS8_IMAGE_URL = "https://img.icons8.com/?size=100&id={icon_id}&format=png" # ─── Audio ─── AUDIO_OUTPUT_DIR = "static/audio" MAX_AUDIO_FILES = 500 # ─── Chat history ─── SUMMARY_TRIGGER_COUNT = 10 MAX_CHAT_HISTORY = 20 # ─── Directories to create on startup ─── REQUIRED_DIRS = [ "static/audio", "templates", "FINAL", ] # ─── Mistral cookie ─── MISTRAL_COOKIE = os.environ.get("MISTRAL_COOKIE", ( '''__cf_bm=JrqUGrlLa1VNznB_gcRKdDY.AyPLIvyRVUuKMV0avmc-1771531995-1.0.1.1-gJPuKUhLOcJ6hccPtwfCq6jJOAkQ8xABZJi_RwovhSQrsUTbUv1MCh1IPLvZ3ldPzqHLrPmcWUwLE_q_iVrP.W9PCYLnRq4HMMbVeyuDwlE; _cfuvid=oL4ochUsTJrLY6FlPpKTfocKdvOyum8GRuUBbqS9l8U-1771531995497-0.0.1.1-604800000; csrf_token_1d61ec8f0158ec4868343239ec73dbe1bfebad9908ad860e62f470c767573d0d=6ybeYU0W24tff63a5O/h0wbG6pYuc4w4iKv6SOKnYFc=; ory_session_coolcurranf83m3srkfl=MTc3MTUzMjA2OXxZU1dnRmd5SzNZM3o5VERSMGN5OFBWaHZ4bmdzd2RLOXNTUHNqMVVQbUdqUVFyZXpFN1ZCa0stYmJOR1lmbFNHVWphdWE4UWZ4S2FESjVROVJVYkw3a0MxN01TVVRZSUFGcWdZRHBCSjA2WEFJcFowcERmTWdrTmxuZ01zLWtubWxtd3lELVBBSVhOSzRkS1RRTWlxUHo1WmJwRWZzNlBXOFZWbUQyY2xkRVlOdXczb19kMlA4enc2ZGdVbkhaOUtXNDhEcEtxTVVBUTFYVk5YUXUtbDhjNVBOaG04WDRncmVNeXV4WWZXNC05TTlZRml3Y2hmR1F3QUR4Wk1aS0wwVUFfWGFyb2YxUlUtZXJkMGVhc3B8ITaivuTKOyk2pkoOhT6SPvP7zk-_NWF074pMm1HUl-k=; intercom-device-id-xel0jpx9=16cbae05-b305-4216-9967-e1622600c85c; csrftoken=KevqOi51yhx99LPY2i96wpMb2BxPlbpA; __cflb=0H28vBt3Asif1pksrBB47e5ijRcsvN4rdm4JmqYW3k8; intercom-session-xel0jpx9=K0lUK3E1QkhQemZuN256UXRtUVIxQWRoTVNQVXA2OVhXbDd1UHlGb3hxVmFVN0tJTG5LdFEvelpJck5aR0FjV3VWY0NjR1piMUxUdkZUc09kLzVDMTIrRGZiU3BtRElIQ0FvZUVZdHZyNU5sZU9EU0E3cHFCK1FzdE9kUTFKaG5PclZJTGRxN1c2WUt1NEVLNXBqZUwySStMQ2JHWHo3Z3Vyc2xLSWh2YkVDbWJBOWJWeHpHNFJIUzltZWUvQXVXLS1jdGVHOFVGRTJNS3I2Qm1tWlk3UEVBPT0=--d3764aa89dd7c5400eb27bf4e34ad8c85402eb72''' )) ================================================================================ FILE 16: database\__init__.py FULL PATH: main_project\database\__init__.py ================================================================================ """ Database package. Handles all JSON file persistence. """ ================================================================================ FILE 17: database\cards.py FULL PATH: main_project\database\cards.py ================================================================================ """ Cards database operations. Pure in-memory via MemoryDB. No local files. """ from memory_db import get_db def init_cards_db(): db = get_db() count = db.count('cards') print(f" ✅ Cards DB ready: {count} cards in memory") def load_cards_db(): return get_db().read('cards') def save_cards_db(data): get_db().write_full('cards', data) def get_card(code): return get_db().read_key('cards', code) def set_card(code, card_data): get_db().write('cards', code, card_data) def card_exists(code): return get_db().has_key('cards', code) ================================================================================ FILE 18: database\chat_history.py FULL PATH: main_project\database\chat_history.py ================================================================================ """ Chat history database. Pure in-memory via MemoryDB. No local files. """ from datetime import datetime from memory_db import get_db def init_chat_history_db(): db = get_db() count = db.count('chat_history') print(f" ✅ Chat History DB ready: {count} records in memory") def make_chat_db_key(username, subject_id): return f"{username}__{subject_id}" def save_user_chat_to_db(username, subject_id, session_data): try: db = get_db() db_key = make_chat_db_key(username, subject_id) chat_record = { "username": username, "subject_id": subject_id, "messages": session_data.get("messages", []), "full_messages": session_data.get("full_messages", []), "summary": session_data.get("summary", ""), "message_count": session_data.get("message_count", 0), "last_file": session_data.get("last_file", None), "subject": session_data.get("subject", subject_id), "saved_at": datetime.now().isoformat() } db.write('chat_history', db_key, chat_record) msg_count = len(session_data.get('full_messages', [])) print(f"✅ Saved chat for {username} / subject={subject_id} ({msg_count} messages)") except Exception as e: print(f"❌ Error saving chat to memory: {e}") def load_user_chat_from_db(username, subject_id): try: db = get_db() db_key = make_chat_db_key(username, subject_id) saved = db.read_key('chat_history', db_key) if saved is None: return None return saved except Exception as e: print(f"❌ Error loading chat from memory: {e}") return None def clear_user_subject_chat_from_db(username, subject_id): try: db = get_db() db_key = make_chat_db_key(username, subject_id) db.delete('chat_history', db_key) print(f"🗑️ Cleared chat for {username} / subject={subject_id}") except Exception as e: print(f"❌ Error clearing chat: {e}") def get_all_user_chats_from_db(username): try: db = get_db() prefix = f"{username}__" records = db.find_keys_by_prefix('chat_history', prefix) result = {} for key, val in records.items(): subject_id = key[len(prefix):] result[subject_id] = val return result except Exception as e: print(f"❌ Error getting all user chats: {e}") return {} ================================================================================ FILE 19: database\telegram.py FULL PATH: main_project\database\telegram.py ================================================================================ """ Telegram verification database. Pure in-memory via MemoryDB. No local files. """ from memory_db import get_db def load_telegram_db(): return get_db().read('telegram') def save_telegram_db(data): get_db().write_full('telegram', data) def get_telegram_user(user_id): return get_db().read_key('telegram', str(user_id)) def set_telegram_user(user_id, data): get_db().write('telegram', str(user_id), data) ================================================================================ FILE 20: database\users.py FULL PATH: main_project\database\users.py ================================================================================ """ Users database operations. Pure in-memory via MemoryDB. No local files. """ from memory_db import get_db def init_users_db(): db = get_db() count = db.count('users') print(f" ✅ Users DB ready: {count} users in memory") def load_users_db(): return get_db().read('users') def save_users_db(data): get_db().write_full('users', data) def get_user(username): return get_db().read_key('users', username) def update_user(username, update_fn): return get_db().update_key('users', username, update_fn) def set_user(username, user_data): get_db().write('users', username, user_data) def user_exists(username): return get_db().has_key('users', username) ================================================================================ FILE 21: exam\__init__.py FULL PATH: main_project\exam\__init__.py ================================================================================ """ Exam package. MCQ exam generation and scoring. """ ================================================================================ FILE 22: exam\generator.py FULL PATH: main_project\exam\generator.py ================================================================================ """ Exam question generator. Updated to use HTTP connection pools. """ import re import json import random import os from config import GPT_URL, GPT_TIMEOUT from http_pool import gpt_session from subjects.loader import subject_loader def load_final_mcq(subject_id): """Load the MCQ reference file from FINAL folder.""" final_path = os.path.join("FINAL", f"{subject_id}.txt") try: with open(final_path, 'r', encoding='utf-8') as f: content = f.read() print(f" ✓ Loaded FINAL/{subject_id}.txt ({len(content)} chars)") return content except FileNotFoundError: print(f" ⚠ FINAL/{subject_id}.txt not found") return "" except Exception as e: print(f" ❌ Error loading FINAL/{subject_id}.txt: {e}") return "" def generate_exam_questions(subject_id, door_file, question_count, difficulty, username): """Generate MCQ questions using GPT with pooled connection.""" subject_data = subject_loader.load(subject_id) if not subject_data: return None, f"مجلد المادة '{subject_id}' غير موجود" door_content = subject_data.get(door_file, "") if not door_content: return None, f"ملف الباب '{door_file}' غير موجود أو فارغ" final_mcq_content = load_final_mcq(subject_id) main_content = subject_data.get("main.txt", "") difficulty_map = { "easy": "سهلة - أسئلة مباشرة تعتمد على الحفظ والفهم الأساسي", "medium": "متوسطة - أسئلة تعتمد على الفهم والتطبيق", "hard": "صعبة - أسئلة تحليلية وتطبيقية عميقة تتطلب فهماً متقدماً" } difficulty_text = difficulty_map.get(difficulty, "متوسطة") exam_prompt = f"""{main_content} === محتوى الباب الدراسي === {door_content} === أسئلة السنوات السابقة (للاستئناس فقط) === {final_mcq_content if final_mcq_content else "لا توجد أسئلة مرجعية متاحة"} === تعليمات توليد الأسئلة === أنت مدرس خبير متخصص في إعداد أسئلة الاختيار من متعدد (MCQ). مهمتك: توليد {question_count} سؤال اختيار من متعدد بمستوى {difficulty_text}. **قواعد مهمة جداً:** 1. اعتمد بشكل رئيسي على محتوى الباب الدراسي أعلاه لصياغة الأسئلة 2. يمكنك الاستئناس بأسئلة السنوات السابقة لفهم نمط الأسئلة وأسلوب المُعِدّين، لكن لا تنسخها حرفياً 3. كل سؤال يجب أن يحتوي على 4 خيارات: 3 خاطئة و1 صحيحة 4. اجعل الخيارات الخاطئة منطقية وقريبة من الصحيحة 5. تنوّع في أنواع الأسئلة: تعريف، تطبيق، مقارنة، استنتاج **صيغة الإجابة المطلوبة (JSON فقط - لا تضف أي نص خارج JSON):** {{ "questions": [ {{ "question": "نص السؤال هنا", "correct": "الإجابة الصحيحة", "wrong1": "خيار خاطئ أول", "wrong2": "خيار خاطئ ثاني", "wrong3": "خيار خاطئ ثالث" }} ] }} الآن أنشئ {question_count} سؤال بمستوى {difficulty_text} بصيغة JSON فقط:""" payload = { "user_input": exam_prompt, "chat_history": [{"role": "system", "content": ""}], "temperature": 0.7, "top_p": 0.95, "max_completion_tokens": 4000 } try: response = gpt_session.post(GPT_URL, json=payload, timeout=GPT_TIMEOUT) response.raise_for_status() result = response.json().get("assistant_response", "") if not result: return None, "لم يتم الحصول على استجابة من الخادم" print(f"✅ Exam generation complete: {len(result)} chars") return result, None except Exception as e: print(f"❌ Exam generation error: {e}") return None, str(e) def parse_mcq_json(json_string): """Parse the JSON MCQ response into a list of question dicts.""" questions = [] seen = set() try: json_string = re.sub(r'```(?:json)?\s*', '', json_string) json_string = re.sub(r'```', '', json_string).strip() match = re.search(r'\{.*\}', json_string, re.DOTALL) if not match: print("❌ No JSON object found in response") return [] data = json.loads(match.group()) raw_questions = data.get("questions", []) for item in raw_questions: question_text = item.get("question", "").strip() correct_text = item.get("correct", "").strip() wrong1 = item.get("wrong1", "").strip() wrong2 = item.get("wrong2", "").strip() wrong3 = item.get("wrong3", "").strip() if not question_text or not correct_text: continue dedup_key = re.sub(r'\s+', ' ', question_text).strip() if dedup_key in seen: continue seen.add(dedup_key) wrong_texts = [w for w in [wrong1, wrong2, wrong3] if w] if not wrong_texts: continue while len(wrong_texts) < 3: wrong_texts.append(wrong_texts[-1]) choices = [{"text": correct_text, "is_correct": True}] for w in wrong_texts[:3]: choices.append({"text": w, "is_correct": False}) random.shuffle(choices) questions.append({ "question": question_text, "choices": choices, "correct_answer": correct_text }) print(f"✅ Parsed {len(questions)} MCQ questions from JSON") return questions except json.JSONDecodeError as e: print(f"❌ JSON parse error: {e}") return [] except Exception as e: print(f"❌ Parse error: {e}") return [] ================================================================================ FILE 23: exam\routes.py FULL PATH: main_project\exam\routes.py ================================================================================ """ Exam API routes. Updated to use MemoryDB for user lookups. """ import threading from datetime import datetime from flask import Blueprint, request, jsonify, session, render_template, redirect, url_for, flash from auth.helpers import is_session_valid from memory_db import get_db from subjects.access import validate_user_subject_access from subjects.definitions import SUBJECTS, get_subject_name from subjects.loader import subject_loader from exam.generator import generate_exam_questions, parse_mcq_json exam_bp = Blueprint('exam', __name__) _exam_sessions = {} _exam_lock = threading.Lock() def _create_exam_session(username, subject_id, door_file, question_count, difficulty): with _exam_lock: _exam_sessions[username] = { "subject_id": subject_id, "door_file": door_file, "question_count": question_count, "difficulty": difficulty, "questions": [], "current_index": 0, "score": 0, "answers": [], "started_at": datetime.now().isoformat(), "status": "generating", } return _exam_sessions[username] def _get_exam_session(username): with _exam_lock: return _exam_sessions.get(username, None) def _clear_exam_session(username): with _exam_lock: if username in _exam_sessions: del _exam_sessions[username] print(f"🗑️ Cleared exam session for {username}") @exam_bp.route('/exam/') def exam_page(subject_id): if not is_session_valid(): session.clear() flash('انتهت الجلسة، يرجى تسجيل الدخول مرة أخرى.', 'error') return redirect(url_for('auth.login')) username = session['username'] db = get_db() user_data = db.read_key('users', username) if not user_data or not user_data.get('verified', False): flash('حسابك غير مفعّل. تواصل مع الإدارة.', 'error') return redirect(url_for('dashboard')) has_access, access_error = validate_user_subject_access(username, subject_id) if not has_access: flash(access_error, 'error') return redirect(url_for('dashboard')) subject_data = subject_loader.load(subject_id) if not subject_data: flash(f"مجلد المادة '{subject_id}' غير موجود على الخادم.", 'error') return redirect(url_for('dashboard')) student_type = user_data.get('student_type', 'علمي') subject_name = get_subject_name(student_type, subject_id) p_files = subject_data.get("_p_files", []) doors = [] for i, pf in enumerate(p_files, 1): doors.append({ "file": pf, "label": f"الباب {i}", "index": i }) return render_template('exam.html', username=username, subject_id=subject_id, subject_name=subject_name, doors=doors, doors_count=len(doors)) @exam_bp.route('/exam/generate', methods=['POST']) def exam_generate(): """Generate MCQ questions for the exam.""" if not is_session_valid(): return jsonify({"error": "غير مصرح", "code": "unauthorized"}), 401 data = request.json username = session['username'] subject_id = data.get('subject_id', '').strip() door_file = data.get('door_file', '').strip() question_count = int(data.get('question_count', 10)) difficulty = data.get('difficulty', 'medium').strip() if not subject_id: return jsonify({"error": "يرجى تحديد المادة"}), 400 if not door_file: return jsonify({"error": "يرجى اختيار الباب"}), 400 if question_count not in [5, 10, 15, 30]: return jsonify({"error": "عدد الأسئلة غير صحيح"}), 400 if difficulty not in ['easy', 'medium', 'hard']: return jsonify({"error": "مستوى الصعوبة غير صحيح"}), 400 has_access, access_error = validate_user_subject_access(username, subject_id) if not has_access: return jsonify({"error": access_error, "code": "no_access"}), 403 subject_data = subject_loader.load(subject_id) if not subject_data: return jsonify({"error": f"مجلد المادة '{subject_id}' غير موجود"}), 404 p_files = subject_data.get("_p_files", []) if door_file not in p_files: return jsonify({"error": f"الباب '{door_file}' غير موجود في هذه المادة"}), 400 print(f"📝 Generating exam: user={username}, subject={subject_id}, " f"door={door_file}, count={question_count}, difficulty={difficulty}") _create_exam_session(username, subject_id, door_file, question_count, difficulty) raw_json, error = generate_exam_questions( subject_id=subject_id, door_file=door_file, question_count=question_count, difficulty=difficulty, username=username ) if error: _clear_exam_session(username) return jsonify({"error": f"فشل توليد الأسئلة: {error}"}), 500 if not raw_json: _clear_exam_session(username) return jsonify({"error": "لم يتم الحصول على أسئلة من الخادم"}), 500 questions = parse_mcq_json(raw_json) if not questions: _clear_exam_session(username) return jsonify({"error": "فشل تحليل الأسئلة. حاول مرة أخرى."}), 500 questions = questions[:question_count] if len(questions) == 0: _clear_exam_session(username) return jsonify({"error": "لم يتم توليد أي أسئلة صحيحة. حاول مرة أخرى."}), 500 with _exam_lock: if username in _exam_sessions: _exam_sessions[username]["questions"] = questions _exam_sessions[username]["status"] = "active" _exam_sessions[username]["total"] = len(questions) print(f"✅ Exam ready: {len(questions)} questions for {username}") return jsonify({ "success": True, "questions": questions, "total": len(questions), "subject_id": subject_id, "door_file": door_file, "difficulty": difficulty, "question_count": len(questions) }) @exam_bp.route('/exam/submit', methods=['POST']) def exam_submit(): """Submit exam answers and get score.""" if not is_session_valid(): return jsonify({"error": "غير مصرح", "code": "unauthorized"}), 401 data = request.json username = session['username'] answers = data.get('answers', []) exam_session = _get_exam_session(username) if not exam_session: return jsonify({"error": "لا توجد جلسة امتحان نشطة"}), 400 if exam_session.get('status') != 'active': return jsonify({"error": "جلسة الامتحان غير نشطة"}), 400 questions = exam_session.get('questions', []) if not questions: return jsonify({"error": "لا توجد أسئلة في جلسة الامتحان"}), 400 score = 0 results = [] for i, question in enumerate(questions): correct_answer = question['correct_answer'] selected_answer = None for ans in answers: if ans.get('question_index') == i: selected_answer = ans.get('selected_answer', '') break is_correct = (selected_answer == correct_answer) if is_correct: score += 1 results.append({ "question_index": i, "question": question['question'], "correct_answer": correct_answer, "selected_answer": selected_answer, "is_correct": is_correct, "choices": question['choices'] }) total = len(questions) percentage = round((score / total) * 100, 1) if total > 0 else 0 if percentage >= 90: grade_msg = "ممتاز! 🌟" grade_color = "success" elif percentage >= 75: grade_msg = "جيد جداً! 👍" grade_color = "info" elif percentage >= 60: grade_msg = "جيد 👌" grade_color = "warning" elif percentage >= 50: grade_msg = "مقبول" grade_color = "warning" else: grade_msg = "يحتاج إلى مراجعة 📚" grade_color = "danger" with _exam_lock: if username in _exam_sessions: _exam_sessions[username]['status'] = 'done' _exam_sessions[username]['score'] = score _exam_sessions[username]['answers'] = answers _exam_sessions[username]['finished_at'] = datetime.now().isoformat() print(f"✅ Exam submitted: {username} scored {score}/{total} ({percentage}%)") return jsonify({ "success": True, "score": score, "total": total, "percentage": percentage, "grade_msg": grade_msg, "grade_color": grade_color, "results": results }) @exam_bp.route('/exam/clear', methods=['POST']) def exam_clear(): if not is_session_valid(): return jsonify({"error": "غير مصرح", "code": "unauthorized"}), 401 username = session['username'] _clear_exam_session(username) return jsonify({"success": True, "message": "تم مسح جلسة الامتحان"}) @exam_bp.route('/exam/status', methods=['GET']) def exam_status(): if not is_session_valid(): return jsonify({"error": "غير مصرح", "code": "unauthorized"}), 401 username = session['username'] exam_session = _get_exam_session(username) if not exam_session: return jsonify({"has_exam": False}) return jsonify({ "has_exam": True, "status": exam_session.get('status'), "subject_id": exam_session.get('subject_id'), "door_file": exam_session.get('door_file'), "question_count": exam_session.get('question_count'), "difficulty": exam_session.get('difficulty'), "total": exam_session.get('total', 0), "started_at": exam_session.get('started_at') }) ================================================================================ FILE 24: github_storage.py FULL PATH: main_project\github_storage.py ================================================================================ """ GitHub-based persistent storage. Saves/loads JSON database files to/from a GitHub repository. All data lives in RAM during runtime. Manual backup/restore via GitHub API. Usage: storage = GitHubStorage.get_instance() # Save all DBs to GitHub storage.push_all() # Load all DBs from GitHub storage.pull_all() # Save single DB storage.push_file('users.json', data_dict) # Load single DB data = storage.pull_file('users.json') """ import os import json import base64 import threading from datetime import datetime try: from http_pool import gpt_session as _http_session except ImportError: import requests as _http_session class GitHubStorage: """ GitHub API storage backend. Pushes/pulls JSON files to a private GitHub repo. """ _instance = None _init_lock = threading.Lock() # ─── Configuration (override via environment variables) ─── GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN", "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") GITHUB_REPO = os.environ.get("GITHUB_REPO", "serversclass-dev/db") GITHUB_BRANCH = os.environ.get("GITHUB_BRANCH", "main") GITHUB_DATA_DIR = os.environ.get("GITHUB_DATA_DIR", "db") # folder inside repo GITHUB_API_BASE = "https://api.github.com" # Files to sync DB_FILES = { 'users': 'users.json', 'telegram': 'users_db.json', 'cards': 'cards.json', 'chat_history': 'chat_history_db.json', } @classmethod def get_instance(cls): if cls._instance is None: with cls._init_lock: if cls._instance is None: cls._instance = cls() return cls._instance def __init__(self): self._lock = threading.Lock() self._file_shas = {} # Track SHA for each file (needed for GitHub updates) # Validate config - just check they're not empty and not the exact placeholder self._configured = bool( self.GITHUB_TOKEN and len(self.GITHUB_TOKEN) > 10 and self.GITHUB_REPO and "/" in self.GITHUB_REPO ) if self._configured: print(f" ✅ GitHubStorage initialized") print(f" Repo: {self.GITHUB_REPO}") print(f" Branch: {self.GITHUB_BRANCH}") print(f" Data dir: {self.GITHUB_DATA_DIR}/") else: print(f" ⚠️ GitHubStorage NOT configured - set GITHUB_TOKEN and GITHUB_REPO") print(f" Current token: {self.GITHUB_TOKEN[:10]}...") print(f" Current repo: {self.GITHUB_REPO}") def _headers(self): return { "Authorization": f"token {self.GITHUB_TOKEN}", "Accept": "application/vnd.github.v3+json", "Content-Type": "application/json", } def _file_url(self, filename): """Build GitHub API URL for a file.""" path = f"{self.GITHUB_DATA_DIR}/{filename}" if self.GITHUB_DATA_DIR else filename return ( f"{self.GITHUB_API_BASE}/repos/{self.GITHUB_REPO}" f"/contents/{path}?ref={self.GITHUB_BRANCH}" ) def _check_configured(self): if not self._configured: return False, "GitHubStorage not configured. Set GITHUB_TOKEN and GITHUB_REPO environment variables." return True, None # ─── PULL (GitHub → Memory) ─── def pull_file(self, filename): """ Download a single JSON file from GitHub. Returns dict or empty dict if not found (first time). NEVER fails on 404 - just returns empty. """ ok, err = self._check_configured() if not ok: print(f" ❌ {err}") return {} url = self._file_url(filename) try: response = _http_session.get(url, headers=self._headers(), timeout=30) if response.status_code == 404: # File doesn't exist on GitHub yet - totally normal for first run print(f" ℹ️ {filename} not on GitHub yet (will be created on first backup)") return {} if response.status_code == 403: remaining = response.headers.get('X-RateLimit-Remaining', '?') print(f" ❌ GitHub API rate limited. Remaining: {remaining}") return {} response.raise_for_status() data = response.json() # Save SHA for future updates self._file_shas[filename] = data.get('sha', '') # Decode content (base64) content_b64 = data.get('content', '') if not content_b64: print(f" ⚠️ {filename} is empty on GitHub") return {} content_bytes = base64.b64decode(content_b64) content_str = content_bytes.decode('utf-8') parsed = json.loads(content_str) if not isinstance(parsed, dict): print(f" ⚠️ {filename} on GitHub is not a dict, ignoring") return {} record_count = len(parsed) print(f" ✅ Pulled {filename} from GitHub ({record_count} records)") return parsed except json.JSONDecodeError as e: print(f" ❌ {filename} on GitHub has invalid JSON: {e}") return {} except Exception as e: print(f" ❌ Error pulling {filename} from GitHub: {e}") return {} def pull_all(self): """ Download ALL database files from GitHub. Returns (dict_of_stores, None) always. Missing files return empty dict (first run is fine). """ ok, err = self._check_configured() if not ok: return {s: {} for s in self.DB_FILES}, err print(f"\n 📥 Pulling all databases from GitHub...") results = {} for store_name, filename in self.DB_FILES.items(): data = self.pull_file(filename) results[store_name] = data if data else {} total_records = sum(len(v) for v in results.values()) if total_records == 0: print(f" ℹ️ All databases empty (first run - files will be created on first /backup_db)") else: print(f" ✅ Pull complete: {total_records} total records across {len(results)} stores") return results, None def push_file(self, filename, data_dict, message=None): """ Upload a single JSON file to GitHub. CREATES the file if it doesn't exist. UPDATES the file if it already exists. Handles the db/ directory automatically. """ ok, err = self._check_configured() if not ok: return False, err url = self._file_url(filename) if message is None: timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") record_count = len(data_dict) if isinstance(data_dict, dict) else 0 message = f"Backup {filename} - {record_count} records - {timestamp}" # Encode content to base64 content_str = json.dumps(data_dict, indent=2, ensure_ascii=False) content_b64 = base64.b64encode(content_str.encode('utf-8')).decode('utf-8') payload = { "message": message, "content": content_b64, "branch": self.GITHUB_BRANCH, } # Get current SHA if file exists (needed for update, not for create) current_sha = self._file_shas.get(filename) if not current_sha: # Try to fetch SHA from GitHub (file might exist but we don't have SHA cached) try: check_url = self._file_url(filename) check_resp = _http_session.get(check_url, headers=self._headers(), timeout=15) if check_resp.status_code == 200: current_sha = check_resp.json().get('sha', '') self._file_shas[filename] = current_sha elif check_resp.status_code == 404: # File doesn't exist yet - will be CREATED current_sha = None print(f" 📝 {filename} will be created on GitHub") except Exception: current_sha = None if current_sha: payload["sha"] = current_sha try: response = _http_session.put(url, headers=self._headers(), json=payload, timeout=30) if response.status_code == 403: remaining = response.headers.get('X-RateLimit-Remaining', '?') reset_time = response.headers.get('X-RateLimit-Reset', '?') err_msg = f"GitHub API rate limited. Remaining: {remaining}, Reset: {reset_time}" print(f" ❌ {err_msg}") return False, err_msg if response.status_code == 409: # SHA conflict - pull fresh SHA and retry print(f" ⚠️ SHA conflict for {filename}, retrying...") try: check_resp = _http_session.get( self._file_url(filename), headers=self._headers(), timeout=15 ) if check_resp.status_code == 200: fresh_sha = check_resp.json().get('sha', '') payload["sha"] = fresh_sha response = _http_session.put( url, headers=self._headers(), json=payload, timeout=30 ) elif check_resp.status_code == 404: # File was deleted? Create fresh if "sha" in payload: del payload["sha"] response = _http_session.put( url, headers=self._headers(), json=payload, timeout=30 ) except Exception as retry_err: return False, f"Retry failed: {retry_err}" if response.status_code == 422: # Unprocessable - might need to remove SHA for fresh create if "sha" in payload: del payload["sha"] response = _http_session.put( url, headers=self._headers(), json=payload, timeout=30 ) if response.status_code in [200, 201]: resp_data = response.json() new_sha = resp_data.get('content', {}).get('sha', '') if new_sha: self._file_shas[filename] = new_sha action = "Created" if response.status_code == 201 else "Updated" record_count = len(data_dict) if isinstance(data_dict, dict) else 0 print(f" ✅ {action} {filename} on GitHub ({record_count} records)") return True, None else: err_text = "" try: err_text = response.json().get('message', response.text[:200]) except Exception: err_text = response.text[:200] err_msg = f"GitHub API error {response.status_code}: {err_text}" print(f" ❌ {err_msg}") return False, err_msg except Exception as e: err_msg = f"Error pushing {filename} to GitHub: {e}" print(f" ❌ {err_msg}") return False, err_msg def push_all(self, data_dict_map=None): """ Upload ALL database files to GitHub. No local file operations. """ ok, err = self._check_configured() if not ok: return False, [err] # If no data provided, read from MemoryDB if data_dict_map is None: try: from memory_db import get_db db = get_db() data_dict_map = {} for store_name in db.STORES: data_dict_map[store_name] = db.read(store_name) except Exception as e: return False, [f"Failed to read from MemoryDB: {e}"] print(f"\n 📤 Pushing all databases to GitHub...") timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") errors = [] success_count = 0 for store_name, filename in self.DB_FILES.items(): data = data_dict_map.get(store_name, {}) record_count = len(data) if isinstance(data, dict) else 0 message = f"Backup {filename} - {record_count} records - {timestamp}" success, error = self.push_file(filename, data, message=message) if success: success_count += 1 else: errors.append(f"{filename}: {error}") total = len(self.DB_FILES) print(f" {'✅' if not errors else '⚠️'} Push complete: {success_count}/{total} files") if errors: for e in errors: print(f" ❌ {e}") return len(errors) == 0, errors # ─── STATUS ─── def get_status(self): """Get GitHub storage status and rate limit info.""" ok, err = self._check_configured() if not ok: return { "configured": False, "error": err } status = { "configured": True, "repo": self.GITHUB_REPO, "branch": self.GITHUB_BRANCH, "data_dir": self.GITHUB_DATA_DIR, "files": {}, "rate_limit": None, } # Check rate limit try: resp = _http_session.get( f"{self.GITHUB_API_BASE}/rate_limit", headers=self._headers(), timeout=10 ) if resp.status_code == 200: rl = resp.json().get('resources', {}).get('core', {}) status["rate_limit"] = { "limit": rl.get('limit', 0), "remaining": rl.get('remaining', 0), "reset_at": datetime.fromtimestamp( rl.get('reset', 0) ).isoformat() if rl.get('reset') else None, "used": rl.get('used', 0), } except Exception as e: status["rate_limit"] = {"error": str(e)} # Check each file for store_name, filename in self.DB_FILES.items(): status["files"][store_name] = { "filename": filename, "has_sha": filename in self._file_shas, } return status # ─── Singleton access ─── def get_github_storage(): return GitHubStorage.get_instance() ================================================================================ FILE 25: gunicorn_config.py FULL PATH: main_project\gunicorn_config.py ================================================================================ """ Gunicorn configuration for Docker deployment. SINGLE WORKER - because MemoryDB is in-process. Gevent handles 200+ concurrent connections in 1 worker. """ import os # SINGLE WORKER with gevent handles 200+ concurrent via greenlets # Multiple workers = separate memory = session problems worker_class = "gevent" workers = 1 worker_connections = 500 bind = f"0.0.0.0:{os.environ.get('PORT', 7860)}" timeout = 300 graceful_timeout = 120 keepalive = 65 max_requests = 0 # Don't restart worker (would lose memory) max_requests_jitter = 0 preload_app = True accesslog = "-" errorlog = "-" loglevel = os.environ.get("LOG_LEVEL", "info") limit_request_line = 0 limit_request_fields = 200 limit_request_field_size = 0 def on_starting(server): print("\n" + "═" * 60) print(" 🎓 CORVO AI - Starting Production Server") print(" ⚡ 1 Worker + 500 Gevent Connections = 500 concurrent users") print("═" * 60 + "\n") def on_exit(server): try: from memory_db import get_db db = get_db() db.shutdown() except Exception as e: print(f" ⚠️ Shutdown error: {e}") ================================================================================ FILE 26: http_pool.py FULL PATH: main_project\http_pool.py ================================================================================ """ HTTP Connection Pool Manager. Reuses TCP connections to external APIs instead of creating new ones per request. This is THE key optimization for 200 concurrent users hitting external APIs. Without this: each request opens a new TCP connection + TLS handshake = ~200ms overhead With this: connections are reused, overhead drops to ~5ms """ import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from config import ( GPT_URL, TTS_URL, TRANSCRIPT_URL, CLOUDINARY_URL, GPT_POOL_SIZE, TTS_POOL_SIZE, MISTRAL_POOL_SIZE, TRANSCRIPT_POOL_SIZE, CLOUDINARY_POOL_SIZE, GPT_TIMEOUT, TTS_TIMEOUT, MISTRAL_TIMEOUT, TRANSCRIPT_TIMEOUT ) def _create_session(pool_size, pool_block=True, retries=2): """ Create a requests.Session with connection pooling. pool_size: max concurrent connections to same host pool_block: if True, blocks when pool is full instead of creating new connection This prevents overwhelming the external API retries: auto-retry on connection errors """ session = requests.Session() retry_strategy = Retry( total=retries, backoff_factor=0.5, status_forcelist=[502, 503, 504], allowed_methods=["POST", "GET"], ) adapter = HTTPAdapter( pool_connections=pool_size, pool_maxsize=pool_size, pool_block=pool_block, max_retries=retry_strategy, ) session.mount("https://", adapter) session.mount("http://", adapter) return session # ─── Pre-built sessions for each external service ─── # GPT API pool - highest traffic gpt_session = _create_session(GPT_POOL_SIZE) # TTS pool - high traffic from board tts_session = _create_session(TTS_POOL_SIZE) # Mistral pool - streaming chat mistral_session = _create_session(MISTRAL_POOL_SIZE) # Transcript pool - voice uploads transcript_session = _create_session(TRANSCRIPT_POOL_SIZE) # Cloudinary pool - image uploads cloudinary_session = _create_session(CLOUDINARY_POOL_SIZE) # Icons8 pool - icon lookups (cached, low traffic) icons8_session = _create_session(5) # Claude/Board processor pool board_processor_session = _create_session(GPT_POOL_SIZE) print("✅ HTTP Connection Pools initialized:") print(f" GPT: {GPT_POOL_SIZE} connections") print(f" TTS: {TTS_POOL_SIZE} connections") print(f" Mistral: {MISTRAL_POOL_SIZE} connections") print(f" Transcript: {TRANSCRIPT_POOL_SIZE} connections") print(f" Cloudinary: {CLOUDINARY_POOL_SIZE} connections") ================================================================================ FILE 27: json_processor.py FULL PATH: main_project\json_processor.py ================================================================================ import json import re from typing import Optional, Dict, Any, List from http_pool import board_processor_session class BoardProcessor: def __init__(self, api_url: str = "https://corvo-ai-claude-4-6-opus.hf.space/chat"): self.api_url = api_url self.system_prompt = self._load_system_prompt() def _load_system_prompt(self) -> str: """Load system prompt from system.txt file""" try: with open('system.txt', 'r', encoding='utf-8') as f: return f.read() except FileNotFoundError: return """You are a converter that transforms XML board data into JSON format. Convert the given XML to JSON following these rules: 1. Extract all elements from the XML 2. Convert them to proper JSON structure 3. Wrap your JSON output in tags Example: Input: Hello Output: {"board": {"notes": [{"color": "yellow", "x": 100, "y": 200, "text": "Hello"}]}} Always wrap your final JSON output in tags.""" def _call_ai_api( self, user_input: Optional[str] = None, chat_history: Optional[List[Dict[str, Any]]] = None, temperature: float = 0.9, top_p: float = 0.95, max_tokens: Optional[int] = None ) -> str: """Call the AI API using pooled connection.""" payload = { "user_input": user_input, "chat_history": chat_history or [], "temperature": temperature, "top_p": top_p, "max_tokens": max_tokens } try: response = board_processor_session.post( self.api_url, json=payload, timeout=120 ) response.raise_for_status() result = response.json() return result.get("assistant_response", "") except Exception as e: raise Exception(f"API request failed: {str(e)}") def _extract_json_from_response(self, response: str) -> str: """Extract JSON from tags in the AI response.""" print("\n📋 DEBUG - Raw AI Response:") print("=" * 80) print(response[:500] + ("..." if len(response) > 500 else "")) print("=" * 80) patterns = [ r'(.*?)', r'```json\s*(.*?)\s*```', r'```\s*(.*?)\s*```', ] for pattern in patterns: json_match = re.search(pattern, response, re.DOTALL) if json_match: json_str = json_match.group(1).strip() try: json.loads(json_str) return json_str except json.JSONDecodeError: continue try: json_match = re.search(r'\{.*\}', response, re.DOTALL) if json_match: json_str = json_match.group(0).strip() json.loads(json_str) return json_str except json.JSONDecodeError: pass # Try array try: json_match = re.search(r'\[.*\]', response, re.DOTALL) if json_match: json_str = json_match.group(0).strip() json.loads(json_str) return json_str except json.JSONDecodeError: pass raise ValueError("No valid JSON found in AI response.") def convert_xml_to_json( self, xml_text: str, temperature: float = 0.9, top_p: float = 0.95, max_tokens: Optional[int] = None ) -> str: """Convert XML text to Board JSON text using pooled connection.""" chat_history = [ {"role": "system", "content": self.system_prompt} ] ai_response = self._call_ai_api( user_input=xml_text, chat_history=chat_history, temperature=temperature, top_p=top_p, max_tokens=max_tokens ) print(xml_text[:200] + ("..." if len(xml_text) > 200 else "")) json_text = self._extract_json_from_response(ai_response) return json_text ================================================================================ FILE 28: market\__init__.py FULL PATH: main_project\market\__init__.py ================================================================================ """ Market package. Subject purchasing, balance management, card redemption. """ ================================================================================ FILE 29: market\routes.py FULL PATH: main_project\market\routes.py ================================================================================ """ Market and balance routes. Updated to use MemoryDB with atomic operations for thread safety. """ import random from datetime import datetime from flask import Blueprint, request, jsonify, session, render_template, redirect, url_for, flash from auth.helpers import is_session_valid from memory_db import get_db from subjects.definitions import SUBJECTS market_bp = Blueprint('market', __name__) @market_bp.route('/market') def market(): if not is_session_valid(): session.clear() flash('انتهت الجلسة، يرجى تسجيل الدخول مرة أخرى.', 'error') return redirect(url_for('auth.login')) username = session['username'] db = get_db() user_data = db.read_key('users', username) or {} student_type = user_data.get('student_type', 'علمي') all_subjects = SUBJECTS.get(student_type, []) purchased_subjects = user_data.get('purchased_subjects', ['islamic']) available_subjects = [s for s in all_subjects if s['id'] not in purchased_subjects] return render_template('market.html', username=username, available_subjects=available_subjects, balance=user_data.get('balance', 0)) @market_bp.route('/buy_subject', methods=['POST']) def buy_subject(): if not is_session_valid(): return jsonify({'success': False, 'message': 'جلسة غير صالحة'}) username = session['username'] subject_id = request.json.get('subject_id') db = get_db() user_data = db.read_key('users', username) if not user_data: return jsonify({'success': False, 'message': 'المستخدم غير موجود'}) student_type = user_data.get('student_type', 'علمي') all_subjects = SUBJECTS.get(student_type, []) subject = next((s for s in all_subjects if s['id'] == subject_id), None) if not subject: return jsonify({'success': False, 'message': 'المادة غير موجودة'}) purchased_subjects = user_data.get('purchased_subjects', ['islamic']) if subject_id in purchased_subjects: return jsonify({'success': False, 'message': 'لديك هذه المادة بالفعل'}) balance = user_data.get('balance', 0) if balance < subject['price']: return jsonify({'success': False, 'message': 'رصيدك غير كافٍ'}) # Atomic purchase operation def do_purchase(u): if not u: return u current_balance = u.get('balance', 0) current_purchased = u.get('purchased_subjects', ['islamic']) # Double-check inside atomic op if subject_id in current_purchased: return u if current_balance < subject['price']: return u u['balance'] = current_balance - subject['price'] if subject_id not in u['purchased_subjects']: u['purchased_subjects'].append(subject_id) return u updated = db.update_key('users', username, do_purchase) if updated: return jsonify({ 'success': True, 'message': f'تم شراء {subject["name"]} بنجاح!', 'new_balance': updated.get('balance', 0) }) else: return jsonify({'success': False, 'message': 'فشل عملية الشراء'}) @market_bp.route('/add_balance') def add_balance(): if not is_session_valid(): session.clear() flash('انتهت الجلسة، يرجى تسجيل الدخول مرة أخرى.', 'error') return redirect(url_for('auth.login')) username = session['username'] db = get_db() user_data = db.read_key('users', username) or {} return render_template('add_balance.html', username=username, balance=user_data.get('balance', 0)) @market_bp.route('/redeem_card', methods=['POST']) def redeem_card(): if not is_session_valid(): return jsonify({'success': False, 'message': 'يجب تسجيل الدخول أولاً'}) code = request.json.get('code', '').strip() if not code: return jsonify({'success': False, 'message': 'يرجى إدخال كود الكرت'}) username = session['username'] db = get_db() # Atomic card redemption card = db.read_key('cards', code) if not card: return jsonify({'success': False, 'message': 'كود الكرت غير صحيح'}) if card.get('used'): return jsonify({'success': False, 'message': 'هذا الكرت مستخدم مسبقاً'}) # Mark card as used atomically card_updated = db.update_key('cards', code, lambda c: { **(c or {}), 'used': True, 'used_by': username, 'used_at': datetime.now().isoformat() }) if not card_updated or not card_updated.get('used'): return jsonify({'success': False, 'message': 'فشل استبدال الكرت'}) card_value = int(card['class']) message = f"تم استبدال كرت فئة {card_value} بنجاح!" # Update user balance and subjects atomically def apply_card(u): if not u: return u u['balance'] = u.get('balance', 0) + card_value current_purchased = set(u.get('purchased_subjects', [])) if card['class'] == '35': sci_subjects_ids = [s['id'] for s in SUBJECTS['علمي']] current_purchased.update(sci_subjects_ids) elif card['class'] == '45': lit_subjects_ids = [s['id'] for s in SUBJECTS['أدبي']] current_purchased.update(lit_subjects_ids) u['purchased_subjects'] = list(current_purchased) return u updated_user = db.update_key('users', username, apply_card) extra_msg = "" if card['class'] == '35': extra_msg = " وتم فتح جميع مواد القسم العلمي." elif card['class'] == '45': extra_msg = " وتم فتح جميع مواد القسم الأدبي." return jsonify({ 'success': True, 'message': message + extra_msg, 'new_balance': updated_user.get('balance', 0) if updated_user else 0, 'unlocked_count': len(updated_user.get('purchased_subjects', [])) if updated_user else 0 }) @market_bp.route('/admin/generate_cards', methods=['GET', 'POST']) def admin_generate_cards(): if request.method == 'POST': card_class = request.form.get('class', '10') quantity = int(request.form.get('quantity', 1)) db = get_db() generated_list = [] for _ in range(quantity): code = ''.join([str(random.randint(0, 9)) for _ in range(16)]) serial = f"{random.randint(1000,9999)}-{random.randint(1000,9999)}-{random.randint(1000,9999)}" card_data = { "code": code, "serial": serial, "class": card_class, "used": False, "used_by": None, "used_at": None, "created_at": datetime.now().isoformat() } db.write('cards', code, card_data) generated_list.append(card_data) return jsonify({'success': True, 'cards': generated_list}) return render_template('admin_generate.html') ================================================================================ FILE 30: media\__init__.py FULL PATH: main_project\media\__init__.py ================================================================================ """ Media package. Voice upload/transcription and image upload/analysis. """ ================================================================================ FILE 31: media\routes.py FULL PATH: main_project\media\routes.py ================================================================================ """ Media API routes. Voice recording upload, transcription, image upload and analysis. Updated to use MemoryDB. """ import os import tempfile from datetime import datetime from flask import Blueprint, request, jsonify, session from auth.helpers import is_session_valid from memory_db import get_db from subjects.access import validate_user_subject_access media_bp = Blueprint('media', __name__) _agent = None def init_media_agent(agent): global _agent _agent = agent @media_bp.route('/voice-upload', methods=['POST']) def voice_upload(): if not is_session_valid(): return jsonify({"error": "غير مصرح", "code": "unauthorized"}), 401 username = session['username'] db = get_db() user = db.read_key('users', username) if not user or not user.get('verified', False): return jsonify({"error": "غير مصرح"}), 403 if 'audio' not in request.files: return jsonify({"error": "لم يتم إرسال ملف صوتي"}), 400 audio_file = request.files['audio'] suffix = os.path.splitext(audio_file.filename)[1] if audio_file.filename else '.webm' if not suffix: suffix = '.webm' timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f') unique_filename = f"{username}_{timestamp}{suffix}" with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: tmp_path = tmp.name audio_file.save(tmp_path) try: file_url, error = _agent.upload_audio(tmp_path, unique_filename) finally: try: os.remove(tmp_path) except Exception: pass if error: return jsonify({"error": f"فشل رفع الملف الصوتي: {error}"}), 500 return jsonify({"file_url": file_url}) @media_bp.route('/voice-transcribe', methods=['POST']) def voice_transcribe(): if not is_session_valid(): return jsonify({"error": "غير مصرح", "code": "unauthorized"}), 401 data = request.json username = session['username'] file_url = data.get('file_url', '').strip() db = get_db() user = db.read_key('users', username) if not user or not user.get('verified', False): return jsonify({"error": "غير مصرح"}), 403 if not file_url: return jsonify({"error": "لم يتم تحديد رابط الملف الصوتي"}), 400 transcription, error = _agent.transcribe_audio(file_url) if error: return jsonify({"error": f"فشل تحويل الصوت إلى نص: {error}"}), 500 if not transcription or not transcription.strip(): return jsonify({"error": "لم يتم التعرف على أي كلام في التسجيل"}), 400 return jsonify({"transcription": transcription.strip()}) @media_bp.route('/image-upload-analyze', methods=['POST']) def image_upload_analyze(): if not is_session_valid(): return jsonify({"error": "غير مصرح", "code": "unauthorized"}), 401 username = session['username'] db = get_db() user = db.read_key('users', username) if not user or not user.get('verified', False): return jsonify({"error": "غير مصرح"}), 403 session_id = username current_subject = _agent.chat_history.get_subject(session_id) if not current_subject: return jsonify({"error": "يرجى اختيار المادة أولاً", "need_subject": True}), 400 has_access, access_error = validate_user_subject_access(username, current_subject) if not has_access: return jsonify({"error": access_error, "code": "no_access"}), 403 if 'image' not in request.files: return jsonify({"error": "لم يتم إرسال صورة"}), 400 image_file = request.files['image'] if image_file.filename == '': return jsonify({"error": "لم يتم اختيار صورة"}), 400 suffix = os.path.splitext(image_file.filename)[1] or '.jpg' with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: tmp_path = tmp.name image_file.save(tmp_path) try: print(f"📤 Uploading image to Cloudinary...") image_url, upload_error = _agent.upload_image_to_cloudinary(tmp_path) finally: try: os.remove(tmp_path) except Exception: pass if upload_error: return jsonify({"error": f"فشل رفع الصورة: {upload_error}"}), 500 if not image_url: return jsonify({"error": "فشل الحصول على رابط الصورة"}), 500 print(f"✅ Image uploaded: {image_url}") print(f"📝 Analyzing image with GPT...") analysis = _agent.analyze_image(image_url, session_id) if not analysis: return jsonify({"error": "فشل تحليل الصورة. حاول مرة أخرى."}), 500 analysis = analysis.strip() if '' in analysis.lower(): return jsonify({ "supported": False, "message": "⚠️ هذه الصورة لا تحتوي على محتوى دراسي. يرجى إرسال صور تتعلق بالدراسة فقط." }) print(f"✅ Image analyzed successfully, extracted {len(analysis)} chars") return jsonify({ "supported": True, "image_url": image_url, "extracted_text": analysis }) ================================================================================ FILE 32: memory_db.py FULL PATH: main_project\memory_db.py ================================================================================ """ In-Memory Database with GitHub-only backup. All data lives in RAM. NO local file reads or writes. On startup: loads from GitHub. Backup: manual push to GitHub via API endpoints. """ import time import threading from datetime import datetime class RWLock: def __init__(self): self._read_ready = threading.Condition(threading.Lock()) self._readers = 0 def read_acquire(self): self._read_ready.acquire() self._readers += 1 self._read_ready.release() def read_release(self): self._read_ready.acquire() self._readers -= 1 if self._readers == 0: self._read_ready.notify_all() self._read_ready.release() def write_acquire(self): self._read_ready.acquire() while self._readers > 0: self._read_ready.wait() def write_release(self): self._read_ready.release() class MemoryDB: """ Pure in-memory database. GitHub is the ONLY persistence layer. Zero local file I/O. """ _instance = None _init_lock = threading.Lock() # Store names (no file paths needed) STORES = ['users', 'telegram', 'cards', 'chat_history'] # Map store names to GitHub filenames STORE_FILES = { 'users': 'users.json', 'telegram': 'users_db.json', 'cards': 'cards.json', 'chat_history': 'chat_history_db.json', } @classmethod def get_instance(cls): if cls._instance is None: with cls._init_lock: if cls._instance is None: cls._instance = cls() return cls._instance def __init__(self): self._data = {} self._locks = {} self._backup_lock = threading.Lock() self._last_backup = time.time() for store_name in self.STORES: self._locks[store_name] = RWLock() self._data[store_name] = {} # Load from GitHub on startup self._initial_load() print(f" ✅ MemoryDB initialized (GitHub-only, zero local files)") def _initial_load(self): """ Load all data from GitHub on startup. If GitHub is not configured or empty: start with empty databases. This is SAFE for first run - no files needed anywhere. """ try: from github_storage import get_github_storage gh = get_github_storage() if not gh._configured: print(f"\n ⚠️ GitHub not configured - starting with EMPTY databases") print(f" Set GITHUB_TOKEN and GITHUB_REPO environment variables") print(f" First /backup_db call will CREATE files on GitHub automatically") return print(f"\n 📥 Loading databases from GitHub...") results, error = gh.pull_all() # pull_all ALWAYS returns results dict (never None) if results: loaded_any = False for store_name, data in results.items(): if data and isinstance(data, dict) and len(data) > 0: self._data[store_name] = data loaded_any = True if not loaded_any: print(f"\n ℹ️ All databases empty on GitHub (first run)") print(f" Users will be created as they sign up") print(f" Call /backup_db to push data to GitHub anytime") total = sum(len(self._data[s]) for s in self.STORES) if total > 0: print(f"\n 📊 Total records loaded: {total}") else: print(f"\n 📊 Starting fresh with 0 records") except ImportError: print(f" ❌ github_storage module not found") print(f" ⚠️ Starting with empty databases") except Exception as e: print(f" ❌ GitHub load error: {e}") print(f" ⚠️ Starting with empty databases (app will work, just no saved data)") # ─── READ OPERATIONS ─── def read(self, store_name): lock = self._locks.get(store_name) if not lock: return {} lock.read_acquire() try: return dict(self._data.get(store_name, {})) finally: lock.read_release() def read_key(self, store_name, key, default=None): lock = self._locks.get(store_name) if not lock: return default lock.read_acquire() try: return self._data.get(store_name, {}).get(key, default) finally: lock.read_release() def read_keys(self, store_name, keys): lock = self._locks.get(store_name) if not lock: return {} lock.read_acquire() try: store = self._data.get(store_name, {}) return {k: store[k] for k in keys if k in store} finally: lock.read_release() def has_key(self, store_name, key): lock = self._locks.get(store_name) if not lock: return False lock.read_acquire() try: return key in self._data.get(store_name, {}) finally: lock.read_release() def count(self, store_name): lock = self._locks.get(store_name) if not lock: return 0 lock.read_acquire() try: return len(self._data.get(store_name, {})) finally: lock.read_release() # ─── WRITE OPERATIONS ─── def write(self, store_name, key, value): lock = self._locks.get(store_name) if not lock: return lock.write_acquire() try: if store_name not in self._data: self._data[store_name] = {} self._data[store_name][key] = value finally: lock.write_release() def write_many(self, store_name, updates): lock = self._locks.get(store_name) if not lock: return lock.write_acquire() try: if store_name not in self._data: self._data[store_name] = {} self._data[store_name].update(updates) finally: lock.write_release() def write_full(self, store_name, data): lock = self._locks.get(store_name) if not lock: return lock.write_acquire() try: self._data[store_name] = data finally: lock.write_release() def update_key(self, store_name, key, update_fn): lock = self._locks.get(store_name) if not lock: return None lock.write_acquire() try: current = self._data.get(store_name, {}).get(key, None) new_value = update_fn(current) if new_value is not None: if store_name not in self._data: self._data[store_name] = {} self._data[store_name][key] = new_value return new_value finally: lock.write_release() def delete(self, store_name, key): lock = self._locks.get(store_name) if not lock: return False lock.write_acquire() try: if key in self._data.get(store_name, {}): del self._data[store_name][key] return True return False finally: lock.write_release() def delete_many(self, store_name, keys): lock = self._locks.get(store_name) if not lock: return 0 lock.write_acquire() try: count = 0 store = self._data.get(store_name, {}) for key in keys: if key in store: del store[key] count += 1 return count finally: lock.write_release() # ─── QUERY OPERATIONS ─── def find(self, store_name, predicate): lock = self._locks.get(store_name) if not lock: return [] lock.read_acquire() try: return [(k, v) for k, v in self._data.get(store_name, {}).items() if predicate(k, v)] finally: lock.read_release() def find_keys_by_prefix(self, store_name, prefix): lock = self._locks.get(store_name) if not lock: return {} lock.read_acquire() try: store = self._data.get(store_name, {}) return {k: v for k, v in store.items() if k.startswith(prefix)} finally: lock.read_release() # ─── GITHUB BACKUP OPERATIONS ─── def push_to_github(self): """Push all in-memory data to GitHub.""" try: from github_storage import get_github_storage gh = get_github_storage() data_map = {} for store_name in self.STORES: lock = self._locks[store_name] lock.read_acquire() try: data_map[store_name] = dict(self._data.get(store_name, {})) finally: lock.read_release() success, errors = gh.push_all(data_map) if success: self._last_backup = time.time() return success, errors except Exception as e: err = f"GitHub push error: {e}" print(f" ❌ {err}") return False, [err] def pull_from_github(self): """Pull all data from GitHub and replace in-memory data.""" try: from github_storage import get_github_storage gh = get_github_storage() results, error = gh.pull_all() if error: return False, error if not results: return False, "No data returned from GitHub" for store_name, data in results.items(): if data and isinstance(data, dict): lock = self._locks.get(store_name) if lock: lock.write_acquire() try: self._data[store_name] = data finally: lock.write_release() total = sum(len(v) for v in results.values()) print(f" ✅ Pulled {total} records from GitHub into memory") return True, None except Exception as e: err = f"GitHub pull error: {e}" print(f" ❌ {err}") return False, err def push_single_to_github(self, store_name): """Push a single store to GitHub.""" if store_name not in self.STORE_FILES: return False, f"Unknown store: {store_name}" try: from github_storage import get_github_storage gh = get_github_storage() filename = self.STORE_FILES[store_name] lock = self._locks[store_name] lock.read_acquire() try: data = dict(self._data.get(store_name, {})) finally: lock.read_release() success, error = gh.push_file(filename, data) return success, error except Exception as e: return False, f"Error pushing {store_name}: {e}" # ─── STATS ─── def get_stats(self): stats = {} for store_name in self.STORES: lock = self._locks[store_name] lock.read_acquire() try: stats[store_name] = { "records": len(self._data.get(store_name, {})), "github_file": self.STORE_FILES.get(store_name, ""), } finally: lock.read_release() stats["_meta"] = { "last_backup": datetime.fromtimestamp(self._last_backup).isoformat(), "mode": "github_only", "local_files": False, } return stats def shutdown(self): """Graceful shutdown - just log, no local save.""" print(" 🛑 MemoryDB shutting down...") total = sum(len(self._data.get(s, {})) for s in self.STORES) print(f" ⚠️ {total} records in memory. Push to GitHub if needed: /backup_db") print(" ✅ MemoryDB shutdown complete") def get_db(): return MemoryDB.get_instance() ================================================================================ FILE 33: subjects\__init__.py FULL PATH: main_project\subjects\__init__.py ================================================================================ """ Subjects package. Handles subject definitions, access control, and file loading. """ ================================================================================ FILE 34: subjects\access.py FULL PATH: main_project\subjects\access.py ================================================================================ """ Subject access validation. Updated to use MemoryDB for fast lookups. """ from subjects.definitions import SUBJECTS from memory_db import get_db def get_user_accessible_subjects(username): """Get all subjects with access status for a user.""" db = get_db() user = db.read_key('users', username) if not user: return None, "المستخدم غير موجود" if not user.get('verified', False): return None, "الحساب غير مفعّل" student_type = user.get('student_type', '') purchased_subjects = user.get('purchased_subjects', []) if student_type not in SUBJECTS: return None, f"نوع الطالب '{student_type}' غير معروف" result = [] for subject in SUBJECTS[student_type]: is_free = subject.get('free', False) or subject.get('price', 0) == 0 is_purchased = subject['id'] in purchased_subjects accessible = is_free or is_purchased result.append({ 'id': subject['id'], 'name': subject['name'], 'price': subject['price'], 'icon': subject['icon'], 'free': is_free, 'accessible': accessible }) return result, None def validate_user_subject_access(username, subject_id): """Check if user has access to a specific subject.""" db = get_db() user = db.read_key('users', username) if not user: return False, "المستخدم غير موجود" if not user.get('verified', False): return False, "الحساب غير مفعّل" student_type = user.get('student_type', '') purchased_subjects = user.get('purchased_subjects', []) if student_type not in SUBJECTS: return False, "نوع الطالب غير معروف" track_subjects = SUBJECTS[student_type] subject_in_track = next((s for s in track_subjects if s['id'] == subject_id), None) if not subject_in_track: return False, f"المادة '{subject_id}' غير متاحة لشعبتك" is_free = subject_in_track.get('free', False) or subject_in_track.get('price', 0) == 0 is_purchased = subject_id in purchased_subjects if is_free or is_purchased: return True, None return False, f"يجب شراء مادة '{subject_in_track['name']}' للوصول إليها" ================================================================================ FILE 35: subjects\definitions.py FULL PATH: main_project\subjects\definitions.py ================================================================================ """ Subject definitions for all tracks. Single source of truth for available subjects. """ SUBJECTS = { 'علمي': [ {'id': 'islamic', 'name': 'التربية الاسلامية', 'price': 0, 'icon': 'book-quran', 'free': True}, {'id': 'english', 'name': 'اللغة الانجليزية', 'price': 5, 'icon': 'language'}, {'id': 'math', 'name': 'الرياضيات', 'price': 5, 'icon': 'calculator'}, {'id': 'it', 'name': 'تقنية المعلومات', 'price': 5, 'icon': 'laptop'}, {'id': 'statistics', 'name': 'اساس الاحصاء', 'price': 5, 'icon': 'chart-bar'}, {'id': 'chemistry', 'name': 'الكيمياء', 'price': 5, 'icon': 'flask'}, {'id': 'physics_electric', 'name': 'الفيزياء الكهربائية', 'price': 5, 'icon': 'bolt'}, {'id': 'physics_mechanic', 'name': 'الفيزياء الميكانيكية', 'price': 5, 'icon': 'gear'}, {'id': 'biology', 'name': 'الاحياء', 'price': 5, 'icon': 'dna'}, {'id': 'literary_studies', 'name': 'الدراسات الادبية', 'price': 5, 'icon': 'book'}, {'id': 'linguistic_studies','name': 'الدراسات اللغوية', 'price': 5, 'icon': 'pen'} ], 'أدبي': [ {'id': 'islamic', 'name': 'التربية الاسلامية', 'price': 0, 'icon': 'book-quran', 'free': True}, {'id': 'english', 'name': 'اللغة الانجليزية', 'price': 5, 'icon': 'language'}, {'id': 'statisticss', 'name': 'الاحصاء', 'price': 5, 'icon': 'chart-bar'}, {'id': 'literature', 'name': 'الادب والنصوص', 'price': 5, 'icon': 'book-open'}, {'id': 'philosophy', 'name': 'الفلسفة', 'price': 5, 'icon': 'brain'}, {'id': 'reading_writing', 'name': 'المطالعة والإنشاء', 'price': 5, 'icon': 'pen-fancy'}, {'id': 'grammar', 'name': 'النحو والصرف والاملاء', 'price': 5, 'icon': 'spell-check'}, {'id': 'criticism', 'name': 'النقد الادبي', 'price': 5, 'icon': 'comments'}, {'id': 'history', 'name': 'تاريخ الوطن العربي ', 'price': 5, 'icon': 'landmark'}, {'id': 'itt', 'name': 'تقنية المعلومات', 'price': 5, 'icon': 'laptop'}, {'id': 'geography', 'name': 'جغرافيا البيئة', 'price': 5, 'icon': 'globe'}, {'id': 'psychology', 'name': 'علم النفس', 'price': 5, 'icon': 'head-side-virus'}, {'id': 'sociology','name': 'علم الاجتماع', 'price': 5, 'icon': 'flag'} ] } # ─── Board-enabled subjects ─── # Subjects that have the interactive board feature. # Each must have a folder with main.txt, structure.txt, p*.txt, and pages_base_url.txt BOARD_ENABLED_SUBJECTS = [ 'physics_mechanic', 'physics_electric', 'chemistry', 'math', 'biology', 'statistics', 'data_structure', ] def get_subject_name(student_type, subject_id): """Get display name for a subject.""" track = SUBJECTS.get(student_type, []) info = next((s for s in track if s['id'] == subject_id), None) return info['name'] if info else subject_id def is_board_enabled(subject_id): """Check if a subject supports the interactive board.""" return subject_id in BOARD_ENABLED_SUBJECTS ================================================================================ FILE 36: subjects\loader.py FULL PATH: main_project\subjects\loader.py ================================================================================ """ Subject file loader. Loads knowledge files from subject folders. Handles caching. """ import os import threading class SubjectLoader: """Loads and caches subject data from folders.""" def __init__(self): self._cache = {} self._lock = threading.Lock() print(" ✔ SubjectLoader initialized") def load(self, subject_id): """ Load a subject's files from its folder. Returns dict with all file contents, or None if folder missing. Cached after first load. """ with self._lock: if subject_id in self._cache: return self._cache[subject_id] folder_path = subject_id if not os.path.exists(folder_path): print(f" ❌ Subject folder not found: {folder_path}") return None subject_data = {} # Load main.txt main_path = os.path.join(folder_path, "main.txt") try: with open(main_path, 'r', encoding='utf-8') as f: subject_data["main.txt"] = f.read() print(f" ✔ {subject_id}/main.txt ({len(subject_data['main.txt'])} chars)") except FileNotFoundError: subject_data["main.txt"] = "" print(f" ⚠ {subject_id}/main.txt not found") # Load structure.txt structure_path = os.path.join(folder_path, "structure.txt") try: with open(structure_path, 'r', encoding='utf-8') as f: subject_data["structure.txt"] = f.read() print(f" ✔ {subject_id}/structure.txt ({len(subject_data['structure.txt'])} chars)") except FileNotFoundError: subject_data["structure.txt"] = "" print(f" ⚠ {subject_id}/structure.txt not found") # Load pages_base_url.txt (for board page images) pages_url_path = os.path.join(folder_path, "pages_base_url.txt") try: with open(pages_url_path, 'r', encoding='utf-8') as f: subject_data["pages_base_url"] = f.read().strip() print(f" ✔ {subject_id}/pages_base_url.txt loaded") except FileNotFoundError: subject_data["pages_base_url"] = "" print(f" ⚠ {subject_id}/pages_base_url.txt not found") # Load p*.txt files p_files = [] i = 1 while True: p_filename = f"p{i}.txt" p_path = os.path.join(folder_path, p_filename) if os.path.exists(p_path): try: with open(p_path, 'r', encoding='utf-8') as f: content = f.read() subject_data[p_filename] = content p_files.append(p_filename) print(f" ✔ {subject_id}/{p_filename} ({len(content)} chars)") except Exception as e: print(f" ⚠ Error loading {p_filename}: {e}") i += 1 else: break subject_data["_p_files"] = p_files print(f" 📊 Subject '{subject_id}' loaded: main + structure + {len(p_files)} chapters") with self._lock: self._cache[subject_id] = subject_data return subject_data def reload(self, subject_id): """Force reload a subject by clearing cache first.""" with self._lock: if subject_id in self._cache: del self._cache[subject_id] return self.load(subject_id) def get_p_files(self, subject_id): """Get list of chapter files for a subject.""" data = self.load(subject_id) if not data: return [] return data.get("_p_files", []) def get_pages_base_url(self, subject_id): """Get the base URL for book page images.""" data = self.load(subject_id) if not data: return "" return data.get("pages_base_url", "") # Singleton instance subject_loader = SubjectLoader() ================================================================================ FILE 37: websocket\__init__.py FULL PATH: main_project\websocket\__init__.py ================================================================================ """ WebSocket package. Real-time session management via SocketIO. """ ================================================================================ FILE 38: websocket\events.py FULL PATH: main_project\websocket\events.py ================================================================================ """ SocketIO event handlers. Updated to use MemoryDB for fast session checks. """ from flask import request from auth.helpers import active_connections from memory_db import get_db def register_socketio_events(socketio): """Register all SocketIO event handlers.""" @socketio.on('connect') def handle_connect(): pass @socketio.on('disconnect') def handle_disconnect(): for username in list(active_connections.keys()): if request.sid in active_connections[username]: active_connections[username].remove(request.sid) if not active_connections[username]: del active_connections[username] @socketio.on('register_session') def handle_register_session(data): username = data.get('username') session_id = data.get('session_id') if username and session_id: if username not in active_connections: active_connections[username] = [] if request.sid not in active_connections[username]: active_connections[username].append(request.sid) @socketio.on('check_session') def handle_check_session(data): username = data.get('username') session_id = data.get('session_id') db = get_db() user_data = db.read_key('users', username) if user_data: if user_data.get('session_id') != session_id: socketio.emit('force_logout', { 'message': 'تم تسجيل الدخول من جهاز آخر.' }, room=request.sid) else: socketio.emit('session_valid', { 'status': 'valid' }, room=request.sid)