CPS-API / api /routes.py
Ali2206's picture
Update api/routes.py
51eb909 verified
raw
history blame
6.73 kB
from fastapi import APIRouter, HTTPException, Depends, Body
from fastapi.security import OAuth2PasswordRequestForm
from models.schemas import SignupForm, TokenResponse, PatientCreate, DoctorCreate, AppointmentCreate
from db.mongo import users_collection, patients_collection, appointments_collection
from core.security import hash_password, verify_password, create_access_token, get_current_user
from datetime import datetime
from bson import ObjectId
from bson.errors import InvalidId
from typing import Optional
from pydantic import BaseModel
router = APIRouter()
# --- SIGNUP ---
@router.post("/signup")
async def signup(data: SignupForm):
if data.role != "patient":
raise HTTPException(status_code=403, detail="Only patients can sign up via this route")
email = data.email.lower().strip()
existing = await users_collection.find_one({"email": email})
if existing:
raise HTTPException(status_code=409, detail="Email already exists")
hashed_pw = hash_password(data.password)
user_doc = {
"email": email,
"full_name": data.full_name.strip(),
"password": hashed_pw,
"role": "patient",
"created_at": datetime.utcnow()
}
await users_collection.insert_one(user_doc)
return {"success": True, "message": "Patient account created"}
# --- ADMIN: Create doctor account ---
@router.post("/admin/create-doctor")
async def create_doctor(data: DoctorCreate):
existing = await users_collection.find_one({"email": data.email})
if existing:
raise HTTPException(status_code=409, detail="Email already exists")
hashed_pw = hash_password(data.password)
user_doc = {
"matricule": data.matricule,
"email": data.email.lower().strip(),
"full_name": data.full_name.strip(),
"specialty": data.specialty,
"password": hashed_pw,
"role": "doctor",
"created_at": datetime.utcnow()
}
await users_collection.insert_one(user_doc)
return {"success": True, "message": "Doctor account created"}
# --- LOGIN ---
@router.post("/login", response_model=TokenResponse)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
email = form_data.username.lower().strip()
user = await users_collection.find_one({"email": email})
if not user or not verify_password(form_data.password, user["password"]):
raise HTTPException(status_code=401, detail="Invalid credentials")
access_token = create_access_token(data={"sub": user["email"]})
return {"access_token": access_token, "token_type": "bearer"}
# --- GET CURRENT USER ---
@router.get("/me")
async def get_me(current_user: dict = Depends(get_current_user)):
return {
"email": current_user["email"],
"full_name": current_user.get("full_name", ""),
"role": current_user.get("role", "unknown"),
"created_at": current_user.get("created_at", "")
}
# --- ADD NEW PATIENT ---
@router.post("/patients")
async def add_patient(data: PatientCreate, current_user: dict = Depends(get_current_user)):
if current_user.get("role") != "doctor":
raise HTTPException(status_code=403, detail="Only doctors can add patients")
patient_doc = {
**data.dict(),
"date_of_birth": datetime.combine(data.date_of_birth, datetime.min.time()),
"contact": data.contact.dict() if data.contact else {},
"created_by": current_user["email"],
"created_at": datetime.utcnow()
}
result = await patients_collection.insert_one(patient_doc)
return {"id": str(result.inserted_id), "message": "Patient created successfully"}
# --- GET ALL PATIENTS ---
@router.get("/patients")
async def list_patients(current_user: dict = Depends(get_current_user)):
if current_user.get("role") != "doctor":
raise HTTPException(status_code=403, detail="Only doctors can view patients")
patients_cursor = patients_collection.find({"created_by": current_user["email"]})
patients = []
async for patient in patients_cursor:
patients.append({
"id": str(patient["_id"]),
"full_name": patient.get("full_name", ""),
"date_of_birth": patient.get("date_of_birth"),
"gender": patient.get("gender", ""),
"notes": patient.get("notes", "")
})
return patients
# --- COUNT PATIENTS ---
@router.get("/patients-count")
async def count_patients(current_user: dict = Depends(get_current_user)):
if current_user.get("role") != "doctor":
raise HTTPException(status_code=403, detail="Only doctors can count patients")
count = await patients_collection.count_documents({"created_by": current_user["email"]})
return {"count": count}
# =========================
# APPOINTMENT ROUTES
# =========================
# --- CREATE APPOINTMENT (doctor only) ---
@router.post("/appointments")
async def create_appointment(data: AppointmentCreate, current_user: dict = Depends(get_current_user)):
if current_user.get("role") != "doctor":
raise HTTPException(status_code=403, detail="Only doctors can create appointments")
appointment_doc = {
"patient_id": ObjectId(data.patient_id),
"doctor_id": ObjectId(data.doctor_id),
"date": data.date,
"time": data.time,
"reason": data.reason,
"created_by": current_user["email"],
"created_at": datetime.utcnow()
}
await appointments_collection.insert_one(appointment_doc)
return {"message": "Appointment created successfully"}
# --- LIST DOCTOR'S APPOINTMENTS ---
@router.get("/appointments/doctor")
async def list_doctor_appointments(current_user: dict = Depends(get_current_user)):
if current_user.get("role") != "doctor":
raise HTTPException(status_code=403, detail="Only doctors can view this")
cursor = appointments_collection.find({"doctor_id": {"$exists": True}})
return [{**a, "_id": str(a["_id"]), "patient_id": str(a["patient_id"]), "doctor_id": str(a["doctor_id"])} async for a in cursor]
# --- LIST PATIENT'S APPOINTMENTS ---
@router.get("/appointments/patient")
async def list_my_appointments(current_user: dict = Depends(get_current_user)):
if current_user.get("role") != "patient":
raise HTTPException(status_code=403, detail="Only patients can view their appointments")
user = await users_collection.find_one({"email": current_user["email"]})
if not user:
raise HTTPException(status_code=404, detail="User not found")
patient_id = user.get("_id")
cursor = appointments_collection.find({"patient_id": patient_id})
return [{**a, "_id": str(a["_id"]), "patient_id": str(a["patient_id"]), "doctor_id": str(a["doctor_id"])} async for a in cursor]