data-str / chat /agent.py
CORVO-AI's picture
Upload 135 files
b83571a verified
"""
AI Agent for text chat mode.
Handles routing, GPT responses, Mistral streaming,
voice transcription, and image analysis.
"""
import json
import requests
from datetime import datetime, timezone
from config import GPT_URL, MISTRAL_COOKIE, TRANSCRIPT_URL, CLOUDINARY_URL, CLOUDINARY_PRESET
from chat.history import ChatHistory
from subjects.loader import subject_loader
from database.chat_history import save_user_chat_to_db, load_user_chat_from_db
class AIAgent:
def __init__(self):
self.gpt_url = GPT_URL
self.mistral_cookie = MISTRAL_COOKIE
self.transcript_base_url = TRANSCRIPT_URL
self.cloudinary_url = CLOUDINARY_URL
self.cloudinary_preset = CLOUDINARY_PRESET
self.chat_history = ChatHistory()
# Exam sessions (in-memory)
self.exam_sessions = {}
self.exam_lock = __import__('threading').Lock()
print("✅ AIAgent initialized")
# ─── GPT call ───
def _call_gpt5(self, user_message, system_prompt, temperature=0.7):
payload = {
"user_input": user_message,
"chat_history": [{"role": "system", "content": system_prompt}],
"temperature": temperature,
"top_p": 0.95,
"max_completion_tokens": 4000
}
try:
response = requests.post(self.gpt_url, json=payload, timeout=120)
response.raise_for_status()
return response.json().get("assistant_response", "")
except Exception as e:
print(f"❌ GPT-5 Error: {e}")
return None
def _call_gpt5_multipart(self, image_url, text_prompt, system_prompt, temperature=0.7):
payload = {
"user_input": None,
"chat_history": [
{"role": "system", "content": system_prompt},
{
"role": "user",
"type": "multipart",
"content": [
{"type": "image", "url": image_url},
{"type": "text", "text": text_prompt}
]
}
],
"temperature": temperature,
"top_p": 0.95,
"max_completion_tokens": 4000
}
try:
response = requests.post(self.gpt_url, json=payload, timeout=120)
response.raise_for_status()
return response.json().get("assistant_response", "")
except Exception as e:
print(f"❌ GPT-5 Multipart Error: {e}")
return None
# ─── Mistral streaming ───
def _call_mistral_stream(self, user_message, file_content, session_id):
chat_context = self.chat_history.get_full_context(session_id)
full_prompt = f"""{file_content}
=== سياق المحادثة ===
{chat_context if chat_context else "هذه بداية المحادثة مع الطالب."}
=== نهاية السياق ===
سؤال الطالب الحالي: {user_message}
إجابتك:"""
headers = {
"Content-Type": "application/json",
"Cookie": self.mistral_cookie,
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
payload = {
"inputs": [{
"object": "entry",
"type": "message.input",
"created_at": datetime.now(timezone.utc).isoformat(),
"role": "user",
"content": full_prompt,
"prefix": False
}],
"stream": True,
"instructions": "",
"tools": [],
"completion_args": {
"temperature": 0.7,
"top_p": 0.95,
"max_tokens": 4096
},
"model": "mistral-medium-latest"
}
try:
response = requests.post(
"https://console.mistral.ai/api-ui/bora/v1/conversations",
headers=headers,
json=payload,
stream=True,
timeout=120
)
if response.status_code not in [200, 201]:
error_msg = f"Mistral Error: {response.status_code} - {response.text[:500]}"
yield json.dumps({"error": error_msg}) + "\n"
return
full_response = ""
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
try:
data = json.loads(line[6:])
if data.get('type') == 'message.output.delta':
content = data.get('content', '')
if content:
full_response += content
yield json.dumps({"chunk": content}) + "\n"
except json.JSONDecodeError:
continue
if full_response:
self.chat_history.add_message(session_id, "assistant", full_response)
yield json.dumps({"done": True, "full_response": full_response}) + "\n"
except requests.exceptions.Timeout:
yield json.dumps({"error": "انتهت مهلة الاتصال بالخادم. حاول مرة أخرى."}) + "\n"
except requests.exceptions.ConnectionError:
yield json.dumps({"error": "خطأ في الاتصال بالخادم."}) + "\n"
except Exception as e:
yield json.dumps({"error": str(e)}) + "\n"
# ─── Routing ───
def route_message(self, user_message, session_id):
subject_id = self.chat_history.get_subject(session_id)
subject_data = subject_loader.load(subject_id) if subject_id else None
if not subject_data:
return "main.txt"
structure_content = subject_data.get("structure.txt", "")
chat_context = self.chat_history.get_full_context(session_id)
last_file = self.chat_history.get_last_file(session_id)
p_files = subject_data.get("_p_files", [])
p_files_list = "\n".join([f"- {f}" for f in p_files])
routing_prompt = f"""{structure_content}
**الملفات المتاحة:**
- main.txt: للتحيات والأسئلة العامة
{p_files_list}
**سياق المحادثة:**
{chat_context if chat_context else "لا يوجد سياق سابق"}
**الملف الأخير المستخدم:** {last_file if last_file else "لا يوجد"}
**تعليمات:**
- إذا كانت الرسالة متابعة لموضوع سابق → استخدم نفس الملف الأخير: {last_file if last_file else "main.txt"}
- إذا قال الطالب "استمر" أو "أكمل" → استخدم نفس الملف الأخير
- أجب **فقط** باسم الملف (مثال: p1.txt أو main.txt)
رسالة الطالب: {user_message}
الملف المناسب:"""
chosen_file = self._call_gpt5(user_message, routing_prompt, temperature=0.2)
if not chosen_file:
return last_file if last_file else "main.txt"
chosen_file = chosen_file.strip().lower()
valid_files = ["main.txt"] + p_files
for vf in valid_files:
if vf in chosen_file:
return vf
return last_file if last_file else "main.txt"
# ─── GPT response ───
def respond_gpt(self, user_message, chosen_file, session_id):
subject_id = self.chat_history.get_subject(session_id)
subject_data = subject_loader.load(subject_id) if subject_id else None
if not subject_data:
return "عذراً، لم يتم تحديد المادة. يرجى اختيار المادة أولاً."
main_content = subject_data.get("main.txt", "")
chat_context = self.chat_history.get_full_context(session_id)
system_prompt = f"""{main_content}
=== سياق المحادثة السابقة ===
{chat_context if chat_context else "هذه بداية المحادثة."}
=== نهاية السياق ==="""
self.chat_history.add_message(session_id, "user", user_message)
response = self._call_gpt5(user_message, system_prompt, temperature=0.8)
if response:
self.chat_history.add_message(session_id, "assistant", response)
return response
else:
err = "عذراً، حدث خطأ في النظام. حاول مرة أخرى."
self.chat_history.add_message(session_id, "assistant", err)
return err
# ─── Mistral response ───
def respond_mistral_stream(self, user_message, chosen_file, session_id):
subject_id = self.chat_history.get_subject(session_id)
subject_data = subject_loader.load(subject_id) if subject_id else None
if not subject_data:
def error_gen():
yield json.dumps({"error": "عذراً، لم يتم تحديد المادة."}) + "\n"
return error_gen()
file_content = subject_data.get(chosen_file, "")
self.chat_history.add_message(session_id, "user", user_message)
return self._call_mistral_stream(user_message, file_content, session_id)
# ─── Process message (route + decide) ───
def process_message(self, user_message, session_id):
subject = self.chat_history.get_subject(session_id)
if not subject:
return None, "no_subject"
subject_data = subject_loader.load(subject)
if not subject_data:
return None, "subject_not_found"
chosen_file = self.route_message(user_message, session_id)
self.chat_history.set_last_file(session_id, chosen_file)
print(f"📀 Subject: {subject} | Routed to: {chosen_file} | Session: {session_id}")
return chosen_file, "ok"
# ─── Subject session management ───
def select_subject(self, username, session_id, subject_id):
"""Select a subject and load/restore chat history."""
current_subject = self.chat_history.get_subject(session_id)
# Save current subject chat before switching
if current_subject and current_subject != subject_id:
raw = self.chat_history.get_raw_session(session_id)
save_user_chat_to_db(username, current_subject, raw)
# Clear and set new subject
self.chat_history.clear_session(session_id)
self.chat_history.set_subject(session_id, subject_id)
# Try to restore saved chat
saved = load_user_chat_from_db(username, subject_id)
if saved:
self.chat_history.restore_session(session_id, saved)
# Save immediately
raw = self.chat_history.get_raw_session(session_id)
save_user_chat_to_db(username, subject_id, raw)
def save_current_chat(self, username, session_id):
"""Save current session to DB."""
subject_id = self.chat_history.get_subject(session_id)
if subject_id:
raw = self.chat_history.get_raw_session(session_id)
save_user_chat_to_db(username, subject_id, raw)
# ─── Audio ───
def upload_audio(self, audio_file_path, original_filename):
try:
with open(audio_file_path, 'rb') as f:
files = {'audio': (original_filename, f, 'audio/webm')}
response = requests.post(
f"{self.transcript_base_url}/upload",
files=files,
timeout=60
)
response.raise_for_status()
data = response.json()
if 'file_url' in data:
return data['file_url'], None
else:
return None, data.get('error', 'Upload failed')
except Exception as e:
print(f"❌ Audio upload error: {e}")
return None, str(e)
def transcribe_audio(self, file_url):
try:
payload = {
"file_url": file_url,
"prompt": "هذا تسجيل صوتي لطالب يسأل سؤالاً دراسياً"
}
response = requests.post(
f"{self.transcript_base_url}/transcribe",
json=payload,
timeout=120
)
response.raise_for_status()
data = response.json()
if 'transcription' in data:
return data['transcription'], None
else:
return None, data.get('error', 'Transcription failed')
except Exception as e:
print(f"❌ Transcription error: {e}")
return None, str(e)
# ─── Image ───
def upload_image_to_cloudinary(self, image_path):
try:
with open(image_path, 'rb') as image_file:
files = {'file': image_file}
data = {'upload_preset': self.cloudinary_preset}
response = requests.post(
self.cloudinary_url,
files=files,
data=data,
timeout=60
)
if response.status_code == 200:
result = response.json()
image_url = result.get('url') or result.get('secure_url')
return image_url, None
else:
return None, f"Cloudinary error: {response.status_code}"
except Exception as e:
return None, str(e)
def analyze_image(self, image_url, session_id):
system_prompt = """أنت مساعد تعليمي ذكي متخصص في تحليل الصور الدراسية.
مهمتك:
1. إذا كانت الصورة تحتوي على محتوى دراسي (معادلات، نصوص، مسائل، رسوم بيانية، جداول، شروحات، كتب، أوراق امتحانات):
- استخرج كل المحتوى من الصورة بدقة تامة
- اكتب المعادلات بصيغة LaTeX
- اكتب النصوص كما هي
- اشرح الرسوم البيانية إن وجدت
- قدم المحتوى بشكل منظم ومفيد
2. إذا كانت الصورة لا تحتوي على محتوى دراسي:
- أجب فقط بالكلمة: <unsupported>
لا تضف أي تعليق إضافي."""
result = self._call_gpt5_multipart(
image_url=image_url,
text_prompt="حلل هذه الصورة واستخرج محتواها الدراسي",
system_prompt=system_prompt,
temperature=0.3
)
return result
# ─── Exam session management ───
def create_exam_session(self, username, subject_id, door_file, question_count, difficulty):
with self.exam_lock:
self.exam_sessions[username] = {
"subject_id": subject_id,
"door_file": door_file,
"question_count": question_count,
"difficulty": difficulty,
"questions": [],
"current_index": 0,
"score": 0,
"answers": [],
"started_at": datetime.now().isoformat(),
"status": "generating",
}
return self.exam_sessions[username]
def get_exam_session(self, username):
with self.exam_lock:
return self.exam_sessions.get(username, None)
def clear_exam_session(self, username):
with self.exam_lock:
if username in self.exam_sessions:
del self.exam_sessions[username]
print(f"🗑️ Cleared exam session for {username}")