Arviano's picture
chore: hide analytics panel
628b871 verified
import gradio as gr
from src.rag import RAG
from src.util import prepare_chroma_dir, debug_list_collections, print_tree
from src.db import SessionLocal, init_db, User, Conversation, Message
from langchain_chroma import Chroma
from pathlib import Path
from datetime import datetime
from sqlalchemy import func, and_
from datetime import datetime
# Prepare Chroma DB
chroma_dir = prepare_chroma_dir()
debug_list_collections(chroma_dir)
print_tree()
rag = RAG(db_dir=chroma_dir)
# DB for chat history
init_db()
gr.set_static_paths(paths=["img"])
def _trim(txt, n=200):
if txt is None: return None
return txt if len(txt) <= n else txt[:n] + "…"
def analytics_snapshot(selected_email=None, limit=100):
"""Return all conversations and messages for a selected email."""
with SessionLocal() as s:
# Get all user emails for the dropdown
all_emails = [u.email for u in s.query(User.email).all()]
if not selected_email:
return {"available_emails": all_emails, "messages": []}
# Get all messages for that user
msgs = (
s.query(
Conversation.id.label("conversation_id"),
Message.role,
Message.content,
Message.created_at,
)
.join(Conversation, Conversation.id == Message.conversation_id)
.join(User, User.id == Conversation.user_id)
.filter(User.email == selected_email)
.order_by(Message.created_at.asc())
.limit(limit)
.all()
)
# Group messages by conversation
grouped = {}
for m in msgs:
grouped.setdefault(m.conversation_id, []).append({
"role": m.role,
"content": m.content,
"created_at": m.created_at.isoformat() if m.created_at else None
})
return {
"available_emails": all_emails,
"selected_email": selected_email,
"conversation_count": len(grouped),
"messages": {str(k): v for k, v in grouped.items()},
}
# Gradio components
sources_box = gr.JSON(label="Sources")
def get_or_create_user(name: str, email: str, role: str) -> int:
with SessionLocal() as session:
user = session.query(User).filter_by(email=email).first()
if not user:
user = User(email=email, name=name, role=role)
session.add(user)
session.commit()
session.refresh(user)
else:
changed = False
if name and user.name != name:
user.name = name; changed = True
if role and user.role != role:
user.role = role; changed = True
if changed:
session.commit()
return user.id
def start_conversation(user_id: int, title: str | None = None):
with SessionLocal() as session:
title = title or f"Session {datetime.utcnow().strftime('%Y-%m-%d %H:%M')}"
conv = Conversation(user_id=user_id, title=title)
session.add(conv)
session.commit()
return conv.id
def build_greeting(name, role):
if role.lower() == "pelatih":
return {"role": "assistant", "content": f"""Halo Coach {name}! Aku CoachUp, asisten pelatih mental.
Siap bantu para atlet hari ini?"""}
else:
return {"role": "assistant", "content": f"""Halo {name}! 👋 Senang bertemu denganmu.
Aku CoachUp, asisten pelatih mental yang siap mendukungmu hari ini.
Apa yang ingin kamu bicarakan atau latih hari ini?"""}
def save_message(conversation_id: int, role: str, content: str):
with SessionLocal() as session:
msg = Message(conversation_id=conversation_id, role=role, content=content)
session.add(msg)
session.commit()
def get_latest_conversation(user_id):
with SessionLocal() as session:
conv = (session.query(Conversation)
.filter_by(user_id=user_id)
.order_by(Conversation.created_at.desc())
.first())
return conv.id if conv else None
def load_history(conversation_id: int):
with SessionLocal() as session:
msgs = (
session.query(Message)
.filter_by(conversation_id=conversation_id)
.order_by(Message.created_at.asc())
.all()
)
# Chatbot(type="messages") expects [{"role": "...", "content": "..."}]
return [{"role": m.role, "content": m.content} for m in msgs]
def login_user(name, email, age, sport_category, role, session_mode_value, state):
if not name or not email:
return (
gr.update(value="⚠ Tolong isi data secara lengkap!", visible=True),
gr.update(visible=True),
gr.update(visible=False),
gr.update(),
[],
state
)
user_id = get_or_create_user(name, email, role)
conv = None
prev_messages = []
if session_mode_value == "Lanjutkan perbincangan terakhir":
conv_id = get_latest_conversation(user_id)
if conv_id is None:
conv_id = start_conversation(user_id, title=f"{name}{datetime.utcnow():%Y-%m-%d %H:%M}")
else:
prev_messages = load_history(conv_id)
else:
conv_id = start_conversation(user_id, title=f"{name}{datetime.utcnow():%Y-%m-%d %H:%M}")
state = {
"user_id": user_id,
"conversation_id": conv_id,
"name": name,
"age": age,
"sport_category": sport_category,
"role": role,
}
if not prev_messages:
greeting_message = build_greeting(name, role)
prev_messages = [greeting_message]
save_message(conv_id, "assistant", greeting_message["content"])
return (
gr.update(value=f"✅ Welcome, {name}!", visible=True),
gr.update(visible=False),
gr.update(visible=True),
gr.update(value=prev_messages),
prev_messages,
state
)
# =========== CHAT UI ===========
def send_message(user_text, msgs, state):
"""Manual chat handler to update DB, RAG, and UI"""
if not (user_text and user_text.strip()):
return (
gr.update(),
msgs,
None,
gr.update(value=""),
state
)
with SessionLocal() as s:
conv_id = state.get("conversation_id")
if not conv_id:
anon = s.query(User).filter_by(email="anon@example.com").first()
if not anon:
anon = User(email="anon@example.com", name="Anonymous", role="Other")
s.add(anon); s.commit()
conv = Conversation(user_id=anon.id, title=f"Session {datetime.utcnow():%Y-%m-%d %H:%M}")
s.add(conv); s.commit()
state["user_id"] = anon.id
state["conversation_id"] = conv.id
conv_id = conv.id
#User's querry
save_message(conv_id, "user", user_text)
msgs = (msgs or []) + [{"role": "user", "content": user_text}]
user_profile = {
"nama": state.get("name"),
"umur": state.get("age"),
"cabang olahraga": state.get("sport_category"),
"role": state.get("role"),
}
#Respond
reply_text, unique_sources = rag.respond(user_text, msgs, user_profile)
save_message(conv_id, "assistant", reply_text)
msgs = msgs + [{"role": "assistant", "content": reply_text}]
return gr.update(value=msgs), msgs, unique_sources, gr.update(value=""), state #update chat, state, sources, and clear textbox
def new_chat(state):
with SessionLocal() as s:
user_id = state.get("user_id")
if not user_id:
return gr.update(), [], gr.update(value=[]), state
conv_id = start_conversation(user_id, title=f"Session {datetime.utcnow():%Y-%m-%d %H:%M}")
state["conversation_id"] = conv_id
greet = build_greeting(state.get("name"), state.get("role"))
save_message(conv_id, "assistant", greet["content"])
start_msgs = [greet]
return gr.update(value=start_msgs), start_msgs, gr.update(value=[]), state
with gr.Blocks(fill_height=True, fill_width=True,
css="""
/* optional fine-tuning */
#chat-wrap { max-width: 1100px; margin: 0 auto; } /* center & max width */
#chat { height: 640px; } /* explicit height if you prefer CSS */
/* hide Hugging Face overlay header */
#huggingface-space-header { display: none !important; }
/* Remove unnecessary padding and grey background */
.gradio-container .wrap.svelte-1ipelgc {
padding: 0 !important;
}
/* Slimmer textbox */
#chat input, #chat textarea {
font-size: 16px !important;
padding: 8px 10px !important;
border-radius: 10px !important;
}
/* Hide textbox border frame and lighten background */
#chat textarea {
background-color: #1e1e1e !important;
border: 1px solid #333 !important;
}
/* Make Send button match overall style */
button.primary {
font-weight: bold;
border-radius: 10px !important;
padding: 8px 16px !important;
}
""") as demo:
gr.Markdown("## Athlete AI Assistant 🏆", elem_id="chat-wrap")
#user states
session_state = gr.State({
"user_id": None,
"conversation_id": None,
"name": None,
"age": None,
"sport_category": None,
"role": None,
})
history_state = gr.State([]) # list of {"role": "...", "content": "..."}
# Login Section
with gr.Group(visible=True) as login_section:
gr.Markdown("### Harap log-in sebelum berbincang")
name = gr.Textbox(label="Nama")
age = gr.Number(label="Umur")
email = gr.Textbox(label="Email")
sport_category = gr.Dropdown(
["Sepak Bola", "Bulu Tangkis", "Basket", "Renang",
"Atletik", "Tenis", "Voli", "Senam", "Triatlon",
"Panahan", "Lainnya"],
label="Kategori Olahraga",
value="Sepak Bola",
info="Pilih kategori olahraga utama Anda"
)
role = gr.Dropdown(
["Atlet", "Pelatih", "Lainnya"],
label="Role",
value="Atlet"
)
# Untuk new chat
session_mode = gr.Radio(
["Lanjutkan perbincangan terakhir", "Perbincangan baru"],
value="Lanjutkan perbincangan terakhir",
label="Mode sesi"
)
login_btn = gr.Button("Lanjut")
login_msg = gr.Markdown(visible=False)
# Chatbot Section (hidden at start)
with gr.Group(visible=False) as chat_section:
with gr.Row():
with gr.Column(scale=3):
bot_chat = gr.Chatbot(
type="messages",
avatar_images=("img/user.jpg", "img/bot_1.jpg"),
height=600,
resizable=True,
elem_id="chat",
)
with gr.Row():
user_box = gr.Textbox(
placeholder="Tanya apa saja…",
lines=1,
max_lines=3,
scale=5,
show_label=False,
container=False,
)
send_btn = gr.Button("Kirim", variant="primary", scale=1)
new_btn = gr.Button("Perbincangan baru")
# with gr.Accordion("📊 Chat History Browser", open=True):
# email_dropdown = gr.Dropdown(
# choices=[],
# label="Pilih email pengguna",
# interactive=True,
# info="Pilih alamat email untuk melihat riwayat percakapan",
# )
# refresh_btn = gr.Button("🔄 Refresh daftar email")
# load_btn = gr.Button("📂 Tampilkan percakapan")
# result_json = gr.JSON(label="Riwayat Chat")
# def get_emails():
# with SessionLocal() as s:
# return gr.update(choices=[u.email for u in s.query(User.email).all()])
# refresh_btn.click(
# get_emails,
# inputs=[],
# outputs=[email_dropdown]
# )
# load_btn.click(
# lambda selected_email: analytics_snapshot(selected_email),
# inputs=[email_dropdown],
# outputs=result_json
# )
# # RIGHT: Sources panel (ini bisa untuk nunjukkin source2nya tapi so far masih belum diimplement)
# with gr.Column(scale=1):
# sources_box.render()
# buttons action
login_btn.click(
login_user,
inputs=[name, email, age, sport_category, role, session_mode, session_state],
outputs=[login_msg, login_section, chat_section, bot_chat, history_state, session_state]
)
send_btn.click(
send_message,
inputs=[user_box, history_state, session_state],
outputs=[bot_chat, history_state, sources_box, user_box, session_state]
)
user_box.submit(
send_message,
inputs=[user_box, history_state, session_state],
outputs=[bot_chat, history_state, sources_box, user_box, session_state]
)
new_btn.click(
new_chat,
inputs= [session_state],
outputs=[bot_chat, history_state, sources_box, session_state]
)
if __name__ == "__main__":
demo.queue(max_size=20).launch(share=True, ssr_mode=False)