Spaces:
Sleeping
Sleeping
| from app.core.nosql_client import db | |
| from datetime import datetime | |
| from typing import List, Optional, Dict, Any | |
| import uuid | |
| import logging | |
| from app.utils.db import prepare_for_db | |
| logger = logging.getLogger(__name__) | |
| class GuestModel: | |
| """Model for managing guest profiles embedded under customer documents""" | |
| async def create_guest(customer_id: str, guest_data: dict) -> Optional[str]: | |
| """Create a new embedded guest profile under a user in customers collection""" | |
| try: | |
| from app.models.user_model import BookMyServiceUserModel | |
| user = await BookMyServiceUserModel.collection.find_one({"customer_id": customer_id}) | |
| if not user: | |
| logger.error(f"User not found for customer_id {customer_id}") | |
| return None | |
| guest_id = str(uuid.uuid4()) | |
| current_time = datetime.utcnow() | |
| guest_doc = { | |
| "guest_id": guest_id, | |
| "first_name": guest_data.get("first_name"), | |
| "last_name": guest_data.get("last_name"), | |
| "email": guest_data.get("email"), | |
| "phone_number": guest_data.get("phone_number"), | |
| "gender": getattr(guest_data.get("gender"), "value", guest_data.get("gender")), | |
| "date_of_birth": guest_data.get("date_of_birth"), | |
| "relationship": getattr(guest_data.get("relationship"), "value", guest_data.get("relationship")), | |
| "notes": guest_data.get("notes"), | |
| "is_default": guest_data.get("is_default", False), | |
| "created_at": current_time, | |
| "updated_at": current_time, | |
| } | |
| guests = user.get("guests", []) | |
| # Handle default semantics: if setting this guest as default, unset others. | |
| if guest_doc.get("is_default"): | |
| for existing in guests: | |
| if existing.get("is_default"): | |
| existing["is_default"] = False | |
| existing["updated_at"] = current_time | |
| else: | |
| # If this is the first guest, make it default by default | |
| if len(guests) == 0: | |
| guest_doc["is_default"] = True | |
| guests.append(guest_doc) | |
| # Sanitize for MongoDB (convert date to datetime, strip tzinfo, etc.) | |
| sanitized_guests = prepare_for_db(guests) | |
| update_res = await BookMyServiceUserModel.collection.update_one( | |
| {"customer_id": customer_id}, | |
| {"$set": {"guests": sanitized_guests}} | |
| ) | |
| if update_res.modified_count > 0: | |
| logger.info(f"Guest created successfully: {guest_id} for user: {customer_id}") | |
| return guest_id | |
| else: | |
| logger.info(f"Guest creation attempted with no modified_count for user: {customer_id}") | |
| return guest_id | |
| except Exception as e: | |
| logger.error(f"Error creating embedded guest for user {customer_id}: {str(e)}") | |
| return None | |
| async def get_user_guests(customer_id: str) -> List[Dict[str, Any]]: | |
| """Get all embedded guests for a specific user from customers collection""" | |
| try: | |
| from app.models.user_model import BookMyServiceUserModel | |
| user = await BookMyServiceUserModel.collection.find_one({"customer_id": customer_id}) | |
| if not user: | |
| return [] | |
| guests = user.get("guests", []) | |
| guests.sort(key=lambda x: x.get("created_at", datetime.utcnow()), reverse=True) | |
| for g in guests: | |
| g["customer_id"] = customer_id | |
| logger.info(f"Retrieved {len(guests)} embedded guests for user: {customer_id}") | |
| return guests | |
| except Exception as e: | |
| logger.error(f"Error getting embedded guests for user {customer_id}: {str(e)}") | |
| return [] | |
| async def get_guest_by_id(customer_id: str, guest_id: str) -> Optional[Dict[str, Any]]: | |
| """Get a specific embedded guest by ID for a user""" | |
| try: | |
| from app.models.user_model import BookMyServiceUserModel | |
| user = await BookMyServiceUserModel.collection.find_one({"customer_id": customer_id}) | |
| if not user: | |
| return None | |
| guests = user.get("guests", []) | |
| for g in guests: | |
| if g.get("guest_id") == guest_id: | |
| g_copy = dict(g) | |
| g_copy["customer_id"] = customer_id | |
| logger.info(f"Embedded guest found: {guest_id} for user: {customer_id}") | |
| return g_copy | |
| logger.warning(f"Embedded guest not found: {guest_id} for user: {customer_id}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error getting embedded guest {guest_id} for user {customer_id}: {str(e)}") | |
| return None | |
| async def update_guest(customer_id: str, guest_id: str, update_fields: Dict[str, Any]) -> bool: | |
| """Update an embedded guest's information under a user""" | |
| try: | |
| from app.models.user_model import BookMyServiceUserModel | |
| user = await BookMyServiceUserModel.collection.find_one({"customer_id": customer_id}) | |
| if not user: | |
| return False | |
| guests = user.get("guests", []) | |
| updated = False | |
| for idx, g in enumerate(guests): | |
| if g.get("guest_id") == guest_id: | |
| normalized_updates: Dict[str, Any] = {} | |
| for k, v in update_fields.items(): | |
| if hasattr(v, "value"): | |
| normalized_updates[k] = v.value | |
| else: | |
| normalized_updates[k] = v | |
| normalized_updates["updated_at"] = datetime.utcnow() | |
| guests[idx] = {**g, **normalized_updates} | |
| updated = True | |
| # If is_default is being set to True, unset default for others | |
| if update_fields.get("is_default"): | |
| for j, other in enumerate(guests): | |
| if other.get("guest_id") != guest_id and other.get("is_default"): | |
| other["is_default"] = False | |
| other["updated_at"] = datetime.utcnow() | |
| guests[j] = other | |
| break | |
| if not updated: | |
| logger.warning(f"Embedded guest not found for update: {guest_id} (user {customer_id})") | |
| return False | |
| # Sanitize for MongoDB before write | |
| sanitized_guests = prepare_for_db(guests) | |
| result = await BookMyServiceUserModel.collection.update_one( | |
| {"customer_id": customer_id}, | |
| {"$set": {"guests": sanitized_guests}} | |
| ) | |
| if result.modified_count > 0: | |
| logger.info(f"Embedded guest updated successfully: {guest_id} (user {customer_id})") | |
| return True | |
| else: | |
| logger.info(f"Embedded guest update applied with no modified_count: {guest_id} (user {customer_id})") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error updating embedded guest {guest_id} for user {customer_id}: {str(e)}") | |
| return False | |
| async def delete_guest(customer_id: str, guest_id: str) -> bool: | |
| """Delete an embedded guest profile under a user""" | |
| try: | |
| from app.models.user_model import BookMyServiceUserModel | |
| user = await BookMyServiceUserModel.collection.find_one({"customer_id": customer_id}) | |
| if not user: | |
| return False | |
| guests = user.get("guests", []) | |
| new_guests = [g for g in guests if g.get("guest_id") != guest_id] | |
| if len(new_guests) == len(guests): | |
| logger.warning(f"Embedded guest not found for deletion: {guest_id} (user {customer_id})") | |
| return False | |
| # Sanitize for MongoDB before write | |
| sanitized_new_guests = prepare_for_db(new_guests) | |
| result = await BookMyServiceUserModel.collection.update_one( | |
| {"customer_id": customer_id}, | |
| {"$set": {"guests": sanitized_new_guests}} | |
| ) | |
| if result.modified_count > 0: | |
| logger.info(f"Embedded guest deleted successfully: {guest_id} (user {customer_id})") | |
| return True | |
| else: | |
| logger.info(f"Embedded guest deletion applied with no modified_count: {guest_id} (user {customer_id})") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error deleting embedded guest {guest_id} for user {customer_id}: {str(e)}") | |
| return False | |
| async def get_guest_count_for_user(customer_id: str) -> int: | |
| """Get the total number of embedded guests for a user""" | |
| try: | |
| from app.models.user_model import BookMyServiceUserModel | |
| user = await BookMyServiceUserModel.collection.find_one({"customer_id": customer_id}) | |
| if not user: | |
| return 0 | |
| return len(user.get("guests", [])) | |
| except Exception as e: | |
| logger.error(f"Error counting embedded guests for user {customer_id}: {str(e)}") | |
| return 0 | |
| async def check_guest_ownership(guest_id: str, customer_id: str) -> bool: | |
| """ | |
| Check if a guest belongs to a specific user. | |
| Args: | |
| guest_id: ID of the guest | |
| customer_id: ID of the user | |
| Returns: | |
| True if guest belongs to user, False otherwise | |
| """ | |
| try: | |
| from app.models.user_model import BookMyServiceUserModel | |
| user = await BookMyServiceUserModel.collection.find_one({"customer_id": customer_id}) | |
| if not user: | |
| return False | |
| guests = user.get("guests", []) | |
| return any(g.get("guest_id") == guest_id for g in guests) | |
| except Exception as e: | |
| logger.error(f"Error checking embedded guest ownership {guest_id} for user {customer_id}: {str(e)}") | |
| return False | |
| async def get_default_guest(customer_id: str) -> Optional[Dict[str, Any]]: | |
| """Get the default guest for a user""" | |
| try: | |
| from app.models.user_model import BookMyServiceUserModel | |
| user = await BookMyServiceUserModel.collection.find_one({"customer_id": customer_id}) | |
| if not user: | |
| return None | |
| guests = user.get("guests", []) | |
| for g in guests: | |
| if g.get("is_default"): | |
| g_copy = dict(g) | |
| g_copy["customer_id"] = customer_id | |
| return g_copy | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error getting default guest for user {customer_id}: {str(e)}") | |
| return None | |
| async def set_default_guest(customer_id: str, guest_id: str) -> bool: | |
| """Set a guest as default for a user, unsetting any existing default""" | |
| try: | |
| from app.models.user_model import BookMyServiceUserModel | |
| user = await BookMyServiceUserModel.collection.find_one({"customer_id": customer_id}) | |
| if not user: | |
| return False | |
| guests = user.get("guests", []) | |
| found = False | |
| now = datetime.utcnow() | |
| for g in guests: | |
| if g.get("guest_id") == guest_id: | |
| g["is_default"] = True | |
| g["updated_at"] = now | |
| found = True | |
| else: | |
| if g.get("is_default"): | |
| g["is_default"] = False | |
| g["updated_at"] = now | |
| if not found: | |
| return False | |
| # Sanitize for MongoDB before write | |
| sanitized_guests = prepare_for_db(guests) | |
| res = await BookMyServiceUserModel.collection.update_one( | |
| {"customer_id": customer_id}, | |
| {"$set": {"guests": sanitized_guests}} | |
| ) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error setting default guest {guest_id} for user {customer_id}: {str(e)}") | |
| return False |