MukeshKapoor25's picture
fix(pet_model): sanitize pet data before MongoDB update
405aa89
from app.core.nosql_client import db
from datetime import datetime
from typing import List, Optional, Dict, Any
import uuid
import logging
from app.utils.db import prepare_for_db
logger = logging.getLogger(__name__)
class PetModel:
"""Model for managing pet profiles embedded under customer documents"""
@staticmethod
async def create_pet(
customer_id: str,
pet_data: dict
) -> Optional[str]:
"""Create a new embedded pet profile under a user in customers collection"""
try:
from app.models.user_model import BookMyServiceUserModel
user = await BookMyServiceUserModel.collection.find_one({"customer_id": customer_id})
if not user:
logger.error(f"User not found for customer_id {customer_id}")
return None
pet_id = str(uuid.uuid4())
current_time = datetime.utcnow()
pet_doc = {
"pet_id": pet_id,
"pet_name": pet_data.get('pet_name'),
"species": getattr(pet_data.get('species'), 'value', pet_data.get('species')),
"breed": pet_data.get('breed'),
"date_of_birth": pet_data.get('date_of_birth'),
"age": pet_data.get('age'),
"weight": pet_data.get('weight'),
"gender": getattr(pet_data.get('gender'), 'value', pet_data.get('gender')),
"temperament": getattr(pet_data.get('temperament'), 'value', pet_data.get('temperament')),
"health_notes": pet_data.get('health_notes'),
"is_vaccinated": pet_data.get('is_vaccinated'),
"pet_photo_url": pet_data.get('pet_photo_url'),
"is_default": pet_data.get('is_default', False),
"created_at": current_time,
"updated_at": current_time
}
pets = user.get("pets", [])
if pet_doc.get("is_default"):
for existing in pets:
if existing.get("is_default"):
existing["is_default"] = False
existing["updated_at"] = current_time
else:
if len(pets) == 0:
pet_doc["is_default"] = True
pets.append(pet_doc)
# Sanitize pets list for MongoDB (convert date to datetime, etc.)
sanitized_pets = prepare_for_db(pets)
update_res = await BookMyServiceUserModel.collection.update_one(
{"customer_id": customer_id},
{"$set": {"pets": sanitized_pets}}
)
if update_res.modified_count > 0:
logger.info(f"Embedded pet created successfully: {pet_id} for user: {customer_id}")
return pet_id
else:
logger.info(f"Embedded pet creation attempted with no modified_count for user: {customer_id}")
return pet_id
except Exception as e:
logger.error(f"Error creating embedded pet for user {customer_id}: {str(e)}")
return None
@staticmethod
async def get_user_pets(customer_id: str) -> List[Dict[str, Any]]:
"""Get all embedded pets for a specific user from customers collection"""
try:
from app.models.user_model import BookMyServiceUserModel
user = await BookMyServiceUserModel.collection.find_one({"customer_id": customer_id})
if not user:
return []
pets = user.get("pets", [])
pets.sort(key=lambda x: x.get("created_at", datetime.utcnow()), reverse=True)
for p in pets:
p["customer_id"] = customer_id
logger.info(f"Retrieved {len(pets)} embedded pets for user: {customer_id}")
return pets
except Exception as e:
logger.error(f"Error getting embedded pets for user {customer_id}: {str(e)}")
return []
@staticmethod
async def get_pet_by_id(customer_id: str, pet_id: str) -> Optional[Dict[str, Any]]:
"""Get a specific embedded pet by ID for a user"""
try:
from app.models.user_model import BookMyServiceUserModel
user = await BookMyServiceUserModel.collection.find_one({"customer_id": customer_id})
if not user:
return None
pets = user.get("pets", [])
for p in pets:
if p.get("pet_id") == pet_id:
p_copy = dict(p)
p_copy["customer_id"] = customer_id
logger.info(f"Embedded pet found: {pet_id} for user: {customer_id}")
return p_copy
logger.warning(f"Embedded pet not found: {pet_id} for user: {customer_id}")
return None
except Exception as e:
logger.error(f"Error getting embedded pet {pet_id} for user {customer_id}: {str(e)}")
return None
@staticmethod
async def update_pet(customer_id: str, pet_id: str, update_fields: Dict[str, Any]) -> bool:
"""Update an embedded pet's information under a user"""
try:
from app.models.user_model import BookMyServiceUserModel
user = await BookMyServiceUserModel.collection.find_one({"customer_id": customer_id})
if not user:
return False
pets = user.get("pets", [])
updated = False
for idx, p in enumerate(pets):
if p.get("pet_id") == pet_id:
normalized_updates: Dict[str, Any] = {}
for k, v in update_fields.items():
if hasattr(v, "value"):
normalized_updates[k] = v.value
else:
normalized_updates[k] = v
normalized_updates["updated_at"] = datetime.utcnow()
pets[idx] = {**p, **normalized_updates}
updated = True
if update_fields.get("is_default"):
for j, other in enumerate(pets):
if other.get("pet_id") != pet_id and other.get("is_default"):
other["is_default"] = False
other["updated_at"] = datetime.utcnow()
pets[j] = other
break
if not updated:
logger.warning(f"Embedded pet not found for update: {pet_id} (user {customer_id})")
return False
# Sanitize pets list for MongoDB
sanitized_pets = prepare_for_db(pets)
result = await BookMyServiceUserModel.collection.update_one(
{"customer_id": customer_id},
{"$set": {"pets": sanitized_pets}}
)
if result.modified_count > 0:
logger.info(f"Embedded pet updated successfully: {pet_id} (user {customer_id})")
return True
else:
logger.info(f"Embedded pet update applied with no modified_count: {pet_id} (user {customer_id})")
return True
except Exception as e:
logger.error(f"Error updating embedded pet {pet_id} for user {customer_id}: {str(e)}")
return False
@staticmethod
async def delete_pet(customer_id: str, pet_id: str) -> bool:
"""Delete an embedded pet profile under a user"""
try:
from app.models.user_model import BookMyServiceUserModel
user = await BookMyServiceUserModel.collection.find_one({"customer_id": customer_id})
if not user:
return False
pets = user.get("pets", [])
new_pets = [p for p in pets if p.get("pet_id") != pet_id]
if len(new_pets) == len(pets):
logger.warning(f"Embedded pet not found for deletion: {pet_id} (user {customer_id})")
return False
# Sanitize pets list for MongoDB
sanitized_new_pets = prepare_for_db(new_pets)
result = await BookMyServiceUserModel.collection.update_one(
{"customer_id": customer_id},
{"$set": {"pets": sanitized_new_pets}}
)
if result.modified_count > 0:
logger.info(f"Embedded pet deleted successfully: {pet_id} (user {customer_id})")
return True
else:
logger.info(f"Embedded pet deletion applied with no modified_count: {pet_id} (user {customer_id})")
return True
except Exception as e:
logger.error(f"Error deleting embedded pet {pet_id} for user {customer_id}: {str(e)}")
return False
@staticmethod
async def get_pet_count_for_user(customer_id: str) -> int:
"""Get the total number of embedded pets for a user"""
try:
from app.models.user_model import BookMyServiceUserModel
user = await BookMyServiceUserModel.collection.find_one({"customer_id": customer_id})
if not user:
return 0
return len(user.get("pets", []))
except Exception as e:
logger.error(f"Error counting embedded pets for user {customer_id}: {str(e)}")
return 0
@staticmethod
async def get_default_pet(customer_id: str) -> Optional[Dict[str, Any]]:
"""Get the default pet for a user"""
try:
from app.models.user_model import BookMyServiceUserModel
user = await BookMyServiceUserModel.collection.find_one({"customer_id": customer_id})
if not user:
return None
pets = user.get("pets", [])
for p in pets:
if p.get("is_default"):
p_copy = dict(p)
p_copy["customer_id"] = customer_id
return p_copy
return None
except Exception as e:
logger.error(f"Error getting default pet for user {customer_id}: {str(e)}")
return None
@staticmethod
async def set_default_pet(customer_id: str, pet_id: str) -> bool:
"""Set a pet as default for a user, unsetting any existing default"""
try:
from app.models.user_model import BookMyServiceUserModel
user = await BookMyServiceUserModel.collection.find_one({"customer_id": customer_id})
if not user:
return False
pets = user.get("pets", [])
found = False
now = datetime.utcnow()
for p in pets:
if p.get("pet_id") == pet_id:
p["is_default"] = True
p["updated_at"] = now
found = True
else:
if p.get("is_default"):
p["is_default"] = False
p["updated_at"] = now
if not found:
return False
# Sanitize pets list for MongoDB
sanitized_pets = prepare_for_db(pets)
res = await BookMyServiceUserModel.collection.update_one(
{"customer_id": customer_id},
{"$set": {"pets": sanitized_pets}}
)
return True
except Exception as e:
logger.error(f"Error setting default pet {pet_id} for user {customer_id}: {str(e)}")
return False