Spaces:
Sleeping
Sleeping
| from app.core.nosql_client import db | |
| from datetime import datetime | |
| from typing import List, Optional, Dict, Any | |
| import uuid | |
| import logging | |
| from app.utils.db import prepare_for_db | |
| logger = logging.getLogger(__name__) | |
| class PetModel: | |
| """Model for managing pet profiles embedded under customer documents""" | |
| async def create_pet( | |
| customer_id: str, | |
| pet_data: dict | |
| ) -> Optional[str]: | |
| """Create a new embedded pet profile under a user in customers collection""" | |
| try: | |
| from app.models.user_model import BookMyServiceUserModel | |
| user = await BookMyServiceUserModel.collection.find_one({"customer_id": customer_id}) | |
| if not user: | |
| logger.error(f"User not found for customer_id {customer_id}") | |
| return None | |
| pet_id = str(uuid.uuid4()) | |
| current_time = datetime.utcnow() | |
| pet_doc = { | |
| "pet_id": pet_id, | |
| "pet_name": pet_data.get('pet_name'), | |
| "species": getattr(pet_data.get('species'), 'value', pet_data.get('species')), | |
| "breed": pet_data.get('breed'), | |
| "date_of_birth": pet_data.get('date_of_birth'), | |
| "age": pet_data.get('age'), | |
| "weight": pet_data.get('weight'), | |
| "gender": getattr(pet_data.get('gender'), 'value', pet_data.get('gender')), | |
| "temperament": getattr(pet_data.get('temperament'), 'value', pet_data.get('temperament')), | |
| "health_notes": pet_data.get('health_notes'), | |
| "is_vaccinated": pet_data.get('is_vaccinated'), | |
| "pet_photo_url": pet_data.get('pet_photo_url'), | |
| "is_default": pet_data.get('is_default', False), | |
| "created_at": current_time, | |
| "updated_at": current_time | |
| } | |
| pets = user.get("pets", []) | |
| if pet_doc.get("is_default"): | |
| for existing in pets: | |
| if existing.get("is_default"): | |
| existing["is_default"] = False | |
| existing["updated_at"] = current_time | |
| else: | |
| if len(pets) == 0: | |
| pet_doc["is_default"] = True | |
| pets.append(pet_doc) | |
| # Sanitize pets list for MongoDB (convert date to datetime, etc.) | |
| sanitized_pets = prepare_for_db(pets) | |
| update_res = await BookMyServiceUserModel.collection.update_one( | |
| {"customer_id": customer_id}, | |
| {"$set": {"pets": sanitized_pets}} | |
| ) | |
| if update_res.modified_count > 0: | |
| logger.info(f"Embedded pet created successfully: {pet_id} for user: {customer_id}") | |
| return pet_id | |
| else: | |
| logger.info(f"Embedded pet creation attempted with no modified_count for user: {customer_id}") | |
| return pet_id | |
| except Exception as e: | |
| logger.error(f"Error creating embedded pet for user {customer_id}: {str(e)}") | |
| return None | |
| async def get_user_pets(customer_id: str) -> List[Dict[str, Any]]: | |
| """Get all embedded pets for a specific user from customers collection""" | |
| try: | |
| from app.models.user_model import BookMyServiceUserModel | |
| user = await BookMyServiceUserModel.collection.find_one({"customer_id": customer_id}) | |
| if not user: | |
| return [] | |
| pets = user.get("pets", []) | |
| pets.sort(key=lambda x: x.get("created_at", datetime.utcnow()), reverse=True) | |
| for p in pets: | |
| p["customer_id"] = customer_id | |
| logger.info(f"Retrieved {len(pets)} embedded pets for user: {customer_id}") | |
| return pets | |
| except Exception as e: | |
| logger.error(f"Error getting embedded pets for user {customer_id}: {str(e)}") | |
| return [] | |
| async def get_pet_by_id(customer_id: str, pet_id: str) -> Optional[Dict[str, Any]]: | |
| """Get a specific embedded pet by ID for a user""" | |
| try: | |
| from app.models.user_model import BookMyServiceUserModel | |
| user = await BookMyServiceUserModel.collection.find_one({"customer_id": customer_id}) | |
| if not user: | |
| return None | |
| pets = user.get("pets", []) | |
| for p in pets: | |
| if p.get("pet_id") == pet_id: | |
| p_copy = dict(p) | |
| p_copy["customer_id"] = customer_id | |
| logger.info(f"Embedded pet found: {pet_id} for user: {customer_id}") | |
| return p_copy | |
| logger.warning(f"Embedded pet not found: {pet_id} for user: {customer_id}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error getting embedded pet {pet_id} for user {customer_id}: {str(e)}") | |
| return None | |
| async def update_pet(customer_id: str, pet_id: str, update_fields: Dict[str, Any]) -> bool: | |
| """Update an embedded pet's information under a user""" | |
| try: | |
| from app.models.user_model import BookMyServiceUserModel | |
| user = await BookMyServiceUserModel.collection.find_one({"customer_id": customer_id}) | |
| if not user: | |
| return False | |
| pets = user.get("pets", []) | |
| updated = False | |
| for idx, p in enumerate(pets): | |
| if p.get("pet_id") == pet_id: | |
| normalized_updates: Dict[str, Any] = {} | |
| for k, v in update_fields.items(): | |
| if hasattr(v, "value"): | |
| normalized_updates[k] = v.value | |
| else: | |
| normalized_updates[k] = v | |
| normalized_updates["updated_at"] = datetime.utcnow() | |
| pets[idx] = {**p, **normalized_updates} | |
| updated = True | |
| if update_fields.get("is_default"): | |
| for j, other in enumerate(pets): | |
| if other.get("pet_id") != pet_id and other.get("is_default"): | |
| other["is_default"] = False | |
| other["updated_at"] = datetime.utcnow() | |
| pets[j] = other | |
| break | |
| if not updated: | |
| logger.warning(f"Embedded pet not found for update: {pet_id} (user {customer_id})") | |
| return False | |
| # Sanitize pets list for MongoDB | |
| sanitized_pets = prepare_for_db(pets) | |
| result = await BookMyServiceUserModel.collection.update_one( | |
| {"customer_id": customer_id}, | |
| {"$set": {"pets": sanitized_pets}} | |
| ) | |
| if result.modified_count > 0: | |
| logger.info(f"Embedded pet updated successfully: {pet_id} (user {customer_id})") | |
| return True | |
| else: | |
| logger.info(f"Embedded pet update applied with no modified_count: {pet_id} (user {customer_id})") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error updating embedded pet {pet_id} for user {customer_id}: {str(e)}") | |
| return False | |
| async def delete_pet(customer_id: str, pet_id: str) -> bool: | |
| """Delete an embedded pet profile under a user""" | |
| try: | |
| from app.models.user_model import BookMyServiceUserModel | |
| user = await BookMyServiceUserModel.collection.find_one({"customer_id": customer_id}) | |
| if not user: | |
| return False | |
| pets = user.get("pets", []) | |
| new_pets = [p for p in pets if p.get("pet_id") != pet_id] | |
| if len(new_pets) == len(pets): | |
| logger.warning(f"Embedded pet not found for deletion: {pet_id} (user {customer_id})") | |
| return False | |
| # Sanitize pets list for MongoDB | |
| sanitized_new_pets = prepare_for_db(new_pets) | |
| result = await BookMyServiceUserModel.collection.update_one( | |
| {"customer_id": customer_id}, | |
| {"$set": {"pets": sanitized_new_pets}} | |
| ) | |
| if result.modified_count > 0: | |
| logger.info(f"Embedded pet deleted successfully: {pet_id} (user {customer_id})") | |
| return True | |
| else: | |
| logger.info(f"Embedded pet deletion applied with no modified_count: {pet_id} (user {customer_id})") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error deleting embedded pet {pet_id} for user {customer_id}: {str(e)}") | |
| return False | |
| async def get_pet_count_for_user(customer_id: str) -> int: | |
| """Get the total number of embedded pets for a user""" | |
| try: | |
| from app.models.user_model import BookMyServiceUserModel | |
| user = await BookMyServiceUserModel.collection.find_one({"customer_id": customer_id}) | |
| if not user: | |
| return 0 | |
| return len(user.get("pets", [])) | |
| except Exception as e: | |
| logger.error(f"Error counting embedded pets for user {customer_id}: {str(e)}") | |
| return 0 | |
| async def get_default_pet(customer_id: str) -> Optional[Dict[str, Any]]: | |
| """Get the default pet for a user""" | |
| try: | |
| from app.models.user_model import BookMyServiceUserModel | |
| user = await BookMyServiceUserModel.collection.find_one({"customer_id": customer_id}) | |
| if not user: | |
| return None | |
| pets = user.get("pets", []) | |
| for p in pets: | |
| if p.get("is_default"): | |
| p_copy = dict(p) | |
| p_copy["customer_id"] = customer_id | |
| return p_copy | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error getting default pet for user {customer_id}: {str(e)}") | |
| return None | |
| async def set_default_pet(customer_id: str, pet_id: str) -> bool: | |
| """Set a pet as default for a user, unsetting any existing default""" | |
| try: | |
| from app.models.user_model import BookMyServiceUserModel | |
| user = await BookMyServiceUserModel.collection.find_one({"customer_id": customer_id}) | |
| if not user: | |
| return False | |
| pets = user.get("pets", []) | |
| found = False | |
| now = datetime.utcnow() | |
| for p in pets: | |
| if p.get("pet_id") == pet_id: | |
| p["is_default"] = True | |
| p["updated_at"] = now | |
| found = True | |
| else: | |
| if p.get("is_default"): | |
| p["is_default"] = False | |
| p["updated_at"] = now | |
| if not found: | |
| return False | |
| # Sanitize pets list for MongoDB | |
| sanitized_pets = prepare_for_db(pets) | |
| res = await BookMyServiceUserModel.collection.update_one( | |
| {"customer_id": customer_id}, | |
| {"$set": {"pets": sanitized_pets}} | |
| ) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error setting default pet {pet_id} for user {customer_id}: {str(e)}") | |
| return False |