itgs226 / app.py
Dooratre's picture
Update app.py
81dfe44 verified
from flask import Flask, render_template, request, jsonify, session, Response
from flask_socketio import SocketIO, emit, disconnect
import os
import requests
from datetime import datetime, timedelta
import secrets
import re
import html
import json
import unicodedata
from ai_forward import AIForwarder
app = Flask(__name__)
app.secret_key = os.environ.get("SECRET_KEY", secrets.token_hex(32))
app.config.update(
SESSION_COOKIE_HTTPONLY=True,
SESSION_COOKIE_SAMESITE="Lax",
SESSION_COOKIE_SECURE=False, # set True behind HTTPS
PERMANENT_SESSION_LIFETIME=timedelta(days=365), # 1 year persistent
)
# Initialize SocketIO
socketio = SocketIO(app, cors_allowed_origins="*", manage_session=False)
# -------------------------
# In-memory chat store
# -------------------------
# Key: session["sid"] Value: list of messages
# Each message: {"role": "...", "content": "...", "ts": "...", "sheet": "..."}
CHAT_STORE: dict[str, list[dict]] = {}
# -------------------------
# Online users tracking
# -------------------------
# Key: socket_id, Value: {"sid": session_id, "name": user_name, "connected_at": timestamp}
ONLINE_USERS: dict[str, dict] = {}
# -------------------------
# Helpers: name + bissan detection
# -------------------------
def normalize_ar_name(s: str) -> str:
"""
Normalize Arabic/Latin name for matching:
- strip spaces
- lowercase
- remove Arabic diacritics (tashkeel)
- normalize unicode
"""
if not s:
return ""
s = s.strip()
s = unicodedata.normalize("NFKC", s)
s = s.lower()
# remove Arabic diacritics
# (harakat range + some marks)
s = re.sub(r"[\u0610-\u061A\u064B-\u065F\u0670\u06D6-\u06ED]", "", s)
# collapse spaces
s = re.sub(r"\s+", " ", s).strip()
return s
def is_bissan_name(name: str) -> bool:
n = normalize_ar_name(name)
# Accept: "بيسان" or "بِيسان" (diacritics removed) or "bissan"
return n in {"بِيسان", "بيسان", "bissan"}
def get_user_profile():
"""
Returns a dict describing how frontend should display the user:
- display_name: what to show in UI
- avatar_type: "default" or "bissan"
- avatar_bg: hint for UI
- emoji: default emoji for non-bissan
"""
name = session.get("user_name") or "أنت"
if is_bissan_name(name):
return {
"display_name": name,
"is_bissan": True,
"avatar_type": "flower",
"avatar_bg": "pink",
"emoji": None,
}
return {
"display_name": name,
"is_bissan": False,
"avatar_type": "emoji",
"avatar_bg": "blue",
"emoji": "USER",
}
def get_online_users_list():
"""Returns list of online users with their info"""
users = []
for socket_id, user_info in ONLINE_USERS.items():
users.append({
"socket_id": socket_id,
"name": user_info.get("name", "أنت"),
"sid": user_info.get("sid", ""),
"connected_at": user_info.get("connected_at", ""),
})
return users
def broadcast_online_count():
"""Broadcast online users count to admin panel"""
count = len(ONLINE_USERS)
users_list = get_online_users_list()
socketio.emit('online_update', {
'count': count,
'users': users_list
}, namespace='/admin')
# -------------------------
# AI App
# -------------------------
class AIStudyApp:
def __init__(self, api_url="https://dooratre-xx-gpt-52.hf.space/chat", data_folder="data"):
self.api_url = api_url
self.data_folder = data_folder
self.forwarder = AIForwarder(api_url)
def load_txt_file(self, filename):
filepath = os.path.join(self.data_folder, filename)
try:
with open(filepath, "r", encoding="utf-8") as f:
return f.read()
except FileNotFoundError:
print(f"Warning: {filename} not found")
return ""
def build_system_prompt(self, sheet_number: str, is_bissan: bool):
"""
Normal users: system.txt + {sheet}.txt
Bissan user: bissan.txt + {sheet}.txt (NO system.txt)
"""
base_file = "bissan.txt" if is_bissan else "system.txt"
system_content = self.load_txt_file(base_file)
sheet_content = self.load_txt_file(f"{sheet_number}.txt")
return system_content + "\n\n" + sheet_content
def send_to_main_ai(self, user_message, system_prompt, chat_history):
full_chat_history = [{"role": "system", "content": system_prompt}]
full_chat_history.extend(chat_history)
full_chat_history.append({"role": "user", "content": user_message})
payload = {
"user_input": user_message,
"chat_history": full_chat_history,
"temperature": 0.9,
"top_p": 0.95,
}
try:
response = requests.post(
self.api_url,
json=payload,
headers={"Content-Type": "application/json"},
timeout=45,
)
response.raise_for_status()
result = response.json()
return result.get("assistant_response", "")
except Exception as e:
print(f"Error calling main AI: {e}")
return None
def normalize_text(self, text: str) -> str:
if not text:
return ""
text = re.sub(r"[\u200B-\u200F\u202A-\u202E\u2066-\u2069]", "", text)
text = text.replace("\r\n", "\n").replace("\r", "\n")
text = re.sub(r"\n{3,}", "\n\n", text)
text = "\n".join([ln.rstrip() for ln in text.split("\n")]).strip()
return text
def format_code_blocks(self, text: str) -> str:
pattern = r"```(\w+)?\n(.*?)```"
def replace_code_block(match):
language = match.group(1) if match.group(1) else "code"
code_content = match.group(2)
return f"<code_{language}>\n{code_content}\n</code_{language}>"
return re.sub(pattern, replace_code_block, text, flags=re.DOTALL)
def unescape_html_inside_mcq(self, text: str) -> str:
if not text:
return ""
mcq_pattern = r"(<mcq>.*?</mcq>)"
def _fix_block(match):
block = match.group(1)
return html.unescape(block)
return re.sub(mcq_pattern, _fix_block, text, flags=re.DOTALL | re.IGNORECASE)
def process_message(self, user_message, chat_history, is_bissan: bool):
temp_history = chat_history + [{"role": "user", "content": user_message}]
forward_result = self.forwarder.process_chat_history(temp_history)
sheet_number = forward_result.get("sheet_number", "1")
system_prompt = self.build_system_prompt(sheet_number, is_bissan=is_bissan)
ai_response = self.send_to_main_ai(user_message, system_prompt, chat_history)
if ai_response is None:
return {"response": None, "sheet_number": sheet_number}
ai_response = self.normalize_text(ai_response)
ai_response = self.unescape_html_inside_mcq(ai_response)
formatted_response = self.format_code_blocks(ai_response)
return {"response": formatted_response, "sheet_number": sheet_number}
ai_app = AIStudyApp()
# -------------------------
# Session init + name from URL
# -------------------------
@app.before_request
def ensure_session():
# Make session permanent (saved in browser cookie for 1 year)
session.permanent = True
if "sid" not in session:
session["sid"] = secrets.token_hex(12)
sid = session["sid"]
if sid not in CHAT_STORE:
CHAT_STORE[sid] = []
# Capture name from URL like: /?name=Ahmed
qname = request.args.get("name", type=str)
if qname is not None:
qname = qname.strip()
if qname:
session["user_name"] = qname
# -------------------------
# Routes
# -------------------------
@app.route("/")
def index():
return render_template("index.html")
@app.route("/admin")
def admin_panel():
"""Admin panel to view online users"""
return render_template("admin.html")
@app.route("/session_info", methods=["GET"])
def session_info():
"""
Frontend calls this to know:
- display name to show instead of "أنت"
- avatar style (default blue USER emoji vs bissan flower pink)
"""
return jsonify(
{
"sid": session.get("sid"),
"user": get_user_profile(),
}
)
@app.route("/clear_history", methods=["POST"])
def clear_history():
sid = session["sid"]
CHAT_STORE[sid] = []
return jsonify({"success": True, "message": "تم مسح المحادثة"})
@app.route("/get_history", methods=["GET"])
def get_history():
sid = session["sid"]
return jsonify({"history": CHAT_STORE.get(sid, [])})
# -------------------------
# Bissan routes (view + download)
# -------------------------
@app.route("/bissan", methods=["GET"])
def bissan_history():
"""
Returns the current session chat as JSON.
"""
sid = session["sid"]
profile = get_user_profile()
return jsonify(
{
"sid": sid,
"user": profile,
"history": CHAT_STORE.get(sid, []),
}
)
@app.route("/bissan/download", methods=["GET"])
def bissan_download():
"""
Download chat as JSON file (UTF-8 Arabic safe).
"""
sid = session["sid"]
profile = get_user_profile()
payload = {
"sid": sid,
"exported_at": datetime.utcnow().isoformat() + "Z",
"user": profile,
"history": CHAT_STORE.get(sid, []),
}
data = json.dumps(payload, ensure_ascii=False, indent=2)
filename = f"chat_{sid}.json"
return Response(
data,
mimetype="application/json; charset=utf-8",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
# -------------------------
# SocketIO Events - Main Chat
# -------------------------
@socketio.on('connect')
def handle_connect():
"""User connected via socket"""
sid = session.get("sid")
profile = get_user_profile()
user_name = profile["display_name"]
# Track online user
ONLINE_USERS[request.sid] = {
"sid": sid,
"name": user_name,
"connected_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
print(f"✅ User connected: {user_name} (socket: {request.sid})")
# Broadcast to admin
broadcast_online_count()
emit('connected', {
'status': 'success',
'user': profile,
'sid': sid
})
@socketio.on('disconnect')
def handle_disconnect():
"""User disconnected"""
if request.sid in ONLINE_USERS:
user_info = ONLINE_USERS[request.sid]
print(f"❌ User disconnected: {user_info.get('name')} (socket: {request.sid})")
del ONLINE_USERS[request.sid]
# Broadcast to admin
broadcast_online_count()
@socketio.on('send_message')
def handle_send_message(data):
"""Handle incoming message via socket"""
try:
# ✅ ORIGINAL message from frontend (WITHOUT name prefix)
original_user_message = (data.get("message") or "").strip()
if not original_user_message:
emit('error', {"error": "الرسالة فارغة"})
return
sid = session["sid"]
profile = get_user_profile()
is_bissan = profile["is_bissan"]
# Build chat_history for AI from stored messages (role/content only)
stored = CHAT_STORE.get(sid, [])
chat_history = [{"role": m["role"], "content": m["content"]} for m in stored]
# ✅ Build message WITH name prefix for AI ONLY
user_name = profile["display_name"]
message_for_ai = f"{user_name}: {original_user_message}"
# Emit "thinking" status
emit('assistant_thinking', {'status': 'thinking'})
# ✅ Send message WITH name to AI
result = ai_app.process_message(message_for_ai, chat_history, is_bissan=is_bissan)
if result["response"]:
ts = datetime.now().strftime("%H:%M")
# ✅ Save ORIGINAL message (WITHOUT name prefix) to CHAT_STORE
CHAT_STORE[sid].append(
{
"role": "user",
"content": original_user_message, # ← Save without name
"ts": ts,
"sheet": result["sheet_number"],
"user_name": profile["display_name"],
}
)
# Save assistant message
CHAT_STORE[sid].append(
{
"role": "assistant",
"content": result["response"],
"ts": ts,
"sheet": result["sheet_number"],
}
)
# Emit response back to user
emit('message_response', {
"success": True,
"response": result["response"],
"sheet_number": result["sheet_number"],
"timestamp": ts,
"user": profile,
})
else:
emit('error', {"error": "فشل الحصول على رد من الذكاء الاصطناعي"})
except Exception as e:
print(f"Error in handle_send_message: {e}")
emit('error', {"error": "حدث خطأ غير متوقع"})
@socketio.on('clear_chat')
def handle_clear_chat():
"""Clear chat history via socket"""
sid = session["sid"]
CHAT_STORE[sid] = []
emit('chat_cleared', {"success": True, "message": "تم مسح المحادثة"})
@socketio.on('get_history')
def handle_get_history():
"""Get chat history via socket"""
sid = session["sid"]
emit('history_data', {"history": CHAT_STORE.get(sid, [])})
# -------------------------
# SocketIO Events - Admin Panel
# -------------------------
@socketio.on('connect', namespace='/admin')
def handle_admin_connect():
"""Admin connected"""
print(f"👑 Admin connected: {request.sid}")
# Send current online users immediately
count = len(ONLINE_USERS)
users_list = get_online_users_list()
emit('online_update', {
'count': count,
'users': users_list
})
@socketio.on('disconnect', namespace='/admin')
def handle_admin_disconnect():
"""Admin disconnected"""
print(f"👑 Admin disconnected: {request.sid}")
@socketio.on('request_update', namespace='/admin')
def handle_admin_request_update():
"""Admin requests manual update"""
count = len(ONLINE_USERS)
users_list = get_online_users_list()
emit('online_update', {
'count': count,
'users': users_list
})
if __name__ == "__main__":
# Development mode with unsafe werkzeug allowed
socketio.run(app, debug=True, host="0.0.0.0", port=7860, allow_unsafe_werkzeug=True)
# OR for production (after installing eventlet):
# socketio.run(app, debug=False, host="0.0.0.0", port=7860)