khoa-tran-hcmut's picture
add README + user session + log UI
4e17efc
import gradio as gr
import json
import os
import logging
from datetime import datetime
import uuid
from pipelines.session_memory_pipeline import SessionMemoryPipeline
from session_memory.store import SessionMemoryStore
from pipelines.query_understanding_pipeline import QueryUnderstandingPipeline
from llm.gemini_client import GeminiClient
from schemas.conversation import Message
from config.config import RECENT_MESSAGE_COUNT
from utils.logger import setup_logger, reset_logger
from demo.helpers import *
# ---------- Session management ----------
LOG_DIR = "logs/"
TEST_DIR = "test_data/"
SUMMARY_DIR = "storage/summaries"
os.makedirs(LOG_DIR, exist_ok=True)
os.makedirs(TEST_DIR, exist_ok=True)
os.makedirs(SUMMARY_DIR, exist_ok=True)
# Global LLM instances (shared across users for efficiency)
llm = GeminiClient()
query_pipeline = QueryUnderstandingPipeline(llm)
def initialize_user_session():
"""Initialize a new user session with unique ID and basic data"""
session_id = f"user_{str(uuid.uuid4())[:8]}"
# Create session-specific paths
summary_path = os.path.join(SUMMARY_DIR, f"{session_id}.json")
# Return only serializable data
return {
"session_id": session_id,
"history": [],
"previous_summary": None,
"summary_path": summary_path,
"initialized": False
}
def get_session_components(user_state):
"""Get or create session components (logger, pipeline, store)"""
session_id = user_state["session_id"]
if not user_state.get("initialized", False):
# Setup session-specific logger
reset_logger(session_id)
logger = setup_logger(session_id, f"logs/{session_id}.log")
logger.info(f"New user session '{session_id}' started.")
# Setup child loggers
memory_logger = logging.getLogger(f"{session_id}.memory_pipeline")
memory_logger.setLevel(logging.INFO)
memory_logger.propagate = True
store_logger = logging.getLogger(f"{session_id}.memory_store")
store_logger.setLevel(logging.INFO)
store_logger.propagate = True
# Create session-specific components
memory_pipeline = SessionMemoryPipeline(logger=memory_logger)
store = SessionMemoryStore(session_id=session_id, logger=store_logger)
user_state["initialized"] = True
return logger, memory_pipeline, store
else:
# Get existing components
logger = logging.getLogger(session_id)
memory_pipeline = SessionMemoryPipeline(logger=logging.getLogger(f"{session_id}.memory_pipeline"))
store = SessionMemoryStore(session_id=session_id, logger=logging.getLogger(f"{session_id}.memory_store"))
return logger, memory_pipeline, store
def chat(user_input, gr_history, user_state):
"""Chat function that works with user-specific state"""
logger, memory_pipeline, store = get_session_components(user_state)
history = user_state["history"]
previous_summary = user_state["previous_summary"]
logger.info(f"CHAT CALLED for session {user_state['session_id']}")
user_msg = Message(role="user", content=user_input)
history.append(user_msg)
recent_messages = history[-RECENT_MESSAGE_COUNT:] if len(history) > RECENT_MESSAGE_COUNT else history
# ---- Query Understanding ----
qu_result = query_pipeline.run(
query=user_input,
recent_messages=recent_messages,
session_summary=previous_summary,
)
logger.info(f"[Query Understanding] Transformed query: {qu_result.model_dump_json(indent=2)}")
response = llm.chat_completion(query=qu_result)
assistant_msg = Message(role="assistant", content=response)
history.append(assistant_msg)
# ---- Session Memory ----
summary, updated_history = memory_pipeline.maybe_summarize(
history, previous_summary
)
if summary:
store.save(summary)
user_state["previous_summary"] = summary
# Update state
user_state["history"] = updated_history
# Flush logger
logger.info("Chat response completed")
for handler in logger.handlers:
if hasattr(handler, 'flush'):
handler.flush()
return response, user_state
# ---------- Log viewer functions ----------
def get_log_content(user_state):
"""Đọc nội dung file log của user hiện tại"""
session_id = user_state["session_id"]
log_file = f"logs/{session_id}.log"
try:
if os.path.exists(log_file):
with open(log_file, 'r', encoding='utf-8') as f:
return f.read()
else:
return f"Log file '{log_file}' not found."
except Exception as e:
return f"Error reading log file: {str(e)}"
def refresh_logs(user_state):
"""Refresh log content"""
return get_log_content(user_state)
def clear_logs(user_state):
"""Clear log file content"""
session_id = user_state["session_id"]
logger, _, _ = get_session_components(user_state)
log_file = f"logs/{session_id}.log"
try:
with open(log_file, 'w', encoding='utf-8') as f:
f.write("")
logger.info(f"Log file cleared manually")
return "Log file cleared successfully!"
except Exception as e:
return f"Error clearing log file: {str(e)}"
def clear_chat(user_state):
"""Clear chat history, summary, and logs for this user"""
session_id = user_state["session_id"]
logger, _, _ = get_session_components(user_state)
summary_path = user_state["summary_path"]
try:
# Clear state
user_state["history"] = []
user_state["previous_summary"] = None
# Clear log file
log_file = f"logs/{session_id}.log"
with open(log_file, 'w', encoding='utf-8') as f:
f.write("")
# Clear summary file if exists
if os.path.exists(summary_path):
os.remove(summary_path)
# Log the clear action
logger.info(f"Chat cleared for session '{session_id}' - New conversation started")
return "Chat history, logs, and summary cleared successfully! Ready for new conversation.", user_state
except Exception as e:
logger.error(f"Error clearing chat: {str(e)}")
return f"Error clearing chat: {str(e)}", user_state
# ---------- UI ----------
def create_ui():
with gr.Blocks(title="Chat Assistant with Log Viewer") as demo:
gr.Markdown("# Chat Assistant with Session Memory")
gr.Markdown("This demo showcases a chat assistant with session memory and a log viewer.")
gr.Markdown("- Chat Tab: Interact with the chat assistant. Use 'Clear Chat' to start fresh conversation.")
gr.Markdown("- Log Viewer Tab: View and manage your session logs, query understanding, and session memory.")
gr.Markdown("_Each user gets their own private session automatically._")
gr.Markdown("_Use the tabs above to switch between chatting and viewing logs._")
gr.Markdown("_The system will preprocess your query for better understanding before generating a response based on chat history and session summary._")
gr.Markdown("_After each response, the system will update the session summary when the conversation context is longer than a certain threshold._")
# Initialize user session state
user_state = gr.State(initialize_user_session())
with gr.Tab("Chat"):
# Chat controls
with gr.Row():
clear_chat_btn = gr.Button("🗑️ Clear Chat", variant="stop")
clear_chat_message = gr.Textbox(label="Status", interactive=False, visible=False)
chat_interface = gr.ChatInterface(
fn=chat,
title="Chat Assistant",
description="Simple chat demo using GeminiClient - Your personal session",
additional_inputs=[user_state],
additional_outputs=[user_state]
)
with gr.Tab("Log Viewer"):
gr.Markdown("## Your Session Logs")
gr.Markdown("_Use the Refresh button to update the log content._")
gr.Markdown("_Use the Clear button to clear the log file content._")
with gr.Row():
refresh_btn = gr.Button("🔄 Refresh Logs", variant="secondary")
clear_logs_btn = gr.Button("🗑️ Clear Logs", variant="stop")
log_display = gr.Textbox(
label="Log Content",
lines=10,
max_lines=15,
interactive=False
)
# Event handlers
clear_chat_btn.click(
fn=clear_chat,
inputs=[user_state],
outputs=[clear_chat_message, user_state]
).then(
fn=lambda: gr.update(visible=True),
outputs=[clear_chat_message]
)
refresh_btn.click(
fn=refresh_logs,
inputs=[user_state],
outputs=[log_display]
)
clear_logs_btn.click(
fn=clear_logs,
inputs=[user_state],
outputs=[gr.Textbox(label="Status")]
).then(
fn=refresh_logs,
inputs=[user_state],
outputs=[log_display]
)
# Load initial log content
demo.load(
fn=refresh_logs,
inputs=[user_state],
outputs=[log_display]
)
return demo
# Launch the application
if __name__ == "__main__":
app = create_ui()
app.launch()