Spaces:
Sleeping
Sleeping
| from fastapi import HTTPException | |
| from app.core.nosql_client import db | |
| from datetime import datetime | |
| from typing import Optional, List, Dict, Any | |
| import logging | |
| logger = logging.getLogger("social_account_model") | |
| class SocialAccountModel: | |
| """Model for managing social login accounts and linking""" | |
| collection = db["social_accounts"] | |
| async def create_social_account(customer_id: str, provider: str, provider_customer_id: str, user_info: Dict[str, Any]) -> str: | |
| """Create a new social account record""" | |
| try: | |
| social_account = { | |
| "customer_id": customer_id, | |
| "provider": provider, | |
| "provider_customer_id": provider_customer_id, | |
| "email": user_info.get("email"), | |
| "name": user_info.get("name"), | |
| "picture": user_info.get("picture"), | |
| "profile_data": user_info, | |
| "created_at": datetime.utcnow(), | |
| "updated_at": datetime.utcnow(), | |
| "is_active": True, | |
| "last_login": datetime.utcnow() | |
| } | |
| result = await SocialAccountModel.collection.insert_one(social_account) | |
| logger.info(f"Created social account for user {customer_id} with provider {provider}") | |
| return str(result.inserted_id) | |
| except Exception as e: | |
| logger.error(f"Error creating social account: {str(e)}", exc_info=True) | |
| raise HTTPException(status_code=500, detail="Failed to create social account") | |
| async def find_by_provider_and_customer_id(provider: str, provider_customer_id: str) -> Optional[Dict[str, Any]]: | |
| """Find social account by provider and provider user ID""" | |
| try: | |
| account = await SocialAccountModel.collection.find_one({ | |
| "provider": provider, | |
| "provider_customer_id": provider_customer_id, | |
| "is_active": True | |
| }) | |
| return account | |
| except Exception as e: | |
| logger.error(f"Error finding social account: {str(e)}", exc_info=True) | |
| return None | |
| async def find_by_customer_id(customer_id: str) -> List[Dict[str, Any]]: | |
| """Find all social accounts for a user""" | |
| try: | |
| cursor = SocialAccountModel.collection.find({ | |
| "customer_id": customer_id, | |
| "is_active": True | |
| }) | |
| accounts = await cursor.to_list(length=None) | |
| return accounts | |
| except Exception as e: | |
| logger.error(f"Error finding social accounts for user {customer_id}: {str(e)}", exc_info=True) | |
| return [] | |
| async def update_social_account(provider: str, provider_customer_id: str, user_info: Dict[str, Any]) -> bool: | |
| """Update social account with latest user info""" | |
| try: | |
| update_data = { | |
| "email": user_info.get("email"), | |
| "name": user_info.get("name"), | |
| "picture": user_info.get("picture"), | |
| "profile_data": user_info, | |
| "updated_at": datetime.utcnow(), | |
| "last_login": datetime.utcnow() | |
| } | |
| result = await SocialAccountModel.collection.update_one( | |
| { | |
| "provider": provider, | |
| "provider_customer_id": provider_customer_id, | |
| "is_active": True | |
| }, | |
| {"$set": update_data} | |
| ) | |
| return result.modified_count > 0 | |
| except Exception as e: | |
| logger.error(f"Error updating social account: {str(e)}", exc_info=True) | |
| return False | |
| async def link_social_account(customer_id: str, provider: str, provider_customer_id: str, user_info: Dict[str, Any]) -> bool: | |
| """Link a social account to an existing user""" | |
| try: | |
| # Check if this social account is already linked to another user | |
| existing_account = await SocialAccountModel.find_by_provider_and_customer_id(provider, provider_customer_id) | |
| if existing_account and existing_account["customer_id"] != customer_id: | |
| logger.warning(f"Social account {provider}:{provider_customer_id} already linked to user {existing_account['customer_id']}") | |
| raise HTTPException( | |
| status_code=409, | |
| detail=f"This {provider} account is already linked to another user" | |
| ) | |
| if existing_account and existing_account["customer_id"] == customer_id: | |
| # Update existing account | |
| await SocialAccountModel.update_social_account(provider, provider_customer_id, user_info) | |
| return True | |
| # Create new social account link | |
| await SocialAccountModel.create_social_account(customer_id, provider, provider_customer_id, user_info) | |
| return True | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error linking social account: {str(e)}", exc_info=True) | |
| raise HTTPException(status_code=500, detail="Failed to link social account") | |
| async def unlink_social_account(customer_id: str, provider: str) -> bool: | |
| """Unlink a social account from a user""" | |
| try: | |
| result = await SocialAccountModel.collection.update_one( | |
| { | |
| "customer_id": customer_id, | |
| "provider": provider, | |
| "is_active": True | |
| }, | |
| { | |
| "$set": { | |
| "is_active": False, | |
| "updated_at": datetime.utcnow() | |
| } | |
| } | |
| ) | |
| if result.modified_count > 0: | |
| logger.info(f"Unlinked {provider} account for user {customer_id}") | |
| return True | |
| else: | |
| logger.warning(f"No active {provider} account found for user {customer_id}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error unlinking social account: {str(e)}", exc_info=True) | |
| return False | |
| async def get_profile_picture(customer_id: str, preferred_provider: str = None) -> Optional[str]: | |
| """Get user's profile picture from social accounts""" | |
| try: | |
| query = {"customer_id": customer_id, "is_active": True} | |
| # If preferred provider specified, try that first | |
| if preferred_provider: | |
| account = await SocialAccountModel.collection.find_one({ | |
| **query, | |
| "provider": preferred_provider, | |
| "picture": {"$exists": True, "$ne": None} | |
| }) | |
| if account and account.get("picture"): | |
| return account["picture"] | |
| # Otherwise, get any account with a profile picture | |
| account = await SocialAccountModel.collection.find_one({ | |
| **query, | |
| "picture": {"$exists": True, "$ne": None} | |
| }) | |
| return account.get("picture") if account else None | |
| except Exception as e: | |
| logger.error(f"Error getting profile picture for user {customer_id}: {str(e)}", exc_info=True) | |
| return None | |
| async def get_social_account_summary(customer_id: str) -> Dict[str, Any]: | |
| """Get summary of all linked social accounts for a user""" | |
| try: | |
| accounts = await SocialAccountModel.find_by_customer_id(customer_id) | |
| summary = { | |
| "linked_accounts": [], | |
| "total_accounts": len(accounts), | |
| "profile_picture": None | |
| } | |
| for account in accounts: | |
| summary["linked_accounts"].append({ | |
| "provider": account["provider"], | |
| "email": account.get("email"), | |
| "name": account.get("name"), | |
| "linked_at": account["created_at"], | |
| "last_login": account.get("last_login") | |
| }) | |
| # Set profile picture if available | |
| if not summary["profile_picture"] and account.get("picture"): | |
| summary["profile_picture"] = account["picture"] | |
| return summary | |
| except Exception as e: | |
| logger.error(f"Error getting social account summary for user {customer_id}: {str(e)}", exc_info=True) | |
| return {"linked_accounts": [], "total_accounts": 0, "profile_picture": None} | |
| async def merge_social_accounts(primary_customer_id: str, secondary_customer_id: str) -> bool: | |
| """Merge social accounts from secondary user to primary user""" | |
| try: | |
| # Get all social accounts from secondary user | |
| secondary_accounts = await SocialAccountModel.find_by_customer_id(secondary_customer_id) | |
| for account in secondary_accounts: | |
| # Check if primary user already has this provider linked | |
| existing = await SocialAccountModel.collection.find_one({ | |
| "customer_id": primary_customer_id, | |
| "provider": account["provider"], | |
| "is_active": True | |
| }) | |
| if not existing: | |
| # Transfer the account to primary user | |
| await SocialAccountModel.collection.update_one( | |
| {"_id": account["_id"]}, | |
| { | |
| "$set": { | |
| "customer_id": primary_customer_id, | |
| "updated_at": datetime.utcnow() | |
| } | |
| } | |
| ) | |
| logger.info(f"Transferred {account['provider']} account from user {secondary_customer_id} to {primary_customer_id}") | |
| else: | |
| # Deactivate the secondary account | |
| await SocialAccountModel.collection.update_one( | |
| {"_id": account["_id"]}, | |
| { | |
| "$set": { | |
| "is_active": False, | |
| "updated_at": datetime.utcnow() | |
| } | |
| } | |
| ) | |
| logger.info(f"Deactivated duplicate {account['provider']} account for user {secondary_customer_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error merging social accounts: {str(e)}", exc_info=True) | |
| return False |