Spaces:
Sleeping
Sleeping
| import re | |
| import time | |
| import chainlit as cl | |
| import pandas as pd | |
| import httpx | |
| import asyncio | |
| from typing import Dict, List, Any, Optional, Callable | |
| from dataclasses import dataclass, field | |
| import os | |
| import uuid | |
| from datetime import datetime, timedelta | |
| API_BASE_URL = os.getenv("API_BASE_URL") | |
| class ConversationState: | |
| """Data class to hold conversation state""" | |
| session_id: Optional[str] = None | |
| specs_advantages: Dict[str, Any] = field(default_factory=dict) | |
| solution_packages: List[str] = field(default_factory=list) | |
| raw_documents: Optional[Dict[str, Any]] = None | |
| outputs: Optional[Dict[str, Any]] = None | |
| selected_model: str = "Gemini 2.0 Flash" | |
| product_model_search: bool = False | |
| method: str = "dense" # "dense", "sparse", "hybrid" | |
| is_enhance_query: bool = False # New field for query enhancement toggle | |
| # New fields for delayed cleanup - now using asyncio | |
| pending_cleanup: bool = False | |
| cleanup_task: Optional[asyncio.Task] = None | |
| last_activity: datetime = field(default_factory=datetime.now) | |
| def reset(self): | |
| """Reset state to initial values""" | |
| self.session_id = None | |
| self.specs_advantages = {} | |
| self.solution_packages = [] | |
| self.raw_documents = None | |
| self.outputs = None | |
| self.selected_model = "Gemini 2.0 Flash" | |
| self.product_model_search = False | |
| self.method = "dense" | |
| self.is_enhance_query = False | |
| # Reset cleanup fields but don't touch tasks | |
| self.pending_cleanup = False | |
| self.last_activity = datetime.now() | |
| def cancel_cleanup_task(self): | |
| """Cancel pending cleanup task if exists""" | |
| if self.cleanup_task and not self.cleanup_task.done(): | |
| self.cleanup_task.cancel() | |
| self.cleanup_task = None | |
| print(f"🚫 Cancelled cleanup task for session: {self.session_id}") | |
| class StateManager: | |
| """Manages conversation state operations with per-session isolation and delayed cleanup""" | |
| # CLASS-LEVEL session storage for isolation between different browser sessions | |
| _session_states: Dict[str, ConversationState] = {} | |
| _lock = asyncio.Lock() # Async lock for consistency | |
| async def get_or_create_session_state(session_id: str) -> ConversationState: | |
| """Get existing session state or create new one""" | |
| async with StateManager._lock: | |
| if session_id not in StateManager._session_states: | |
| state = ConversationState() | |
| state.session_id = session_id | |
| StateManager._session_states[session_id] = state | |
| print(f"🆕 Created new session state for: {session_id}") | |
| else: | |
| state = StateManager._session_states[session_id] | |
| print(f"🔄 Retrieved existing session state for: {session_id}") | |
| # CRITICAL: If session was pending cleanup, cancel it because user is active again | |
| if state.pending_cleanup: | |
| state.cancel_cleanup_task() | |
| state.pending_cleanup = False | |
| print(f"♻️ User activity detected! Cancelled pending cleanup for: {session_id}") | |
| # Update activity timestamp | |
| state.last_activity = datetime.now() | |
| return state | |
| async def schedule_delayed_cleanup(session_id: str, delay_seconds: int = 3600): | |
| """Schedule delayed cleanup for a session using asyncio (default 1 hour for disconnect tolerance)""" | |
| async with StateManager._lock: | |
| if session_id not in StateManager._session_states: | |
| print(f"⚠️ Cannot schedule cleanup for non-existent session: {session_id}") | |
| return | |
| state = StateManager._session_states[session_id] | |
| # Cancel existing task if any | |
| state.cancel_cleanup_task() | |
| # Mark as pending cleanup | |
| state.pending_cleanup = True | |
| # Schedule new cleanup using asyncio | |
| async def delayed_cleanup(): | |
| try: | |
| await asyncio.sleep(delay_seconds) | |
| print(f"⏰ Executing delayed cleanup for session: {session_id}") | |
| await StateManager._perform_actual_cleanup(session_id) | |
| except asyncio.CancelledError: | |
| print(f"🚫 Cleanup task cancelled for session: {session_id}") | |
| raise | |
| except Exception as e: | |
| print(f"❌ Error in delayed cleanup for {session_id}: {e}") | |
| state.cleanup_task = asyncio.create_task(delayed_cleanup()) | |
| print(f"⏱️ Scheduled cleanup in {delay_seconds}s for session: {session_id} (likely disconnect)") | |
| async def _perform_actual_cleanup(session_id: str): | |
| """Perform the actual cleanup after delay""" | |
| async with StateManager._lock: | |
| if session_id not in StateManager._session_states: | |
| print(f"⚠️ Session already cleaned or doesn't exist: {session_id}") | |
| return | |
| state = StateManager._session_states[session_id] | |
| # Double-check if session is still pending cleanup (user might have sent message) | |
| if not state.pending_cleanup: | |
| print(f"🚫 Cleanup cancelled - user activity detected for: {session_id}") | |
| return | |
| # Perform API cleanup using httpx | |
| try: | |
| if API_BASE_URL: | |
| payload = { | |
| "reset_cache": True, | |
| "reset_model": False, | |
| "session_id": session_id | |
| } | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| response = await client.post(f"{API_BASE_URL}/clear_memory", json=payload) | |
| print(f"Clear memory response for {session_id}: {response.status_code}") | |
| except Exception as e: | |
| print(f"Warning: clear_memory failed for {session_id}: {e}") | |
| # Remove from memory | |
| del StateManager._session_states[session_id] | |
| print(f"🗑️ Successfully cleaned up session: {session_id}") | |
| async def cleanup_session_immediate(session_id: str): | |
| """Immediate cleanup (for testing or forced cleanup)""" | |
| async with StateManager._lock: | |
| if session_id in StateManager._session_states: | |
| state = StateManager._session_states[session_id] | |
| state.cancel_cleanup_task() | |
| await StateManager._perform_actual_cleanup(session_id) | |
| async def clear_chat_state(state: ConversationState): | |
| """Clear all conversation history and reset state via API (but keep session alive)""" | |
| if state.session_id is not None and API_BASE_URL: | |
| try: | |
| payload = { | |
| "reset_cache": True, | |
| "reset_model": False, | |
| "session_id": state.session_id | |
| } | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| response = await client.post(f"{API_BASE_URL}/clear_memory", json=payload) | |
| print(f"Clear memory response: {response.status_code}") | |
| except Exception as e: | |
| print(f"Warning: clear_memory failed: {e}") | |
| # Reset state but keep session_id and don't trigger cleanup | |
| session_id = state.session_id | |
| state.reset() | |
| state.session_id = session_id | |
| async def change_model(state: ConversationState, model_name: str): | |
| """Change the selected model""" | |
| state.selected_model = model_name | |
| state.last_activity = datetime.now() | |
| async def toggle_product_model_search(state: ConversationState): | |
| """Toggle product model search mode""" | |
| state.product_model_search = not state.product_model_search | |
| state.last_activity = datetime.now() | |
| async def toggle_enhance_query(state: ConversationState): | |
| """Toggle query enhancement mode""" | |
| state.is_enhance_query = not state.is_enhance_query | |
| state.last_activity = datetime.now() | |
| async def cycle_search_method(state: ConversationState): | |
| """Cycle search method: dense -> sparse -> hybrid -> dense""" | |
| if state.method == "dense": | |
| state.method = "sparse" | |
| elif state.method == "sparse": | |
| state.method = "hybrid" | |
| else: | |
| state.method = "dense" | |
| state.last_activity = datetime.now() | |
| async def get_session_status() -> Dict[str, Dict[str, Any]]: | |
| """Get status of all sessions (for debugging)""" | |
| async with StateManager._lock: | |
| status = {} | |
| for session_id, state in StateManager._session_states.items(): | |
| status[session_id] = { | |
| "pending_cleanup": state.pending_cleanup, | |
| "has_task": state.cleanup_task is not None and not state.cleanup_task.done(), | |
| "last_activity": state.last_activity.isoformat(), | |
| "selected_model": state.selected_model, | |
| "product_model_search": state.product_model_search, | |
| "method": state.method, | |
| "is_enhance_query": state.is_enhance_query | |
| } | |
| return status | |
| class ChatService: | |
| """Handles chat-related operations with async HTTP calls""" | |
| async def respond_to_chat( | |
| state: ConversationState, | |
| message: str, | |
| image_path: Optional[str] = None | |
| ) -> str: | |
| """Handle chat responses with image support using async HTTP""" | |
| print(f"🔄 === DEBUG STATE ===\nChat request with model: {state.selected_model}, Product Model Search: {state.product_model_search}, Method: {state.method}, Session ID: {state.session_id}") | |
| # Update activity timestamp - this is KEY to prevent cleanup during active use | |
| state.last_activity = datetime.now() | |
| if not API_BASE_URL: | |
| return "Error: API_BASE_URL not configured" | |
| if not state.session_id: | |
| return "Error: Session ID not initialized" | |
| # Call API using httpx for async HTTP | |
| try: | |
| async with httpx.AsyncClient(timeout=600.0) as client: | |
| if image_path: | |
| # For image uploads, use form-data format as expected by API | |
| with open(image_path, 'rb') as f: | |
| files = {"image": f.read()} | |
| data = { | |
| "message": message, | |
| "product_model_search": str(state.product_model_search).lower(), | |
| "method": state.method, | |
| "session_id": state.session_id, | |
| "llm_model": state.selected_model, | |
| "debug": "Normal", | |
| "is_enhance_query": str(state.is_enhance_query).lower() | |
| } | |
| # Use multipart form data for image upload | |
| files_dict = {"image": ("image.jpg", files["image"], "image/jpeg")} | |
| resp = await client.post( | |
| f"{API_BASE_URL}/chat_with_image", | |
| files=files_dict, | |
| data=data | |
| ) | |
| else: | |
| # For text messages, use form-data format as expected by API | |
| data = { | |
| "message": message, | |
| "session_id": state.session_id, | |
| "debug": "Normal", | |
| "product_model_search": str(state.product_model_search).lower(), | |
| "method": state.method, | |
| "llm_model": state.selected_model, | |
| "is_enhance_query": str(state.is_enhance_query).lower() | |
| } | |
| resp = await client.post( | |
| f"{API_BASE_URL}/chat", | |
| data=data # Form data format | |
| ) | |
| if resp.status_code == 200: | |
| j = resp.json() | |
| response = j.get("response", "") | |
| specs_advantages = j.get("specs_advantages") | |
| solution_packages = j.get("solution_packages") | |
| raw_documents = j.get("raw_documents") # This might be None from API | |
| outputs = j.get("outputs") | |
| else: | |
| print(f"API Error: {resp.status_code} - {resp.text}") | |
| response = f"Error: API status {resp.status_code}" | |
| specs_advantages, solution_packages, raw_documents, outputs = None, None, None, None | |
| except Exception as e: | |
| print(f"Exception calling API: {e}") | |
| response = f"Error calling API: {e}" | |
| specs_advantages, solution_packages, raw_documents, outputs = None, None, None, None | |
| # Update state | |
| if specs_advantages is not None: | |
| state.specs_advantages = specs_advantages | |
| if solution_packages is not None: | |
| state.solution_packages = solution_packages | |
| if raw_documents is not None: | |
| state.raw_documents = raw_documents | |
| if outputs is not None: | |
| state.outputs = outputs | |
| # Filter products based on query | |
| if state.specs_advantages is not None: | |
| await ChatService.get_specific_product_from_query(message, state) | |
| # NEW: Format response with 2-column grid for products | |
| formatted_response = ChatService.format_product_grid(response) | |
| return formatted_response | |
| def format_product_grid(response_text: str) -> str: | |
| """Format product listings into 2-column grid while keeping other content intact""" | |
| # Pattern to match: * **[Name](url)**\n\n  | |
| pattern = r'\*\s+\*\*\[(.*?)\]\((.*?)\)\*\*\s*\n\s*!\[(.*?)\]\((.*?)\)' | |
| matches = list(re.finditer(pattern, response_text)) | |
| if not matches: | |
| # No product listings found, return original | |
| return response_text | |
| # Find boundaries of product section | |
| first_match_start = matches[0].start() | |
| last_match_end = matches[-1].end() | |
| # Split into: intro + products + rest | |
| intro_text = response_text[:first_match_start].strip() | |
| rest_text = response_text[last_match_end:].strip() | |
| # Extract all products | |
| products = [] | |
| for match in matches: | |
| products.append({ | |
| 'name': match.group(1), | |
| 'url': match.group(2), | |
| 'alt': match.group(3), | |
| 'img': match.group(4) | |
| }) | |
| # Build 2-column markdown table | |
| grid_content = "\n\n" | |
| for i in range(0, len(products), 2): | |
| p1 = products[i] | |
| if i + 1 < len(products): | |
| p2 = products[i + 1] | |
| # Two columns | |
| grid_content += f"| **[{p1['name']}]({p1['url']})** | **[{p2['name']}]({p2['url']})** |\n" | |
| grid_content += f"|:---:|:---:|\n" | |
| grid_content += f"| ![{p1['alt']}]({p1['img']}) | ![{p2['alt']}]({p2['img']}) |\n\n" | |
| else: | |
| # Single column for last odd product | |
| grid_content += f"| **[{p1['name']}]({p1['url']})** |\n" | |
| grid_content += f"|:---:|\n" | |
| grid_content += f"| ![{p1['alt']}]({p1['img']}) |\n\n" | |
| # Reconstruct full response | |
| return intro_text + grid_content + rest_text | |
| async def get_specific_product_from_query(query, state): | |
| """Filter specs_advantages based on models found in query""" | |
| specs_map = state.specs_advantages or {} | |
| product_model_list = [] | |
| for prod_id, data in specs_map.items(): | |
| model = data.get("model", None) | |
| if model is not None: | |
| product_model_list.append(model) | |
| found_models = [] | |
| for model in product_model_list: | |
| pattern = re.escape(model) | |
| if re.search(pattern, query, re.IGNORECASE): | |
| found_models.append(model) | |
| new_specs_advantages = {} | |
| if found_models != []: | |
| for prod_id, data in specs_map.items(): | |
| if data.get("model", None) in found_models: | |
| new_specs_advantages[prod_id] = data | |
| state.specs_advantages = new_specs_advantages | |
| class DisplayService: | |
| """Handles display-related operations with async HTTP calls""" | |
| async def show_specs(state: ConversationState) -> str: | |
| """Generate specifications table""" | |
| specs_map = state.specs_advantages | |
| columns = ["Thông số"] | |
| raw_data = [] | |
| if not specs_map: | |
| return "📄 **Thông số kỹ thuật**\n\nKhông có thông số kỹ thuật nào." | |
| print(specs_map) | |
| for prod_id, data in specs_map.items(): | |
| spec = data.get("specification") | |
| if spec is None or spec == "" or spec == "None": | |
| spec = "Hiện tại trong dữ liệu chưa có thông tin về thông số kĩ thuật của sản phẩm này!" | |
| model = data.get("model", "") | |
| url = data.get("url", "") | |
| # Handle both products and solution packages | |
| if url: | |
| # full_name = f"**[{data['name']} {model}]({url})**" | |
| full_name = f"**[{data['name']}]({url})**" | |
| else: | |
| # full_name = f"**{data['name']} {model}**" | |
| full_name = f"**{data['name']}**" | |
| if full_name not in columns: | |
| columns.append(full_name) | |
| if spec: | |
| # Check if this is a solution package (contains markdown table) | |
| if "### 📦" in spec: | |
| # For solution packages, parse the markdown table properly | |
| lines = spec.split('\n') | |
| in_table = False | |
| headers = [] | |
| for line in lines: | |
| line = line.strip() | |
| if '|' in line and '---' not in line and line.startswith('|') and line.endswith('|'): | |
| cells = [cell.strip() | |
| for cell in line.split('|')[1:-1]] | |
| if not in_table: | |
| # This is the header row | |
| headers = cells | |
| in_table = True | |
| continue | |
| # This is a data row | |
| if len(cells) >= len(headers): | |
| for i, header in enumerate(headers): | |
| if i < len(cells): | |
| param_name = header | |
| param_value = cells[i] | |
| existing_row = None | |
| for row in raw_data: | |
| if row["Thông số"] == param_name: | |
| existing_row = row | |
| break | |
| if existing_row: | |
| existing_row[full_name] = param_value | |
| else: | |
| new_row = {"Thông số": param_name} | |
| for col in columns[1:]: | |
| new_row[col] = "" | |
| new_row[full_name] = param_value | |
| raw_data.append(new_row) | |
| elif in_table and (not line or not line.startswith('|')): | |
| in_table = False | |
| else: | |
| # For products, parse specification items | |
| items = re.split(r';|\n', spec) | |
| for item in items: | |
| if ":" in item: | |
| key, value = item.split(':', 1) | |
| spec_key = key.strip().capitalize() | |
| if spec_key == "Vậtl iệu": | |
| spec_key = "Vật liệu" | |
| if "|" in spec_key: | |
| spec_key = spec_key.strip().replace("|", "").capitalize() | |
| existing_row = None | |
| for row in raw_data: | |
| if row["Thông số"] == spec_key: | |
| existing_row = row | |
| break | |
| if existing_row: | |
| existing_row[full_name] = value.strip() if value else "" | |
| else: | |
| new_row = {"Thông số": spec_key} | |
| for col in columns[1:]: | |
| new_row[col] = "" | |
| new_row[full_name] = value.strip() if value else "" | |
| raw_data.append(new_row) | |
| if raw_data: | |
| df = pd.DataFrame(raw_data, columns=columns) | |
| df = df.fillna("").replace("None", "").replace("nan", "") | |
| else: | |
| df = pd.DataFrame( | |
| [["Không có thông số kỹ thuật", "", ""]], columns=columns) | |
| markdown_table = df.to_markdown(index=False) | |
| return f"📄 **Thông số kỹ thuật**\n\n{markdown_table}" | |
| async def show_advantages(state: ConversationState) -> str: | |
| """Generate advantages as bullet list instead of table""" | |
| specs_map = state.specs_advantages | |
| if not specs_map: | |
| return "💡 **Ưu điểm nổi trội**\n\nKhông có ưu điểm nào." | |
| content = "💡 **Ưu điểm nổi trội**\n\n" | |
| for prod_id, data in specs_map.items(): | |
| # adv = data.get("advantages", "Hiện tại trong dữ liệu chưa có thông tin về ưu điểm nổi trội của sản phẩm này!") | |
| adv = data.get("advantages") | |
| if adv is None or adv == "" or adv == "None": | |
| adv = "Hiện tại trong dữ liệu chưa có thông tin về ưu điểm nổi trội của sản phẩm này!" | |
| model = data.get("model", "") | |
| url = data.get("url", "") | |
| # Handle both products and solution packages | |
| if url: | |
| full_name = f"**[{data['name']}]({url})**" | |
| else: | |
| full_name = f"**{data['name']}**" | |
| content += f"### {full_name}\n" | |
| # Split by newlines and create bullet points | |
| advantages_list = [line.strip() for line in adv.split('\n') if line.strip()] | |
| for advantage in advantages_list: | |
| content += f"- {advantage}\n" | |
| content += "\n" | |
| return content | |
| async def show_solution_packages(state: ConversationState) -> str: | |
| """Show solution packages in a structured format""" | |
| packages = state.solution_packages | |
| if not packages or packages == []: | |
| return "📦 **Gói sản phẩm**\n\nKhông có gói sản phẩm nào" | |
| markdown_table = "\n\n".join(packages) | |
| return markdown_table | |
| async def show_all_products_table(state: ConversationState): | |
| """Show all products table using async HTTP""" | |
| outputs = state.outputs or {} | |
| if not outputs: | |
| return "Không có dữ liệu sản phẩm" | |
| try: | |
| # Updated to match API format - send outputs in request body | |
| payload = {"outputs": outputs} | |
| async with httpx.AsyncClient(timeout=60.0) as client: | |
| resp = await client.post(f"{API_BASE_URL}/products_by_category", json=payload) | |
| if resp.status_code == 200: | |
| data = resp.json() | |
| return data.get("markdown_table", "Không có dữ liệu sản phẩm") | |
| else: | |
| print(f"All products API error: {resp.status_code} - {resp.text}") | |
| return "Không có dữ liệu sản phẩm" | |
| except Exception as e: | |
| print(f"Exception in show_all_products_table: {e}") | |
| return f"Error: {e}" | |
| class UIService: | |
| """Handles UI-related operations""" | |
| def create_action_buttons(state: ConversationState): | |
| """Create persistent action buttons""" | |
| search_status = "🔍 Tìm theo mã sản phẩm (Đang tắt)" if not state.product_model_search else "🔍 Tìm theo mã sản phẩm (Đang bật)" | |
| method_labels = { | |
| "dense": "🔎 Tìm kiếm: Dense", | |
| "sparse": "🔎 Tìm kiếm: Sparse (BM25)", | |
| "hybrid": "🔎 Tìm kiếm: Hybrid" | |
| } | |
| method_status = method_labels.get(state.method, "🔎 Tìm kiếm: Dense") | |
| enhance_status = "🧠 Tăng cường truy vấn (Đang tắt)" if not state.is_enhance_query else "🧠 Tăng cường truy vấn (Đang bật)" | |
| return [ | |
| cl.Action(name="show_specs", value="specs", label="📄 Thông số kỹ thuật", payload={"action": "specs"}), | |
| cl.Action(name="show_advantages", value="advantages", label="💡 Ưu điểm nổi trội", payload={"action": "advantages"}), | |
| cl.Action(name="show_packages", value="packages", label="📦 Gói sản phẩm", payload={"action": "packages"}), | |
| cl.Action(name="show_all_products", value="all_products", label="🛒 Tất cả sản phẩm", payload={"action": "all_products"}), | |
| cl.Action(name="toggle_product_search", value="toggle_search", label=search_status, payload={"action": "toggle_search"}), | |
| cl.Action(name="change_search_method", value="change_method", label="🔎 Đổi phương thức tìm kiếm", payload={"action": "change_method"}), | |
| cl.Action(name="toggle_enhance_query", value="toggle_enhance", label=enhance_status, payload={"action": "toggle_enhance"}), | |
| cl.Action(name="change_model", value="model", label="🔄 Đổi model", payload={"action": "model"}), | |
| ] | |
| def create_start_buttons(state: ConversationState): | |
| """Create start buttons""" | |
| search_status = "🔍 Tìm theo mã sản phẩm (Đang tắt)" if not state.product_model_search else "🔍 Tìm theo mã sản phẩm (Đang bật)" | |
| method_labels = { | |
| "dense": "🔎 Tìm kiếm: Dense", | |
| "sparse": "🔎 Tìm kiếm: Sparse (BM25)", | |
| "hybrid": "🔎 Tìm kiếm: Hybrid" | |
| } | |
| method_status = method_labels.get(state.method, "🔎 Tìm kiếm: Dense") | |
| enhance_status = "🧠 Tăng cường truy vấn (Đang tắt)" if not state.is_enhance_query else "🧠 Tăng cường truy vấn (Đang bật)" | |
| return [ | |
| cl.Action(name="toggle_product_search", value="toggle_search", label=search_status, payload={"action": "toggle_search"}), | |
| cl.Action(name="change_search_method", value="change_method", label="🔎 Đổi phương thức tìm kiếm", payload={"action": "change_method"}), | |
| cl.Action(name="toggle_enhance_query", value="toggle_enhance", label=enhance_status, payload={"action": "toggle_enhance"}), | |
| cl.Action(name="change_model", value="model", label="🔄 Đổi model", payload={"action": "model"}), | |
| ] | |
| async def send_message_with_buttons(content: str, state: ConversationState, actions=None, author="assistant"): | |
| """Send message with optional action buttons and author""" | |
| if actions is None: | |
| actions = UIService.create_action_buttons(state) | |
| await cl.Message( | |
| content=content, | |
| actions=actions, | |
| author=author | |
| ).send() | |
| async def create_typing_animation(): | |
| """Create typing animation effect (legacy method - kept for compatibility)""" | |
| msg = cl.Message(content="", author="assistant") | |
| await msg.send() | |
| # Typing animation frames | |
| typing_frames = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"] | |
| for i in range(27): # Show animation for ~2 seconds | |
| frame = typing_frames[i % len(typing_frames)] | |
| msg.content = f"{frame} Đang suy nghĩ..." | |
| await msg.update() | |
| await asyncio.sleep(0.25) | |
| return msg | |
| async def run_typing_animation(msg: cl.Message): | |
| """Run typing animation until cancelled""" | |
| typing_frames = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"] | |
| frame_index = 0 | |
| try: | |
| while True: # Run indefinitely until cancelled | |
| frame = typing_frames[frame_index % len(typing_frames)] | |
| msg.content = f"{frame} Đang suy nghĩ..." | |
| await msg.update() | |
| await asyncio.sleep(0.25) | |
| frame_index += 1 | |
| except asyncio.CancelledError: | |
| # Animation was cancelled, this is expected | |
| print("🎬 Animation cancelled - API response received") | |
| raise | |
| # HELPER FUNCTIONS: Session management with proper async error handling | |
| async def ensure_session_state() -> Optional[ConversationState]: | |
| """Ensure session state exists, create if not""" | |
| try: | |
| session_id = cl.user_session.get("session_id") | |
| if not session_id: | |
| print(f"Lỗi: Không lấy được session id ở ensure_session_state") | |
| return None | |
| return await StateManager.get_or_create_session_state(session_id) | |
| except Exception as e: | |
| print(f"⚠️ Error ensuring session state: {e}") | |
| return None | |
| async def get_current_session_state() -> Optional[ConversationState]: | |
| """Get current session state using Chainlit's session system""" | |
| try: | |
| # Use Chainlit's user session to get unique session ID | |
| chainlit_session_id = cl.user_session.get("session_id") | |
| if chainlit_session_id: | |
| return await StateManager.get_or_create_session_state(chainlit_session_id) | |
| else: | |
| print("⚠️ No Chainlit session ID found") | |
| return None | |
| except Exception as e: | |
| print(f"⚠️ Error getting session state: {e}") | |
| return None | |
| async def on_chat_start(): | |
| """Initialize the chat session""" | |
| session_id = cl.user_session.get("session_id") | |
| if not session_id: | |
| session_id = str(uuid.uuid4()) | |
| cl.user_session.set("session_id", session_id) | |
| print(f"🆕 Generated new session_id: {session_id}") | |
| else: | |
| print(f"🔄 Reusing existing session_id: {session_id}") | |
| app_state = await StateManager.get_or_create_session_state(session_id) | |
| await cl.Message( | |
| content=f"🛍️ **RangDong Sales Agent** (Session: {session_id[:8]}...)\n\n" | |
| f"Xin chào! Tôi có thể giúp bạn tìm kiếm và tư vấn sản phẩm RangDong. Hãy thử các câu hỏi mẫu:\n\n" | |
| f"- Tìm sản phẩm bình giữ nhiệt dung tích dưới 2 lít\n" | |
| f"- Tìm sản phẩm ổ cắm thông minh\n" | |
| f"- Tư vấn cho tôi đèn học chống cận cho con gái của tôi học lớp 6", | |
| author="assistant" | |
| ).send() | |
| actions = UIService.create_start_buttons(app_state) | |
| await cl.Message( | |
| content="Sử dụng nút bên dưới để cấu hình:", | |
| actions=actions, | |
| author="assistant" | |
| ).send() | |
| async def on_chat_end(): | |
| """Handle chat session end with delayed cleanup mechanism using asyncio""" | |
| try: | |
| session_id = cl.user_session.get("session_id") | |
| print(f"📤 on_chat_end triggered for session {session_id}") | |
| if session_id: | |
| # Schedule delayed cleanup instead of immediate cleanup | |
| # Use shorter delay (30s) since this is likely just a temporary disconnect | |
| await StateManager.schedule_delayed_cleanup(session_id, delay_seconds=3600) | |
| print(f"⏳ Scheduled delayed cleanup for session {session_id} (1h delay for disconnect tolerance)") | |
| else: | |
| print("⚠️ No session_id found in on_chat_end") | |
| except Exception as e: | |
| print(f"⚠️ Error during on_chat_end: {e}") | |
| # ACTION CALLBACKS - All use ensure_session_state() for better reliability | |
| async def on_show_specs(action): | |
| """Handle show specifications action""" | |
| app_state = await ensure_session_state() | |
| if app_state is None: | |
| await cl.Message(content="Error: Session state not found", author="assistant").send() | |
| return | |
| specs_content = await DisplayService.show_specs(app_state) | |
| await UIService.send_message_with_buttons(specs_content, app_state, author="assistant") | |
| async def on_show_advantages(action): | |
| """Handle show advantages action""" | |
| app_state = await ensure_session_state() | |
| if app_state is None: | |
| await cl.Message(content="Error: Session state not found", author="assistant").send() | |
| return | |
| adv_content = await DisplayService.show_advantages(app_state) | |
| await UIService.send_message_with_buttons(adv_content, app_state, author="assistant") | |
| async def on_show_packages(action): | |
| """Handle show packages action""" | |
| app_state = await ensure_session_state() | |
| if app_state is None: | |
| await cl.Message(content="Error: Session state not found", author="assistant").send() | |
| return | |
| pkg_content = await DisplayService.show_solution_packages(app_state) | |
| await UIService.send_message_with_buttons(pkg_content, app_state, author="assistant") | |
| async def on_show_all_products(action): | |
| """Handle show all products action""" | |
| app_state = await ensure_session_state() | |
| if app_state is None: | |
| await cl.Message(content="Error: Session state not found", author="assistant").send() | |
| return | |
| all_products_content = await DisplayService.show_all_products_table(app_state) | |
| await UIService.send_message_with_buttons(all_products_content, app_state, author="assistant") | |
| async def on_toggle_product_search(action): | |
| """Handle toggle product model search action""" | |
| app_state = await ensure_session_state() | |
| if app_state is None: | |
| await cl.Message(content="Error: Session state not found", author="assistant").send() | |
| return | |
| await StateManager.toggle_product_model_search(app_state) | |
| status_message = ( | |
| "✅ **Đã bật tìm kiếm theo mã sản phẩm**\n\n" | |
| "Khi bạn nhắc đến mã/model cụ thể trong câu hỏi, hệ thống sẽ tìm kiếm chính xác theo mã đó." | |
| if app_state.product_model_search | |
| else "✅ **Đã tắt tìm kiếm theo mã sản phẩm**\n\n" | |
| "Hệ thống sẽ tìm kiếm sản phẩm theo cách thông thường." | |
| ) | |
| await UIService.send_message_with_buttons(status_message, app_state, author="assistant") | |
| async def on_toggle_enhance_query(action): | |
| """Handle toggle enhance query action""" | |
| app_state = await ensure_session_state() | |
| if app_state is None: | |
| await cl.Message(content="Error: Session state not found", author="assistant").send() | |
| return | |
| await StateManager.toggle_enhance_query(app_state) | |
| status_message = ( | |
| "✅ **Đã bật tăng cường truy vấn**\n\n" | |
| "Hệ thống sẽ tự động cải thiện và mở rộng câu hỏi của bạn để tìm kiếm chính xác hơn." | |
| if app_state.is_enhance_query | |
| else "✅ **Đã tắt tăng cường truy vấn**\n\n" | |
| "Hệ thống sẽ sử dụng câu hỏi gốc của bạn mà không cải thiện." | |
| ) | |
| await UIService.send_message_with_buttons(status_message, app_state, author="assistant") | |
| async def on_change_search_method(action): | |
| """Handle change search method action""" | |
| app_state = await ensure_session_state() | |
| if app_state is None: | |
| await cl.Message(content="Error: Session state not found", author="assistant").send() | |
| return | |
| method_actions = [ | |
| cl.Action(name="select_method_dense", value="dense", label="🔎 Dense (Mặc định)", payload={"method": "dense"}), | |
| cl.Action(name="select_method_sparse", value="sparse", label="🔎 Sparse (BM25)", payload={"method": "sparse"}), | |
| cl.Action(name="select_method_hybrid", value="hybrid", label="🔎 Hybrid", payload={"method": "hybrid"}), | |
| cl.Action(name="back_to_main", value="back", label="🔙 Quay lại", payload={"action": "back"}) | |
| ] | |
| current_method_labels = { | |
| "dense": "Dense", | |
| "sparse": "Sparse (BM25)", | |
| "hybrid": "Hybrid" | |
| } | |
| current = current_method_labels.get(app_state.method, "Dense") | |
| await cl.Message( | |
| content=f"**Model hiện tại**: {app_state.selected_model}\n**Tìm kiếm theo mã**: {'Đang bật' if app_state.product_model_search else 'Đang tắt'}\n**Phương thức tìm kiếm**: {current}\n**Tăng cường truy vấn**: {'Đang bật' if app_state.is_enhance_query else 'Đang tắt'}\n\nChọn phương thức tìm kiếm mới:", | |
| actions=method_actions, | |
| author="assistant" | |
| ).send() | |
| async def on_select_method_dense(action): | |
| app_state = await ensure_session_state() | |
| if app_state is None: | |
| await cl.Message(content="Error: Session state not found", author="assistant").send() | |
| return | |
| app_state.method = "dense" | |
| app_state.last_activity = datetime.now() | |
| await UIService.send_message_with_buttons("✅ Đã chuyển sang **Dense Search**\n\nHệ thống sẽ sử dụng tìm kiếm semantic vector thông thường.", app_state, author="assistant") | |
| async def on_select_method_sparse(action): | |
| app_state = await ensure_session_state() | |
| if app_state is None: | |
| await cl.Message(content="Error: Session state not found", author="assistant").send() | |
| return | |
| app_state.method = "sparse" | |
| app_state.last_activity = datetime.now() | |
| await UIService.send_message_with_buttons("✅ Đã chuyển sang **Sparse Search (BM25)**\n\nHệ thống sẽ sử dụng tìm kiếm từ khóa BM25.", app_state, author="assistant") | |
| async def on_select_method_hybrid(action): | |
| app_state = await ensure_session_state() | |
| if app_state is None: | |
| await cl.Message(content="Error: Session state not found", author="assistant").send() | |
| return | |
| app_state.method = "hybrid" | |
| app_state.last_activity = datetime.now() | |
| await UIService.send_message_with_buttons("✅ Đã chuyển sang **Hybrid Search**\n\nHệ thống sẽ kết hợp cả Dense và Sparse vector.", app_state, author="assistant") | |
| async def on_change_model(action): | |
| """Handle model change action""" | |
| app_state = await ensure_session_state() | |
| if app_state is None: | |
| await cl.Message(content="Error: Session state not found", author="assistant").send() | |
| return | |
| models = ["Gemini 2.0 Flash", "Gemini 2.5 Flash Lite", "Gemini 2.0 Flash Lite"] | |
| model_actions = [ | |
| cl.Action(name=f"select_model_{i}", value=model, label=model, payload={"model": model}) | |
| for i, model in enumerate(models) | |
| ] | |
| model_actions.append( | |
| cl.Action(name="back_to_main", value="back", label="🔙 Quay lại", payload={"action": "back"}) | |
| ) | |
| await cl.Message( | |
| content=f"**Model hiện tại**: {app_state.selected_model}\n**Tìm kiếm theo mã**: {'Đang bật' if app_state.product_model_search else 'Đang tắt'}\n**Phương thức tìm kiếm**: {app_state.method}\n**Tăng cường truy vấn**: {'Đang bật' if app_state.is_enhance_query else 'Đang tắt'}\n\nChọn model mới:", | |
| actions=model_actions, | |
| author="assistant" | |
| ).send() | |
| async def on_back_to_main(action): | |
| """Handle back to main menu action""" | |
| app_state = await ensure_session_state() | |
| if app_state is None: | |
| await cl.Message(content="Error: Session state not found", author="assistant").send() | |
| return | |
| actions = UIService.create_action_buttons(app_state) | |
| await cl.Message( | |
| content="📋 **Menu chính**\n\nSử dụng các nút bên dưới để:", | |
| actions=actions, | |
| author="assistant" | |
| ).send() | |
| async def on_select_model_0(action): | |
| app_state = await ensure_session_state() | |
| if app_state is None: | |
| await cl.Message(content="Error: Session state not found", author="assistant").send() | |
| return | |
| await StateManager.change_model(app_state, "Gemini 2.0 Flash") | |
| await UIService.send_message_with_buttons("✅ Đã chuyển sang **Gemini 2.0 Flash**", app_state, author="assistant") | |
| async def on_select_model_1(action): | |
| app_state = await ensure_session_state() | |
| if app_state is None: | |
| await cl.Message(content="Error: Session state not found", author="assistant").send() | |
| return | |
| await StateManager.change_model(app_state, "Gemini 2.5 Flash Lite") | |
| await UIService.send_message_with_buttons("✅ Đã chuyển sang **Gemini 2.5 Flash Lite**", app_state, author="assistant") | |
| async def on_select_model_2(action): | |
| app_state = await ensure_session_state() | |
| if app_state is None: | |
| await cl.Message(content="Error: Session state not found", author="assistant").send() | |
| return | |
| await StateManager.change_model(app_state, "Gemini 2.0 Flash Lite") | |
| await UIService.send_message_with_buttons("✅ Đã chuyển sang **Gemini 2.0 Flash Lite**", app_state, author="assistant") | |
| # DEBUG ENDPOINTS (optional - for monitoring session status) | |
| async def on_debug_sessions(action): | |
| """Debug action to show session status (can be added to debug builds)""" | |
| try: | |
| status = await StateManager.get_session_status() | |
| debug_content = "🔍 **Debug: Session Status**\n\n" | |
| if not status: | |
| debug_content += "No active sessions." | |
| else: | |
| for session_id, info in status.items(): | |
| debug_content += f"**Session: {session_id[:8]}...**\n" | |
| debug_content += f"- Pending cleanup: {info['pending_cleanup']}\n" | |
| debug_content += f"- Has task: {info['has_task']}\n" | |
| debug_content += f"- Last activity: {info['last_activity']}\n" | |
| debug_content += f"- Model: {info['selected_model']}\n" | |
| debug_content += f"- Product search: {info['product_model_search']}\n" | |
| debug_content += f"- Method: {info.get('method', 'dense')}\n\n" | |
| await cl.Message(content=debug_content, author="assistant").send() | |
| except Exception as e: | |
| await cl.Message(content=f"Debug error: {e}", author="assistant").send() | |
| async def main(message: cl.Message): | |
| """Main message handler with concurrent animation and API call""" | |
| app_state = await ensure_session_state() | |
| if app_state is None: | |
| await cl.Message(content="Error: Session state not found", author="assistant").send() | |
| return | |
| # Handle images if present | |
| image_path = None | |
| if message.elements: | |
| for element in message.elements: | |
| if isinstance(element, cl.Image): | |
| image_path = element.path | |
| break | |
| # Create initial message for animation | |
| msg = cl.Message(content="", author="assistant") | |
| await msg.send() | |
| # Create concurrent tasks for animation and API call | |
| animation_task = asyncio.create_task(run_typing_animation(msg)) | |
| api_task = asyncio.create_task(ChatService.respond_to_chat(app_state, message.content, image_path)) | |
| try: | |
| # Wait for API response (this will complete first usually) | |
| response = await api_task | |
| # Cancel animation task since we have the response | |
| animation_task.cancel() | |
| # Wait a bit for graceful animation cancellation | |
| try: | |
| await asyncio.wait_for(animation_task, timeout=0.1) | |
| except (asyncio.CancelledError, asyncio.TimeoutError): | |
| pass | |
| except Exception as e: | |
| # If API fails, cancel animation and show error | |
| animation_task.cancel() | |
| try: | |
| await asyncio.wait_for(animation_task, timeout=0.1) | |
| except (asyncio.CancelledError, asyncio.TimeoutError): | |
| pass | |
| response = f"Error: {e}" | |
| # Update message with final response and buttons | |
| msg.content = response | |
| msg.actions = UIService.create_action_buttons(app_state) | |
| await msg.update() |