Spaces:
Sleeping
Sleeping
| import os | |
| from pymongo import MongoClient | |
| from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError | |
| from dotenv import load_dotenv | |
| from datetime import datetime, timedelta | |
| import pytz | |
| import logging | |
| # Configure logging | |
| logger = logging.getLogger(__name__) | |
| # Load environment variables | |
| load_dotenv() | |
| # MongoDB connection string from .env | |
| MONGODB_URL = os.getenv("MONGODB_URL") | |
| DB_NAME = os.getenv("DB_NAME", "Telegram") | |
| COLLECTION_NAME = os.getenv("COLLECTION_NAME", "session_chat") | |
| # Set timeout for MongoDB connection | |
| MONGODB_TIMEOUT = int(os.getenv("MONGODB_TIMEOUT", "5000")) # 5 seconds by default | |
| # Legacy cache settings - now only used for configuration purposes | |
| HISTORY_CACHE_TTL = int(os.getenv("HISTORY_CACHE_TTL", "3600")) # 1 hour by default | |
| HISTORY_QUEUE_SIZE = int(os.getenv("HISTORY_QUEUE_SIZE", "10")) # 10 items by default | |
| # Create MongoDB connection with timeout | |
| try: | |
| client = MongoClient(MONGODB_URL, serverSelectionTimeoutMS=MONGODB_TIMEOUT) | |
| db = client[DB_NAME] | |
| # Collections | |
| session_collection = db[COLLECTION_NAME] | |
| logger.info(f"MongoDB connection initialized to {DB_NAME}.{COLLECTION_NAME}") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize MongoDB connection: {e}") | |
| # Don't raise exception to avoid crash during startup, error handling will be done in functions | |
| # Check MongoDB connection | |
| def check_db_connection(): | |
| """Check MongoDB connection""" | |
| try: | |
| # Issue a ping to confirm a successful connection | |
| client.admin.command('ping') | |
| logger.info("MongoDB connection is working") | |
| return True | |
| except (ConnectionFailure, ServerSelectionTimeoutError) as e: | |
| logger.error(f"MongoDB connection failed: {e}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Unknown error when checking MongoDB connection: {e}") | |
| return False | |
| # Timezone for Asia/Ho_Chi_Minh | |
| asia_tz = pytz.timezone('Asia/Ho_Chi_Minh') | |
| def get_local_time(): | |
| """Get current time in Asia/Ho_Chi_Minh timezone""" | |
| return datetime.now(asia_tz).strftime("%Y-%m-%d %H:%M:%S") | |
| def get_local_datetime(): | |
| """Get current datetime object in Asia/Ho_Chi_Minh timezone""" | |
| return datetime.now(asia_tz) | |
| # For backward compatibility | |
| get_vietnam_time = get_local_time | |
| get_vietnam_datetime = get_local_datetime | |
| # Utility functions | |
| def save_session(session_id, factor, action, first_name, last_name, message, user_id, username, response=None): | |
| """Save user session to MongoDB""" | |
| try: | |
| session_data = { | |
| "session_id": session_id, | |
| "factor": factor, | |
| "action": action, | |
| "created_at": get_local_time(), | |
| "created_at_datetime": get_local_datetime(), | |
| "first_name": first_name, | |
| "last_name": last_name, | |
| "message": message, | |
| "user_id": user_id, | |
| "username": username, | |
| "response": response | |
| } | |
| result = session_collection.insert_one(session_data) | |
| logger.info(f"Session saved with ID: {result.inserted_id}") | |
| return { | |
| "acknowledged": result.acknowledged, | |
| "inserted_id": str(result.inserted_id), | |
| "session_data": session_data | |
| } | |
| except Exception as e: | |
| logger.error(f"Error saving session: {e}") | |
| raise | |
| def update_session_response(session_id, response): | |
| """Update a session with response""" | |
| try: | |
| # Lấy session hiện có | |
| existing_session = session_collection.find_one({"session_id": session_id}) | |
| if not existing_session: | |
| logger.warning(f"No session found with ID: {session_id}") | |
| return False | |
| result = session_collection.update_one( | |
| {"session_id": session_id}, | |
| {"$set": {"response": response}} | |
| ) | |
| logger.info(f"Session {session_id} updated with response") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error updating session response: {e}") | |
| raise | |
| def get_recent_sessions(user_id, action, n=3): | |
| """Get n most recent sessions for a specific user and action""" | |
| try: | |
| # Truy vấn trực tiếp từ MongoDB | |
| result = list( | |
| session_collection.find( | |
| {"user_id": user_id, "action": action}, | |
| {"_id": 0, "message": 1, "response": 1} | |
| ).sort("created_at_datetime", -1).limit(n) | |
| ) | |
| logger.debug(f"Retrieved {len(result)} recent sessions for user {user_id}, action {action}") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error getting recent sessions: {e}") | |
| return [] | |
| def get_chat_history(user_id, n = 5) -> str: | |
| """ | |
| Lấy lịch sử chat cho user_id từ MongoDB và ghép thành chuỗi theo định dạng: | |
| User: ... | |
| Bot: ... | |
| User: ... | |
| Bot: ... | |
| Chỉ lấy history sau lệnh /start hoặc /clear mới nhất | |
| """ | |
| try: | |
| # Tìm session /start hoặc /clear mới nhất | |
| reset_session = session_collection.find_one( | |
| { | |
| "user_id": str(user_id), | |
| "$or": [ | |
| {"action": "start"}, | |
| {"action": "clear"} | |
| ] | |
| }, | |
| sort=[("created_at_datetime", -1)] | |
| ) | |
| # Nếu không tìm thấy session reset nào, lấy n session gần nhất | |
| if reset_session: | |
| reset_time = reset_session["created_at_datetime"] | |
| # Lấy các session sau reset_time | |
| docs = list( | |
| session_collection.find({ | |
| "user_id": str(user_id), | |
| "created_at_datetime": {"$gt": reset_time} | |
| }).sort("created_at_datetime", 1) | |
| ) | |
| logger.info(f"Lấy {len(docs)} session sau lệnh {reset_session['action']} lúc {reset_time}") | |
| else: | |
| # Không tìm thấy reset session, lấy n session gần nhất | |
| docs = list(session_collection.find({"user_id": str(user_id)}).sort("created_at", -1).limit(n)) | |
| # Đảo ngược để có thứ tự từ cũ đến mới | |
| docs.reverse() | |
| logger.info(f"Không tìm thấy session reset, lấy {len(docs)} session gần nhất") | |
| if not docs: | |
| logger.info(f"Không tìm thấy dữ liệu cho user_id: {user_id}") | |
| return "" | |
| conversation_lines = [] | |
| # Xử lý từng document theo cấu trúc mới | |
| for doc in docs: | |
| factor = doc.get("factor", "").lower() | |
| action = doc.get("action", "").lower() | |
| message = doc.get("message", "") | |
| response = doc.get("response", "") | |
| # Bỏ qua lệnh start và clear | |
| if action in ["start", "clear"]: | |
| continue | |
| if factor == "user" and action == "asking_freely": | |
| conversation_lines.append(f"User: {message}") | |
| conversation_lines.append(f"Bot: {response}") | |
| # Ghép các dòng thành chuỗi | |
| return "\n".join(conversation_lines) | |
| except Exception as e: | |
| logger.error(f"Lỗi khi lấy lịch sử chat cho user_id {user_id}: {e}") | |
| return "" | |
| def get_request_history(user_id, n=3): | |
| """Get the most recent user requests to use as context for retrieval""" | |
| try: | |
| # Truy vấn trực tiếp từ MongoDB | |
| history = get_chat_history(user_id, n) | |
| # Just extract the questions for context | |
| requests = [] | |
| for line in history.split('\n'): | |
| if line.startswith("User: "): | |
| requests.append(line[6:]) # Lấy nội dung sau "User: " | |
| # Join all recent requests into a single string for context | |
| return " ".join(requests) | |
| except Exception as e: | |
| logger.error(f"Error getting request history: {e}") | |
| return "" |