| import gradio as gr |
| import json |
| import os |
| import logging |
| from datetime import datetime |
| import uuid |
|
|
| from pipelines.session_memory_pipeline import SessionMemoryPipeline |
| from session_memory.store import SessionMemoryStore |
| from pipelines.query_understanding_pipeline import QueryUnderstandingPipeline |
| from llm.gemini_client import GeminiClient |
| from schemas.conversation import Message |
| from config.config import RECENT_MESSAGE_COUNT |
| from utils.logger import setup_logger, reset_logger |
|
|
| from demo.helpers import * |
|
|
|
|
|
|
| |
|
|
| LOG_DIR = "logs/" |
| TEST_DIR = "test_data/" |
| SUMMARY_DIR = "storage/summaries" |
|
|
| os.makedirs(LOG_DIR, exist_ok=True) |
| os.makedirs(TEST_DIR, exist_ok=True) |
| os.makedirs(SUMMARY_DIR, exist_ok=True) |
|
|
| |
| llm = GeminiClient() |
| query_pipeline = QueryUnderstandingPipeline(llm) |
|
|
| def initialize_user_session(): |
| """Initialize a new user session with unique ID and basic data""" |
| session_id = f"user_{str(uuid.uuid4())[:8]}" |
| |
| |
| summary_path = os.path.join(SUMMARY_DIR, f"{session_id}.json") |
| |
| |
| return { |
| "session_id": session_id, |
| "history": [], |
| "previous_summary": None, |
| "summary_path": summary_path, |
| "initialized": False |
| } |
|
|
| def get_session_components(user_state): |
| """Get or create session components (logger, pipeline, store)""" |
| session_id = user_state["session_id"] |
| |
| if not user_state.get("initialized", False): |
| |
| reset_logger(session_id) |
| logger = setup_logger(session_id, f"logs/{session_id}.log") |
| logger.info(f"New user session '{session_id}' started.") |
| |
| |
| memory_logger = logging.getLogger(f"{session_id}.memory_pipeline") |
| memory_logger.setLevel(logging.INFO) |
| memory_logger.propagate = True |
| |
| store_logger = logging.getLogger(f"{session_id}.memory_store") |
| store_logger.setLevel(logging.INFO) |
| store_logger.propagate = True |
| |
| |
| memory_pipeline = SessionMemoryPipeline(logger=memory_logger) |
| store = SessionMemoryStore(session_id=session_id, logger=store_logger) |
| |
| user_state["initialized"] = True |
| |
| return logger, memory_pipeline, store |
| else: |
| |
| logger = logging.getLogger(session_id) |
| memory_pipeline = SessionMemoryPipeline(logger=logging.getLogger(f"{session_id}.memory_pipeline")) |
| store = SessionMemoryStore(session_id=session_id, logger=logging.getLogger(f"{session_id}.memory_store")) |
| |
| return logger, memory_pipeline, store |
|
|
|
|
| def chat(user_input, gr_history, user_state): |
| """Chat function that works with user-specific state""" |
| logger, memory_pipeline, store = get_session_components(user_state) |
| history = user_state["history"] |
| previous_summary = user_state["previous_summary"] |
| |
| logger.info(f"CHAT CALLED for session {user_state['session_id']}") |
| |
| user_msg = Message(role="user", content=user_input) |
| history.append(user_msg) |
|
|
| recent_messages = history[-RECENT_MESSAGE_COUNT:] if len(history) > RECENT_MESSAGE_COUNT else history |
|
|
| |
| qu_result = query_pipeline.run( |
| query=user_input, |
| recent_messages=recent_messages, |
| session_summary=previous_summary, |
| ) |
| logger.info(f"[Query Understanding] Transformed query: {qu_result.model_dump_json(indent=2)}") |
|
|
| response = llm.chat_completion(query=qu_result) |
| assistant_msg = Message(role="assistant", content=response) |
| history.append(assistant_msg) |
|
|
| |
| summary, updated_history = memory_pipeline.maybe_summarize( |
| history, previous_summary |
| ) |
|
|
| if summary: |
| store.save(summary) |
| user_state["previous_summary"] = summary |
| |
| |
| user_state["history"] = updated_history |
| |
| |
| logger.info("Chat response completed") |
| for handler in logger.handlers: |
| if hasattr(handler, 'flush'): |
| handler.flush() |
|
|
| return response, user_state |
|
|
|
|
|
|
| |
|
|
| def get_log_content(user_state): |
| """Đọc nội dung file log của user hiện tại""" |
| session_id = user_state["session_id"] |
| log_file = f"logs/{session_id}.log" |
| try: |
| if os.path.exists(log_file): |
| with open(log_file, 'r', encoding='utf-8') as f: |
| return f.read() |
| else: |
| return f"Log file '{log_file}' not found." |
| except Exception as e: |
| return f"Error reading log file: {str(e)}" |
|
|
| def refresh_logs(user_state): |
| """Refresh log content""" |
| return get_log_content(user_state) |
|
|
| def clear_logs(user_state): |
| """Clear log file content""" |
| session_id = user_state["session_id"] |
| logger, _, _ = get_session_components(user_state) |
| log_file = f"logs/{session_id}.log" |
| try: |
| with open(log_file, 'w', encoding='utf-8') as f: |
| f.write("") |
| logger.info(f"Log file cleared manually") |
| return "Log file cleared successfully!" |
| except Exception as e: |
| return f"Error clearing log file: {str(e)}" |
|
|
| def clear_chat(user_state): |
| """Clear chat history, summary, and logs for this user""" |
| session_id = user_state["session_id"] |
| logger, _, _ = get_session_components(user_state) |
| summary_path = user_state["summary_path"] |
| |
| try: |
| |
| user_state["history"] = [] |
| user_state["previous_summary"] = None |
| |
| |
| log_file = f"logs/{session_id}.log" |
| with open(log_file, 'w', encoding='utf-8') as f: |
| f.write("") |
| |
| |
| if os.path.exists(summary_path): |
| os.remove(summary_path) |
| |
| |
| logger.info(f"Chat cleared for session '{session_id}' - New conversation started") |
| |
| return "Chat history, logs, and summary cleared successfully! Ready for new conversation.", user_state |
| except Exception as e: |
| logger.error(f"Error clearing chat: {str(e)}") |
| return f"Error clearing chat: {str(e)}", user_state |
|
|
|
|
| |
|
|
| def create_ui(): |
| with gr.Blocks(title="Chat Assistant with Log Viewer") as demo: |
| gr.Markdown("# Chat Assistant with Session Memory") |
| gr.Markdown("This demo showcases a chat assistant with session memory and a log viewer.") |
| gr.Markdown("- Chat Tab: Interact with the chat assistant. Use 'Clear Chat' to start fresh conversation.") |
| gr.Markdown("- Log Viewer Tab: View and manage your session logs, query understanding, and session memory.") |
| gr.Markdown("_Each user gets their own private session automatically._") |
| gr.Markdown("_Use the tabs above to switch between chatting and viewing logs._") |
| gr.Markdown("_The system will preprocess your query for better understanding before generating a response based on chat history and session summary._") |
| gr.Markdown("_After each response, the system will update the session summary when the conversation context is longer than a certain threshold._") |
| |
| |
| user_state = gr.State(initialize_user_session()) |
| |
| with gr.Tab("Chat"): |
| |
| with gr.Row(): |
| clear_chat_btn = gr.Button("🗑️ Clear Chat", variant="stop") |
| |
| clear_chat_message = gr.Textbox(label="Status", interactive=False, visible=False) |
| |
| chat_interface = gr.ChatInterface( |
| fn=chat, |
| title="Chat Assistant", |
| description="Simple chat demo using GeminiClient - Your personal session", |
| additional_inputs=[user_state], |
| additional_outputs=[user_state] |
| ) |
| |
| with gr.Tab("Log Viewer"): |
| gr.Markdown("## Your Session Logs") |
| gr.Markdown("_Use the Refresh button to update the log content._") |
| gr.Markdown("_Use the Clear button to clear the log file content._") |
| |
| with gr.Row(): |
| refresh_btn = gr.Button("🔄 Refresh Logs", variant="secondary") |
| clear_logs_btn = gr.Button("🗑️ Clear Logs", variant="stop") |
| |
| log_display = gr.Textbox( |
| label="Log Content", |
| lines=10, |
| max_lines=15, |
| interactive=False |
| ) |
| |
| |
| clear_chat_btn.click( |
| fn=clear_chat, |
| inputs=[user_state], |
| outputs=[clear_chat_message, user_state] |
| ).then( |
| fn=lambda: gr.update(visible=True), |
| outputs=[clear_chat_message] |
| ) |
| |
| refresh_btn.click( |
| fn=refresh_logs, |
| inputs=[user_state], |
| outputs=[log_display] |
| ) |
| |
| clear_logs_btn.click( |
| fn=clear_logs, |
| inputs=[user_state], |
| outputs=[gr.Textbox(label="Status")] |
| ).then( |
| fn=refresh_logs, |
| inputs=[user_state], |
| outputs=[log_display] |
| ) |
| |
| |
| demo.load( |
| fn=refresh_logs, |
| inputs=[user_state], |
| outputs=[log_display] |
| ) |
| |
| return demo |
|
|
| |
| if __name__ == "__main__": |
| app = create_ui() |
| app.launch() |