Auth.Nexus / main.py
ChandimaPrabath's picture
init
08a1fe6
raw
history blame
11.7 kB
import os
from fastapi import FastAPI, HTTPException, Header, status, APIRouter
from pydantic import BaseModel, EmailStr
from typing import Dict, List
from datetime import datetime, timedelta, timezone
import secrets
import threading
import time
import json
import hashlib
import uuid
from user_agents import parse
from dotenv import load_dotenv
load_dotenv()
SYSTEM_USER = os.getenv("SYSTEM_USER")
SYSTEM_PASSWORD = os.getenv("SYSTEM_PASSWORD")
app = FastAPI()
# Routers
auth_router = APIRouter(prefix="/v1/api/auth", tags=["Authentication"])
admin_router = APIRouter(prefix="/v1/api/admin", tags=["Admin"])
# Simulated in-memory databases
users_db: Dict[str, dict] = {}
sessions_db: Dict[str, List[dict]] = {}
# Debug Mode
DEBUG = True
DB_FILE_USERS = "users_db.json"
DB_FILE_SESSIONS = "sessions_db.json"
if DEBUG:
try:
with open(DB_FILE_USERS, "r") as f:
users_db = json.load(f)
except FileNotFoundError:
users_db = {}
try:
with open(DB_FILE_SESSIONS, "r") as f:
sessions_db = json.load(f)
except FileNotFoundError:
sessions_db = {}
# Constants
TOKEN_EXPIRATION_MINUTES = 60
ACCESS_LEVELS = ["default", "member", "admin", "dev", "hush"]
# Models
class SignupRequest(BaseModel):
username: str
password: str
email: EmailStr = None # Make email optional
class LoginRequest(BaseModel):
username: str
password: str
class TokenResponse(BaseModel):
access_token: str
token_type: str = "bearer"
class UserResponse(BaseModel):
username: str
email: EmailStr
access_level: str
date_joined: datetime
class UpdateUserRequest(BaseModel):
password: str = None
email: EmailStr = None
username: str = None
class UpdateAccessLevelRequest(BaseModel):
access_level: str
# Utility functions
def hash_password(password: str) -> str:
return hashlib.sha256(password.encode()).hexdigest()
def verify_password(password: str, hashed_password: str) -> bool:
return hash_password(password) == hashed_password
def create_device_token(username: str, user_agent: str) -> str:
device_info = parse(user_agent)
token_seed = f"{username}-{device_info.device.family}-{device_info.os.family}-{secrets.token_hex(16)}"
return hashlib.sha256(token_seed.encode()).hexdigest()
def is_token_expired(expiration_time: datetime) -> bool:
return datetime.now(timezone.utc) > expiration_time
def save_databases():
if DEBUG:
with open(DB_FILE_USERS, "w") as f:
json.dump(users_db, f, default=str)
with open(DB_FILE_SESSIONS, "w") as f:
json.dump(sessions_db, f, default=str)
# Background task for cleaning up expired sessions
def cleanup_expired_sessions():
while True:
now = datetime.now(timezone.utc)
for user_id, sessions in list(sessions_db.items()):
sessions_db[user_id] = [
session for session in sessions if session["expires"] > now
]
if not sessions_db[user_id]: # Remove user if no active sessions
del sessions_db[user_id]
save_databases()
time.sleep(60) # Run cleanup every 60 seconds
# Set timezone
now = datetime.now(timezone.utc)
print(f"Server starting at: {now.astimezone(timezone(timedelta(hours=5, minutes=30)))} (Sri Lankan Time)")
# Start the background cleanup task
cleanup_thread = threading.Thread(target=cleanup_expired_sessions, daemon=True)
cleanup_thread.start()
# Create system hush user
SYSTEM_USER_ID = str(uuid.uuid4())
users_db[SYSTEM_USER_ID] = {
"username": SYSTEM_USER,
"password": hash_password(SYSTEM_PASSWORD),
"email": None,
"date_joined": datetime.now(timezone.utc),
"access_level": "hush",
}
save_databases()
# Authentication Routes
@auth_router.post("/signup", status_code=status.HTTP_201_CREATED)
def signup(request: SignupRequest):
for user in users_db.values():
if user["username"] == request.username:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Username already exists"
)
user_id = str(uuid.uuid4())
date_joined = datetime.now(timezone.utc)
# Auto apply default access level
users_db[user_id] = {
"username": request.username,
"password": hash_password(request.password),
"email": request.email,
"date_joined": date_joined,
"access_level": "default", # Default access level
}
save_databases()
return {"message": "User created successfully"}
@auth_router.post("/login", response_model=TokenResponse)
def login(request: LoginRequest, user_agent: str = Header(...)):
user_id = next((uid for uid, user in users_db.items() if user["username"] == request.username), None)
if not user_id or not verify_password(request.password, users_db[user_id]["password"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials"
)
# Generate a new device-specific session token
token = create_device_token(request.username, user_agent)
expiration_time = datetime.now(timezone.utc) + timedelta(minutes=TOKEN_EXPIRATION_MINUTES)
# Add the session
if user_id not in sessions_db:
sessions_db[user_id] = []
sessions_db[user_id].append({"token": token, "expires": expiration_time, "device": user_agent})
save_databases()
return TokenResponse(access_token=token)
@auth_router.post("/logout")
def logout(user_id: str, token: str):
sessions = sessions_db.get(user_id)
if not sessions:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="No active sessions"
)
sessions_db[user_id] = [s for s in sessions if s["token"] != token]
if not sessions_db[user_id]:
del sessions_db[user_id]
save_databases()
return {"message": "Session forcefully expired"}
@auth_router.get("/validate", response_model=TokenResponse)
def validate_token(user_id: str, token: str, user_agent: str = Header(...)):
sessions = sessions_db.get(user_id)
if not sessions:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="No active sessions"
)
for session in sessions:
if session["token"] == token:
if session["device"] != user_agent:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Device mismatch"
)
if is_token_expired(session["expires"]):
sessions.remove(session)
if not sessions:
del sessions_db[user_id]
save_databases()
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired"
)
return TokenResponse(access_token=token)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token"
)
# Admin Routes
@admin_router.get("/users", response_model=List[UserResponse])
def get_all_users(user_id: str, token: str, user_agent: str = Header(...)):
validate_token(user_id, token, user_agent)
if users_db[user_id]["access_level"] != "hush":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions"
)
return [
{
"username": data["username"],
"email": data["email"],
"access_level": data["access_level"],
"date_joined": data["date_joined"],
}
for data in users_db.values()
]
@admin_router.get("/user/{user_id}", response_model=UserResponse)
def get_user(admin_id: str, token: str, user_id: str, user_agent: str = Header(...)):
validate_token(admin_id, token, user_agent)
if users_db[admin_id]["access_level"] != "hush":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions"
)
user = users_db.get(user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
)
return {
"username": user["username"],
"email": user["email"],
"access_level": user["access_level"],
"date_joined": user["date_joined"],
}
@admin_router.put("/user/{user_id}", response_model=UserResponse)
def update_user(admin_id: str, token: str, user_id: str, request: UpdateUserRequest, user_agent: str = Header(...)):
validate_token(admin_id, token, user_agent)
if users_db[admin_id]["access_level"] != "hush":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions"
)
user = users_db.get(user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
)
# Update only the fields provided in the request
if request.password:
user["password"] = hash_password(request.password)
if request.email:
user["email"] = request.email
if request.username:
user["username"] = request.username
users_db[user_id] = user
save_databases()
return {
"username": user["username"],
"email": user["email"],
"access_level": user["access_level"],
"date_joined": user["date_joined"],
}
@admin_router.put("/user/{user_id}/access-level")
def update_access_level(admin_id: str, token: str, user_id: str, request: UpdateAccessLevelRequest, user_agent: str = Header(...)):
validate_token(admin_id, token, user_agent)
if users_db[admin_id]["access_level"] != "hush":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions"
)
if user_id not in users_db:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
)
user = users_db.get(user_id)
new_access_level = request.access_level
# Check if the user has the necessary access level to perform the upgrade
if ACCESS_LEVELS.index(new_access_level) <= ACCESS_LEVELS.index(user["access_level"]):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot downgrade a user or change to the same level",
)
user["access_level"] = new_access_level
users_db[user_id] = user
save_databases()
return {
"username": user["username"],
"email": user["email"],
"access_level": user["access_level"],
"date_joined": user["date_joined"],
}
# User Auth Routes
@auth_router.put("/user/update", response_model=UserResponse)
def update_own_data(user_id: str, request: UpdateUserRequest, token: str = Header(...), user_agent: str = Header(...)):
validate_token(user_id, token, user_agent)
user = users_db.get(user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
)
# Update only the fields provided in the request
if request.password:
user["password"] = hash_password(request.password)
if request.email:
user["email"] = request.email
if request.username:
user["username"] = request.username
users_db[user_id] = user
save_databases()
return {
"username": user["username"],
"email": user["email"],
"access_level": user["access_level"],
"date_joined": user["date_joined"],
}
# Include routes
app.include_router(auth_router)
app.include_router(admin_router)