| from flask import Flask, render_template, request, jsonify, session, Response |
| from flask_socketio import SocketIO, emit, disconnect |
| import os |
| import requests |
| from datetime import datetime, timedelta |
| import secrets |
| import re |
| import html |
| import json |
| import unicodedata |
|
|
| from ai_forward import AIForwarder |
|
|
| app = Flask(__name__) |
|
|
| app.secret_key = os.environ.get("SECRET_KEY", secrets.token_hex(32)) |
|
|
| app.config.update( |
| SESSION_COOKIE_HTTPONLY=True, |
| SESSION_COOKIE_SAMESITE="Lax", |
| SESSION_COOKIE_SECURE=False, |
| PERMANENT_SESSION_LIFETIME=timedelta(days=365), |
| ) |
|
|
| |
| socketio = SocketIO(app, cors_allowed_origins="*", manage_session=False) |
|
|
| |
| |
| |
| |
| |
| CHAT_STORE: dict[str, list[dict]] = {} |
|
|
| |
| |
| |
| |
| ONLINE_USERS: dict[str, dict] = {} |
|
|
|
|
| |
| |
| |
| def normalize_ar_name(s: str) -> str: |
| """ |
| Normalize Arabic/Latin name for matching: |
| - strip spaces |
| - lowercase |
| - remove Arabic diacritics (tashkeel) |
| - normalize unicode |
| """ |
| if not s: |
| return "" |
| s = s.strip() |
| s = unicodedata.normalize("NFKC", s) |
| s = s.lower() |
|
|
| |
| |
| s = re.sub(r"[\u0610-\u061A\u064B-\u065F\u0670\u06D6-\u06ED]", "", s) |
|
|
| |
| s = re.sub(r"\s+", " ", s).strip() |
| return s |
|
|
|
|
| def is_bissan_name(name: str) -> bool: |
| n = normalize_ar_name(name) |
| |
| return n in {"بِيسان", "بيسان", "bissan"} |
|
|
|
|
| def get_user_profile(): |
| """ |
| Returns a dict describing how frontend should display the user: |
| - display_name: what to show in UI |
| - avatar_type: "default" or "bissan" |
| - avatar_bg: hint for UI |
| - emoji: default emoji for non-bissan |
| """ |
| name = session.get("user_name") or "أنت" |
| if is_bissan_name(name): |
| return { |
| "display_name": name, |
| "is_bissan": True, |
| "avatar_type": "flower", |
| "avatar_bg": "pink", |
| "emoji": None, |
| } |
| return { |
| "display_name": name, |
| "is_bissan": False, |
| "avatar_type": "emoji", |
| "avatar_bg": "blue", |
| "emoji": "USER", |
| } |
|
|
|
|
| def get_online_users_list(): |
| """Returns list of online users with their info""" |
| users = [] |
| for socket_id, user_info in ONLINE_USERS.items(): |
| users.append({ |
| "socket_id": socket_id, |
| "name": user_info.get("name", "أنت"), |
| "sid": user_info.get("sid", ""), |
| "connected_at": user_info.get("connected_at", ""), |
| }) |
| return users |
|
|
|
|
| def broadcast_online_count(): |
| """Broadcast online users count to admin panel""" |
| count = len(ONLINE_USERS) |
| users_list = get_online_users_list() |
| socketio.emit('online_update', { |
| 'count': count, |
| 'users': users_list |
| }, namespace='/admin') |
|
|
|
|
| |
| |
| |
| class AIStudyApp: |
| def __init__(self, api_url="https://dooratre-xx-gpt-52.hf.space/chat", data_folder="data"): |
| self.api_url = api_url |
| self.data_folder = data_folder |
| self.forwarder = AIForwarder(api_url) |
|
|
| def load_txt_file(self, filename): |
| filepath = os.path.join(self.data_folder, filename) |
| try: |
| with open(filepath, "r", encoding="utf-8") as f: |
| return f.read() |
| except FileNotFoundError: |
| print(f"Warning: {filename} not found") |
| return "" |
|
|
| def build_system_prompt(self, sheet_number: str, is_bissan: bool): |
| """ |
| Normal users: system.txt + {sheet}.txt |
| Bissan user: bissan.txt + {sheet}.txt (NO system.txt) |
| """ |
| base_file = "bissan.txt" if is_bissan else "system.txt" |
| system_content = self.load_txt_file(base_file) |
| sheet_content = self.load_txt_file(f"{sheet_number}.txt") |
| return system_content + "\n\n" + sheet_content |
|
|
| def send_to_main_ai(self, user_message, system_prompt, chat_history): |
| full_chat_history = [{"role": "system", "content": system_prompt}] |
| full_chat_history.extend(chat_history) |
| full_chat_history.append({"role": "user", "content": user_message}) |
|
|
| payload = { |
| "user_input": user_message, |
| "chat_history": full_chat_history, |
| "temperature": 0.9, |
| "top_p": 0.95, |
| } |
|
|
| try: |
| response = requests.post( |
| self.api_url, |
| json=payload, |
| headers={"Content-Type": "application/json"}, |
| timeout=45, |
| ) |
| response.raise_for_status() |
| result = response.json() |
| return result.get("assistant_response", "") |
| except Exception as e: |
| print(f"Error calling main AI: {e}") |
| return None |
|
|
| def normalize_text(self, text: str) -> str: |
| if not text: |
| return "" |
| text = re.sub(r"[\u200B-\u200F\u202A-\u202E\u2066-\u2069]", "", text) |
| text = text.replace("\r\n", "\n").replace("\r", "\n") |
| text = re.sub(r"\n{3,}", "\n\n", text) |
| text = "\n".join([ln.rstrip() for ln in text.split("\n")]).strip() |
| return text |
|
|
| def format_code_blocks(self, text: str) -> str: |
| pattern = r"```(\w+)?\n(.*?)```" |
|
|
| def replace_code_block(match): |
| language = match.group(1) if match.group(1) else "code" |
| code_content = match.group(2) |
| return f"<code_{language}>\n{code_content}\n</code_{language}>" |
|
|
| return re.sub(pattern, replace_code_block, text, flags=re.DOTALL) |
|
|
| def unescape_html_inside_mcq(self, text: str) -> str: |
| if not text: |
| return "" |
| mcq_pattern = r"(<mcq>.*?</mcq>)" |
|
|
| def _fix_block(match): |
| block = match.group(1) |
| return html.unescape(block) |
|
|
| return re.sub(mcq_pattern, _fix_block, text, flags=re.DOTALL | re.IGNORECASE) |
|
|
| def process_message(self, user_message, chat_history, is_bissan: bool): |
| temp_history = chat_history + [{"role": "user", "content": user_message}] |
| forward_result = self.forwarder.process_chat_history(temp_history) |
| sheet_number = forward_result.get("sheet_number", "1") |
|
|
| system_prompt = self.build_system_prompt(sheet_number, is_bissan=is_bissan) |
| ai_response = self.send_to_main_ai(user_message, system_prompt, chat_history) |
|
|
| if ai_response is None: |
| return {"response": None, "sheet_number": sheet_number} |
|
|
| ai_response = self.normalize_text(ai_response) |
| ai_response = self.unescape_html_inside_mcq(ai_response) |
|
|
| formatted_response = self.format_code_blocks(ai_response) |
| return {"response": formatted_response, "sheet_number": sheet_number} |
|
|
|
|
| ai_app = AIStudyApp() |
|
|
|
|
| |
| |
| |
| @app.before_request |
| def ensure_session(): |
| |
| session.permanent = True |
|
|
| if "sid" not in session: |
| session["sid"] = secrets.token_hex(12) |
|
|
| sid = session["sid"] |
| if sid not in CHAT_STORE: |
| CHAT_STORE[sid] = [] |
|
|
| |
| qname = request.args.get("name", type=str) |
| if qname is not None: |
| qname = qname.strip() |
| if qname: |
| session["user_name"] = qname |
|
|
|
|
| |
| |
| |
| @app.route("/") |
| def index(): |
| return render_template("index.html") |
|
|
|
|
| @app.route("/admin") |
| def admin_panel(): |
| """Admin panel to view online users""" |
| return render_template("admin.html") |
|
|
|
|
| @app.route("/session_info", methods=["GET"]) |
| def session_info(): |
| """ |
| Frontend calls this to know: |
| - display name to show instead of "أنت" |
| - avatar style (default blue USER emoji vs bissan flower pink) |
| """ |
| return jsonify( |
| { |
| "sid": session.get("sid"), |
| "user": get_user_profile(), |
| } |
| ) |
|
|
|
|
| @app.route("/clear_history", methods=["POST"]) |
| def clear_history(): |
| sid = session["sid"] |
| CHAT_STORE[sid] = [] |
| return jsonify({"success": True, "message": "تم مسح المحادثة"}) |
|
|
|
|
| @app.route("/get_history", methods=["GET"]) |
| def get_history(): |
| sid = session["sid"] |
| return jsonify({"history": CHAT_STORE.get(sid, [])}) |
|
|
|
|
| |
| |
| |
| @app.route("/bissan", methods=["GET"]) |
| def bissan_history(): |
| """ |
| Returns the current session chat as JSON. |
| """ |
| sid = session["sid"] |
| profile = get_user_profile() |
| return jsonify( |
| { |
| "sid": sid, |
| "user": profile, |
| "history": CHAT_STORE.get(sid, []), |
| } |
| ) |
|
|
|
|
| @app.route("/bissan/download", methods=["GET"]) |
| def bissan_download(): |
| """ |
| Download chat as JSON file (UTF-8 Arabic safe). |
| """ |
| sid = session["sid"] |
| profile = get_user_profile() |
|
|
| payload = { |
| "sid": sid, |
| "exported_at": datetime.utcnow().isoformat() + "Z", |
| "user": profile, |
| "history": CHAT_STORE.get(sid, []), |
| } |
|
|
| data = json.dumps(payload, ensure_ascii=False, indent=2) |
| filename = f"chat_{sid}.json" |
|
|
| return Response( |
| data, |
| mimetype="application/json; charset=utf-8", |
| headers={"Content-Disposition": f'attachment; filename="{filename}"'}, |
| ) |
|
|
|
|
| |
| |
| |
| @socketio.on('connect') |
| def handle_connect(): |
| """User connected via socket""" |
| sid = session.get("sid") |
| profile = get_user_profile() |
| user_name = profile["display_name"] |
|
|
| |
| ONLINE_USERS[request.sid] = { |
| "sid": sid, |
| "name": user_name, |
| "connected_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
| } |
|
|
| print(f"✅ User connected: {user_name} (socket: {request.sid})") |
|
|
| |
| broadcast_online_count() |
|
|
| emit('connected', { |
| 'status': 'success', |
| 'user': profile, |
| 'sid': sid |
| }) |
|
|
|
|
| @socketio.on('disconnect') |
| def handle_disconnect(): |
| """User disconnected""" |
| if request.sid in ONLINE_USERS: |
| user_info = ONLINE_USERS[request.sid] |
| print(f"❌ User disconnected: {user_info.get('name')} (socket: {request.sid})") |
| del ONLINE_USERS[request.sid] |
|
|
| |
| broadcast_online_count() |
|
|
|
|
| @socketio.on('send_message') |
| def handle_send_message(data): |
| """Handle incoming message via socket""" |
| try: |
| |
| original_user_message = (data.get("message") or "").strip() |
|
|
| if not original_user_message: |
| emit('error', {"error": "الرسالة فارغة"}) |
| return |
|
|
| sid = session["sid"] |
| profile = get_user_profile() |
| is_bissan = profile["is_bissan"] |
|
|
| |
| stored = CHAT_STORE.get(sid, []) |
| chat_history = [{"role": m["role"], "content": m["content"]} for m in stored] |
|
|
| |
| user_name = profile["display_name"] |
| message_for_ai = f"{user_name}: {original_user_message}" |
|
|
| |
| emit('assistant_thinking', {'status': 'thinking'}) |
|
|
| |
| result = ai_app.process_message(message_for_ai, chat_history, is_bissan=is_bissan) |
|
|
| if result["response"]: |
| ts = datetime.now().strftime("%H:%M") |
|
|
| |
| CHAT_STORE[sid].append( |
| { |
| "role": "user", |
| "content": original_user_message, |
| "ts": ts, |
| "sheet": result["sheet_number"], |
| "user_name": profile["display_name"], |
| } |
| ) |
|
|
| |
| CHAT_STORE[sid].append( |
| { |
| "role": "assistant", |
| "content": result["response"], |
| "ts": ts, |
| "sheet": result["sheet_number"], |
| } |
| ) |
|
|
| |
| emit('message_response', { |
| "success": True, |
| "response": result["response"], |
| "sheet_number": result["sheet_number"], |
| "timestamp": ts, |
| "user": profile, |
| }) |
| else: |
| emit('error', {"error": "فشل الحصول على رد من الذكاء الاصطناعي"}) |
|
|
| except Exception as e: |
| print(f"Error in handle_send_message: {e}") |
| emit('error', {"error": "حدث خطأ غير متوقع"}) |
|
|
|
|
| @socketio.on('clear_chat') |
| def handle_clear_chat(): |
| """Clear chat history via socket""" |
| sid = session["sid"] |
| CHAT_STORE[sid] = [] |
| emit('chat_cleared', {"success": True, "message": "تم مسح المحادثة"}) |
|
|
|
|
| @socketio.on('get_history') |
| def handle_get_history(): |
| """Get chat history via socket""" |
| sid = session["sid"] |
| emit('history_data', {"history": CHAT_STORE.get(sid, [])}) |
|
|
|
|
| |
| |
| |
| @socketio.on('connect', namespace='/admin') |
| def handle_admin_connect(): |
| """Admin connected""" |
| print(f"👑 Admin connected: {request.sid}") |
|
|
| |
| count = len(ONLINE_USERS) |
| users_list = get_online_users_list() |
|
|
| emit('online_update', { |
| 'count': count, |
| 'users': users_list |
| }) |
|
|
|
|
| @socketio.on('disconnect', namespace='/admin') |
| def handle_admin_disconnect(): |
| """Admin disconnected""" |
| print(f"👑 Admin disconnected: {request.sid}") |
|
|
|
|
| @socketio.on('request_update', namespace='/admin') |
| def handle_admin_request_update(): |
| """Admin requests manual update""" |
| count = len(ONLINE_USERS) |
| users_list = get_online_users_list() |
|
|
| emit('online_update', { |
| 'count': count, |
| 'users': users_list |
| }) |
|
|
|
|
| if __name__ == "__main__": |
| |
| socketio.run(app, debug=True, host="0.0.0.0", port=7860, allow_unsafe_werkzeug=True) |
|
|
| |
| |