import gradio as gr import time from datetime import datetime from collections import deque import html # --- GLOBAL SERVER STATE (In-Memory Only) --- class ChatServer: def __init__(self): # Default channel self.channels = {"Main": deque(maxlen=50)} self.active_users = {} self.dms = {} # Tracks which room each user is currently viewing self.user_rooms = {} def prune_server(self): """Remove inactive users and empty rooms.""" now = time.time() # 1. Prune Users (30 second timeout) expired_users = [u for u, data in self.active_users.items() if now - data['last_seen'] > 30] for u in expired_users: if u in self.active_users: del self.active_users[u] if u in self.user_rooms: del self.user_rooms[u] # 2. Prune Rooms (Delete rooms with 0 active users, except Main) active_room_names = set(self.user_rooms.values()) rooms_to_delete = [] for room_name in self.channels.keys(): if room_name != "Main" and room_name not in active_room_names: rooms_to_delete.append(room_name) for room in rooms_to_delete: del self.channels[room] def _sanitize(self, text): """Prevents HTML injection by escaping user input.""" escaped = html.escape(text) return escaped.replace("\n", "
") def _add_message_to_queue(self, queue, user, message, profile_pic): safe_text = self._sanitize(message) # Stacking Logic: Check if last message was same user and same text if queue and queue[-1]['user'] == user and queue[-1]['text'] == safe_text: queue[-1]['count'] = queue[-1].get('count', 1) + 1 queue[-1]['time'] = datetime.now().strftime("%H:%M") else: msg_data = { "user": user, "text": safe_text, "pic": profile_pic, "time": datetime.now().strftime("%H:%M"), "count": 1 } queue.append(msg_data) def broadcast(self, channel, user, message, profile_pic): if channel not in self.channels: self.channels[channel] = deque(maxlen=50) self._add_message_to_queue(self.channels[channel], user, message, profile_pic) def send_dm(self, sender, recipient, message, profile_pic): dm_key = "-".join(sorted([sender, recipient])) if dm_key not in self.dms: self.dms[dm_key] = deque(maxlen=50) self._add_message_to_queue(self.dms[dm_key], sender, message, profile_pic) server = ChatServer() # --- UTILS --- def get_messages_html(messages): if not messages: return "
No messages yet.
" html_output = "
" for m in messages: pic_url = m.get('pic') or "https://huggingface.co/front/assets/huggingface_logo-noborder.svg" display_text = m['text'] if m.get('count', 1) > 1: display_text += f" (x{m['count']})" html_output += f"""
{m['user']} {m['time']}
{display_text}
""" html_output += """
""" return html_output def get_active_users_html(current_user): server.prune_server() html_list = "
" for user, data in server.active_users.items(): pic = data.get('pic') or "https://huggingface.co/front/assets/huggingface_logo-noborder.svg" label = f"{user}" + (" (You)" if user == current_user else "") html_list += f"""
{label}
""" return html_list + "
" # --- CSS --- custom_css = """ @import url('https://fonts.googleapis.com/css2?family=Inter:wght@400;600;700&display=swap'); body, .gradio-container, .main-title, #chat-container, #side-panel, textarea, button { font-family: 'Inter', -apple-system, BlinkMacSystemFont, "Segoe UI", Helvetica, Arial, sans-serif !important; } #chat-container { height: 60vh !important; overflow-y: auto !important; border: 1px solid rgba(255,255,255,0.1) !important; padding: 15px; border-radius: 12px; background: #0f1117 !important; } #side-panel { border-right: 1px solid rgba(255,255,255,0.05); padding-right: 15px !important; } .login-button { padding: 6px 16px !important; font-size: 0.85em !important; border-radius: 30px !important; background: #252a33 !important; border: 1px solid #3f444d !important; } #msg-input textarea { background: #161b22 !important; border: 1px solid #30363d !important; border-radius: 10px !important; padding: 12px !important; } .main-title h3 { font-size: 1.8em !important; font-weight: 700 !important; letter-spacing: -0.02em; } """ with gr.Blocks(css=custom_css) as demo: user_session = gr.State(None) current_room = gr.State("Main") dm_target = gr.State(None) with gr.Row(elem_classes="main-title"): gr.Markdown("### 🤗 HF Community Chat") login_btn = gr.LoginButton(scale=0, elem_classes="login-button") with gr.Row(visible=False) as chat_ui: with gr.Column(scale=3, elem_id="side-panel"): gr.Markdown("🏠 **Rooms**") room_list = gr.Radio(["Main"], value="Main", label=None) with gr.Row(): new_room_name = gr.Textbox(placeholder="New room...", label=None, show_label=False, scale=2) create_room_btn = gr.Button("Join", scale=1) gr.Markdown("✉️ **Direct Messages**") dm_user_input = gr.Textbox(placeholder="Username...", label=None, show_label=False) start_dm_btn = gr.Button("Open DM", size="sm") gr.Markdown("👥 **Online Now**") active_users_box = gr.HTML() with gr.Column(scale=9): room_title = gr.Markdown("## # Main") chat_display = gr.HTML(elem_id="chat-container") with gr.Row(): msg_input = gr.Textbox( placeholder="Type a message...", show_label=False, scale=10, elem_id="msg-input" ) send_btn = gr.Button("Send", scale=2, variant="primary") def on_load(profile: gr.OAuthProfile | None): if profile is None: return gr.update(visible=False), None username = profile.username pic = getattr(profile, 'picture', None) or "https://huggingface.co/front/assets/huggingface_logo-noborder.svg" server.active_users[username] = {"last_seen": time.time(), "pic": pic} server.user_rooms[username] = "Main" return gr.update(visible=True), {"name": username, "pic": pic} demo.load(on_load, None, [chat_ui, user_session]) def refresh_chat(session, room, dm_user): if not session: return "", "", "", gr.update() # Update user's last seen and current room server.active_users[session['name']]['last_seen'] = time.time() server.user_rooms[session['name']] = room if not dm_user else "DM" # Cleanup inactive users and empty rooms server.prune_server() # Prepare Room List (Radio) choices # If the current room was deleted while we were in it (rare), fallback to Main current_rooms = list(server.channels.keys()) active_room = room if room in current_rooms else "Main" if dm_user: dm_key = "-".join(sorted([session['name'], dm_user])) msgs = list(server.dms.get(dm_key, [])) title = f"## 💬 DM: {dm_user}" else: msgs = list(server.channels.get(active_room, [])) title = f"## # {active_room}" return ( get_messages_html(msgs), get_active_users_html(session['name']), title, gr.update(choices=current_rooms, value=active_room) ) timer = gr.Timer(1) timer.tick(refresh_chat, [user_session, current_room, dm_target], [chat_display, active_users_box, room_title, room_list]) def send_msg(text, session, room, dm_user): if not text or not session: return "" if dm_user: server.send_dm(session['name'], dm_user, text, session['pic']) else: server.broadcast(room, session['name'], text, session['pic']) return "" send_btn.click(send_msg, [msg_input, user_session, current_room, dm_target], [msg_input]) msg_input.submit(send_msg, [msg_input, user_session, current_room, dm_target], [msg_input]) def create_and_join(name, session): if not name or not session: return gr.update(), "Main", None clean_name = name.strip().replace(" ", "-") if clean_name not in server.channels: server.channels[clean_name] = deque(maxlen=50) # Update the user's room tracking immediately server.user_rooms[session['name']] = clean_name return gr.update(choices=list(server.channels.keys()), value=clean_name), clean_name, None create_room_btn.click(create_and_join, [new_room_name, user_session], [room_list, current_room, dm_target]) def on_room_change(name, session): if session: server.user_rooms[session['name']] = name return name, None room_list.change(on_room_change, [room_list, user_session], [current_room, dm_target]) def start_dm(target, session): if session: server.user_rooms[session['name']] = "DM" return "Main", target start_dm_btn.click(start_dm, [dm_user_input, user_session], [current_room, dm_target]) demo.launch()