CPS-API / api /routes /auth.py
Ali2206's picture
Update api/routes/auth.py
b7c0a0c verified
from fastapi import APIRouter, HTTPException, Depends, status, Request
from fastapi.security import OAuth2PasswordRequestForm
from db.mongo import users_collection
from core.security import hash_password, verify_password, create_access_token, get_current_user
from models.schemas import SignupForm, TokenResponse, DoctorCreate
from datetime import datetime
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(name)s - %(message)s'
)
logger = logging.getLogger(__name__)
router = APIRouter()
@router.post("/signup", status_code=status.HTTP_201_CREATED)
async def signup(data: SignupForm):
logger.info(f"Signup attempt for email: {data.email}")
email = data.email.lower().strip()
existing = await users_collection.find_one({"email": email})
if existing:
logger.warning(f"Signup failed: Email already exists: {email}")
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Email already exists"
)
hashed_pw = hash_password(data.password)
user_doc = {
"email": email,
"full_name": data.full_name.strip(),
"password": hashed_pw,
"role": "patient",
"created_at": datetime.utcnow().isoformat(),
"updated_at": datetime.utcnow().isoformat(),
"device_token": data.device_token or "" # Store device token if provided
}
try:
result = await users_collection.insert_one(user_doc)
logger.info(f"User created successfully: {email}")
return {
"status": "success",
"id": str(result.inserted_id),
"email": email
}
except Exception as e:
logger.error(f"Failed to create user {email}: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to create user: {str(e)}"
)
@router.post("/admin/doctors", status_code=status.HTTP_201_CREATED)
async def create_doctor(
data: DoctorCreate,
current_user: dict = Depends(get_current_user)
):
logger.info(f"Doctor creation attempt by {current_user.get('email')}")
if current_user.get('role') != 'admin':
logger.warning(f"Unauthorized doctor creation attempt by {current_user.get('email')}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only admins can create doctor accounts"
)
email = data.email.lower().strip()
existing = await users_collection.find_one({"email": email})
if existing:
logger.warning(f"Doctor creation failed: Email already exists: {email}")
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Email already exists"
)
hashed_pw = hash_password(data.password)
doctor_doc = {
"email": email,
"full_name": data.full_name.strip(),
"password": hashed_pw,
"role": "doctor",
"specialty": data.specialty,
"license_number": data.license_number,
"created_at": datetime.utcnow().isoformat(),
"updated_at": datetime.utcnow().isoformat(),
"device_token": data.device_token or "" # Store device token if provided
}
try:
result = await users_collection.insert_one(doctor_doc)
logger.info(f"Doctor created successfully: {email}")
return {
"status": "success",
"id": str(result.inserted_id),
"email": email
}
except Exception as e:
logger.error(f"Failed to create doctor {email}: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to create doctor: {str(e)}"
)
@router.post("/login", response_model=TokenResponse)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
logger.info(f"Login attempt for email: {form_data.username}")
user = await users_collection.find_one({"email": form_data.username.lower()})
if not user or not verify_password(form_data.password, user["password"]):
logger.warning(f"Login failed for {form_data.username}: Invalid credentials")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Update device token if provided in form_data (e.g., from frontend)
if hasattr(form_data, 'device_token') and form_data.device_token:
await users_collection.update_one(
{"email": user["email"]},
{"$set": {"device_token": form_data.device_token}}
)
logger.info(f"Device token updated for {form_data.username}")
access_token = create_access_token(data={"sub": user["email"]})
logger.info(f"Successful login for {form_data.username}")
return {
"access_token": access_token,
"token_type": "bearer",
"role": user.get("role", "patient")
}
@router.get("/me")
async def get_me(request: Request, current_user: dict = Depends(get_current_user)):
logger.info(f"Fetching user profile for {current_user['email']} at {datetime.utcnow().isoformat()}")
print(f"Headers: {request.headers}")
try:
user = await users_collection.find_one({"email": current_user["email"]})
if not user:
logger.warning(f"User not found: {current_user['email']}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
response = {
"id": str(user["_id"]),
"email": user["email"],
"full_name": user.get("full_name", ""),
"role": user.get("role", "patient"),
"specialty": user.get("specialty"),
"created_at": user.get("created_at"),
"updated_at": user.get("updated_at"),
"device_token": user.get("device_token", "") # Include device token in response
}
logger.info(f"User profile retrieved for {current_user['email']} at {datetime.utcnow().isoformat()}")
return response
except Exception as e:
logger.error(f"Database error for user {current_user['email']}: {str(e)} at {datetime.utcnow().isoformat()}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Database error: {str(e)}"
)
# Export the router as 'auth' for api.__init__.py
auth = router