import gradio as gr
import time
from datetime import datetime
from collections import deque
import html
# --- GLOBAL SERVER STATE (In-Memory Only) ---
class ChatServer:
def __init__(self):
# Default channel
self.channels = {"Main": deque(maxlen=50)}
self.active_users = {}
self.dms = {}
# Tracks which room each user is currently viewing
self.user_rooms = {}
def prune_server(self):
"""Remove inactive users and empty rooms."""
now = time.time()
# 1. Prune Users (30 second timeout)
expired_users = [u for u, data in self.active_users.items() if now - data['last_seen'] > 30]
for u in expired_users:
if u in self.active_users:
del self.active_users[u]
if u in self.user_rooms:
del self.user_rooms[u]
# 2. Prune Rooms (Delete rooms with 0 active users, except Main)
active_room_names = set(self.user_rooms.values())
rooms_to_delete = []
for room_name in self.channels.keys():
if room_name != "Main" and room_name not in active_room_names:
rooms_to_delete.append(room_name)
for room in rooms_to_delete:
del self.channels[room]
def _sanitize(self, text):
"""Prevents HTML injection by escaping user input."""
escaped = html.escape(text)
return escaped.replace("\n", "
")
def _add_message_to_queue(self, queue, user, message, profile_pic):
safe_text = self._sanitize(message)
# Stacking Logic: Check if last message was same user and same text
if queue and queue[-1]['user'] == user and queue[-1]['text'] == safe_text:
queue[-1]['count'] = queue[-1].get('count', 1) + 1
queue[-1]['time'] = datetime.now().strftime("%H:%M")
else:
msg_data = {
"user": user,
"text": safe_text,
"pic": profile_pic,
"time": datetime.now().strftime("%H:%M"),
"count": 1
}
queue.append(msg_data)
def broadcast(self, channel, user, message, profile_pic):
if channel not in self.channels:
self.channels[channel] = deque(maxlen=50)
self._add_message_to_queue(self.channels[channel], user, message, profile_pic)
def send_dm(self, sender, recipient, message, profile_pic):
dm_key = "-".join(sorted([sender, recipient]))
if dm_key not in self.dms:
self.dms[dm_key] = deque(maxlen=50)
self._add_message_to_queue(self.dms[dm_key], sender, message, profile_pic)
server = ChatServer()
# --- UTILS ---
def get_messages_html(messages):
if not messages:
return "
No messages yet.
"
html_output = ""
for m in messages:
pic_url = m.get('pic') or "https://huggingface.co/front/assets/huggingface_logo-noborder.svg"
display_text = m['text']
if m.get('count', 1) > 1:
display_text += f"
(x{m['count']})"
html_output += f"""
{m['user']} {m['time']}
{display_text}
"""
html_output += """
"""
return html_output
def get_active_users_html(current_user):
server.prune_server()
html_list = ""
for user, data in server.active_users.items():
pic = data.get('pic') or "https://huggingface.co/front/assets/huggingface_logo-noborder.svg"
label = f"{user}" + (" (You)" if user == current_user else "")
html_list += f"""
{label}
"""
return html_list + "
"
# --- CSS ---
custom_css = """
@import url('https://fonts.googleapis.com/css2?family=Inter:wght@400;600;700&display=swap');
body, .gradio-container, .main-title, #chat-container, #side-panel, textarea, button {
font-family: 'Inter', -apple-system, BlinkMacSystemFont, "Segoe UI", Helvetica, Arial, sans-serif !important;
}
#chat-container {
height: 60vh !important;
overflow-y: auto !important;
border: 1px solid rgba(255,255,255,0.1) !important;
padding: 15px;
border-radius: 12px;
background: #0f1117 !important;
}
#side-panel {
border-right: 1px solid rgba(255,255,255,0.05);
padding-right: 15px !important;
}
.login-button {
padding: 6px 16px !important;
font-size: 0.85em !important;
border-radius: 30px !important;
background: #252a33 !important;
border: 1px solid #3f444d !important;
}
#msg-input textarea {
background: #161b22 !important;
border: 1px solid #30363d !important;
border-radius: 10px !important;
padding: 12px !important;
}
.main-title h3 {
font-size: 1.8em !important;
font-weight: 700 !important;
letter-spacing: -0.02em;
}
"""
with gr.Blocks(css=custom_css) as demo:
user_session = gr.State(None)
current_room = gr.State("Main")
dm_target = gr.State(None)
with gr.Row(elem_classes="main-title"):
gr.Markdown("### 🤗 HF Community Chat")
login_btn = gr.LoginButton(scale=0, elem_classes="login-button")
with gr.Row(visible=False) as chat_ui:
with gr.Column(scale=3, elem_id="side-panel"):
gr.Markdown("🏠 **Rooms**")
room_list = gr.Radio(["Main"], value="Main", label=None)
with gr.Row():
new_room_name = gr.Textbox(placeholder="New room...", label=None, show_label=False, scale=2)
create_room_btn = gr.Button("Join", scale=1)
gr.Markdown("✉️ **Direct Messages**")
dm_user_input = gr.Textbox(placeholder="Username...", label=None, show_label=False)
start_dm_btn = gr.Button("Open DM", size="sm")
gr.Markdown("👥 **Online Now**")
active_users_box = gr.HTML()
with gr.Column(scale=9):
room_title = gr.Markdown("## # Main")
chat_display = gr.HTML(elem_id="chat-container")
with gr.Row():
msg_input = gr.Textbox(
placeholder="Type a message...",
show_label=False,
scale=10,
elem_id="msg-input"
)
send_btn = gr.Button("Send", scale=2, variant="primary")
def on_load(profile: gr.OAuthProfile | None):
if profile is None: return gr.update(visible=False), None
username = profile.username
pic = getattr(profile, 'picture', None) or "https://huggingface.co/front/assets/huggingface_logo-noborder.svg"
server.active_users[username] = {"last_seen": time.time(), "pic": pic}
server.user_rooms[username] = "Main"
return gr.update(visible=True), {"name": username, "pic": pic}
demo.load(on_load, None, [chat_ui, user_session])
def refresh_chat(session, room, dm_user):
if not session: return "", "", "", gr.update()
# Update user's last seen and current room
server.active_users[session['name']]['last_seen'] = time.time()
server.user_rooms[session['name']] = room if not dm_user else "DM"
# Cleanup inactive users and empty rooms
server.prune_server()
# Prepare Room List (Radio) choices
# If the current room was deleted while we were in it (rare), fallback to Main
current_rooms = list(server.channels.keys())
active_room = room if room in current_rooms else "Main"
if dm_user:
dm_key = "-".join(sorted([session['name'], dm_user]))
msgs = list(server.dms.get(dm_key, []))
title = f"## 💬 DM: {dm_user}"
else:
msgs = list(server.channels.get(active_room, []))
title = f"## # {active_room}"
return (
get_messages_html(msgs),
get_active_users_html(session['name']),
title,
gr.update(choices=current_rooms, value=active_room)
)
timer = gr.Timer(1)
timer.tick(refresh_chat, [user_session, current_room, dm_target], [chat_display, active_users_box, room_title, room_list])
def send_msg(text, session, room, dm_user):
if not text or not session: return ""
if dm_user:
server.send_dm(session['name'], dm_user, text, session['pic'])
else:
server.broadcast(room, session['name'], text, session['pic'])
return ""
send_btn.click(send_msg, [msg_input, user_session, current_room, dm_target], [msg_input])
msg_input.submit(send_msg, [msg_input, user_session, current_room, dm_target], [msg_input])
def create_and_join(name, session):
if not name or not session: return gr.update(), "Main", None
clean_name = name.strip().replace(" ", "-")
if clean_name not in server.channels:
server.channels[clean_name] = deque(maxlen=50)
# Update the user's room tracking immediately
server.user_rooms[session['name']] = clean_name
return gr.update(choices=list(server.channels.keys()), value=clean_name), clean_name, None
create_room_btn.click(create_and_join, [new_room_name, user_session], [room_list, current_room, dm_target])
def on_room_change(name, session):
if session:
server.user_rooms[session['name']] = name
return name, None
room_list.change(on_room_change, [room_list, user_session], [current_room, dm_target])
def start_dm(target, session):
if session:
server.user_rooms[session['name']] = "DM"
return "Main", target
start_dm_btn.click(start_dm, [dm_user_input, user_session], [current_room, dm_target])
demo.launch()