Auth.Nexus / main.py
ChandimaPrabath's picture
2 min expiring
5b4d7da
raw
history blame
12.5 kB
import os
from fastapi import FastAPI, HTTPException, Header, status, APIRouter
from pydantic import BaseModel, EmailStr
from typing import Dict, List
from datetime import datetime, timedelta, timezone
import secrets
import hashlib
import uuid
from user_agents import parse
from dotenv import load_dotenv
from supabase import create_client, Client
from typing import Optional
load_dotenv()
SERVER_NAME = "Nexus Authentication Service"
VERSION = "1.0"
# Supabase Configuration
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_KEY")
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
SYSTEM_USER = os.getenv("SYSTEM_USER")
SYSTEM_PASSWORD = os.getenv("SYSTEM_PASSWORD")
app = FastAPI()
# Routers
root_router = APIRouter()
auth_router = APIRouter(prefix="/v1/api/auth", tags=["Authentication"])
admin_router = APIRouter(prefix="/v1/api/admin", tags=["Admin"])
# Constants
TOKEN_EXPIRATION_MINUTES = 2
ACCESS_LEVELS = ["default", "member", "admin", "dev", "hush"]
# Models
class SignupRequest(BaseModel):
username: str
password: str
email: Optional[EmailStr] = None
class LoginRequest(BaseModel):
username: str
password: str
class LoginResponse(BaseModel):
user_id: str
username: str
email: Optional[EmailStr]
access_level: str
date_joined: datetime
access_token: str
token_type: str = "bearer"
class TokenResponse(BaseModel):
access_token: str
token_type: str = "bearer"
class UserResponse(BaseModel):
username: str
email: Optional[EmailStr]
access_level: str
date_joined: datetime
class UpdateUserRequest(BaseModel):
password: Optional[str] = None
email: Optional[EmailStr] = None
username: Optional[str] = None
class UpdateAccessLevelRequest(BaseModel):
access_level: str
# Utility functions
def hash_password(password: str) -> str:
return hashlib.sha256(password.encode()).hexdigest()
def verify_password(password: str, hashed_password: str) -> bool:
return hash_password(password) == hashed_password
def create_device_token(username: str, user_agent: str) -> str:
device_info = parse(user_agent)
token_seed = f"{username}-{device_info.device.family}-{device_info.os.family}-{secrets.token_hex(16)}"
return hashlib.sha256(token_seed.encode()).hexdigest()
def is_token_expired(expiration_time: datetime) -> bool:
return datetime.now(timezone.utc) > expiration_time
# Initialize system user
async def init_system_user():
system_user_data = {
"id": str(uuid.uuid4()),
"username": SYSTEM_USER,
"password": hash_password(SYSTEM_PASSWORD),
"email": None,
"date_joined": datetime.now(timezone.utc).isoformat(),
"access_level": "hush"
}
# Check if system user exists
existing_user = supabase.table("users").select("*").eq("username", SYSTEM_USER).execute()
if not existing_user.data:
supabase.table("users").insert(system_user_data).execute()
@root_router.get("/", status_code=status.HTTP_200_OK)
async def root():
message = f'{SERVER_NAME} v{VERSION}'
return {"message": message}
# Authentication Routes
@auth_router.post("/signup", status_code=status.HTTP_201_CREATED)
async def signup(request: SignupRequest):
# Check if username exists
existing_user = supabase.table("users").select("*").eq("username", request.username).execute()
if existing_user.data:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Username already exists"
)
user_data = {
"id": str(uuid.uuid4()),
"username": request.username,
"password": hash_password(request.password),
"email": request.email,
"date_joined": datetime.now(timezone.utc).isoformat(),
"access_level": "default"
}
supabase.table("users").insert(user_data).execute()
return {"message": "User created successfully"}
@auth_router.post("/login", response_model=LoginResponse)
async def login(request: LoginRequest, user_agent: str = Header(...)):
user_query = supabase.table("users").select("*").eq("username", request.username).execute()
if not user_query.data or not verify_password(request.password, user_query.data[0]["password"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials"
)
user = user_query.data[0]
token = create_device_token(request.username, user_agent)
expiration_time = datetime.now(timezone.utc) + timedelta(minutes=TOKEN_EXPIRATION_MINUTES)
session_data = {
"user_id": user["id"],
"token": token,
"expires": expiration_time.isoformat(),
"device": user_agent
}
supabase.table("sessions").insert(session_data).execute()
return LoginResponse(
user_id=user["id"],
username=user["username"],
email=user["email"],
access_level=user["access_level"],
date_joined=datetime.fromisoformat(user["date_joined"]),
access_token=token
)
@auth_router.post("/logout")
async def logout(user_id: str, token: str):
supabase.table("sessions").delete().eq("user_id", user_id).eq("token", token).execute()
return {"message": "Session forcefully expired"}
@auth_router.get("/validate", response_model=TokenResponse)
async def validate_token(user_id: str, token: str, user_agent: str = Header(...)):
session_query = (
supabase.table("sessions")
.select("*")
.eq("user_id", user_id)
.eq("token", token)
.eq("device", user_agent)
.execute()
)
if not session_query.data:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token"
)
session = session_query.data[0]
if is_token_expired(datetime.fromisoformat(session["expires"])):
supabase.table("sessions").delete().eq("id", session["id"]).execute()
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired"
)
return TokenResponse(access_token=token)
@auth_router.get("/search-users", response_model=List[str])
async def search_users(query: str):
users = supabase.table("users").select("username").ilike("username", f"%{query}%").execute()
return [user["username"] for user in users.data]
@auth_router.get("/get-user-id", response_model=str)
async def get_user_id(username: str):
user_query = supabase.table("users").select("id").eq("username", username).execute()
if not user_query.data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Username not found"
)
return user_query.data[0]["id"]
# Admin Routes
@admin_router.get("/users", response_model=List[UserResponse])
async def get_all_users(user_id: str, token: str, user_agent: str = Header(...)):
await validate_token(user_id, token, user_agent)
admin_query = supabase.table("users").select("access_level").eq("id", user_id).execute()
if not admin_query.data or admin_query.data[0]["access_level"] != "hush":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions"
)
users = supabase.table("users").select("*").execute()
return [
UserResponse(
username=user["username"],
email=user["email"],
access_level=user["access_level"],
date_joined=datetime.fromisoformat(user["date_joined"])
)
for user in users.data
]
@admin_router.get("/user/{user_id}", response_model=UserResponse)
async def get_user(admin_id: str, token: str, user_id: str, user_agent: str = Header(...)):
await validate_token(admin_id, token, user_agent)
admin_query = supabase.table("users").select("access_level").eq("id", admin_id).execute()
if not admin_query.data or admin_query.data[0]["access_level"] != "hush":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions"
)
user_query = supabase.table("users").select("*").eq("id", user_id).execute()
if not user_query.data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
)
user = user_query.data[0]
return UserResponse(
username=user["username"],
email=user["email"],
access_level=user["access_level"],
date_joined=datetime.fromisoformat(user["date_joined"])
)
@admin_router.put("/user/{user_id}", response_model=UserResponse)
async def update_user(admin_id: str, token: str, user_id: str, request: UpdateUserRequest, user_agent: str = Header(...)):
await validate_token(admin_id, token, user_agent)
admin_query = supabase.table("users").select("access_level").eq("id", admin_id).execute()
if not admin_query.data or admin_query.data[0]["access_level"] != "hush":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions"
)
update_data = {}
if request.password:
update_data["password"] = hash_password(request.password)
if request.email:
update_data["email"] = request.email
if request.username:
update_data["username"] = request.username
updated_user = supabase.table("users").update(update_data).eq("id", user_id).execute()
if not updated_user.data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
)
user = updated_user.data[0]
return UserResponse(
username=user["username"],
email=user["email"],
access_level=user["access_level"],
date_joined=datetime.fromisoformat(user["date_joined"])
)
@admin_router.put("/user/{user_id}/access-level")
async def update_access_level(admin_id: str, token: str, user_id: str, request: UpdateAccessLevelRequest, user_agent: str = Header(...)):
await validate_token(admin_id, token, user_agent)
admin_query = supabase.table("users").select("access_level").eq("id", admin_id).execute()
if not admin_query.data or admin_query.data[0]["access_level"] != "hush":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions"
)
user_query = supabase.table("users").select("*").eq("id", user_id).execute()
if not user_query.data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
)
user = user_query.data[0]
new_access_level = request.access_level
if ACCESS_LEVELS.index(new_access_level) <= ACCESS_LEVELS.index(user["access_level"]):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot downgrade a user or change to the same level",
)
updated_user = supabase.table("users").update({"access_level": new_access_level}).eq("id", user_id).execute()
user = updated_user.data[0]
return UserResponse(
username=user["username"],
email=user["email"],
access_level=user["access_level"],
date_joined=datetime.fromisoformat(user["date_joined"])
)
@auth_router.put("/user/update", response_model=UserResponse)
async def update_own_data(user_id: str, request: UpdateUserRequest, token: str = Header(...), user_agent: str = Header(...)):
await validate_token(user_id, token, user_agent)
update_data = {}
if request.password:
update_data["password"] = hash_password(request.password)
if request.email:
update_data["email"] = request.email
if request.username:
update_data["username"] = request.username
updated_user = supabase.table("users").update(update_data).eq("id", user_id).execute()
if not updated_user.data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
)
user = updated_user.data[0]
return UserResponse(
username=user["username"],
email=user["email"],
access_level=user["access_level"],
date_joined=datetime.fromisoformat(user["date_joined"])
)
# Include routes
app.include_router(root_router)
app.include_router(auth_router)
app.include_router(admin_router)
# Initialize system user on startup
@app.on_event("startup")
async def startup_event():
await init_system_user()