import os from fastapi import FastAPI, HTTPException, Header, status, APIRouter from pydantic import BaseModel, EmailStr from typing import Dict, List from datetime import datetime, timedelta, timezone import secrets import hashlib import uuid from user_agents import parse from dotenv import load_dotenv from supabase import create_client, Client from typing import Optional load_dotenv() SERVER_NAME = "Nexus Authentication Service" VERSION = "1.0.4 debug" # Supabase Configuration SUPABASE_URL = os.getenv("SUPABASE_URL") SUPABASE_KEY = os.getenv("SUPABASE_KEY") supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) SYSTEM_USER = os.getenv("SYSTEM_USER") SYSTEM_PASSWORD = os.getenv("SYSTEM_PASSWORD") app = FastAPI() # Routers root_router = APIRouter() auth_router = APIRouter(prefix="/v1/api/auth", tags=["Authentication"]) admin_router = APIRouter(prefix="/v1/api/admin", tags=["Admin"]) # Constants TOKEN_EXPIRATION_MINUTES = 1440 # 24 hours ACCESS_LEVELS = ["default", "member", "admin", "dev", "hush"] # Models class SignupRequest(BaseModel): username: str password: str email: Optional[EmailStr] = None class LoginRequest(BaseModel): username: str password: str class LoginResponse(BaseModel): user_id: str username: str email: Optional[EmailStr] access_level: str date_joined: datetime access_token: str token_type: str = "bearer" class TokenResponse(BaseModel): access_token: str token_type: str = "bearer" class UserResponse(BaseModel): username: str email: Optional[EmailStr] access_level: str date_joined: datetime class UpdateUserRequest(BaseModel): password: Optional[str] = None email: Optional[EmailStr] = None username: Optional[str] = None class UpdateAccessLevelRequest(BaseModel): access_level: str def generate_numeric_user_id(length=10): """ Generates a numeric user ID with the specified length. Args: length: The desired length of the user ID. Returns: A string representing the numeric user ID. """ # Generate a random UUID uuid_str = str(uuid.uuid4()).replace('-', '') # Convert the UUID to an integer uuid_int = int(uuid_str, 16) # Generate a numeric ID with the specified length numeric_id = str(uuid_int)[-length:] return numeric_id # Generate and print a numeric user ID user_id = generate_numeric_user_id() print(f"Generated User ID: {user_id}") # Utility functions def hash_password(password: str) -> str: return hashlib.sha256(password.encode()).hexdigest() def verify_password(password: str, hashed_password: str) -> bool: return hash_password(password) == hashed_password def create_device_token(username: str, user_agent: str) -> str: device_info = parse(user_agent) token_seed = f"{username}-{device_info.device.family}-{device_info.os.family}-{secrets.token_hex(16)}" return hashlib.sha256(token_seed.encode()).hexdigest() def is_token_expired(expiration_time: datetime) -> bool: return datetime.now(timezone.utc) > expiration_time # Initialize system user async def init_system_user(): system_user_data = { "user_id": str(uuid.uuid4()), "username": SYSTEM_USER, "password": hash_password(SYSTEM_PASSWORD), "email": None, "date_joined": datetime.now(timezone.utc).isoformat(), "access_level": "hush" } # Check if system user exists existing_user = supabase.table("users").select("*").eq("username", SYSTEM_USER).execute() if not existing_user.data: supabase.table("users").insert(system_user_data).execute() @root_router.get("/", status_code=status.HTTP_200_OK) async def root(): message = f'{SERVER_NAME} v{VERSION}' return {"message": message} # Authentication Routes @auth_router.post("/signup", status_code=status.HTTP_201_CREATED) async def signup(request: SignupRequest): # Check if username exists existing_user = supabase.table("users").select("*").eq("username", request.username).execute() if existing_user.data: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Username already exists" ) user_data = { "user_id": int(generate_numeric_user_id(length=16)), "username": request.username, "password": hash_password(request.password), "email": request.email, "date_joined": datetime.now(timezone.utc).isoformat(), "access_level": "default" } supabase.table("users").insert(user_data).execute() return {"message": "User created successfully"} from datetime import datetime, timedelta, timezone @auth_router.post("/login", response_model=LoginResponse) async def login(request: LoginRequest, user_agent: str = Header(...)): # Query the user based on the username user_query = supabase.table("users").select("*").eq("username", request.username).execute() # If user not found or password verification fails, raise an error if not user_query.data or not verify_password(request.password, user_query.data[0]["password"]): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials" ) # Extract user details user = user_query.data[0] # Create a token and calculate expiration time in UTC token = create_device_token(request.username, user_agent) expiration_time = datetime.now(timezone.utc) + timedelta(minutes=TOKEN_EXPIRATION_MINUTES) # Prepare session data with the expiration time in ISO 8601 format (UTC) session_data = { "user_id": user["user_id"], "token": token, "expires": expiration_time.isoformat(), # ISO 8601 format ensures the timezone is stored "device": user_agent } # Insert the session data into the database supabase.table("sessions").insert(session_data).execute() # Return the login response with relevant user details return LoginResponse( user_id=user["user_id"], username=user["username"], email=user["email"], access_level=user["access_level"], date_joined=(user["date_joined"]), access_token=token ) @auth_router.post("/logout") async def logout(user_id: str, token: str): # Query to check if the session exists for the given user_id and token session_query = ( supabase.table("sessions") .select("*") .eq("user_id", user_id) .eq("token", token) .execute() ) # If session is not found, raise an unauthorized error if not session_query.data: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Session not found or already expired" ) # Delete the session using the composite key (user_id and token) supabase.table("sessions").delete().eq("user_id", user_id).eq("token", token).execute() return {"message": "Session forcefully expired"} from datetime import datetime, timezone @auth_router.get("/validate", response_model=TokenResponse) async def validate_token(user_id: str, token: str, user_agent: str = Header(...)): # Query to validate session by user_id, token, and device session_query = ( supabase.table("sessions") .select("*") .eq("user_id", user_id) .eq("token", token) .eq("device", user_agent) .execute() ) # If no session found, raise unauthorized error if not session_query.data: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token" ) session = session_query.data[0] # Get the current time (UTC) with timezone awareness current_time = datetime.now(timezone.utc) # Parse the 'expires' field from the session as an offset-aware datetime session_expiry = datetime.fromisoformat(session["expires"]) # Check if the token has expired if session_expiry <= current_time: # Delete the session if expired supabase.table("sessions").delete().eq("user_id", user_id).eq("token", token).execute() raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired" ) return TokenResponse(access_token=token) @auth_router.get("/search-users", response_model=List[str]) async def search_users(query: str): users = supabase.table("users").select("username").ilike("username", f"%{query}%").execute() return [user["username"] for user in users.data] @auth_router.get("/get-user-id", response_model=str) async def get_user_id(username: str): user_query = supabase.table("users").select("user_id").eq("username", username).execute() if not user_query.data: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Username not found" ) return user_query.data[0]["user_id"] # Admin Routes @admin_router.get("/users", response_model=List[UserResponse]) async def get_all_users(user_id: str, token: str, user_agent: str = Header(...)): await validate_token(user_id, token, user_agent) admin_query = supabase.table("users").select("access_level").eq("user_id", user_id).execute() if not admin_query.data or admin_query.data[0]["access_level"] != "hush": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions" ) users = supabase.table("users").select("*").execute() return [ UserResponse( username=user["username"], email=user["email"], access_level=user["access_level"], date_joined=datetime.fromisoformat(user["date_joined"]) ) for user in users.data ] @admin_router.get("/user/{user_id}", response_model=UserResponse) async def get_user(admin_id: str, token: str, user_id: str, user_agent: str = Header(...)): await validate_token(admin_id, token, user_agent) admin_query = supabase.table("users").select("access_level").eq("user_id", admin_id).execute() if not admin_query.data or admin_query.data[0]["access_level"] != "hush": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions" ) user_query = supabase.table("users").select("*").eq("user_id", user_id).execute() if not user_query.data: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) user = user_query.data[0] return UserResponse( username=user["username"], email=user["email"], access_level=user["access_level"], date_joined=datetime.fromisoformat(user["date_joined"]) ) @admin_router.put("/user/{user_id}", response_model=UserResponse) async def update_user(admin_id: str, token: str, user_id: str, request: UpdateUserRequest, user_agent: str = Header(...)): await validate_token(admin_id, token, user_agent) admin_query = supabase.table("users").select("access_level").eq("user_id", admin_id).execute() if not admin_query.data or admin_query.data[0]["access_level"] != "hush": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions" ) update_data = {} if request.password: update_data["password"] = hash_password(request.password) if request.email: update_data["email"] = request.email if request.username: update_data["username"] = request.username updated_user = supabase.table("users").update(update_data).eq("user_id", user_id).execute() if not updated_user.data: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) user = updated_user.data[0] return UserResponse( username=user["username"], email=user["email"], access_level=user["access_level"], date_joined=datetime.fromisoformat(user["date_joined"]) ) @admin_router.put("/user/{user_id}/access-level") async def update_access_level(admin_id: str, token: str, user_id: str, request: UpdateAccessLevelRequest, user_agent: str = Header(...)): await validate_token(admin_id, token, user_agent) admin_query = supabase.table("users").select("access_level").eq("user_id", admin_id).execute() if not admin_query.data or admin_query.data[0]["access_level"] != "hush": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions" ) user_query = supabase.table("users").select("*").eq("user_id", user_id).execute() if not user_query.data: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) user = user_query.data[0] new_access_level = request.access_level if ACCESS_LEVELS.index(new_access_level) <= ACCESS_LEVELS.index(user["access_level"]): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot downgrade a user or change to the same level", ) updated_user = supabase.table("users").update({"access_level": new_access_level}).eq("user_id", user_id).execute() user = updated_user.data[0] return UserResponse( username=user["username"], email=user["email"], access_level=user["access_level"], date_joined=datetime.fromisoformat(user["date_joined"]) ) @auth_router.put("/user/update", response_model=UserResponse) async def update_own_data(user_id: str, request: UpdateUserRequest, token: str = Header(...), user_agent: str = Header(...)): await validate_token(user_id, token, user_agent) update_data = {} if request.password: update_data["password"] = hash_password(request.password) if request.email: update_data["email"] = request.email if request.username: update_data["username"] = request.username updated_user = supabase.table("users").update(update_data).eq("user_id", user_id).execute() if not updated_user.data: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) user = updated_user.data[0] return UserResponse( username=user["username"], email=user["email"], access_level=user["access_level"], date_joined=datetime.fromisoformat(user["date_joined"]) ) # Include routes app.include_router(root_router) app.include_router(auth_router) app.include_router(admin_router) # Initialize system user on startup @app.on_event("startup") async def startup_event(): await init_system_user()