""" AI Agent for text chat mode. Handles routing, GPT responses, Mistral streaming, voice transcription, and image analysis. """ import json import requests from datetime import datetime, timezone from config import GPT_URL, MISTRAL_COOKIE, TRANSCRIPT_URL, CLOUDINARY_URL, CLOUDINARY_PRESET from chat.history import ChatHistory from subjects.loader import subject_loader from database.chat_history import save_user_chat_to_db, load_user_chat_from_db class AIAgent: def __init__(self): self.gpt_url = GPT_URL self.mistral_cookie = MISTRAL_COOKIE self.transcript_base_url = TRANSCRIPT_URL self.cloudinary_url = CLOUDINARY_URL self.cloudinary_preset = CLOUDINARY_PRESET self.chat_history = ChatHistory() # Exam sessions (in-memory) self.exam_sessions = {} self.exam_lock = __import__('threading').Lock() print("✅ AIAgent initialized") # ─── GPT call ─── def _call_gpt5(self, user_message, system_prompt, temperature=0.7): payload = { "user_input": user_message, "chat_history": [{"role": "system", "content": system_prompt}], "temperature": temperature, "top_p": 0.95, "max_completion_tokens": 4000 } try: response = requests.post(self.gpt_url, json=payload, timeout=120) response.raise_for_status() return response.json().get("assistant_response", "") except Exception as e: print(f"❌ GPT-5 Error: {e}") return None def _call_gpt5_multipart(self, image_url, text_prompt, system_prompt, temperature=0.7): payload = { "user_input": None, "chat_history": [ {"role": "system", "content": system_prompt}, { "role": "user", "type": "multipart", "content": [ {"type": "image", "url": image_url}, {"type": "text", "text": text_prompt} ] } ], "temperature": temperature, "top_p": 0.95, "max_completion_tokens": 4000 } try: response = requests.post(self.gpt_url, json=payload, timeout=120) response.raise_for_status() return response.json().get("assistant_response", "") except Exception as e: print(f"❌ GPT-5 Multipart Error: {e}") return None # ─── Mistral streaming ─── def _call_mistral_stream(self, user_message, file_content, session_id): chat_context = self.chat_history.get_full_context(session_id) full_prompt = f"""{file_content} === سياق المحادثة === {chat_context if chat_context else "هذه بداية المحادثة مع الطالب."} === نهاية السياق === سؤال الطالب الحالي: {user_message} إجابتك:""" headers = { "Content-Type": "application/json", "Cookie": self.mistral_cookie, "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" } payload = { "inputs": [{ "object": "entry", "type": "message.input", "created_at": datetime.now(timezone.utc).isoformat(), "role": "user", "content": full_prompt, "prefix": False }], "stream": True, "instructions": "", "tools": [], "completion_args": { "temperature": 0.7, "top_p": 0.95, "max_tokens": 4096 }, "model": "mistral-medium-latest" } try: response = requests.post( "https://console.mistral.ai/api-ui/bora/v1/conversations", headers=headers, json=payload, stream=True, timeout=120 ) if response.status_code not in [200, 201]: error_msg = f"Mistral Error: {response.status_code} - {response.text[:500]}" yield json.dumps({"error": error_msg}) + "\n" return full_response = "" for line in response.iter_lines(): if line: line = line.decode('utf-8') if line.startswith('data: '): try: data = json.loads(line[6:]) if data.get('type') == 'message.output.delta': content = data.get('content', '') if content: full_response += content yield json.dumps({"chunk": content}) + "\n" except json.JSONDecodeError: continue if full_response: self.chat_history.add_message(session_id, "assistant", full_response) yield json.dumps({"done": True, "full_response": full_response}) + "\n" except requests.exceptions.Timeout: yield json.dumps({"error": "انتهت مهلة الاتصال بالخادم. حاول مرة أخرى."}) + "\n" except requests.exceptions.ConnectionError: yield json.dumps({"error": "خطأ في الاتصال بالخادم."}) + "\n" except Exception as e: yield json.dumps({"error": str(e)}) + "\n" # ─── Routing ─── def route_message(self, user_message, session_id): subject_id = self.chat_history.get_subject(session_id) subject_data = subject_loader.load(subject_id) if subject_id else None if not subject_data: return "main.txt" structure_content = subject_data.get("structure.txt", "") chat_context = self.chat_history.get_full_context(session_id) last_file = self.chat_history.get_last_file(session_id) p_files = subject_data.get("_p_files", []) p_files_list = "\n".join([f"- {f}" for f in p_files]) routing_prompt = f"""{structure_content} **الملفات المتاحة:** - main.txt: للتحيات والأسئلة العامة {p_files_list} **سياق المحادثة:** {chat_context if chat_context else "لا يوجد سياق سابق"} **الملف الأخير المستخدم:** {last_file if last_file else "لا يوجد"} **تعليمات:** - إذا كانت الرسالة متابعة لموضوع سابق → استخدم نفس الملف الأخير: {last_file if last_file else "main.txt"} - إذا قال الطالب "استمر" أو "أكمل" → استخدم نفس الملف الأخير - أجب **فقط** باسم الملف (مثال: p1.txt أو main.txt) رسالة الطالب: {user_message} الملف المناسب:""" chosen_file = self._call_gpt5(user_message, routing_prompt, temperature=0.2) if not chosen_file: return last_file if last_file else "main.txt" chosen_file = chosen_file.strip().lower() valid_files = ["main.txt"] + p_files for vf in valid_files: if vf in chosen_file: return vf return last_file if last_file else "main.txt" # ─── GPT response ─── def respond_gpt(self, user_message, chosen_file, session_id): subject_id = self.chat_history.get_subject(session_id) subject_data = subject_loader.load(subject_id) if subject_id else None if not subject_data: return "عذراً، لم يتم تحديد المادة. يرجى اختيار المادة أولاً." main_content = subject_data.get("main.txt", "") chat_context = self.chat_history.get_full_context(session_id) system_prompt = f"""{main_content} === سياق المحادثة السابقة === {chat_context if chat_context else "هذه بداية المحادثة."} === نهاية السياق ===""" self.chat_history.add_message(session_id, "user", user_message) response = self._call_gpt5(user_message, system_prompt, temperature=0.8) if response: self.chat_history.add_message(session_id, "assistant", response) return response else: err = "عذراً، حدث خطأ في النظام. حاول مرة أخرى." self.chat_history.add_message(session_id, "assistant", err) return err # ─── Mistral response ─── def respond_mistral_stream(self, user_message, chosen_file, session_id): subject_id = self.chat_history.get_subject(session_id) subject_data = subject_loader.load(subject_id) if subject_id else None if not subject_data: def error_gen(): yield json.dumps({"error": "عذراً، لم يتم تحديد المادة."}) + "\n" return error_gen() file_content = subject_data.get(chosen_file, "") self.chat_history.add_message(session_id, "user", user_message) return self._call_mistral_stream(user_message, file_content, session_id) # ─── Process message (route + decide) ─── def process_message(self, user_message, session_id): subject = self.chat_history.get_subject(session_id) if not subject: return None, "no_subject" subject_data = subject_loader.load(subject) if not subject_data: return None, "subject_not_found" chosen_file = self.route_message(user_message, session_id) self.chat_history.set_last_file(session_id, chosen_file) print(f"📀 Subject: {subject} | Routed to: {chosen_file} | Session: {session_id}") return chosen_file, "ok" # ─── Subject session management ─── def select_subject(self, username, session_id, subject_id): """Select a subject and load/restore chat history.""" current_subject = self.chat_history.get_subject(session_id) # Save current subject chat before switching if current_subject and current_subject != subject_id: raw = self.chat_history.get_raw_session(session_id) save_user_chat_to_db(username, current_subject, raw) # Clear and set new subject self.chat_history.clear_session(session_id) self.chat_history.set_subject(session_id, subject_id) # Try to restore saved chat saved = load_user_chat_from_db(username, subject_id) if saved: self.chat_history.restore_session(session_id, saved) # Save immediately raw = self.chat_history.get_raw_session(session_id) save_user_chat_to_db(username, subject_id, raw) def save_current_chat(self, username, session_id): """Save current session to DB.""" subject_id = self.chat_history.get_subject(session_id) if subject_id: raw = self.chat_history.get_raw_session(session_id) save_user_chat_to_db(username, subject_id, raw) # ─── Audio ─── def upload_audio(self, audio_file_path, original_filename): try: with open(audio_file_path, 'rb') as f: files = {'audio': (original_filename, f, 'audio/webm')} response = requests.post( f"{self.transcript_base_url}/upload", files=files, timeout=60 ) response.raise_for_status() data = response.json() if 'file_url' in data: return data['file_url'], None else: return None, data.get('error', 'Upload failed') except Exception as e: print(f"❌ Audio upload error: {e}") return None, str(e) def transcribe_audio(self, file_url): try: payload = { "file_url": file_url, "prompt": "هذا تسجيل صوتي لطالب يسأل سؤالاً دراسياً" } response = requests.post( f"{self.transcript_base_url}/transcribe", json=payload, timeout=120 ) response.raise_for_status() data = response.json() if 'transcription' in data: return data['transcription'], None else: return None, data.get('error', 'Transcription failed') except Exception as e: print(f"❌ Transcription error: {e}") return None, str(e) # ─── Image ─── def upload_image_to_cloudinary(self, image_path): try: with open(image_path, 'rb') as image_file: files = {'file': image_file} data = {'upload_preset': self.cloudinary_preset} response = requests.post( self.cloudinary_url, files=files, data=data, timeout=60 ) if response.status_code == 200: result = response.json() image_url = result.get('url') or result.get('secure_url') return image_url, None else: return None, f"Cloudinary error: {response.status_code}" except Exception as e: return None, str(e) def analyze_image(self, image_url, session_id): system_prompt = """أنت مساعد تعليمي ذكي متخصص في تحليل الصور الدراسية. مهمتك: 1. إذا كانت الصورة تحتوي على محتوى دراسي (معادلات، نصوص، مسائل، رسوم بيانية، جداول، شروحات، كتب، أوراق امتحانات): - استخرج كل المحتوى من الصورة بدقة تامة - اكتب المعادلات بصيغة LaTeX - اكتب النصوص كما هي - اشرح الرسوم البيانية إن وجدت - قدم المحتوى بشكل منظم ومفيد 2. إذا كانت الصورة لا تحتوي على محتوى دراسي: - أجب فقط بالكلمة: لا تضف أي تعليق إضافي.""" result = self._call_gpt5_multipart( image_url=image_url, text_prompt="حلل هذه الصورة واستخرج محتواها الدراسي", system_prompt=system_prompt, temperature=0.3 ) return result # ─── Exam session management ─── def create_exam_session(self, username, subject_id, door_file, question_count, difficulty): with self.exam_lock: self.exam_sessions[username] = { "subject_id": subject_id, "door_file": door_file, "question_count": question_count, "difficulty": difficulty, "questions": [], "current_index": 0, "score": 0, "answers": [], "started_at": datetime.now().isoformat(), "status": "generating", } return self.exam_sessions[username] def get_exam_session(self, username): with self.exam_lock: return self.exam_sessions.get(username, None) def clear_exam_session(self, username): with self.exam_lock: if username in self.exam_sessions: del self.exam_sessions[username] print(f"🗑️ Cleared exam session for {username}")