| """
|
| AI Agent for text chat mode.
|
| Handles routing, GPT responses, Mistral streaming,
|
| voice transcription, and image analysis.
|
| """
|
|
|
| import json
|
| import requests
|
| from datetime import datetime, timezone
|
| from config import GPT_URL, MISTRAL_COOKIE, TRANSCRIPT_URL, CLOUDINARY_URL, CLOUDINARY_PRESET
|
| from chat.history import ChatHistory
|
| from subjects.loader import subject_loader
|
| from database.chat_history import save_user_chat_to_db, load_user_chat_from_db
|
|
|
|
|
| class AIAgent:
|
| def __init__(self):
|
| self.gpt_url = GPT_URL
|
| self.mistral_cookie = MISTRAL_COOKIE
|
| self.transcript_base_url = TRANSCRIPT_URL
|
| self.cloudinary_url = CLOUDINARY_URL
|
| self.cloudinary_preset = CLOUDINARY_PRESET
|
|
|
| self.chat_history = ChatHistory()
|
|
|
|
|
| self.exam_sessions = {}
|
| self.exam_lock = __import__('threading').Lock()
|
|
|
| print("✅ AIAgent initialized")
|
|
|
|
|
|
|
| def _call_gpt5(self, user_message, system_prompt, temperature=0.7):
|
| payload = {
|
| "user_input": user_message,
|
| "chat_history": [{"role": "system", "content": system_prompt}],
|
| "temperature": temperature,
|
| "top_p": 0.95,
|
| "max_completion_tokens": 4000
|
| }
|
| try:
|
| response = requests.post(self.gpt_url, json=payload, timeout=120)
|
| response.raise_for_status()
|
| return response.json().get("assistant_response", "")
|
| except Exception as e:
|
| print(f"❌ GPT-5 Error: {e}")
|
| return None
|
|
|
| def _call_gpt5_multipart(self, image_url, text_prompt, system_prompt, temperature=0.7):
|
| payload = {
|
| "user_input": None,
|
| "chat_history": [
|
| {"role": "system", "content": system_prompt},
|
| {
|
| "role": "user",
|
| "type": "multipart",
|
| "content": [
|
| {"type": "image", "url": image_url},
|
| {"type": "text", "text": text_prompt}
|
| ]
|
| }
|
| ],
|
| "temperature": temperature,
|
| "top_p": 0.95,
|
| "max_completion_tokens": 4000
|
| }
|
| try:
|
| response = requests.post(self.gpt_url, json=payload, timeout=120)
|
| response.raise_for_status()
|
| return response.json().get("assistant_response", "")
|
| except Exception as e:
|
| print(f"❌ GPT-5 Multipart Error: {e}")
|
| return None
|
|
|
|
|
|
|
| def _call_mistral_stream(self, user_message, file_content, session_id):
|
| chat_context = self.chat_history.get_full_context(session_id)
|
|
|
| full_prompt = f"""{file_content}
|
|
|
| === سياق المحادثة ===
|
| {chat_context if chat_context else "هذه بداية المحادثة مع الطالب."}
|
| === نهاية السياق ===
|
|
|
| سؤال الطالب الحالي: {user_message}
|
|
|
| إجابتك:"""
|
|
|
| headers = {
|
| "Content-Type": "application/json",
|
| "Cookie": self.mistral_cookie,
|
| "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
|
| }
|
|
|
| payload = {
|
| "inputs": [{
|
| "object": "entry",
|
| "type": "message.input",
|
| "created_at": datetime.now(timezone.utc).isoformat(),
|
| "role": "user",
|
| "content": full_prompt,
|
| "prefix": False
|
| }],
|
| "stream": True,
|
| "instructions": "",
|
| "tools": [],
|
| "completion_args": {
|
| "temperature": 0.7,
|
| "top_p": 0.95,
|
| "max_tokens": 4096
|
| },
|
| "model": "mistral-medium-latest"
|
| }
|
|
|
| try:
|
| response = requests.post(
|
| "https://console.mistral.ai/api-ui/bora/v1/conversations",
|
| headers=headers,
|
| json=payload,
|
| stream=True,
|
| timeout=120
|
| )
|
|
|
| if response.status_code not in [200, 201]:
|
| error_msg = f"Mistral Error: {response.status_code} - {response.text[:500]}"
|
| yield json.dumps({"error": error_msg}) + "\n"
|
| return
|
|
|
| full_response = ""
|
| for line in response.iter_lines():
|
| if line:
|
| line = line.decode('utf-8')
|
| if line.startswith('data: '):
|
| try:
|
| data = json.loads(line[6:])
|
| if data.get('type') == 'message.output.delta':
|
| content = data.get('content', '')
|
| if content:
|
| full_response += content
|
| yield json.dumps({"chunk": content}) + "\n"
|
| except json.JSONDecodeError:
|
| continue
|
|
|
| if full_response:
|
| self.chat_history.add_message(session_id, "assistant", full_response)
|
|
|
| yield json.dumps({"done": True, "full_response": full_response}) + "\n"
|
|
|
| except requests.exceptions.Timeout:
|
| yield json.dumps({"error": "انتهت مهلة الاتصال بالخادم. حاول مرة أخرى."}) + "\n"
|
| except requests.exceptions.ConnectionError:
|
| yield json.dumps({"error": "خطأ في الاتصال بالخادم."}) + "\n"
|
| except Exception as e:
|
| yield json.dumps({"error": str(e)}) + "\n"
|
|
|
|
|
|
|
| def route_message(self, user_message, session_id):
|
| subject_id = self.chat_history.get_subject(session_id)
|
| subject_data = subject_loader.load(subject_id) if subject_id else None
|
| if not subject_data:
|
| return "main.txt"
|
|
|
| structure_content = subject_data.get("structure.txt", "")
|
| chat_context = self.chat_history.get_full_context(session_id)
|
| last_file = self.chat_history.get_last_file(session_id)
|
| p_files = subject_data.get("_p_files", [])
|
| p_files_list = "\n".join([f"- {f}" for f in p_files])
|
|
|
| routing_prompt = f"""{structure_content}
|
|
|
| **الملفات المتاحة:**
|
| - main.txt: للتحيات والأسئلة العامة
|
| {p_files_list}
|
|
|
| **سياق المحادثة:**
|
| {chat_context if chat_context else "لا يوجد سياق سابق"}
|
|
|
| **الملف الأخير المستخدم:** {last_file if last_file else "لا يوجد"}
|
|
|
| **تعليمات:**
|
| - إذا كانت الرسالة متابعة لموضوع سابق → استخدم نفس الملف الأخير: {last_file if last_file else "main.txt"}
|
| - إذا قال الطالب "استمر" أو "أكمل" → استخدم نفس الملف الأخير
|
| - أجب **فقط** باسم الملف (مثال: p1.txt أو main.txt)
|
|
|
| رسالة الطالب: {user_message}
|
|
|
| الملف المناسب:"""
|
|
|
| chosen_file = self._call_gpt5(user_message, routing_prompt, temperature=0.2)
|
|
|
| if not chosen_file:
|
| return last_file if last_file else "main.txt"
|
|
|
| chosen_file = chosen_file.strip().lower()
|
| valid_files = ["main.txt"] + p_files
|
|
|
| for vf in valid_files:
|
| if vf in chosen_file:
|
| return vf
|
|
|
| return last_file if last_file else "main.txt"
|
|
|
|
|
|
|
| def respond_gpt(self, user_message, chosen_file, session_id):
|
| subject_id = self.chat_history.get_subject(session_id)
|
| subject_data = subject_loader.load(subject_id) if subject_id else None
|
| if not subject_data:
|
| return "عذراً، لم يتم تحديد المادة. يرجى اختيار المادة أولاً."
|
|
|
| main_content = subject_data.get("main.txt", "")
|
| chat_context = self.chat_history.get_full_context(session_id)
|
|
|
| system_prompt = f"""{main_content}
|
|
|
| === سياق المحادثة السابقة ===
|
| {chat_context if chat_context else "هذه بداية المحادثة."}
|
| === نهاية السياق ==="""
|
|
|
| self.chat_history.add_message(session_id, "user", user_message)
|
| response = self._call_gpt5(user_message, system_prompt, temperature=0.8)
|
|
|
| if response:
|
| self.chat_history.add_message(session_id, "assistant", response)
|
| return response
|
| else:
|
| err = "عذراً، حدث خطأ في النظام. حاول مرة أخرى."
|
| self.chat_history.add_message(session_id, "assistant", err)
|
| return err
|
|
|
|
|
|
|
| def respond_mistral_stream(self, user_message, chosen_file, session_id):
|
| subject_id = self.chat_history.get_subject(session_id)
|
| subject_data = subject_loader.load(subject_id) if subject_id else None
|
| if not subject_data:
|
| def error_gen():
|
| yield json.dumps({"error": "عذراً، لم يتم تحديد المادة."}) + "\n"
|
| return error_gen()
|
|
|
| file_content = subject_data.get(chosen_file, "")
|
| self.chat_history.add_message(session_id, "user", user_message)
|
| return self._call_mistral_stream(user_message, file_content, session_id)
|
|
|
|
|
|
|
| def process_message(self, user_message, session_id):
|
| subject = self.chat_history.get_subject(session_id)
|
| if not subject:
|
| return None, "no_subject"
|
|
|
| subject_data = subject_loader.load(subject)
|
| if not subject_data:
|
| return None, "subject_not_found"
|
|
|
| chosen_file = self.route_message(user_message, session_id)
|
| self.chat_history.set_last_file(session_id, chosen_file)
|
|
|
| print(f"📀 Subject: {subject} | Routed to: {chosen_file} | Session: {session_id}")
|
| return chosen_file, "ok"
|
|
|
|
|
|
|
| def select_subject(self, username, session_id, subject_id):
|
| """Select a subject and load/restore chat history."""
|
| current_subject = self.chat_history.get_subject(session_id)
|
|
|
|
|
| if current_subject and current_subject != subject_id:
|
| raw = self.chat_history.get_raw_session(session_id)
|
| save_user_chat_to_db(username, current_subject, raw)
|
|
|
|
|
| self.chat_history.clear_session(session_id)
|
| self.chat_history.set_subject(session_id, subject_id)
|
|
|
|
|
| saved = load_user_chat_from_db(username, subject_id)
|
| if saved:
|
| self.chat_history.restore_session(session_id, saved)
|
|
|
|
|
| raw = self.chat_history.get_raw_session(session_id)
|
| save_user_chat_to_db(username, subject_id, raw)
|
|
|
| def save_current_chat(self, username, session_id):
|
| """Save current session to DB."""
|
| subject_id = self.chat_history.get_subject(session_id)
|
| if subject_id:
|
| raw = self.chat_history.get_raw_session(session_id)
|
| save_user_chat_to_db(username, subject_id, raw)
|
|
|
|
|
|
|
| def upload_audio(self, audio_file_path, original_filename):
|
| try:
|
| with open(audio_file_path, 'rb') as f:
|
| files = {'audio': (original_filename, f, 'audio/webm')}
|
| response = requests.post(
|
| f"{self.transcript_base_url}/upload",
|
| files=files,
|
| timeout=60
|
| )
|
| response.raise_for_status()
|
| data = response.json()
|
| if 'file_url' in data:
|
| return data['file_url'], None
|
| else:
|
| return None, data.get('error', 'Upload failed')
|
| except Exception as e:
|
| print(f"❌ Audio upload error: {e}")
|
| return None, str(e)
|
|
|
| def transcribe_audio(self, file_url):
|
| try:
|
| payload = {
|
| "file_url": file_url,
|
| "prompt": "هذا تسجيل صوتي لطالب يسأل سؤالاً دراسياً"
|
| }
|
| response = requests.post(
|
| f"{self.transcript_base_url}/transcribe",
|
| json=payload,
|
| timeout=120
|
| )
|
| response.raise_for_status()
|
| data = response.json()
|
| if 'transcription' in data:
|
| return data['transcription'], None
|
| else:
|
| return None, data.get('error', 'Transcription failed')
|
| except Exception as e:
|
| print(f"❌ Transcription error: {e}")
|
| return None, str(e)
|
|
|
|
|
|
|
| def upload_image_to_cloudinary(self, image_path):
|
| try:
|
| with open(image_path, 'rb') as image_file:
|
| files = {'file': image_file}
|
| data = {'upload_preset': self.cloudinary_preset}
|
| response = requests.post(
|
| self.cloudinary_url,
|
| files=files,
|
| data=data,
|
| timeout=60
|
| )
|
| if response.status_code == 200:
|
| result = response.json()
|
| image_url = result.get('url') or result.get('secure_url')
|
| return image_url, None
|
| else:
|
| return None, f"Cloudinary error: {response.status_code}"
|
| except Exception as e:
|
| return None, str(e)
|
|
|
| def analyze_image(self, image_url, session_id):
|
| system_prompt = """أنت مساعد تعليمي ذكي متخصص في تحليل الصور الدراسية.
|
|
|
| مهمتك:
|
| 1. إذا كانت الصورة تحتوي على محتوى دراسي (معادلات، نصوص، مسائل، رسوم بيانية، جداول، شروحات، كتب، أوراق امتحانات):
|
| - استخرج كل المحتوى من الصورة بدقة تامة
|
| - اكتب المعادلات بصيغة LaTeX
|
| - اكتب النصوص كما هي
|
| - اشرح الرسوم البيانية إن وجدت
|
| - قدم المحتوى بشكل منظم ومفيد
|
|
|
| 2. إذا كانت الصورة لا تحتوي على محتوى دراسي:
|
| - أجب فقط بالكلمة: <unsupported>
|
|
|
| لا تضف أي تعليق إضافي."""
|
|
|
| result = self._call_gpt5_multipart(
|
| image_url=image_url,
|
| text_prompt="حلل هذه الصورة واستخرج محتواها الدراسي",
|
| system_prompt=system_prompt,
|
| temperature=0.3
|
| )
|
| return result
|
|
|
|
|
|
|
| def create_exam_session(self, username, subject_id, door_file, question_count, difficulty):
|
| with self.exam_lock:
|
| self.exam_sessions[username] = {
|
| "subject_id": subject_id,
|
| "door_file": door_file,
|
| "question_count": question_count,
|
| "difficulty": difficulty,
|
| "questions": [],
|
| "current_index": 0,
|
| "score": 0,
|
| "answers": [],
|
| "started_at": datetime.now().isoformat(),
|
| "status": "generating",
|
| }
|
| return self.exam_sessions[username]
|
|
|
| def get_exam_session(self, username):
|
| with self.exam_lock:
|
| return self.exam_sessions.get(username, None)
|
|
|
| def clear_exam_session(self, username):
|
| with self.exam_lock:
|
| if username in self.exam_sessions:
|
| del self.exam_sessions[username]
|
| print(f"🗑️ Cleared exam session for {username}") |