| import gradio as gr |
| import time |
| from datetime import datetime |
| from collections import deque |
| import html |
|
|
| |
| class ChatServer: |
| def __init__(self): |
| |
| self.channels = {"Main": deque(maxlen=50)} |
| self.active_users = {} |
| self.dms = {} |
| |
| self.user_rooms = {} |
|
|
| def prune_server(self): |
| """Remove inactive users and empty rooms.""" |
| now = time.time() |
| |
| expired_users = [u for u, data in self.active_users.items() if now - data['last_seen'] > 30] |
| for u in expired_users: |
| if u in self.active_users: |
| del self.active_users[u] |
| if u in self.user_rooms: |
| del self.user_rooms[u] |
|
|
| |
| active_room_names = set(self.user_rooms.values()) |
| rooms_to_delete = [] |
| |
| for room_name in self.channels.keys(): |
| if room_name != "Main" and room_name not in active_room_names: |
| rooms_to_delete.append(room_name) |
| |
| for room in rooms_to_delete: |
| del self.channels[room] |
|
|
| def _sanitize(self, text): |
| """Prevents HTML injection by escaping user input.""" |
| escaped = html.escape(text) |
| return escaped.replace("\n", "<br>") |
|
|
| def _add_message_to_queue(self, queue, user, message, profile_pic): |
| safe_text = self._sanitize(message) |
| |
| |
| if queue and queue[-1]['user'] == user and queue[-1]['text'] == safe_text: |
| queue[-1]['count'] = queue[-1].get('count', 1) + 1 |
| queue[-1]['time'] = datetime.now().strftime("%H:%M") |
| else: |
| msg_data = { |
| "user": user, |
| "text": safe_text, |
| "pic": profile_pic, |
| "time": datetime.now().strftime("%H:%M"), |
| "count": 1 |
| } |
| queue.append(msg_data) |
|
|
| def broadcast(self, channel, user, message, profile_pic): |
| if channel not in self.channels: |
| self.channels[channel] = deque(maxlen=50) |
| self._add_message_to_queue(self.channels[channel], user, message, profile_pic) |
| |
| def send_dm(self, sender, recipient, message, profile_pic): |
| dm_key = "-".join(sorted([sender, recipient])) |
| if dm_key not in self.dms: |
| self.dms[dm_key] = deque(maxlen=50) |
| self._add_message_to_queue(self.dms[dm_key], sender, message, profile_pic) |
|
|
| server = ChatServer() |
|
|
| |
|
|
| def get_messages_html(messages): |
| if not messages: |
| return "<div style='color: gray; text-align: center; margin-top: 20px;'>No messages yet.</div>" |
| |
| html_output = "<div id='chat-flow' style='display: flex; flex-direction: column; gap: 12px; font-family: sans-serif;'>" |
| for m in messages: |
| pic_url = m.get('pic') or "https://huggingface.co/front/assets/huggingface_logo-noborder.svg" |
| display_text = m['text'] |
| if m.get('count', 1) > 1: |
| display_text += f" <span style='color: #888; font-size: 0.8em; font-weight: normal;'>(x{m['count']})</span>" |
| |
| html_output += f""" |
| <div class="msg-row" style="display: flex; align-items: flex-start; gap: 10px; margin-bottom: 4px;"> |
| <img src="{pic_url}" style="width: 36px; height: 36px; border-radius: 50%; border: 1px solid rgba(255,255,255,0.1); flex-shrink: 0;"> |
| <div style="background: rgba(255, 255, 255, 0.07); padding: 10px 14px; border-radius: 0 16px 16px 16px; max-width: 85%;"> |
| <div style="font-weight: bold; font-size: 0.85em; margin-bottom: 4px; color: #ff9d00; display: flex; align-items: center; gap: 8px;"> |
| {m['user']} <span style="font-weight: normal; color: #888; font-size: 0.85em;">{m['time']}</span> |
| </div> |
| <div style="font-size: 0.95em; line-height: 1.4; color: #eee; word-break: break-word;"> |
| {display_text} |
| </div> |
| </div> |
| </div> |
| """ |
| |
| html_output += """ |
| </div> |
| <script> |
| (function() { |
| const container = document.getElementById('chat-container'); |
| if (container) { |
| container.scrollTo({ top: container.scrollHeight, behavior: 'smooth' }); |
| } |
| })(); |
| </script> |
| """ |
| return html_output |
|
|
| def get_active_users_html(current_user): |
| server.prune_server() |
| html_list = "<div style='display: flex; flex-direction: column; gap: 8px; font-family: sans-serif;'>" |
| for user, data in server.active_users.items(): |
| pic = data.get('pic') or "https://huggingface.co/front/assets/huggingface_logo-noborder.svg" |
| label = f"{user}" + (" (You)" if user == current_user else "") |
| html_list += f""" |
| <div style="display: flex; align-items: center; gap: 10px; font-size: 0.85em; padding: 4px;"> |
| <img src="{pic}" style="width: 22px; height: 22px; border-radius: 50%;"> |
| <span style="color: #ccc;">{label}</span> |
| <div style="width: 8px; height: 8px; background: #4ade80; border-radius: 50%; margin-left: auto;"></div> |
| </div> |
| """ |
| return html_list + "</div>" |
|
|
| |
| custom_css = """ |
| @import url('https://fonts.googleapis.com/css2?family=Inter:wght@400;600;700&display=swap'); |
| |
| body, .gradio-container, .main-title, #chat-container, #side-panel, textarea, button { |
| font-family: 'Inter', -apple-system, BlinkMacSystemFont, "Segoe UI", Helvetica, Arial, sans-serif !important; |
| } |
| |
| #chat-container { |
| height: 60vh !important; |
| overflow-y: auto !important; |
| border: 1px solid rgba(255,255,255,0.1) !important; |
| padding: 15px; |
| border-radius: 12px; |
| background: #0f1117 !important; |
| } |
| |
| #side-panel { |
| border-right: 1px solid rgba(255,255,255,0.05); |
| padding-right: 15px !important; |
| } |
| |
| .login-button { |
| padding: 6px 16px !important; |
| font-size: 0.85em !important; |
| border-radius: 30px !important; |
| background: #252a33 !important; |
| border: 1px solid #3f444d !important; |
| } |
| |
| #msg-input textarea { |
| background: #161b22 !important; |
| border: 1px solid #30363d !important; |
| border-radius: 10px !important; |
| padding: 12px !important; |
| } |
| |
| .main-title h3 { |
| font-size: 1.8em !important; |
| font-weight: 700 !important; |
| letter-spacing: -0.02em; |
| } |
| """ |
|
|
| with gr.Blocks(css=custom_css) as demo: |
| user_session = gr.State(None) |
| current_room = gr.State("Main") |
| dm_target = gr.State(None) |
|
|
| with gr.Row(elem_classes="main-title"): |
| gr.Markdown("### 🤗 HF Community Chat") |
| login_btn = gr.LoginButton(scale=0, elem_classes="login-button") |
|
|
| with gr.Row(visible=False) as chat_ui: |
| with gr.Column(scale=3, elem_id="side-panel"): |
| gr.Markdown("🏠 **Rooms**") |
| room_list = gr.Radio(["Main"], value="Main", label=None) |
| |
| with gr.Row(): |
| new_room_name = gr.Textbox(placeholder="New room...", label=None, show_label=False, scale=2) |
| create_room_btn = gr.Button("Join", scale=1) |
| |
| gr.Markdown("✉️ **Direct Messages**") |
| dm_user_input = gr.Textbox(placeholder="Username...", label=None, show_label=False) |
| start_dm_btn = gr.Button("Open DM", size="sm") |
| |
| gr.Markdown("👥 **Online Now**") |
| active_users_box = gr.HTML() |
|
|
| with gr.Column(scale=9): |
| room_title = gr.Markdown("## # Main") |
| chat_display = gr.HTML(elem_id="chat-container") |
| |
| with gr.Row(): |
| msg_input = gr.Textbox( |
| placeholder="Type a message...", |
| show_label=False, |
| scale=10, |
| elem_id="msg-input" |
| ) |
| send_btn = gr.Button("Send", scale=2, variant="primary") |
|
|
| def on_load(profile: gr.OAuthProfile | None): |
| if profile is None: return gr.update(visible=False), None |
| username = profile.username |
| pic = getattr(profile, 'picture', None) or "https://huggingface.co/front/assets/huggingface_logo-noborder.svg" |
| server.active_users[username] = {"last_seen": time.time(), "pic": pic} |
| server.user_rooms[username] = "Main" |
| return gr.update(visible=True), {"name": username, "pic": pic} |
|
|
| demo.load(on_load, None, [chat_ui, user_session]) |
|
|
| def refresh_chat(session, room, dm_user): |
| if not session: return "", "", "", gr.update() |
| |
| |
| server.active_users[session['name']]['last_seen'] = time.time() |
| server.user_rooms[session['name']] = room if not dm_user else "DM" |
| |
| |
| server.prune_server() |
| |
| |
| |
| current_rooms = list(server.channels.keys()) |
| active_room = room if room in current_rooms else "Main" |
| |
| if dm_user: |
| dm_key = "-".join(sorted([session['name'], dm_user])) |
| msgs = list(server.dms.get(dm_key, [])) |
| title = f"## 💬 DM: {dm_user}" |
| else: |
| msgs = list(server.channels.get(active_room, [])) |
| title = f"## # {active_room}" |
| |
| return ( |
| get_messages_html(msgs), |
| get_active_users_html(session['name']), |
| title, |
| gr.update(choices=current_rooms, value=active_room) |
| ) |
|
|
| timer = gr.Timer(1) |
| timer.tick(refresh_chat, [user_session, current_room, dm_target], [chat_display, active_users_box, room_title, room_list]) |
|
|
| def send_msg(text, session, room, dm_user): |
| if not text or not session: return "" |
| if dm_user: |
| server.send_dm(session['name'], dm_user, text, session['pic']) |
| else: |
| server.broadcast(room, session['name'], text, session['pic']) |
| return "" |
|
|
| send_btn.click(send_msg, [msg_input, user_session, current_room, dm_target], [msg_input]) |
| msg_input.submit(send_msg, [msg_input, user_session, current_room, dm_target], [msg_input]) |
|
|
| def create_and_join(name, session): |
| if not name or not session: return gr.update(), "Main", None |
| clean_name = name.strip().replace(" ", "-") |
| if clean_name not in server.channels: |
| server.channels[clean_name] = deque(maxlen=50) |
| |
| |
| server.user_rooms[session['name']] = clean_name |
| return gr.update(choices=list(server.channels.keys()), value=clean_name), clean_name, None |
|
|
| create_room_btn.click(create_and_join, [new_room_name, user_session], [room_list, current_room, dm_target]) |
| |
| def on_room_change(name, session): |
| if session: |
| server.user_rooms[session['name']] = name |
| return name, None |
|
|
| room_list.change(on_room_change, [room_list, user_session], [current_room, dm_target]) |
| |
| def start_dm(target, session): |
| if session: |
| server.user_rooms[session['name']] = "DM" |
| return "Main", target |
|
|
| start_dm_btn.click(start_dm, [dm_user_input, user_session], [current_room, dm_target]) |
|
|
| demo.launch() |