Auth.Nexus / main.py
ChandimaPrabath's picture
num user id
07c16c1
raw
history blame
14.7 kB
import os
from fastapi import FastAPI, HTTPException, Header, status, APIRouter
from pydantic import BaseModel, EmailStr
from typing import Dict, List
from datetime import datetime, timedelta, timezone
import secrets
import hashlib
import uuid
from user_agents import parse
from dotenv import load_dotenv
from supabase import create_client, Client
from typing import Optional
load_dotenv()
SERVER_NAME = "Nexus Authentication Service"
VERSION = "1.0.4 debug"
# Supabase Configuration
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_KEY")
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
SYSTEM_USER = os.getenv("SYSTEM_USER")
SYSTEM_PASSWORD = os.getenv("SYSTEM_PASSWORD")
app = FastAPI()
# Routers
root_router = APIRouter()
auth_router = APIRouter(prefix="/v1/api/auth", tags=["Authentication"])
admin_router = APIRouter(prefix="/v1/api/admin", tags=["Admin"])
# Constants
TOKEN_EXPIRATION_MINUTES = 1440 # 24 hours
ACCESS_LEVELS = ["default", "member", "admin", "dev", "hush"]
# Models
class SignupRequest(BaseModel):
username: str
password: str
email: Optional[EmailStr] = None
class LoginRequest(BaseModel):
username: str
password: str
class LoginResponse(BaseModel):
user_id: str
username: str
email: Optional[EmailStr]
access_level: str
date_joined: datetime
access_token: str
token_type: str = "bearer"
class TokenResponse(BaseModel):
access_token: str
token_type: str = "bearer"
class UserResponse(BaseModel):
username: str
email: Optional[EmailStr]
access_level: str
date_joined: datetime
class UpdateUserRequest(BaseModel):
password: Optional[str] = None
email: Optional[EmailStr] = None
username: Optional[str] = None
class UpdateAccessLevelRequest(BaseModel):
access_level: str
def generate_numeric_user_id(length=10):
"""
Generates a numeric user ID with the specified length.
Args:
length: The desired length of the user ID.
Returns:
A string representing the numeric user ID.
"""
# Generate a random UUID
uuid_str = str(uuid.uuid4()).replace('-', '')
# Convert the UUID to an integer
uuid_int = int(uuid_str, 16)
# Generate a numeric ID with the specified length
numeric_id = str(uuid_int)[-length:]
return numeric_id
# Generate and print a numeric user ID
user_id = generate_numeric_user_id()
print(f"Generated User ID: {user_id}")
# Utility functions
def hash_password(password: str) -> str:
return hashlib.sha256(password.encode()).hexdigest()
def verify_password(password: str, hashed_password: str) -> bool:
return hash_password(password) == hashed_password
def create_device_token(username: str, user_agent: str) -> str:
device_info = parse(user_agent)
token_seed = f"{username}-{device_info.device.family}-{device_info.os.family}-{secrets.token_hex(16)}"
return hashlib.sha256(token_seed.encode()).hexdigest()
def is_token_expired(expiration_time: datetime) -> bool:
return datetime.now(timezone.utc) > expiration_time
# Initialize system user
async def init_system_user():
system_user_data = {
"user_id": str(uuid.uuid4()),
"username": SYSTEM_USER,
"password": hash_password(SYSTEM_PASSWORD),
"email": None,
"date_joined": datetime.now(timezone.utc).isoformat(),
"access_level": "hush"
}
# Check if system user exists
existing_user = supabase.table("users").select("*").eq("username", SYSTEM_USER).execute()
if not existing_user.data:
supabase.table("users").insert(system_user_data).execute()
@root_router.get("/", status_code=status.HTTP_200_OK)
async def root():
message = f'{SERVER_NAME} v{VERSION}'
return {"message": message}
# Authentication Routes
@auth_router.post("/signup", status_code=status.HTTP_201_CREATED)
async def signup(request: SignupRequest):
# Check if username exists
existing_user = supabase.table("users").select("*").eq("username", request.username).execute()
if existing_user.data:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Username already exists"
)
user_data = {
"user_id": int(generate_numeric_user_id(length=16)),
"username": request.username,
"password": hash_password(request.password),
"email": request.email,
"date_joined": datetime.now(timezone.utc).isoformat(),
"access_level": "default"
}
supabase.table("users").insert(user_data).execute()
return {"message": "User created successfully"}
from datetime import datetime, timedelta, timezone
@auth_router.post("/login", response_model=LoginResponse)
async def login(request: LoginRequest, user_agent: str = Header(...)):
# Query the user based on the username
user_query = supabase.table("users").select("*").eq("username", request.username).execute()
# If user not found or password verification fails, raise an error
if not user_query.data or not verify_password(request.password, user_query.data[0]["password"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials"
)
# Extract user details
user = user_query.data[0]
# Create a token and calculate expiration time in UTC
token = create_device_token(request.username, user_agent)
expiration_time = datetime.now(timezone.utc) + timedelta(minutes=TOKEN_EXPIRATION_MINUTES)
# Prepare session data with the expiration time in ISO 8601 format (UTC)
session_data = {
"user_id": user["user_id"],
"token": token,
"expires": expiration_time.isoformat(), # ISO 8601 format ensures the timezone is stored
"device": user_agent
}
# Insert the session data into the database
supabase.table("sessions").insert(session_data).execute()
# Return the login response with relevant user details
return LoginResponse(
user_id=user["user_id"],
username=user["username"],
email=user["email"],
access_level=user["access_level"],
date_joined=(user["date_joined"]),
access_token=token
)
@auth_router.post("/logout")
async def logout(user_id: str, token: str):
# Query to check if the session exists for the given user_id and token
session_query = (
supabase.table("sessions")
.select("*")
.eq("user_id", user_id)
.eq("token", token)
.execute()
)
# If session is not found, raise an unauthorized error
if not session_query.data:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Session not found or already expired"
)
# Delete the session using the composite key (user_id and token)
supabase.table("sessions").delete().eq("user_id", user_id).eq("token", token).execute()
return {"message": "Session forcefully expired"}
from datetime import datetime, timezone
@auth_router.get("/validate", response_model=TokenResponse)
async def validate_token(user_id: str, token: str, user_agent: str = Header(...)):
# Query to validate session by user_id, token, and device
session_query = (
supabase.table("sessions")
.select("*")
.eq("user_id", user_id)
.eq("token", token)
.eq("device", user_agent)
.execute()
)
# If no session found, raise unauthorized error
if not session_query.data:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token"
)
session = session_query.data[0]
# Get the current time (UTC) with timezone awareness
current_time = datetime.now(timezone.utc)
# Parse the 'expires' field from the session as an offset-aware datetime
session_expiry = datetime.fromisoformat(session["expires"])
# Check if the token has expired
if session_expiry <= current_time:
# Delete the session if expired
supabase.table("sessions").delete().eq("user_id", user_id).eq("token", token).execute()
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired"
)
return TokenResponse(access_token=token)
@auth_router.get("/search-users", response_model=List[str])
async def search_users(query: str):
users = supabase.table("users").select("username").ilike("username", f"%{query}%").execute()
return [user["username"] for user in users.data]
@auth_router.get("/get-user-id", response_model=str)
async def get_user_id(username: str):
user_query = supabase.table("users").select("user_id").eq("username", username).execute()
if not user_query.data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Username not found"
)
return user_query.data[0]["user_id"]
# Admin Routes
@admin_router.get("/users", response_model=List[UserResponse])
async def get_all_users(user_id: str, token: str, user_agent: str = Header(...)):
await validate_token(user_id, token, user_agent)
admin_query = supabase.table("users").select("access_level").eq("user_id", user_id).execute()
if not admin_query.data or admin_query.data[0]["access_level"] != "hush":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions"
)
users = supabase.table("users").select("*").execute()
return [
UserResponse(
username=user["username"],
email=user["email"],
access_level=user["access_level"],
date_joined=datetime.fromisoformat(user["date_joined"])
)
for user in users.data
]
@admin_router.get("/user/{user_id}", response_model=UserResponse)
async def get_user(admin_id: str, token: str, user_id: str, user_agent: str = Header(...)):
await validate_token(admin_id, token, user_agent)
admin_query = supabase.table("users").select("access_level").eq("user_id", admin_id).execute()
if not admin_query.data or admin_query.data[0]["access_level"] != "hush":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions"
)
user_query = supabase.table("users").select("*").eq("user_id", user_id).execute()
if not user_query.data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
)
user = user_query.data[0]
return UserResponse(
username=user["username"],
email=user["email"],
access_level=user["access_level"],
date_joined=datetime.fromisoformat(user["date_joined"])
)
@admin_router.put("/user/{user_id}", response_model=UserResponse)
async def update_user(admin_id: str, token: str, user_id: str, request: UpdateUserRequest, user_agent: str = Header(...)):
await validate_token(admin_id, token, user_agent)
admin_query = supabase.table("users").select("access_level").eq("user_id", admin_id).execute()
if not admin_query.data or admin_query.data[0]["access_level"] != "hush":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions"
)
update_data = {}
if request.password:
update_data["password"] = hash_password(request.password)
if request.email:
update_data["email"] = request.email
if request.username:
update_data["username"] = request.username
updated_user = supabase.table("users").update(update_data).eq("user_id", user_id).execute()
if not updated_user.data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
)
user = updated_user.data[0]
return UserResponse(
username=user["username"],
email=user["email"],
access_level=user["access_level"],
date_joined=datetime.fromisoformat(user["date_joined"])
)
@admin_router.put("/user/{user_id}/access-level")
async def update_access_level(admin_id: str, token: str, user_id: str, request: UpdateAccessLevelRequest, user_agent: str = Header(...)):
await validate_token(admin_id, token, user_agent)
admin_query = supabase.table("users").select("access_level").eq("user_id", admin_id).execute()
if not admin_query.data or admin_query.data[0]["access_level"] != "hush":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions"
)
user_query = supabase.table("users").select("*").eq("user_id", user_id).execute()
if not user_query.data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
)
user = user_query.data[0]
new_access_level = request.access_level
if ACCESS_LEVELS.index(new_access_level) <= ACCESS_LEVELS.index(user["access_level"]):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot downgrade a user or change to the same level",
)
updated_user = supabase.table("users").update({"access_level": new_access_level}).eq("user_id", user_id).execute()
user = updated_user.data[0]
return UserResponse(
username=user["username"],
email=user["email"],
access_level=user["access_level"],
date_joined=datetime.fromisoformat(user["date_joined"])
)
@auth_router.put("/user/update", response_model=UserResponse)
async def update_own_data(user_id: str, request: UpdateUserRequest, token: str = Header(...), user_agent: str = Header(...)):
await validate_token(user_id, token, user_agent)
update_data = {}
if request.password:
update_data["password"] = hash_password(request.password)
if request.email:
update_data["email"] = request.email
if request.username:
update_data["username"] = request.username
updated_user = supabase.table("users").update(update_data).eq("user_id", user_id).execute()
if not updated_user.data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
)
user = updated_user.data[0]
return UserResponse(
username=user["username"],
email=user["email"],
access_level=user["access_level"],
date_joined=datetime.fromisoformat(user["date_joined"])
)
# Include routes
app.include_router(root_router)
app.include_router(auth_router)
app.include_router(admin_router)
# Initialize system user on startup
@app.on_event("startup")
async def startup_event():
await init_system_user()