bookmyservice-ums / app /models /social_account_model.py
MukeshKapoor25's picture
refactor(user): rename user_id to customer_id across all modules
a9ccd3b
from fastapi import HTTPException
from app.core.nosql_client import db
from datetime import datetime
from typing import Optional, List, Dict, Any
import logging
logger = logging.getLogger("social_account_model")
class SocialAccountModel:
"""Model for managing social login accounts and linking"""
collection = db["social_accounts"]
@staticmethod
async def create_social_account(customer_id: str, provider: str, provider_customer_id: str, user_info: Dict[str, Any]) -> str:
"""Create a new social account record"""
try:
social_account = {
"customer_id": customer_id,
"provider": provider,
"provider_customer_id": provider_customer_id,
"email": user_info.get("email"),
"name": user_info.get("name"),
"picture": user_info.get("picture"),
"profile_data": user_info,
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow(),
"is_active": True,
"last_login": datetime.utcnow()
}
result = await SocialAccountModel.collection.insert_one(social_account)
logger.info(f"Created social account for user {customer_id} with provider {provider}")
return str(result.inserted_id)
except Exception as e:
logger.error(f"Error creating social account: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail="Failed to create social account")
@staticmethod
async def find_by_provider_and_customer_id(provider: str, provider_customer_id: str) -> Optional[Dict[str, Any]]:
"""Find social account by provider and provider user ID"""
try:
account = await SocialAccountModel.collection.find_one({
"provider": provider,
"provider_customer_id": provider_customer_id,
"is_active": True
})
return account
except Exception as e:
logger.error(f"Error finding social account: {str(e)}", exc_info=True)
return None
@staticmethod
async def find_by_customer_id(customer_id: str) -> List[Dict[str, Any]]:
"""Find all social accounts for a user"""
try:
cursor = SocialAccountModel.collection.find({
"customer_id": customer_id,
"is_active": True
})
accounts = await cursor.to_list(length=None)
return accounts
except Exception as e:
logger.error(f"Error finding social accounts for user {customer_id}: {str(e)}", exc_info=True)
return []
@staticmethod
async def update_social_account(provider: str, provider_customer_id: str, user_info: Dict[str, Any]) -> bool:
"""Update social account with latest user info"""
try:
update_data = {
"email": user_info.get("email"),
"name": user_info.get("name"),
"picture": user_info.get("picture"),
"profile_data": user_info,
"updated_at": datetime.utcnow(),
"last_login": datetime.utcnow()
}
result = await SocialAccountModel.collection.update_one(
{
"provider": provider,
"provider_customer_id": provider_customer_id,
"is_active": True
},
{"$set": update_data}
)
return result.modified_count > 0
except Exception as e:
logger.error(f"Error updating social account: {str(e)}", exc_info=True)
return False
@staticmethod
async def link_social_account(customer_id: str, provider: str, provider_customer_id: str, user_info: Dict[str, Any]) -> bool:
"""Link a social account to an existing user"""
try:
# Check if this social account is already linked to another user
existing_account = await SocialAccountModel.find_by_provider_and_customer_id(provider, provider_customer_id)
if existing_account and existing_account["customer_id"] != customer_id:
logger.warning(f"Social account {provider}:{provider_customer_id} already linked to user {existing_account['customer_id']}")
raise HTTPException(
status_code=409,
detail=f"This {provider} account is already linked to another user"
)
if existing_account and existing_account["customer_id"] == customer_id:
# Update existing account
await SocialAccountModel.update_social_account(provider, provider_customer_id, user_info)
return True
# Create new social account link
await SocialAccountModel.create_social_account(customer_id, provider, provider_customer_id, user_info)
return True
except HTTPException:
raise
except Exception as e:
logger.error(f"Error linking social account: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail="Failed to link social account")
@staticmethod
async def unlink_social_account(customer_id: str, provider: str) -> bool:
"""Unlink a social account from a user"""
try:
result = await SocialAccountModel.collection.update_one(
{
"customer_id": customer_id,
"provider": provider,
"is_active": True
},
{
"$set": {
"is_active": False,
"updated_at": datetime.utcnow()
}
}
)
if result.modified_count > 0:
logger.info(f"Unlinked {provider} account for user {customer_id}")
return True
else:
logger.warning(f"No active {provider} account found for user {customer_id}")
return False
except Exception as e:
logger.error(f"Error unlinking social account: {str(e)}", exc_info=True)
return False
@staticmethod
async def get_profile_picture(customer_id: str, preferred_provider: str = None) -> Optional[str]:
"""Get user's profile picture from social accounts"""
try:
query = {"customer_id": customer_id, "is_active": True}
# If preferred provider specified, try that first
if preferred_provider:
account = await SocialAccountModel.collection.find_one({
**query,
"provider": preferred_provider,
"picture": {"$exists": True, "$ne": None}
})
if account and account.get("picture"):
return account["picture"]
# Otherwise, get any account with a profile picture
account = await SocialAccountModel.collection.find_one({
**query,
"picture": {"$exists": True, "$ne": None}
})
return account.get("picture") if account else None
except Exception as e:
logger.error(f"Error getting profile picture for user {customer_id}: {str(e)}", exc_info=True)
return None
@staticmethod
async def get_social_account_summary(customer_id: str) -> Dict[str, Any]:
"""Get summary of all linked social accounts for a user"""
try:
accounts = await SocialAccountModel.find_by_customer_id(customer_id)
summary = {
"linked_accounts": [],
"total_accounts": len(accounts),
"profile_picture": None
}
for account in accounts:
summary["linked_accounts"].append({
"provider": account["provider"],
"email": account.get("email"),
"name": account.get("name"),
"linked_at": account["created_at"],
"last_login": account.get("last_login")
})
# Set profile picture if available
if not summary["profile_picture"] and account.get("picture"):
summary["profile_picture"] = account["picture"]
return summary
except Exception as e:
logger.error(f"Error getting social account summary for user {customer_id}: {str(e)}", exc_info=True)
return {"linked_accounts": [], "total_accounts": 0, "profile_picture": None}
@staticmethod
async def merge_social_accounts(primary_customer_id: str, secondary_customer_id: str) -> bool:
"""Merge social accounts from secondary user to primary user"""
try:
# Get all social accounts from secondary user
secondary_accounts = await SocialAccountModel.find_by_customer_id(secondary_customer_id)
for account in secondary_accounts:
# Check if primary user already has this provider linked
existing = await SocialAccountModel.collection.find_one({
"customer_id": primary_customer_id,
"provider": account["provider"],
"is_active": True
})
if not existing:
# Transfer the account to primary user
await SocialAccountModel.collection.update_one(
{"_id": account["_id"]},
{
"$set": {
"customer_id": primary_customer_id,
"updated_at": datetime.utcnow()
}
}
)
logger.info(f"Transferred {account['provider']} account from user {secondary_customer_id} to {primary_customer_id}")
else:
# Deactivate the secondary account
await SocialAccountModel.collection.update_one(
{"_id": account["_id"]},
{
"$set": {
"is_active": False,
"updated_at": datetime.utcnow()
}
}
)
logger.info(f"Deactivated duplicate {account['provider']} account for user {secondary_customer_id}")
return True
except Exception as e:
logger.error(f"Error merging social accounts: {str(e)}", exc_info=True)
return False