Spaces:
Sleeping
Sleeping
| from fastapi import APIRouter, HTTPException, Depends, status, Request | |
| from fastapi.security import OAuth2PasswordRequestForm | |
| from db.mongo import users_collection | |
| from core.security import hash_password, verify_password, create_access_token, get_current_user | |
| from models.schemas import SignupForm, TokenResponse, DoctorCreate | |
| from datetime import datetime | |
| import logging | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(name)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter() | |
| async def signup(data: SignupForm): | |
| logger.info(f"Signup attempt for email: {data.email}") | |
| email = data.email.lower().strip() | |
| existing = await users_collection.find_one({"email": email}) | |
| if existing: | |
| logger.warning(f"Signup failed: Email already exists: {email}") | |
| raise HTTPException( | |
| status_code=status.HTTP_409_CONFLICT, | |
| detail="Email already exists" | |
| ) | |
| hashed_pw = hash_password(data.password) | |
| user_doc = { | |
| "email": email, | |
| "full_name": data.full_name.strip(), | |
| "password": hashed_pw, | |
| "role": "patient", | |
| "created_at": datetime.utcnow().isoformat(), | |
| "updated_at": datetime.utcnow().isoformat(), | |
| "device_token": data.device_token or "" # Store device token if provided | |
| } | |
| try: | |
| result = await users_collection.insert_one(user_doc) | |
| logger.info(f"User created successfully: {email}") | |
| return { | |
| "status": "success", | |
| "id": str(result.inserted_id), | |
| "email": email | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to create user {email}: {str(e)}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to create user: {str(e)}" | |
| ) | |
| async def create_doctor( | |
| data: DoctorCreate, | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| logger.info(f"Doctor creation attempt by {current_user.get('email')}") | |
| if current_user.get('role') != 'admin': | |
| logger.warning(f"Unauthorized doctor creation attempt by {current_user.get('email')}") | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Only admins can create doctor accounts" | |
| ) | |
| email = data.email.lower().strip() | |
| existing = await users_collection.find_one({"email": email}) | |
| if existing: | |
| logger.warning(f"Doctor creation failed: Email already exists: {email}") | |
| raise HTTPException( | |
| status_code=status.HTTP_409_CONFLICT, | |
| detail="Email already exists" | |
| ) | |
| hashed_pw = hash_password(data.password) | |
| doctor_doc = { | |
| "email": email, | |
| "full_name": data.full_name.strip(), | |
| "password": hashed_pw, | |
| "role": "doctor", | |
| "specialty": data.specialty, | |
| "license_number": data.license_number, | |
| "created_at": datetime.utcnow().isoformat(), | |
| "updated_at": datetime.utcnow().isoformat(), | |
| "device_token": data.device_token or "" # Store device token if provided | |
| } | |
| try: | |
| result = await users_collection.insert_one(doctor_doc) | |
| logger.info(f"Doctor created successfully: {email}") | |
| return { | |
| "status": "success", | |
| "id": str(result.inserted_id), | |
| "email": email | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to create doctor {email}: {str(e)}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to create doctor: {str(e)}" | |
| ) | |
| async def login(form_data: OAuth2PasswordRequestForm = Depends()): | |
| logger.info(f"Login attempt for email: {form_data.username}") | |
| user = await users_collection.find_one({"email": form_data.username.lower()}) | |
| if not user or not verify_password(form_data.password, user["password"]): | |
| logger.warning(f"Login failed for {form_data.username}: Invalid credentials") | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid credentials", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| # Update device token if provided in form_data (e.g., from frontend) | |
| if hasattr(form_data, 'device_token') and form_data.device_token: | |
| await users_collection.update_one( | |
| {"email": user["email"]}, | |
| {"$set": {"device_token": form_data.device_token}} | |
| ) | |
| logger.info(f"Device token updated for {form_data.username}") | |
| access_token = create_access_token(data={"sub": user["email"]}) | |
| logger.info(f"Successful login for {form_data.username}") | |
| return { | |
| "access_token": access_token, | |
| "token_type": "bearer", | |
| "role": user.get("role", "patient") | |
| } | |
| async def get_me(request: Request, current_user: dict = Depends(get_current_user)): | |
| logger.info(f"Fetching user profile for {current_user['email']} at {datetime.utcnow().isoformat()}") | |
| print(f"Headers: {request.headers}") | |
| try: | |
| user = await users_collection.find_one({"email": current_user["email"]}) | |
| if not user: | |
| logger.warning(f"User not found: {current_user['email']}") | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User not found" | |
| ) | |
| response = { | |
| "id": str(user["_id"]), | |
| "email": user["email"], | |
| "full_name": user.get("full_name", ""), | |
| "role": user.get("role", "patient"), | |
| "specialty": user.get("specialty"), | |
| "created_at": user.get("created_at"), | |
| "updated_at": user.get("updated_at"), | |
| "device_token": user.get("device_token", "") # Include device token in response | |
| } | |
| logger.info(f"User profile retrieved for {current_user['email']} at {datetime.utcnow().isoformat()}") | |
| return response | |
| except Exception as e: | |
| logger.error(f"Database error for user {current_user['email']}: {str(e)} at {datetime.utcnow().isoformat()}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Database error: {str(e)}" | |
| ) | |
| # Export the router as 'auth' for api.__init__.py | |
| auth = router |