import os from fastapi import FastAPI, HTTPException, Header, status, APIRouter from pydantic import BaseModel, EmailStr from typing import Dict, List from datetime import datetime, timedelta, timezone from dateutil import parser import secrets import hashlib import uuid from user_agents import parse from dotenv import load_dotenv from supabase import create_client, Client from typing import Optional load_dotenv() SERVER_NAME = "Nexus Authentication Service" VERSION = "1.0.6 debug" # Supabase Configuration SUPABASE_URL = os.getenv("SUPABASE_URL") SUPABASE_KEY = os.getenv("SUPABASE_KEY") supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) SYSTEM_USER = os.getenv("SYSTEM_USER") SYSTEM_PASSWORD = os.getenv("SYSTEM_PASSWORD") app = FastAPI() # Routers root_router = APIRouter() auth_router = APIRouter(prefix="/v1/api/auth", tags=["Authentication"]) admin_router = APIRouter(prefix="/v1/api/admin", tags=["Admin"]) # Constants TOKEN_EXPIRATION_MINUTES = 1440 # 24 hours ACCESS_LEVELS = ["default", "member", "admin", "dev", "hush"] # Models class SignupRequest(BaseModel): username: str password: str email: Optional[EmailStr] = None class LoginRequest(BaseModel): username: str password: str class LoginResponse(BaseModel): user_id: str username: str email: Optional[EmailStr] access_level: str date_joined: datetime access_token: str token_type: str = "bearer" class TokenResponse(BaseModel): access_token: str token_type: str = "bearer" class UserResponse(BaseModel): username: str email: Optional[EmailStr] access_level: str date_joined: datetime class UsernameAvailabilityResponse(BaseModel): username: str is_available: bool class UpdateUserRequest(BaseModel): password: Optional[str] = None email: Optional[EmailStr] = None username: Optional[str] = None class UpdateAccessLevelRequest(BaseModel): access_level: str def generate_numeric_user_id(length=16): """ Generates a numeric user ID with the specified length. Args: length: The desired length of the user ID. Returns: A string representing the numeric user ID. """ # Generate a random UUID uuid_str = str(uuid.uuid4()).replace('-', '') # Convert the UUID to an integer uuid_int = int(uuid_str, 16) # Generate a numeric ID with the specified length numeric_id = str(uuid_int)[-length:] print(numeric_id) return numeric_id # Utility functions def hash_password(password: str) -> str: return hashlib.sha256(password.encode()).hexdigest() def verify_password(password: str, hashed_password: str) -> bool: return hash_password(password) == hashed_password def create_device_token(username: str, user_agent: str) -> str: device_info = parse(user_agent) token_seed = f"{username}-{device_info.device.family}-{device_info.os.family}-{secrets.token_hex(16)}" return hashlib.sha256(token_seed.encode()).hexdigest() def is_token_expired(expiration_time: datetime) -> bool: return datetime.now(timezone.utc) > expiration_time async def create_user( username: str, password: str, email: Optional[str] = None, access_level: Optional[str] = None, ): """ Creates a new user in the database. Args: username: The username of the new user. password: The password of the new user (hashed). email: The email address of the new user (optional). access_level: The access level of the new user (optional). Returns: The created user object. Raises: HTTPException: If the username already exists. """ # Check if the username already exists existing_user = supabase.table("users").select("*").eq("username", username).execute() if existing_user.data: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Username already exists" ) # Ensure the generated user_id is unique while True: user_id = generate_numeric_user_id() existing_user_id = supabase.table("users").select("*").eq("user_id", user_id).execute() if not existing_user_id.data: break # Prepare user data user_data = { "user_id": user_id, "username": username, "password": password, "email": email, "date_joined": datetime.now(timezone.utc).isoformat(), "access_level": access_level or "default", } # Insert the user into the database inserted_user = supabase.table("users").insert(user_data).execute() return inserted_user.data[0] # Initialize system user async def init_system_user(): try: await create_user(SYSTEM_USER, hash_password(SYSTEM_PASSWORD), None, "hush") except HTTPException as e: if e.status_code == status.HTTP_400_BAD_REQUEST and "Username already exists" in e.detail: print("System user already exists, continuing...") else: raise e async def validate_session(user_id: str, token: str, user_agent: str) -> Optional[dict]: """ Validates the session for the given user_id, token, and user_agent. Args: user_id: The user ID associated with the session. token: The token to validate. user_agent: The user agent/device identifier. Returns: The session data if valid. Raises: HTTPException: If the session is invalid or the token is expired. """ # Query to validate session by user_id, token, and device session_query = ( supabase.table("sessions") .select("*") .eq("user_id", user_id) .eq("token", token) .eq("device", user_agent) .execute() ) # If no session found, raise unauthorized error if not session_query.data: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token" ) session = session_query.data[0] # Get the current time (UTC) with timezone awareness current_time = datetime.now(timezone.utc) # Parse the 'expires' field from the session using dateutil.parser session_expiry = parser.parse(session["expires"]) # Check if the token has expired if session_expiry <= current_time: # Delete the session if expired supabase.table("sessions").delete().eq("user_id", user_id).eq("token", token).execute() raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired" ) return session @root_router.get("/", status_code=status.HTTP_200_OK) async def root(): message = f'{SERVER_NAME} v{VERSION}' return {"message": message} # Authentication Routes @auth_router.post("/signup", status_code=status.HTTP_201_CREATED) async def signup(request: SignupRequest): """ Signup route handler. Uses the create_user helper function to create a new user. Args: request: Signup request data. Returns: A JSON response with a success message. """ created_user = await create_user(request.username, hash_password(request.password), request.email) return {"message": "User created successfully", "user": created_user} @auth_router.post("/login", response_model=LoginResponse) async def login(request: LoginRequest, user_agent: str = Header(...)): # Query the user based on the username user_query = supabase.table("users").select("*").eq("username", request.username).execute() # If user not found or password verification fails, raise an error if not user_query.data or not verify_password(request.password, user_query.data[0]["password"]): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials" ) # Extract user details user = user_query.data[0] # Create a token and calculate expiration time in UTC token = create_device_token(request.username, user_agent) expiration_time = datetime.now(timezone.utc) + timedelta(minutes=TOKEN_EXPIRATION_MINUTES) # Prepare session data with the expiration time in ISO 8601 format (UTC) session_data = { "user_id": user["user_id"], "token": token, "expires": expiration_time.isoformat(), # ISO 8601 format ensures the timezone is stored "device": user_agent } # Insert the session data into the database supabase.table("sessions").insert(session_data).execute() # Return the login response with relevant user details return LoginResponse( user_id=user["user_id"], username=user["username"], email=user["email"], access_level=user["access_level"], date_joined=(user["date_joined"]), access_token=token ) @auth_router.post("/logout") async def logout(user_id: str, token: str): # Query to check if the session exists for the given user_id and token session_query = ( supabase.table("sessions") .select("*") .eq("user_id", user_id) .eq("token", token) .execute() ) # If session is not found, raise an unauthorized error if not session_query.data: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Session not found or already expired" ) # Delete the session using the composite key (user_id and token) supabase.table("sessions").delete().eq("user_id", user_id).eq("token", token).execute() return {"message": "Session forcefully expired"} @auth_router.get("/validate", response_model=LoginResponse) async def validate_token(user_id: str, token: str, user_agent: str = Header(...)): """ Route to validate a token based on user_id, token, and user agent. Args: user_id: The user ID associated with the token. token: The token to validate. user_agent: The user agent (from request header). Returns: TokenResponse: The validated token response. """ # Use the helper function to validate the session await validate_session(user_id=user_id, token=token, user_agent=user_agent) user_query = supabase.table("users").select("*").eq("user_id", user_id).execute() # If user not found or password verification fails, raise an error if not user_query.data: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) # Extract user details user = user_query.data[0] # Return the login response with relevant user details return LoginResponse( user_id=user["user_id"], username=user["username"], email=user["email"], access_level=user["access_level"], date_joined=(user["date_joined"]), access_token=token ) @auth_router.get("/search-users", response_model=List[str]) async def search_users(query: str): users = supabase.table("users").select("username").ilike("username", f"%{query}%").execute() usernames = [user["username"] for user in users.data] # Exclude SYSTEM_USER from the list if it's present if SYSTEM_USER and SYSTEM_USER in usernames: usernames.remove(SYSTEM_USER) return usernames @auth_router.get("/is-username-available", response_model=UsernameAvailabilityResponse) async def is_username_available(query: str): users = supabase.table("users").select("username").eq("username", query).execute() return UsernameAvailabilityResponse( username=query, is_available=len(users.data) == 0 # If no users are found, the username is available ) @auth_router.get("/get-user-id", response_model=str) async def get_user_id(username: str): user_query = supabase.table("users").select("user_id").eq("username", username).execute() if not user_query.data: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Username not found" ) return user_query.data[0]["user_id"] # Admin Routes @admin_router.get("/users", response_model=List[UserResponse]) async def get_all_users(user_id: str, token: str, user_agent: str = Header(...)): await validate_session(user_id, token, user_agent) admin_query = supabase.table("users").select("access_level").eq("user_id", user_id).execute() if not admin_query.data or admin_query.data[0]["access_level"] != "hush": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions" ) users = supabase.table("users").select("*").execute() return [ UserResponse( username=user["username"], email=user["email"], access_level=user["access_level"], date_joined=datetime.fromisoformat(user["date_joined"]) ) for user in users.data ] @admin_router.get("/user/{user_id}", response_model=UserResponse) async def get_user(admin_id: str, token: str, user_id: str, user_agent: str = Header(...)): await validate_session(admin_id, token, user_agent) admin_query = supabase.table("users").select("access_level").eq("user_id", admin_id).execute() if not admin_query.data or admin_query.data[0]["access_level"] != "hush": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions" ) user_query = supabase.table("users").select("*").eq("user_id", user_id).execute() if not user_query.data: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) user = user_query.data[0] return UserResponse( username=user["username"], email=user["email"], access_level=user["access_level"], date_joined=datetime.fromisoformat(user["date_joined"]) ) @admin_router.put("/user/{user_id}", response_model=UserResponse) async def update_user(admin_id: str, token: str, user_id: str, request: UpdateUserRequest, user_agent: str = Header(...)): await validate_session(admin_id, token, user_agent) admin_query = supabase.table("users").select("access_level").eq("user_id", admin_id).execute() if not admin_query.data or admin_query.data[0]["access_level"] != "hush": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions" ) update_data = {} if request.password: update_data["password"] = hash_password(request.password) if request.email: update_data["email"] = request.email if request.username: update_data["username"] = request.username updated_user = supabase.table("users").update(update_data).eq("user_id", user_id).execute() if not updated_user.data: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) user = updated_user.data[0] return UserResponse( username=user["username"], email=user["email"], access_level=user["access_level"], date_joined=datetime.fromisoformat(user["date_joined"]) ) @admin_router.put("/user/{user_id}/access-level") async def update_access_level(admin_id: str, token: str, user_id: str, request: UpdateAccessLevelRequest, user_agent: str = Header(...)): await validate_session(admin_id, token, user_agent) admin_query = supabase.table("users").select("access_level").eq("user_id", admin_id).execute() if not admin_query.data or admin_query.data[0]["access_level"] != "hush": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions" ) user_query = supabase.table("users").select("*").eq("user_id", user_id).execute() if not user_query.data: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) user = user_query.data[0] new_access_level = request.access_level if ACCESS_LEVELS.index(new_access_level) <= ACCESS_LEVELS.index(user["access_level"]): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot downgrade a user or change to the same level", ) updated_user = supabase.table("users").update({"access_level": new_access_level}).eq("user_id", user_id).execute() user = updated_user.data[0] return UserResponse( username=user["username"], email=user["email"], access_level=user["access_level"], date_joined=datetime.fromisoformat(user["date_joined"]) ) @auth_router.put("/user/update", response_model=UserResponse) async def update_own_data(user_id: str,token: str, request: UpdateUserRequest, user_agent: str = Header(...)): await validate_session(user_id, token, user_agent) update_data = {} if request.password: update_data["password"] = hash_password(request.password) if request.email: update_data["email"] = request.email if request.username: update_data["username"] = request.username updated_user = supabase.table("users").update(update_data).eq("user_id", user_id).execute() if not updated_user.data: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) user = updated_user.data[0] return UserResponse( username=user["username"], email=user["email"], access_level=user["access_level"], date_joined=datetime.fromisoformat(user["date_joined"]) ) # Include routes app.include_router(root_router) app.include_router(auth_router) app.include_router(admin_router) # Initialize system user on startup @app.on_event("startup") async def startup_event(): await init_system_user()