""" Board AI Engine. Handles the interactive whiteboard: XML generation, parsing, icon/page resolution, TTS, and board state management. Now supports MULTIPLE SUBJECTS dynamically. Each subject loads its own folder, pages_base_url, etc. """ import re import json import requests from config import GPT_URL, MAX_CHAT_HISTORY from board.tts import TTSEngine from board.icons import IconResolver from board.pages import resolve_page_tags from subjects.loader import subject_loader from json_processor import BoardProcessor class BoardEngine: """ Board AI Engine for a specific user session. Supports any subject dynamically. """ def __init__(self): self.gpt_url = GPT_URL self.board_processor = BoardProcessor() self.tts_engine = TTSEngine() self.icon_resolver = IconResolver() # Per-user state: { username: { subject_id, conversation_history, last_sequence } } self._user_sessions = {} print("✅ BoardEngine initialized (multi-subject)") # ─── User session management ─── def _get_user_session(self, username): """Get or create a user's board session.""" if username not in self._user_sessions: self._user_sessions[username] = { "subject_id": None, "conversation_history": [], "last_sequence": [], } return self._user_sessions[username] def set_subject(self, username, subject_id): """Set the active subject for a user's board session.""" us = self._get_user_session(username) # If switching subjects, clear history if us["subject_id"] and us["subject_id"] != subject_id: us["conversation_history"] = [] us["last_sequence"] = [] print(f" 🔄 Board subject switched: {us['subject_id']} → {subject_id} for {username}") us["subject_id"] = subject_id print(f" 📋 Board subject set: {subject_id} for {username}") def get_subject(self, username): """Get the active subject for a user.""" us = self._get_user_session(username) return us.get("subject_id") # ─── GPT call ─── def _call_gpt5(self, user_message, system_prompt, temperature=0.7, max_tokens=4000): payload = { "user_input": user_message, "chat_history": [ {"role": "system", "content": system_prompt} ], "temperature": temperature, "top_p": 0.95, "max_completion_tokens": max_tokens } try: response = requests.post(self.gpt_url, json=payload, timeout=120) response.raise_for_status() return response.json().get("assistant_response", "") except requests.exceptions.Timeout: print(" ❌ GPT timeout") return None except requests.exceptions.ConnectionError: print(" ❌ GPT connection error") return None except Exception as e: print(f" ❌ GPT error: {e}") return None # ─── Chat history formatting ─── def _format_chat_history(self, username): us = self._get_user_session(username) history = us.get("conversation_history", []) if not history: return "" recent = history[-10:] parts = [] for msg in recent: if msg["role"] == "user": parts.append(f"الطالب: {msg['content']}") elif msg["role"] == "assistant": parts.append(f"المدرس: {msg['content']}") return ( "\n\n══ سجل المحادثة السابقة ══\n" + "\n".join(parts) + "\n══ نهاية السجل ══" ) # ─── Step 1: Route message ─── def _route_message(self, user_message, username): us = self._get_user_session(username) subject_id = us["subject_id"] if not subject_id: return "main.txt" subject_data = subject_loader.load(subject_id) if not subject_data: return "main.txt" structure = subject_data.get("structure.txt", "") p_files = subject_data.get("_p_files", []) chat_history_text = self._format_chat_history(username) p_files_desc = "\n".join([f"- {f}: الفصل {i+1}" for i, f in enumerate(p_files)]) routing_prompt = f"""أنت نظام توجيه ذكي لمساعد تعليمي. مهمتك: تحليل رسالة الطالب واختيار الملف المناسب للرد. الملفات المتاحة: - main.txt: للتحيات، الأسئلة العامة، أي شيء لا يتعلق بفصل محدد {p_files_desc} فهرس الكتاب (للمساعدة في التوجيه): {structure} {chat_history_text} تعليمات: 1. إذا كانت الرسالة تحية أو سؤال عام → main.txt 2. إذا كانت تسأل عن موضوع في فصل محدد → الملف المناسب 3. استخدم الفهرس لتحديد الفصل الصحيح 4. مهم جداً: إذا قال الطالب "اشرح أكثر" أو "وضح" أو أي طلب متابعة، ارجع لسجل المحادثة لتعرف الموضوع الحالي واختر نفس الملف 5. أجب فقط باسم الملف (مثال: p1.txt أو main.txt) بدون أي كلام إضافي رسالة الطالب: {user_message} الملف المناسب:""" chosen = self._call_gpt5( user_message, routing_prompt, temperature=0.2, max_tokens=50 ) if not chosen: return "main.txt" chosen = chosen.strip().lower() valid_files = ["main.txt"] + p_files for v in valid_files: if v in chosen: return v return "main.txt" # ─── Step 2: Generate XML response ─── def _generate_xml_response(self, user_message, chosen_file, username): us = self._get_user_session(username) subject_id = us["subject_id"] if not subject_id: return None subject_data = subject_loader.load(subject_id) if not subject_data: return None file_content = subject_data.get(chosen_file, "") chat_history_text = self._format_chat_history(username) system_prompt = f"""انتي مدرسة خبيرة ومحترفة في هذه المادة. تشرح على سبورة تفاعلية رقمية. ═══ صيغة الرد ═══ يجب أن تردي بصيغة XML خاصة تحتوي على: 1. عناصر السبورة - ما سيُرسم/يُضاف على السبورة أولاً 2. نص الكلام - النص الذي سيُقرأ بصوت عالٍ للطالب بعد رسم العناصر (عربي طبيعي) ═══ عناصر السبورة المتاحة (داخل ) ═══ • محتوى الملاحظةنص مباشر على السبورة بدون خلفية الأنواع: circle, triangle, star, arrow-right, arrow-left, arrow-up, arrow-down, rectangle, diamond, hexagon, square, oval, arrow-double-h, checkmark, cross, heart, cloud, lightning, speech, process, decision • كلمة_بحث_بالإنجليزية سيتم البحث عن أيقونة مرسومة يدوياً (مثل: ball, car, force, spring, weight, rope, pulley) • رقم_الصفحة لعرض صفحة محددة من الكتاب كصورة على السبورة مثال: 12 لعرض الصفحة 12 من الكتاب استخدمها عندما تحتاج تعرض للطالب صفحة معينة من الكتاب ═══ قواعد مهمة جداً ═══ 1. اشرح خطوة بخطوة: ابدأ بـ ثم ثم ثم وهكذا 2. اجعل الشرح متدرجاً كأنك تشرح على سبورة حقيقية أمام الطلاب 3. = ما يظهر على السبورة أولاً (ملاحظات، نصوص، أشكال، صور، صفحات الكتاب) 4. = الكلام المسموع بعد رسم العناصر (طبيعي، ودود، واضح، يشرح ما تم رسمه) 5. لا تضع كل شيء دفعة واحدة - اجعله تسلسلياً 7. فقط بكلمات إنجليزية بسيطة ومعبرة 8. السبورة تعمل بنظام الإضافة - العناصر السابقة تبقى 9. استخدم للعناوين والمعادلات المهمة (بدون خلفية) 10. استخدم للتوضيحات والملاحظات (مع خلفية ملونة) 11. لا تستخدم أكثر من 3-4 عناصر في كل 12. اجعل النص في طبيعياً كأنك تتحدث مع طالب ويشرح ما تم رسمه على السبورة 13. ارسم أولاً ثم تكلم - هذا مهم جداً! 14. راجع سجل المحادثة السابقة لتعرف ما تم شرحه وتكمل من حيث توقفت - لا تكرر ما قلته سابقاً 15. استخدم عندما تريد عرض صفحة من الكتاب - مثلاً إذا الطالب سأل عن تمرين أو شكل في صفحة معينة when user talk about something not about the subject or something funny etc... you can actually answer without the board just VOICE and be funny smart perfect girl also: when you explain something dont make all your explain on the NOTE make the note for important point use the TEXT direct on the board and the ICONS/SHAPES ═══ محتوى المادة ═══ {file_content} {chat_history_text} ═══ الآن أجب على سؤال الطالب ═══ رسالة الطالب: {user_message}""" response = self._call_gpt5( user_message, system_prompt, temperature=0.8, max_tokens=4000 ) return response # ─── Resolve tags ─── def _resolve_svg_tags(self, xml_response): if not xml_response: return xml_response return self.icon_resolver.resolve_all_in_xml(xml_response) def _resolve_page_tags(self, xml_response, username): if not xml_response: return xml_response us = self._get_user_session(username) subject_id = us.get("subject_id") if not subject_id: return xml_response base_url = subject_loader.get_pages_base_url(subject_id) if not base_url: print(f" ⚠️ No pages_base_url for subject {subject_id}") return xml_response return resolve_page_tags(xml_response, base_url) # ─── Build sequence from XML ─── def _build_sequence_from_xml(self, xml_response, frontend_board_state): sequence = [] if not xml_response: return sequence, frontend_board_state pattern = r'<(voice|board)>(.*?)' matches = list(re.finditer(pattern, xml_response, re.DOTALL)) if not matches: cleaned = re.sub(r'<[^>]+>', '', xml_response).strip() if cleaned: sequence.append({ "type": "voice", "text": cleaned, "audio_url": None }) return sequence, frontend_board_state current_board_state = list(frontend_board_state) for match in matches: tag_type = match.group(1) content = match.group(2).strip() if tag_type == "voice": cleaned = re.sub(r'<[^>]+>', '', content).strip() if cleaned: sequence.append({ "type": "voice", "text": cleaned, "audio_url": None }) elif tag_type == "board": existing_json_str = json.dumps( current_board_state, ensure_ascii=False, indent=2 ) processor_input = ( f"BOARD NOW (make sure no X Y error):\n" f"{existing_json_str}\n\n" f"new board need to add :\n" f"{content}" ) print(f" 🔧 Sending to json_processor...") print(f" Current board items: {len(current_board_state)}") try: json_text = self.board_processor.convert_xml_to_json( processor_input ) if json_text: new_items = json.loads(json_text) if isinstance(new_items, list) and new_items: added_items = [] existing_ids = set() for item in current_board_state: item_key = json.dumps( item, sort_keys=True, ensure_ascii=False ) existing_ids.add(item_key) for item in new_items: item_key = json.dumps( item, sort_keys=True, ensure_ascii=False ) if item_key not in existing_ids: added_items.append(item) if added_items: sequence.append({ "type": "board_update", "action": "add", "items": added_items }) current_board_state.extend(added_items) print( f" ✅ json_processor: " f"{len(new_items)} total, " f"{len(added_items)} new" ) elif isinstance(new_items, dict): added_items = [new_items] current_board_state.append(new_items) sequence.append({ "type": "board_update", "action": "add", "items": added_items }) print(f" ✅ json_processor: 1 item") else: print(f" ⚠️ json_processor: unexpected format") except json.JSONDecodeError as e: print(f" ❌ json_processor invalid JSON: {e}") raw_preview = json_text[:200] if json_text else 'None' print(f" Raw: {raw_preview}") except Exception as e: print(f" ❌ json_processor error: {e}") return sequence, current_board_state # ─── MAIN PIPELINE ─── def process_message(self, user_message, username, frontend_board_state=None): """ Main board pipeline for any subject. Args: user_message: Student's text username: User identifier frontend_board_state: Current board items from frontend (source of truth) Returns: dict with success, sequence, board_state, chosen_file """ us = self._get_user_session(username) subject_id = us.get("subject_id") print(f"\n{'═' * 60}") print(f" 👤 Student ({username}): {user_message}") print(f" 📚 Subject: {subject_id}") print(f"{'═' * 60}") if not subject_id: return { "success": False, "error": "لم يتم تحديد المادة للسبورة", "sequence": [{ "type": "voice", "text": "يرجى اختيار المادة أولاً.", "audio_url": None }], "board_state": frontend_board_state or [] } if frontend_board_state is None: frontend_board_state = [] print(f" 📋 Board state from frontend: {len(frontend_board_state)} items") print(f" 💬 Chat history: {len(us.get('conversation_history', []))} messages") # Step 1: Route print("\n 📝 Step 1: Routing message...") chosen_file = self._route_message(user_message, username) print(f" 📂 Chosen file: {chosen_file}") # Step 2: Generate XML print(f" 🤖 Step 2: Generating XML response...") xml_response = self._generate_xml_response(user_message, chosen_file, username) if not xml_response: print(" ❌ Failed to generate response") error_seq = [{ "type": "voice", "text": "عذراً، حدث خطأ في النظام. حاول مرة أخرى.", "audio_url": None }] return { "success": False, "error": "Failed to generate response", "sequence": error_seq, "board_state": frontend_board_state } print(f" 📝 XML response: {len(xml_response)} chars") # Step 3: Resolve tags print(" 📄 Step 3: Resolving tags...") xml_response = self._resolve_page_tags(xml_response, username) # Step 4: Resolve tags print(" 🎨 Step 4: Resolving tags...") xml_response = self._resolve_svg_tags(xml_response) # Step 5: Parse XML + convert boards print(" 🔧 Step 5: Parsing XML & converting boards...") sequence, updated_board_state = self._build_sequence_from_xml( xml_response, frontend_board_state ) voice_count = sum(1 for s in sequence if s['type'] == 'voice') board_count = sum(1 for s in sequence if s['type'] == 'board_update') print(f" 📊 {len(sequence)} segments ({voice_count} voice, {board_count} board)") # Step 6: Convert voice → audio print(" 🔊 Step 6: Converting voice to audio...") for item in sequence: if item["type"] == "voice" and item.get("text"): text_preview = item['text'][:50] print(f" 🎙️ Converting: '{text_preview}...'") audio_file = self.tts_engine.convert(item["text"]) if audio_file: item["audio_url"] = f"/static/{audio_file}" else: item["audio_url"] = None print(f" ⚠️ TTS failed for this segment") # Step 7: Save chat history us["conversation_history"].append({ "role": "user", "content": user_message }) voice_texts = [ s["text"] for s in sequence if s["type"] == "voice" and s.get("text") ] assistant_text = " ".join(voice_texts) if assistant_text: us["conversation_history"].append({ "role": "assistant", "content": assistant_text }) # Keep history manageable if len(us["conversation_history"]) > MAX_CHAT_HISTORY: us["conversation_history"] = us["conversation_history"][-MAX_CHAT_HISTORY:] # Step 8: Store sequence for replay us["last_sequence"] = sequence # Cleanup old audio self.tts_engine.cleanup_old_files() print(f"\n ✅ Response ready!") print(f" Sequence: {voice_count} voice + {board_count} board") print(f" Board items: {len(updated_board_state)}") print(f"{'═' * 60}\n") return { "success": True, "chosen_file": chosen_file, "sequence": sequence, "board_state": updated_board_state } # ─── Replay ─── def get_replay_sequence(self, username): us = self._get_user_session(username) last_seq = us.get("last_sequence", []) if not last_seq: return { "success": False, "error": "No previous response to replay", "sequence": [] } voice_only = [] for item in last_seq: if item["type"] == "voice": voice_only.append({ "type": "voice", "text": item.get("text", ""), "audio_url": item.get("audio_url") }) print(f" 🔄 Replay: {len(voice_only)} voice segments (no board items)") return { "success": True, "sequence": voice_only } # ─── Clear ─── def clear_board(self, username): us = self._get_user_session(username) us["last_sequence"] = [] print(f" 🗑️ Board cleared for {username}") return {"success": True, "board_state": []} def clear_chat_history(self, username): us = self._get_user_session(username) us["conversation_history"] = [] us["last_sequence"] = [] print(f" 🗑️ Board chat history cleared for {username}") return {"success": True} def clear_user_session(self, username): """Fully clear a user's board session.""" if username in self._user_sessions: del self._user_sessions[username] print(f" 🗑️ Full board session cleared for {username}") return {"success": True}