Spaces:
Sleeping
Sleeping
| import os | |
| from fastapi import FastAPI, HTTPException, Header, status, APIRouter | |
| from pydantic import BaseModel, EmailStr | |
| from typing import Dict, List | |
| from datetime import datetime, timedelta, timezone | |
| from dateutil import parser | |
| import secrets | |
| import hashlib | |
| import uuid | |
| from user_agents import parse | |
| from dotenv import load_dotenv | |
| from supabase import create_client, Client | |
| from typing import Optional | |
| load_dotenv() | |
| SERVER_NAME = "Nexus Authentication Service" | |
| VERSION = "1.0.6 debug" | |
| # Supabase Configuration | |
| SUPABASE_URL = os.getenv("SUPABASE_URL") | |
| SUPABASE_KEY = os.getenv("SUPABASE_KEY") | |
| supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) | |
| SYSTEM_USER = os.getenv("SYSTEM_USER") | |
| SYSTEM_PASSWORD = os.getenv("SYSTEM_PASSWORD") | |
| app = FastAPI() | |
| # Routers | |
| root_router = APIRouter() | |
| auth_router = APIRouter(prefix="/v1/api/auth", tags=["Authentication"]) | |
| admin_router = APIRouter(prefix="/v1/api/admin", tags=["Admin"]) | |
| # Constants | |
| TOKEN_EXPIRATION_MINUTES = 1440 # 24 hours | |
| ACCESS_LEVELS = ["default", "member", "admin", "dev", "hush"] | |
| # Models | |
| class SignupRequest(BaseModel): | |
| username: str | |
| password: str | |
| email: Optional[EmailStr] = None | |
| class LoginRequest(BaseModel): | |
| username: str | |
| password: str | |
| class LoginResponse(BaseModel): | |
| user_id: str | |
| username: str | |
| email: Optional[EmailStr] | |
| access_level: str | |
| date_joined: datetime | |
| access_token: str | |
| token_type: str = "bearer" | |
| class TokenResponse(BaseModel): | |
| access_token: str | |
| token_type: str = "bearer" | |
| class UserResponse(BaseModel): | |
| username: str | |
| email: Optional[EmailStr] | |
| access_level: str | |
| date_joined: datetime | |
| class UsernameAvailabilityResponse(BaseModel): | |
| username: str | |
| is_available: bool | |
| class UpdateUserRequest(BaseModel): | |
| password: Optional[str] = None | |
| email: Optional[EmailStr] = None | |
| username: Optional[str] = None | |
| class UpdateAccessLevelRequest(BaseModel): | |
| access_level: str | |
| def generate_numeric_user_id(length=16): | |
| """ | |
| Generates a numeric user ID with the specified length. | |
| Args: | |
| length: The desired length of the user ID. | |
| Returns: | |
| A string representing the numeric user ID. | |
| """ | |
| # Generate a random UUID | |
| uuid_str = str(uuid.uuid4()).replace('-', '') | |
| # Convert the UUID to an integer | |
| uuid_int = int(uuid_str, 16) | |
| # Generate a numeric ID with the specified length | |
| numeric_id = str(uuid_int)[-length:] | |
| print(numeric_id) | |
| return numeric_id | |
| # Utility functions | |
| def hash_password(password: str) -> str: | |
| return hashlib.sha256(password.encode()).hexdigest() | |
| def verify_password(password: str, hashed_password: str) -> bool: | |
| return hash_password(password) == hashed_password | |
| def create_device_token(username: str, user_agent: str) -> str: | |
| device_info = parse(user_agent) | |
| token_seed = f"{username}-{device_info.device.family}-{device_info.os.family}-{secrets.token_hex(16)}" | |
| return hashlib.sha256(token_seed.encode()).hexdigest() | |
| def is_token_expired(expiration_time: datetime) -> bool: | |
| return datetime.now(timezone.utc) > expiration_time | |
| async def create_user( | |
| username: str, | |
| password: str, | |
| email: Optional[str] = None, | |
| access_level: Optional[str] = None, | |
| ): | |
| """ | |
| Creates a new user in the database. | |
| Args: | |
| username: The username of the new user. | |
| password: The password of the new user (hashed). | |
| email: The email address of the new user (optional). | |
| access_level: The access level of the new user (optional). | |
| Returns: | |
| The created user object. | |
| Raises: | |
| HTTPException: If the username already exists. | |
| """ | |
| # Check if the username already exists | |
| existing_user = supabase.table("users").select("*").eq("username", username).execute() | |
| if existing_user.data: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, detail="Username already exists" | |
| ) | |
| # Ensure the generated user_id is unique | |
| while True: | |
| user_id = generate_numeric_user_id() | |
| existing_user_id = supabase.table("users").select("*").eq("user_id", user_id).execute() | |
| if not existing_user_id.data: | |
| break | |
| # Prepare user data | |
| user_data = { | |
| "user_id": user_id, | |
| "username": username, | |
| "password": password, | |
| "email": email, | |
| "date_joined": datetime.now(timezone.utc).isoformat(), | |
| "access_level": access_level or "default", | |
| } | |
| # Insert the user into the database | |
| inserted_user = supabase.table("users").insert(user_data).execute() | |
| return inserted_user.data[0] | |
| # Initialize system user | |
| async def init_system_user(): | |
| try: | |
| await create_user(SYSTEM_USER, hash_password(SYSTEM_PASSWORD), None, "hush") | |
| except HTTPException as e: | |
| if e.status_code == status.HTTP_400_BAD_REQUEST and "Username already exists" in e.detail: | |
| print("System user already exists, continuing...") | |
| else: | |
| raise e | |
| async def validate_session(user_id: str, token: str, user_agent: str) -> Optional[dict]: | |
| """ | |
| Validates the session for the given user_id, token, and user_agent. | |
| Args: | |
| user_id: The user ID associated with the session. | |
| token: The token to validate. | |
| user_agent: The user agent/device identifier. | |
| Returns: | |
| The session data if valid. | |
| Raises: | |
| HTTPException: If the session is invalid or the token is expired. | |
| """ | |
| # Query to validate session by user_id, token, and device | |
| session_query = ( | |
| supabase.table("sessions") | |
| .select("*") | |
| .eq("user_id", user_id) | |
| .eq("token", token) | |
| .eq("device", user_agent) | |
| .execute() | |
| ) | |
| # If no session found, raise unauthorized error | |
| if not session_query.data: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token" | |
| ) | |
| session = session_query.data[0] | |
| # Get the current time (UTC) with timezone awareness | |
| current_time = datetime.now(timezone.utc) | |
| # Parse the 'expires' field from the session using dateutil.parser | |
| session_expiry = parser.parse(session["expires"]) | |
| # Check if the token has expired | |
| if session_expiry <= current_time: | |
| # Delete the session if expired | |
| supabase.table("sessions").delete().eq("user_id", user_id).eq("token", token).execute() | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired" | |
| ) | |
| return session | |
| async def root(): | |
| message = f'{SERVER_NAME} v{VERSION}' | |
| return {"message": message} | |
| # Authentication Routes | |
| async def signup(request: SignupRequest): | |
| """ | |
| Signup route handler. | |
| Uses the create_user helper function to create a new user. | |
| Args: | |
| request: Signup request data. | |
| Returns: | |
| A JSON response with a success message. | |
| """ | |
| created_user = await create_user(request.username, hash_password(request.password), request.email) | |
| return {"message": "User created successfully", "user": created_user} | |
| async def login(request: LoginRequest, user_agent: str = Header(...)): | |
| # Query the user based on the username | |
| user_query = supabase.table("users").select("*").eq("username", request.username).execute() | |
| # If user not found or password verification fails, raise an error | |
| if not user_query.data or not verify_password(request.password, user_query.data[0]["password"]): | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials" | |
| ) | |
| # Extract user details | |
| user = user_query.data[0] | |
| # Create a token and calculate expiration time in UTC | |
| token = create_device_token(request.username, user_agent) | |
| expiration_time = datetime.now(timezone.utc) + timedelta(minutes=TOKEN_EXPIRATION_MINUTES) | |
| # Prepare session data with the expiration time in ISO 8601 format (UTC) | |
| session_data = { | |
| "user_id": user["user_id"], | |
| "token": token, | |
| "expires": expiration_time.isoformat(), # ISO 8601 format ensures the timezone is stored | |
| "device": user_agent | |
| } | |
| # Insert the session data into the database | |
| supabase.table("sessions").insert(session_data).execute() | |
| # Return the login response with relevant user details | |
| return LoginResponse( | |
| user_id=user["user_id"], | |
| username=user["username"], | |
| email=user["email"], | |
| access_level=user["access_level"], | |
| date_joined=(user["date_joined"]), | |
| access_token=token | |
| ) | |
| async def logout(user_id: str, token: str): | |
| # Query to check if the session exists for the given user_id and token | |
| session_query = ( | |
| supabase.table("sessions") | |
| .select("*") | |
| .eq("user_id", user_id) | |
| .eq("token", token) | |
| .execute() | |
| ) | |
| # If session is not found, raise an unauthorized error | |
| if not session_query.data: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Session not found or already expired" | |
| ) | |
| # Delete the session using the composite key (user_id and token) | |
| supabase.table("sessions").delete().eq("user_id", user_id).eq("token", token).execute() | |
| return {"message": "Session forcefully expired"} | |
| async def validate_token(user_id: str, token: str, user_agent: str = Header(...)): | |
| """ | |
| Route to validate a token based on user_id, token, and user agent. | |
| Args: | |
| user_id: The user ID associated with the token. | |
| token: The token to validate. | |
| user_agent: The user agent (from request header). | |
| Returns: | |
| TokenResponse: The validated token response. | |
| """ | |
| # Use the helper function to validate the session | |
| await validate_session(user_id=user_id, token=token, user_agent=user_agent) | |
| user_query = supabase.table("users").select("*").eq("user_id", user_id).execute() | |
| # If user not found or password verification fails, raise an error | |
| if not user_query.data: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, detail="User not found" | |
| ) | |
| # Extract user details | |
| user = user_query.data[0] | |
| # Return the login response with relevant user details | |
| return LoginResponse( | |
| user_id=user["user_id"], | |
| username=user["username"], | |
| email=user["email"], | |
| access_level=user["access_level"], | |
| date_joined=(user["date_joined"]), | |
| access_token=token | |
| ) | |
| async def search_users(query: str): | |
| users = supabase.table("users").select("username").ilike("username", f"%{query}%").execute() | |
| usernames = [user["username"] for user in users.data] | |
| # Exclude SYSTEM_USER from the list if it's present | |
| if SYSTEM_USER and SYSTEM_USER in usernames: | |
| usernames.remove(SYSTEM_USER) | |
| return usernames | |
| async def is_username_available(query: str): | |
| users = supabase.table("users").select("username").eq("username", query).execute() | |
| return UsernameAvailabilityResponse( | |
| username=query, | |
| is_available=len(users.data) == 0 # If no users are found, the username is available | |
| ) | |
| async def get_user_id(username: str): | |
| user_query = supabase.table("users").select("user_id").eq("username", username).execute() | |
| if not user_query.data: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="Username not found" | |
| ) | |
| return user_query.data[0]["user_id"] | |
| # Admin Routes | |
| async def get_all_users(user_id: str, token: str, user_agent: str = Header(...)): | |
| await validate_session(user_id, token, user_agent) | |
| admin_query = supabase.table("users").select("access_level").eq("user_id", user_id).execute() | |
| if not admin_query.data or admin_query.data[0]["access_level"] != "hush": | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions" | |
| ) | |
| users = supabase.table("users").select("*").execute() | |
| return [ | |
| UserResponse( | |
| username=user["username"], | |
| email=user["email"], | |
| access_level=user["access_level"], | |
| date_joined=datetime.fromisoformat(user["date_joined"]) | |
| ) | |
| for user in users.data | |
| ] | |
| async def get_user(admin_id: str, token: str, user_id: str, user_agent: str = Header(...)): | |
| await validate_session(admin_id, token, user_agent) | |
| admin_query = supabase.table("users").select("access_level").eq("user_id", admin_id).execute() | |
| if not admin_query.data or admin_query.data[0]["access_level"] != "hush": | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions" | |
| ) | |
| user_query = supabase.table("users").select("*").eq("user_id", user_id).execute() | |
| if not user_query.data: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, detail="User not found" | |
| ) | |
| user = user_query.data[0] | |
| return UserResponse( | |
| username=user["username"], | |
| email=user["email"], | |
| access_level=user["access_level"], | |
| date_joined=datetime.fromisoformat(user["date_joined"]) | |
| ) | |
| async def update_user(admin_id: str, token: str, user_id: str, request: UpdateUserRequest, user_agent: str = Header(...)): | |
| await validate_session(admin_id, token, user_agent) | |
| admin_query = supabase.table("users").select("access_level").eq("user_id", admin_id).execute() | |
| if not admin_query.data or admin_query.data[0]["access_level"] != "hush": | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions" | |
| ) | |
| update_data = {} | |
| if request.password: | |
| update_data["password"] = hash_password(request.password) | |
| if request.email: | |
| update_data["email"] = request.email | |
| if request.username: | |
| update_data["username"] = request.username | |
| updated_user = supabase.table("users").update(update_data).eq("user_id", user_id).execute() | |
| if not updated_user.data: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, detail="User not found" | |
| ) | |
| user = updated_user.data[0] | |
| return UserResponse( | |
| username=user["username"], | |
| email=user["email"], | |
| access_level=user["access_level"], | |
| date_joined=datetime.fromisoformat(user["date_joined"]) | |
| ) | |
| async def update_access_level(admin_id: str, token: str, user_id: str, request: UpdateAccessLevelRequest, user_agent: str = Header(...)): | |
| await validate_session(admin_id, token, user_agent) | |
| admin_query = supabase.table("users").select("access_level").eq("user_id", admin_id).execute() | |
| if not admin_query.data or admin_query.data[0]["access_level"] != "hush": | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions" | |
| ) | |
| user_query = supabase.table("users").select("*").eq("user_id", user_id).execute() | |
| if not user_query.data: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, detail="User not found" | |
| ) | |
| user = user_query.data[0] | |
| new_access_level = request.access_level | |
| if ACCESS_LEVELS.index(new_access_level) <= ACCESS_LEVELS.index(user["access_level"]): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Cannot downgrade a user or change to the same level", | |
| ) | |
| updated_user = supabase.table("users").update({"access_level": new_access_level}).eq("user_id", user_id).execute() | |
| user = updated_user.data[0] | |
| return UserResponse( | |
| username=user["username"], | |
| email=user["email"], | |
| access_level=user["access_level"], | |
| date_joined=datetime.fromisoformat(user["date_joined"]) | |
| ) | |
| async def update_own_data(user_id: str,token: str, request: UpdateUserRequest, user_agent: str = Header(...)): | |
| await validate_session(user_id, token, user_agent) | |
| update_data = {} | |
| if request.password: | |
| update_data["password"] = hash_password(request.password) | |
| if request.email: | |
| update_data["email"] = request.email | |
| if request.username: | |
| update_data["username"] = request.username | |
| updated_user = supabase.table("users").update(update_data).eq("user_id", user_id).execute() | |
| if not updated_user.data: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, detail="User not found" | |
| ) | |
| user = updated_user.data[0] | |
| return UserResponse( | |
| username=user["username"], | |
| email=user["email"], | |
| access_level=user["access_level"], | |
| date_joined=datetime.fromisoformat(user["date_joined"]) | |
| ) | |
| # Include routes | |
| app.include_router(root_router) | |
| app.include_router(auth_router) | |
| app.include_router(admin_router) | |
| # Initialize system user on startup | |
| async def startup_event(): | |
| await init_system_user() |