Spaces:
Sleeping
Sleeping
| from app.database.db import db | |
| import re | |
| from bson import ObjectId | |
| from datetime import datetime, timezone, timedelta | |
| from pymongo import DESCENDING | |
| from typing import Optional | |
| class DatabaseQuery: | |
| def __init__(self): | |
| pass | |
| def create_chat_session(self, chat_session): | |
| try: | |
| db.chat_sessions.insert_one(chat_session) | |
| except Exception as e: | |
| raise Exception(f"Error creating chat session: {str(e)}") | |
| def get_user_chat_sessions(self, user_id): | |
| try: | |
| sessions = list(db.chat_sessions.find( | |
| {"user_id": user_id}, | |
| {"_id": 0} | |
| ).sort("last_accessed", -1)) | |
| return sessions | |
| except Exception as e: | |
| raise Exception(f"Error retrieving user chat sessions: {str(e)}") | |
| def create_chat(self, chat_data): | |
| try: | |
| db.chats.insert_one(chat_data) | |
| return True | |
| except Exception as e: | |
| raise Exception(f"Error creating chat: {str(e)}") | |
| def update_last_accessed_time(self, session_id): | |
| try: | |
| db.chat_sessions.update_one( | |
| {"session_id": session_id}, | |
| {"$set": {"last_accessed": datetime.now(timezone.utc)}} | |
| ) | |
| except Exception as e: | |
| raise Exception(f"Error updating last accessed time: {str(e)}") | |
| def get_session_chats(self, session_id, user_id): | |
| try: | |
| chats = list(db.chats.find( | |
| {"session_id": session_id, "user_id": user_id}, | |
| {"_id": 0} | |
| ).sort("timestamp", 1)) | |
| return chats | |
| except Exception as e: | |
| raise Exception(f"Error retrieving session chats: {str(e)}") | |
| def get_user_by_identifier(self, identifier): | |
| try: | |
| user = db.users.find_one({'$or': [{'username': identifier}, {'email': identifier}]}) | |
| return user | |
| except Exception as e: | |
| raise Exception(f"Error retrieving user by identifier: {str(e)}") | |
| def add_token_to_blacklist(self, jti): | |
| try: | |
| db.blacklist.insert_one({'jti': jti}) | |
| except Exception as e: | |
| raise Exception(f"Error adding token to blacklist: {str(e)}") | |
| def create_indexes(self): | |
| try: | |
| db.chat_sessions.create_index([("user_id", 1), ("last_accessed", -1)]) | |
| db.chat_sessions.create_index([("session_id", 1)]) | |
| db.chats.create_index([("session_id", 1), ("timestamp", 1)]) | |
| db.chats.create_index([("user_id", 1)]) | |
| except Exception as e: | |
| raise Exception(f"Error creating indexes: {str(e)}") | |
| def check_chat_session(self, session_id): | |
| try: | |
| chat_session = db.chat_sessions.find_one({'session_id': session_id}) | |
| return chat_session is not None | |
| except Exception as e: | |
| raise Exception(f"Error checking chat session: {str(e)}") | |
| def get_user_profile(self, username): | |
| try: | |
| user = db.users.find_one({'username': username}, {'password': 0}) | |
| return user | |
| except Exception as e: | |
| raise Exception(f"Error getting user profile: {str(e)}") | |
| def update_user_profile(self, username, update_fields): | |
| try: | |
| result = db.users.update_one( | |
| {'username': username}, | |
| {'$set': update_fields} | |
| ) | |
| return result.modified_count > 0 | |
| except Exception as e: | |
| raise Exception(f"Error updating user profile: {str(e)}") | |
| def delete_user_account(self, username): | |
| try: | |
| result = db.users.delete_one({'username': username}) | |
| return result.deleted_count > 0 | |
| except Exception as e: | |
| raise Exception(f"Error deleting user account: {str(e)}") | |
| def is_username_or_email_exists(self, username, email): | |
| try: | |
| user = db.users.find_one({'$or': [{'username': username}, {'email': email}]}) | |
| return user is not None | |
| except Exception as e: | |
| raise Exception(f"Error checking if username or email exists: {str(e)}") | |
| def create_or_update_temp_user(self, username, email, temp_user_data): | |
| try: | |
| db.temp_users.update_one( | |
| {'$or': [{'username': username}, {'email': email}]}, | |
| {'$set': temp_user_data}, | |
| upsert=True | |
| ) | |
| except Exception as e: | |
| raise Exception(f"Error creating/updating temp user: {str(e)}") | |
| def get_temp_user_by_username(self, username): | |
| try: | |
| temp_user = db.temp_users.find_one({'username': username}) | |
| return temp_user | |
| except Exception as e: | |
| raise Exception(f"Error retrieving temp user by username: {str(e)}") | |
| def delete_temp_user(self, username): | |
| try: | |
| db.temp_users.delete_one({'username': username}) | |
| except Exception as e: | |
| raise Exception(f"Error deleting temp user: {str(e)}") | |
| def create_user_from_data(self, user_data): | |
| try: | |
| db.users.insert_one(user_data) | |
| return user_data | |
| except Exception as e: | |
| raise Exception(f"Error creating user from data: {str(e)}") | |
| def create_user(self, username, email, hashed_password, name, age, created_at, | |
| is_verified=False, verification_code=None, code_expiration=None): | |
| try: | |
| new_user = { | |
| 'username': username, | |
| 'email': email, | |
| 'password': hashed_password, | |
| 'name': name, | |
| 'age': age, | |
| 'created_at': created_at, | |
| 'is_verified': is_verified | |
| } | |
| if verification_code and code_expiration: | |
| new_user['verification_code'] = verification_code | |
| new_user['code_expiration'] = code_expiration | |
| db.users.insert_one(new_user) | |
| return new_user | |
| except Exception as e: | |
| raise Exception(f"Error creating user: {str(e)}") | |
| def get_user_by_username(self, username): | |
| try: | |
| user = db.users.find_one({'username': username}) | |
| return user | |
| except Exception as e: | |
| raise Exception(f"Error retrieving user by username: {str(e)}") | |
| def verify_user_email(self, username): | |
| try: | |
| result = db.users.update_one( | |
| {'username': username}, | |
| {'$set': {'is_verified': True}, '$unset': {'verification_code': '', 'code_expiration': ''}} | |
| ) | |
| return result.modified_count > 0 | |
| except Exception as e: | |
| raise Exception(f"Error verifying user email: {str(e)}") | |
| def update_verification_code(self, username, verification_code, code_expiration): | |
| try: | |
| result = db.users.update_one( | |
| {'username': username}, | |
| {'$set': {'verification_code': verification_code, 'code_expiration': code_expiration}} | |
| ) | |
| return result.modified_count > 0 | |
| except Exception as e: | |
| raise Exception(f"Error updating verification code: {str(e)}") | |
| def is_valid_email(self, email): | |
| try: | |
| email_regex = r'^\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,4}\b' | |
| return re.match(email_regex, email) is not None | |
| except Exception as e: | |
| raise Exception(f"Error validating email: {str(e)}") | |
| def add_or_update_location(self, username, location): | |
| try: | |
| db.locations.update_one( | |
| {'username': username}, | |
| {'$set': {'location': location, 'updated_at': datetime.now(timezone.utc)}}, | |
| upsert=True | |
| ) | |
| except Exception as e: | |
| raise Exception(f"Error adding/updating location: {str(e)}") | |
| def get_location(self, username): | |
| try: | |
| location = db.locations.find_one({'username': username}) | |
| return location | |
| except Exception as e: | |
| raise Exception(f"Error retrieving location: {str(e)}") | |
| def submit_questionnaire(self, user_id, answers): | |
| try: | |
| questionnaire_data = { | |
| 'user_id': user_id, | |
| 'answers': answers, | |
| 'created_at': datetime.now(timezone.utc), | |
| 'updated_at': datetime.now(timezone.utc) | |
| } | |
| result = db.questionnaires.insert_one(questionnaire_data) | |
| return str(result.inserted_id) | |
| except Exception as e: | |
| raise Exception(f"Error submitting questionnaire: {str(e)}") | |
| def get_latest_questionnaire(self, user_id): | |
| try: | |
| questionnaire = db.questionnaires.find_one( | |
| {'user_id': user_id}, | |
| sort=[('created_at', -1)] | |
| ) | |
| if questionnaire: | |
| questionnaire['_id'] = str(questionnaire['_id']) | |
| return questionnaire | |
| except Exception as e: | |
| raise Exception(f"Error getting latest questionnaire: {str(e)}") | |
| def update_questionnaire(self, questionnaire_id, user_id, answers): | |
| try: | |
| result = db.questionnaires.update_one( | |
| {'_id': ObjectId(questionnaire_id), 'user_id': user_id}, | |
| { | |
| '$set': { | |
| 'answers': answers, | |
| 'updated_at': datetime.now(timezone.utc) | |
| } | |
| } | |
| ) | |
| return result.modified_count > 0 | |
| except Exception as e: | |
| raise Exception(f"Error updating questionnaire: {str(e)}") | |
| def delete_questionnaire(self, questionnaire_id, user_id): | |
| try: | |
| result = db.questionnaires.delete_one( | |
| {'_id': ObjectId(questionnaire_id), 'user_id': user_id} | |
| ) | |
| return result.deleted_count > 0 | |
| except Exception as e: | |
| raise Exception(f"Error deleting questionnaire: {str(e)}") | |
| def count_answered_questions(self, username): | |
| try: | |
| answered_count = db.questions.count_documents({ | |
| 'username': username, | |
| 'answer': {'$ne': None} | |
| }) | |
| return answered_count | |
| except Exception as e: | |
| raise Exception(f"Error counting answered questions: {str(e)}") | |
| def get_user_preferences(self, username): | |
| try: | |
| user_preferences = db.preferences.find_one({'username': username}) | |
| if not user_preferences: | |
| return { | |
| 'keywords': False, | |
| 'references': False, | |
| 'websearch': False, | |
| 'personalized_recommendations': False, | |
| 'environmental_recommendations': False | |
| } | |
| return { | |
| 'keywords': user_preferences.get('keywords', False), | |
| 'references': user_preferences.get('references', False), | |
| 'websearch': user_preferences.get('websearch', False), | |
| 'personalized_recommendations': user_preferences.get('personalized_recommendations', False), | |
| 'environmental_recommendations': user_preferences.get('environmental_recommendations', False) | |
| } | |
| except Exception as e: | |
| raise Exception(f"Error getting user preferences: {str(e)}") | |
| def set_user_preferences(self, username, preferences): | |
| try: | |
| preferences_data = { | |
| 'username': username, | |
| 'keywords': bool(preferences.get('keywords', False)), | |
| 'references': bool(preferences.get('references', False)), | |
| 'websearch': bool(preferences.get('websearch', False)), | |
| 'personalized_recommendations': bool(preferences.get('personalized_recommendations', False)), | |
| 'environmental_recommendations': bool(preferences.get('environmental_recommendations', False)), | |
| 'updated_at': datetime.now(timezone.utc) | |
| } | |
| result = db.preferences.update_one( | |
| {'username': username}, | |
| {'$set': preferences_data}, | |
| upsert=True | |
| ) | |
| return preferences_data | |
| except Exception as e: | |
| raise Exception(f"Error setting user preferences: {str(e)}") | |
| def get_user_theme(self, username): | |
| try: | |
| user_theme = db.user_themes.find_one({'username': username}) | |
| if not user_theme: | |
| return 'light' | |
| return user_theme.get('theme', 'light') | |
| except Exception as e: | |
| raise Exception(f"Error getting user theme: {str(e)}") | |
| def set_user_theme(self, username, theme): | |
| try: | |
| theme_data = { | |
| 'username': username, | |
| 'theme': "dark" if theme else "light", | |
| 'updated_at': datetime.now(timezone.utc) | |
| } | |
| db.user_themes.update_one( | |
| {'username': username}, | |
| {'$set': theme_data}, | |
| upsert=True | |
| ) | |
| return theme_data | |
| except Exception as e: | |
| raise Exception(f"Error setting user theme: {str(e)}") | |
| def verify_session(self, session_id, user_id): | |
| try: | |
| session = db.chat_sessions.find_one({ | |
| "session_id": session_id, | |
| "user_id": user_id | |
| }) | |
| return session is not None | |
| except Exception as e: | |
| raise Exception(f"Error verifying session: {str(e)}") | |
| def update_chat_session_title(self, session_id, new_title): | |
| try: | |
| result = db.chat_sessions.update_one( | |
| {"session_id": session_id}, | |
| {"$set": {"title": new_title}} | |
| ) | |
| if result.matched_count == 0: | |
| raise Exception("Chat session not found") | |
| return result.modified_count > 0 | |
| except Exception as e: | |
| raise Exception(f"Error updating chat session title: {str(e)}") | |
| def delete_chat_session(self, session_id, user_id): | |
| try: | |
| session_result = db.chat_sessions.delete_one({ | |
| "session_id": session_id, | |
| "user_id": user_id | |
| }) | |
| chats_result = db.chats.delete_many({ | |
| "session_id": session_id, | |
| "user_id": user_id | |
| }) | |
| return { | |
| "session_deleted": session_result.deleted_count > 0, | |
| "chats_deleted": chats_result.deleted_count | |
| } | |
| except Exception as e: | |
| raise Exception(f"Error deleting chat session and chats: {str(e)}") | |
| def delete_all_user_sessions_and_chats(self, user_id): | |
| try: | |
| chats_result = db.chats.delete_many({"user_id": user_id}) | |
| sessions_result = db.chat_sessions.delete_many({"user_id": user_id}) | |
| return { | |
| "deleted_chats": chats_result.deleted_count, | |
| "deleted_sessions": sessions_result.deleted_count | |
| } | |
| except Exception as e: | |
| raise Exception(f"Error deleting user sessions and chats: {str(e)}") | |
| def get_all_user_chats(self, user_id): | |
| try: | |
| sessions = list(db.chat_sessions.find( | |
| {"user_id": user_id}, | |
| {"_id": 0} | |
| ).sort("last_accessed", -1)) | |
| all_chats = [] | |
| for session in sessions: | |
| session_chats = list(db.chats.find( | |
| {"session_id": session["session_id"], "user_id": user_id}, | |
| {"_id": 0} | |
| ).sort("timestamp", 1)) | |
| all_chats.append({ | |
| "session_id": session["session_id"], | |
| "title": session.get("title", "New Chat"), | |
| "created_at": session.get("created_at"), | |
| "last_accessed": session.get("last_accessed"), | |
| "chats": session_chats | |
| }) | |
| return all_chats | |
| except Exception as e: | |
| raise Exception(f"Error retrieving all user chats: {str(e)}") | |
| def store_reset_token(self, email, token, expiration): | |
| try: | |
| db.password_resets.update_one( | |
| {'email': email}, | |
| { | |
| '$set': { | |
| 'token': token, | |
| 'expiration': expiration | |
| } | |
| }, | |
| upsert=True | |
| ) | |
| except Exception as e: | |
| raise Exception(f"Error storing reset token: {str(e)}") | |
| def verify_reset_token(self, token): | |
| try: | |
| reset_info = db.password_resets.find_one({ | |
| 'token': token, | |
| 'expiration': {'$gt': datetime.now(timezone.utc)} | |
| }) | |
| return reset_info | |
| except Exception as e: | |
| raise Exception(f"Error verifying reset token: {str(e)}") | |
| def update_password(self, email, hashed_password): | |
| try: | |
| db.users.update_one( | |
| {'email': email}, | |
| {'$set': {'password': hashed_password}} | |
| ) | |
| except Exception as e: | |
| raise Exception(f"Error updating password: {str(e)}") | |
| def delete_reset_token(self, token): | |
| try: | |
| db.password_resets.delete_one({'token': token}) | |
| except Exception as e: | |
| raise Exception(f"Error deleting reset token: {str(e)}") | |
| def delete_account_permanently(self, username): | |
| try: | |
| chat_deletion_result = self.delete_all_user_sessions_and_chats(username) | |
| preferences_result = db.preferences.delete_one({'username': username}) | |
| theme_result = db.user_themes.delete_one({'username': username}) | |
| location_result = db.locations.delete_one({'username': username}) | |
| questionnaire_result = db.questionnaires.delete_many({'user_id': username}) | |
| user_result = db.users.delete_one({'username': username}) | |
| return { | |
| 'success': True, | |
| 'deleted_data': { | |
| 'chats': chat_deletion_result['deleted_chats'], | |
| 'chat_sessions': chat_deletion_result['deleted_sessions'], | |
| 'preferences': preferences_result.deleted_count, | |
| 'theme': theme_result.deleted_count, | |
| 'location': location_result.deleted_count, | |
| 'questionnaires': questionnaire_result.deleted_count, | |
| 'user_account': user_result.deleted_count | |
| } | |
| } | |
| except Exception as e: | |
| raise Exception(f"Error deleting account permanently: {str(e)}") | |
| def store_reset_token(self, email, token, expiration): | |
| try: | |
| db.password_resets.update_one( | |
| {'email': email}, | |
| { | |
| '$set': { | |
| 'token': token, | |
| 'expiration': expiration | |
| } | |
| }, | |
| upsert=True | |
| ) | |
| except Exception as e: | |
| raise Exception(f"Error storing reset token: {str(e)}") | |
| def verify_reset_token(self, token): | |
| try: | |
| reset_info = db.password_resets.find_one({ | |
| 'token': token, | |
| 'expiration': {'$gt': datetime.now(timezone.utc)} | |
| }) | |
| return reset_info | |
| except Exception as e: | |
| raise Exception(f"Error verifying reset token: {str(e)}") | |
| def update_password(self, email, new_password): | |
| try: | |
| db.users.update_one( | |
| {'email': email}, | |
| {'$set': {'password': new_password}} | |
| ) | |
| except Exception as e: | |
| raise Exception(f"Error updating password: {str(e)}") | |
| def get_user_language(self, user_id): | |
| try: | |
| language = db.languages.find_one({'user_id': user_id}) | |
| return language.get('language') if language else None | |
| except Exception as e: | |
| raise Exception(f"Error retrieving user language: {str(e)}") | |
| def set_user_language(self, user_id, language): | |
| try: | |
| language_data = { | |
| 'user_id': user_id, | |
| 'language': language, | |
| 'updated_at': datetime.now(timezone.utc) | |
| } | |
| result = db.languages.update_one( | |
| {'user_id': user_id}, | |
| {'$set': language_data}, | |
| upsert=True | |
| ) | |
| return language_data | |
| except Exception as e: | |
| raise Exception(f"Error setting user language: {str(e)}") | |
| def delete_user_language(self, user_id): | |
| try: | |
| result = db.languages.delete_one({'user_id': user_id}) | |
| return result.deleted_count > 0 | |
| except Exception as e: | |
| raise Exception(f"Error deleting user language: {str(e)}") | |
| def get_today_schedule(self, user_id): | |
| try: | |
| # Get today's date at midnight UTC | |
| today = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) | |
| tomorrow = today.replace(hour=23, minute=59, second=59) | |
| schedule = db.skin_schedules.find_one({ | |
| "user_id": user_id, | |
| "created_at": { | |
| "$gte": today, | |
| "$lte": tomorrow | |
| } | |
| }) | |
| return schedule | |
| except Exception as e: | |
| raise Exception(f"Error retrieving today's schedule: {str(e)}") | |
| def save_schedule(self, user_id, schedule_data): | |
| try: | |
| existing_schedule = self.get_today_schedule(user_id) | |
| if existing_schedule: | |
| return str(existing_schedule["_id"]) | |
| schedule = { | |
| "user_id": user_id, | |
| "schedule_data": schedule_data, | |
| "created_at": datetime.now(timezone.utc) | |
| } | |
| result = db.skin_schedules.insert_one(schedule) | |
| return str(result.inserted_id) | |
| except Exception as e: | |
| raise Exception(f"Error saving schedule: {str(e)}") | |
| def get_last_seven_days_schedules(self, user_id): | |
| try: | |
| seven_days_ago = datetime.now(timezone.utc) - timedelta(days=7) | |
| schedules = db.skin_schedules.find({ | |
| "user_id": user_id, | |
| "created_at": {"$gte": seven_days_ago} | |
| }).sort("created_at", -1) | |
| return list(schedules) | |
| except Exception as e: | |
| raise Exception(f"Error fetching last 7 days schedules: {str(e)}") | |
| def save_rag_interaction(self, user_id: str, session_id: str, context: str, query: str, | |
| response: str, rag_start_time: datetime, rag_end_time: datetime): | |
| try: | |
| interaction = { | |
| "interaction_id": str(ObjectId()), | |
| "user_id": user_id, | |
| "session_id": session_id, | |
| "context": context, | |
| "query": query, | |
| "response": response, | |
| "rag_start_time": rag_start_time.astimezone(timezone.utc), | |
| "rag_end_time": rag_end_time.astimezone(timezone.utc), | |
| "created_at": datetime.now(timezone.utc) | |
| } | |
| result = db.rag_interactions.insert_one(interaction) | |
| return interaction["interaction_id"] | |
| except Exception as e: | |
| raise Exception(f"Error saving RAG interaction: {str(e)}") | |
| def get_rag_interactions( | |
| self, | |
| user_id: Optional[str] = None, | |
| page: int = 1, | |
| page_size: int = 5 | |
| ) -> dict: | |
| try: | |
| query_filter = {} | |
| if user_id: | |
| query_filter["user_id"] = user_id | |
| skip = (page - 1) * page_size | |
| total = db.rag_interactions.count_documents(query_filter) | |
| interactions = db.rag_interactions.find( | |
| query_filter, | |
| {"_id": 0} | |
| ).sort("created_at", DESCENDING).skip(skip).limit(page_size) | |
| result_list = [] | |
| for interaction in interactions: | |
| interaction["rag_start_time"] = interaction["rag_start_time"].isoformat() | |
| interaction["rag_end_time"] = interaction["rag_end_time"].isoformat() | |
| interaction["created_at"] = interaction["created_at"].isoformat() | |
| result_list.append(interaction) | |
| return { | |
| "total_interactions": total, | |
| "page": page, | |
| "page_size": page_size, | |
| "total_pages": (total + page_size - 1) // page_size, | |
| "results": result_list | |
| } | |
| except Exception as e: | |
| raise Exception(f"Error retrieving RAG interactions: {str(e)}") | |
| def log_image_upload(self, user_id): | |
| """Log an image upload for a user""" | |
| try: | |
| timestamp = datetime.now(timezone.utc) # This is timezone-aware | |
| db.image_uploads.insert_one({ | |
| "user_id": user_id, | |
| "timestamp": timestamp | |
| }) | |
| return True | |
| except Exception as e: | |
| raise Exception(f"Error logging image upload: {str(e)}") | |
| def get_user_daily_uploads(self, user_id): | |
| """Get number of images uploaded by user in the last 24 hours""" | |
| try: | |
| now = datetime.now(timezone.utc) | |
| yesterday = now - timedelta(days=1) | |
| count = db.image_uploads.count_documents({ | |
| "user_id": user_id, | |
| "timestamp": {"$gte": yesterday} | |
| }) | |
| return count | |
| except Exception as e: | |
| raise Exception(f"Error retrieving user daily uploads: {str(e)}") | |
| def get_user_last_upload_time(self, user_id): | |
| """Get the timestamp of user's most recent image upload""" | |
| try: | |
| last_upload = db.image_uploads.find_one( | |
| {"user_id": user_id}, | |
| sort=[("timestamp", DESCENDING)] | |
| ) | |
| return last_upload["timestamp"] if last_upload else None | |
| except Exception as e: | |
| raise Exception(f"Error retrieving last upload time: {str(e)}") |