Auth.Nexus / main.py
ChandimaPrabath's picture
patch
5e372f6
import os
from fastapi import FastAPI, HTTPException, Header, status, APIRouter
from pydantic import BaseModel, EmailStr
from typing import Dict, List
from datetime import datetime, timedelta, timezone
from dateutil import parser
import secrets
import hashlib
import uuid
from user_agents import parse
from dotenv import load_dotenv
from supabase import create_client, Client
from typing import Optional
load_dotenv()
SERVER_NAME = "Nexus Authentication Service"
VERSION = "1.0.6 debug"
# Supabase Configuration
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_KEY")
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
SYSTEM_USER = os.getenv("SYSTEM_USER")
SYSTEM_PASSWORD = os.getenv("SYSTEM_PASSWORD")
app = FastAPI()
# Routers
root_router = APIRouter()
auth_router = APIRouter(prefix="/v1/api/auth", tags=["Authentication"])
admin_router = APIRouter(prefix="/v1/api/admin", tags=["Admin"])
# Constants
TOKEN_EXPIRATION_MINUTES = 1440 # 24 hours
ACCESS_LEVELS = ["default", "member", "admin", "dev", "hush"]
# Models
class SignupRequest(BaseModel):
username: str
password: str
email: Optional[EmailStr] = None
class LoginRequest(BaseModel):
username: str
password: str
class LoginResponse(BaseModel):
user_id: str
username: str
email: Optional[EmailStr]
access_level: str
date_joined: datetime
access_token: str
token_type: str = "bearer"
class TokenResponse(BaseModel):
access_token: str
token_type: str = "bearer"
class UserResponse(BaseModel):
username: str
email: Optional[EmailStr]
access_level: str
date_joined: datetime
class UsernameAvailabilityResponse(BaseModel):
username: str
is_available: bool
class UpdateUserRequest(BaseModel):
password: Optional[str] = None
email: Optional[EmailStr] = None
username: Optional[str] = None
class UpdateAccessLevelRequest(BaseModel):
access_level: str
def generate_numeric_user_id(length=16):
"""
Generates a numeric user ID with the specified length.
Args:
length: The desired length of the user ID.
Returns:
A string representing the numeric user ID.
"""
# Generate a random UUID
uuid_str = str(uuid.uuid4()).replace('-', '')
# Convert the UUID to an integer
uuid_int = int(uuid_str, 16)
# Generate a numeric ID with the specified length
numeric_id = str(uuid_int)[-length:]
print(numeric_id)
return numeric_id
# Utility functions
def hash_password(password: str) -> str:
return hashlib.sha256(password.encode()).hexdigest()
def verify_password(password: str, hashed_password: str) -> bool:
return hash_password(password) == hashed_password
def create_device_token(username: str, user_agent: str) -> str:
device_info = parse(user_agent)
token_seed = f"{username}-{device_info.device.family}-{device_info.os.family}-{secrets.token_hex(16)}"
return hashlib.sha256(token_seed.encode()).hexdigest()
def is_token_expired(expiration_time: datetime) -> bool:
return datetime.now(timezone.utc) > expiration_time
async def create_user(
username: str,
password: str,
email: Optional[str] = None,
access_level: Optional[str] = None,
):
"""
Creates a new user in the database.
Args:
username: The username of the new user.
password: The password of the new user (hashed).
email: The email address of the new user (optional).
access_level: The access level of the new user (optional).
Returns:
The created user object.
Raises:
HTTPException: If the username already exists.
"""
# Check if the username already exists
existing_user = supabase.table("users").select("*").eq("username", username).execute()
if existing_user.data:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Username already exists"
)
# Ensure the generated user_id is unique
while True:
user_id = generate_numeric_user_id()
existing_user_id = supabase.table("users").select("*").eq("user_id", user_id).execute()
if not existing_user_id.data:
break
# Prepare user data
user_data = {
"user_id": user_id,
"username": username,
"password": password,
"email": email,
"date_joined": datetime.now(timezone.utc).isoformat(),
"access_level": access_level or "default",
}
# Insert the user into the database
inserted_user = supabase.table("users").insert(user_data).execute()
return inserted_user.data[0]
# Initialize system user
async def init_system_user():
try:
await create_user(SYSTEM_USER, hash_password(SYSTEM_PASSWORD), None, "hush")
except HTTPException as e:
if e.status_code == status.HTTP_400_BAD_REQUEST and "Username already exists" in e.detail:
print("System user already exists, continuing...")
else:
raise e
async def validate_session(user_id: str, token: str, user_agent: str) -> Optional[dict]:
"""
Validates the session for the given user_id, token, and user_agent.
Args:
user_id: The user ID associated with the session.
token: The token to validate.
user_agent: The user agent/device identifier.
Returns:
The session data if valid.
Raises:
HTTPException: If the session is invalid or the token is expired.
"""
# Query to validate session by user_id, token, and device
session_query = (
supabase.table("sessions")
.select("*")
.eq("user_id", user_id)
.eq("token", token)
.eq("device", user_agent)
.execute()
)
# If no session found, raise unauthorized error
if not session_query.data:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token"
)
session = session_query.data[0]
# Get the current time (UTC) with timezone awareness
current_time = datetime.now(timezone.utc)
# Parse the 'expires' field from the session using dateutil.parser
session_expiry = parser.parse(session["expires"])
# Check if the token has expired
if session_expiry <= current_time:
# Delete the session if expired
supabase.table("sessions").delete().eq("user_id", user_id).eq("token", token).execute()
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired"
)
return session
@root_router.get("/", status_code=status.HTTP_200_OK)
async def root():
message = f'{SERVER_NAME} v{VERSION}'
return {"message": message}
# Authentication Routes
@auth_router.post("/signup", status_code=status.HTTP_201_CREATED)
async def signup(request: SignupRequest):
"""
Signup route handler.
Uses the create_user helper function to create a new user.
Args:
request: Signup request data.
Returns:
A JSON response with a success message.
"""
created_user = await create_user(request.username, hash_password(request.password), request.email)
return {"message": "User created successfully", "user": created_user}
@auth_router.post("/login", response_model=LoginResponse)
async def login(request: LoginRequest, user_agent: str = Header(...)):
# Query the user based on the username
user_query = supabase.table("users").select("*").eq("username", request.username).execute()
# If user not found or password verification fails, raise an error
if not user_query.data or not verify_password(request.password, user_query.data[0]["password"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials"
)
# Extract user details
user = user_query.data[0]
# Create a token and calculate expiration time in UTC
token = create_device_token(request.username, user_agent)
expiration_time = datetime.now(timezone.utc) + timedelta(minutes=TOKEN_EXPIRATION_MINUTES)
# Prepare session data with the expiration time in ISO 8601 format (UTC)
session_data = {
"user_id": user["user_id"],
"token": token,
"expires": expiration_time.isoformat(), # ISO 8601 format ensures the timezone is stored
"device": user_agent
}
# Insert the session data into the database
supabase.table("sessions").insert(session_data).execute()
# Return the login response with relevant user details
return LoginResponse(
user_id=user["user_id"],
username=user["username"],
email=user["email"],
access_level=user["access_level"],
date_joined=(user["date_joined"]),
access_token=token
)
@auth_router.post("/logout")
async def logout(user_id: str, token: str):
# Query to check if the session exists for the given user_id and token
session_query = (
supabase.table("sessions")
.select("*")
.eq("user_id", user_id)
.eq("token", token)
.execute()
)
# If session is not found, raise an unauthorized error
if not session_query.data:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Session not found or already expired"
)
# Delete the session using the composite key (user_id and token)
supabase.table("sessions").delete().eq("user_id", user_id).eq("token", token).execute()
return {"message": "Session forcefully expired"}
@auth_router.get("/validate", response_model=LoginResponse)
async def validate_token(user_id: str, token: str, user_agent: str = Header(...)):
"""
Route to validate a token based on user_id, token, and user agent.
Args:
user_id: The user ID associated with the token.
token: The token to validate.
user_agent: The user agent (from request header).
Returns:
TokenResponse: The validated token response.
"""
# Use the helper function to validate the session
await validate_session(user_id=user_id, token=token, user_agent=user_agent)
user_query = supabase.table("users").select("*").eq("user_id", user_id).execute()
# If user not found or password verification fails, raise an error
if not user_query.data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
)
# Extract user details
user = user_query.data[0]
# Return the login response with relevant user details
return LoginResponse(
user_id=user["user_id"],
username=user["username"],
email=user["email"],
access_level=user["access_level"],
date_joined=(user["date_joined"]),
access_token=token
)
@auth_router.get("/search-users", response_model=List[str])
async def search_users(query: str):
users = supabase.table("users").select("username").ilike("username", f"%{query}%").execute()
usernames = [user["username"] for user in users.data]
# Exclude SYSTEM_USER from the list if it's present
if SYSTEM_USER and SYSTEM_USER in usernames:
usernames.remove(SYSTEM_USER)
return usernames
@auth_router.get("/is-username-available", response_model=UsernameAvailabilityResponse)
async def is_username_available(query: str):
users = supabase.table("users").select("username").eq("username", query).execute()
return UsernameAvailabilityResponse(
username=query,
is_available=len(users.data) == 0 # If no users are found, the username is available
)
@auth_router.get("/get-user-id", response_model=str)
async def get_user_id(username: str):
user_query = supabase.table("users").select("user_id").eq("username", username).execute()
if not user_query.data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Username not found"
)
return user_query.data[0]["user_id"]
# Admin Routes
@admin_router.get("/users", response_model=List[UserResponse])
async def get_all_users(user_id: str, token: str, user_agent: str = Header(...)):
await validate_session(user_id, token, user_agent)
admin_query = supabase.table("users").select("access_level").eq("user_id", user_id).execute()
if not admin_query.data or admin_query.data[0]["access_level"] != "hush":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions"
)
users = supabase.table("users").select("*").execute()
return [
UserResponse(
username=user["username"],
email=user["email"],
access_level=user["access_level"],
date_joined=datetime.fromisoformat(user["date_joined"])
)
for user in users.data
]
@admin_router.get("/user/{user_id}", response_model=UserResponse)
async def get_user(admin_id: str, token: str, user_id: str, user_agent: str = Header(...)):
await validate_session(admin_id, token, user_agent)
admin_query = supabase.table("users").select("access_level").eq("user_id", admin_id).execute()
if not admin_query.data or admin_query.data[0]["access_level"] != "hush":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions"
)
user_query = supabase.table("users").select("*").eq("user_id", user_id).execute()
if not user_query.data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
)
user = user_query.data[0]
return UserResponse(
username=user["username"],
email=user["email"],
access_level=user["access_level"],
date_joined=datetime.fromisoformat(user["date_joined"])
)
@admin_router.put("/user/{user_id}", response_model=UserResponse)
async def update_user(admin_id: str, token: str, user_id: str, request: UpdateUserRequest, user_agent: str = Header(...)):
await validate_session(admin_id, token, user_agent)
admin_query = supabase.table("users").select("access_level").eq("user_id", admin_id).execute()
if not admin_query.data or admin_query.data[0]["access_level"] != "hush":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions"
)
update_data = {}
if request.password:
update_data["password"] = hash_password(request.password)
if request.email:
update_data["email"] = request.email
if request.username:
update_data["username"] = request.username
updated_user = supabase.table("users").update(update_data).eq("user_id", user_id).execute()
if not updated_user.data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
)
user = updated_user.data[0]
return UserResponse(
username=user["username"],
email=user["email"],
access_level=user["access_level"],
date_joined=datetime.fromisoformat(user["date_joined"])
)
@admin_router.put("/user/{user_id}/access-level")
async def update_access_level(admin_id: str, token: str, user_id: str, request: UpdateAccessLevelRequest, user_agent: str = Header(...)):
await validate_session(admin_id, token, user_agent)
admin_query = supabase.table("users").select("access_level").eq("user_id", admin_id).execute()
if not admin_query.data or admin_query.data[0]["access_level"] != "hush":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions"
)
user_query = supabase.table("users").select("*").eq("user_id", user_id).execute()
if not user_query.data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
)
user = user_query.data[0]
new_access_level = request.access_level
if ACCESS_LEVELS.index(new_access_level) <= ACCESS_LEVELS.index(user["access_level"]):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot downgrade a user or change to the same level",
)
updated_user = supabase.table("users").update({"access_level": new_access_level}).eq("user_id", user_id).execute()
user = updated_user.data[0]
return UserResponse(
username=user["username"],
email=user["email"],
access_level=user["access_level"],
date_joined=datetime.fromisoformat(user["date_joined"])
)
@auth_router.put("/user/update", response_model=UserResponse)
async def update_own_data(user_id: str,token: str, request: UpdateUserRequest, user_agent: str = Header(...)):
await validate_session(user_id, token, user_agent)
update_data = {}
if request.password:
update_data["password"] = hash_password(request.password)
if request.email:
update_data["email"] = request.email
if request.username:
update_data["username"] = request.username
updated_user = supabase.table("users").update(update_data).eq("user_id", user_id).execute()
if not updated_user.data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
)
user = updated_user.data[0]
return UserResponse(
username=user["username"],
email=user["email"],
access_level=user["access_level"],
date_joined=datetime.fromisoformat(user["date_joined"])
)
# Include routes
app.include_router(root_router)
app.include_router(auth_router)
app.include_router(admin_router)
# Initialize system user on startup
@app.on_event("startup")
async def startup_event():
await init_system_user()