Spaces:
Sleeping
Sleeping
| import os | |
| from fastapi import FastAPI, HTTPException, Header, status, APIRouter | |
| from pydantic import BaseModel, EmailStr | |
| from typing import Dict, List | |
| from datetime import datetime, timedelta, timezone | |
| import secrets | |
| import threading | |
| import time | |
| import json | |
| import hashlib | |
| import uuid | |
| from user_agents import parse | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| SYSTEM_USER = os.getenv("SYSTEM_USER") | |
| SYSTEM_PASSWORD = os.getenv("SYSTEM_PASSWORD") | |
| app = FastAPI() | |
| # Routers | |
| auth_router = APIRouter(prefix="/v1/api/auth", tags=["Authentication"]) | |
| admin_router = APIRouter(prefix="/v1/api/admin", tags=["Admin"]) | |
| # Simulated in-memory databases | |
| users_db: Dict[str, dict] = {} | |
| sessions_db: Dict[str, List[dict]] = {} | |
| # Debug Mode | |
| DEBUG = True | |
| DB_FILE_USERS = "users_db.json" | |
| DB_FILE_SESSIONS = "sessions_db.json" | |
| if DEBUG: | |
| try: | |
| with open(DB_FILE_USERS, "r") as f: | |
| users_db = json.load(f) | |
| except FileNotFoundError: | |
| users_db = {} | |
| try: | |
| with open(DB_FILE_SESSIONS, "r") as f: | |
| sessions_db = json.load(f) | |
| except FileNotFoundError: | |
| sessions_db = {} | |
| # Constants | |
| TOKEN_EXPIRATION_MINUTES = 60 | |
| ACCESS_LEVELS = ["default", "member", "admin", "dev", "hush"] | |
| # Models | |
| class SignupRequest(BaseModel): | |
| username: str | |
| password: str | |
| email: EmailStr = None # Make email optional | |
| class LoginRequest(BaseModel): | |
| username: str | |
| password: str | |
| class TokenResponse(BaseModel): | |
| access_token: str | |
| token_type: str = "bearer" | |
| class UserResponse(BaseModel): | |
| username: str | |
| email: EmailStr | |
| access_level: str | |
| date_joined: datetime | |
| class UpdateUserRequest(BaseModel): | |
| password: str = None | |
| email: EmailStr = None | |
| username: str = None | |
| class UpdateAccessLevelRequest(BaseModel): | |
| access_level: str | |
| # Utility functions | |
| def hash_password(password: str) -> str: | |
| return hashlib.sha256(password.encode()).hexdigest() | |
| def verify_password(password: str, hashed_password: str) -> bool: | |
| return hash_password(password) == hashed_password | |
| def create_device_token(username: str, user_agent: str) -> str: | |
| device_info = parse(user_agent) | |
| token_seed = f"{username}-{device_info.device.family}-{device_info.os.family}-{secrets.token_hex(16)}" | |
| return hashlib.sha256(token_seed.encode()).hexdigest() | |
| def is_token_expired(expiration_time: datetime) -> bool: | |
| return datetime.now(timezone.utc) > expiration_time | |
| def save_databases(): | |
| if DEBUG: | |
| with open(DB_FILE_USERS, "w") as f: | |
| json.dump(users_db, f, default=str) | |
| with open(DB_FILE_SESSIONS, "w") as f: | |
| json.dump(sessions_db, f, default=str) | |
| # Background task for cleaning up expired sessions | |
| def cleanup_expired_sessions(): | |
| while True: | |
| now = datetime.now(timezone.utc) | |
| for user_id, sessions in list(sessions_db.items()): | |
| sessions_db[user_id] = [ | |
| session for session in sessions if session["expires"] > now | |
| ] | |
| if not sessions_db[user_id]: # Remove user if no active sessions | |
| del sessions_db[user_id] | |
| save_databases() | |
| time.sleep(60) # Run cleanup every 60 seconds | |
| # Set timezone | |
| now = datetime.now(timezone.utc) | |
| print(f"Server starting at: {now.astimezone(timezone(timedelta(hours=5, minutes=30)))} (Sri Lankan Time)") | |
| # Start the background cleanup task | |
| cleanup_thread = threading.Thread(target=cleanup_expired_sessions, daemon=True) | |
| cleanup_thread.start() | |
| # Create system hush user | |
| SYSTEM_USER_ID = str(uuid.uuid4()) | |
| users_db[SYSTEM_USER_ID] = { | |
| "username": SYSTEM_USER, | |
| "password": hash_password(SYSTEM_PASSWORD), | |
| "email": None, | |
| "date_joined": datetime.now(timezone.utc), | |
| "access_level": "hush", | |
| } | |
| save_databases() | |
| # Authentication Routes | |
| def signup(request: SignupRequest): | |
| for user in users_db.values(): | |
| if user["username"] == request.username: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, detail="Username already exists" | |
| ) | |
| user_id = str(uuid.uuid4()) | |
| date_joined = datetime.now(timezone.utc) | |
| # Auto apply default access level | |
| users_db[user_id] = { | |
| "username": request.username, | |
| "password": hash_password(request.password), | |
| "email": request.email, | |
| "date_joined": date_joined, | |
| "access_level": "default", # Default access level | |
| } | |
| save_databases() | |
| return {"message": "User created successfully"} | |
| def login(request: LoginRequest, user_agent: str = Header(...)): | |
| user_id = next((uid for uid, user in users_db.items() if user["username"] == request.username), None) | |
| if not user_id or not verify_password(request.password, users_db[user_id]["password"]): | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials" | |
| ) | |
| # Generate a new device-specific session token | |
| token = create_device_token(request.username, user_agent) | |
| expiration_time = datetime.now(timezone.utc) + timedelta(minutes=TOKEN_EXPIRATION_MINUTES) | |
| # Add the session | |
| if user_id not in sessions_db: | |
| sessions_db[user_id] = [] | |
| sessions_db[user_id].append({"token": token, "expires": expiration_time, "device": user_agent}) | |
| save_databases() | |
| return TokenResponse(access_token=token) | |
| def logout(user_id: str, token: str): | |
| sessions = sessions_db.get(user_id) | |
| if not sessions: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, detail="No active sessions" | |
| ) | |
| sessions_db[user_id] = [s for s in sessions if s["token"] != token] | |
| if not sessions_db[user_id]: | |
| del sessions_db[user_id] | |
| save_databases() | |
| return {"message": "Session forcefully expired"} | |
| def validate_token(user_id: str, token: str, user_agent: str = Header(...)): | |
| sessions = sessions_db.get(user_id) | |
| if not sessions: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, detail="No active sessions" | |
| ) | |
| for session in sessions: | |
| if session["token"] == token: | |
| if session["device"] != user_agent: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, detail="Device mismatch" | |
| ) | |
| if is_token_expired(session["expires"]): | |
| sessions.remove(session) | |
| if not sessions: | |
| del sessions_db[user_id] | |
| save_databases() | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired" | |
| ) | |
| return TokenResponse(access_token=token) | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token" | |
| ) | |
| # Admin Routes | |
| def get_all_users(user_id: str, token: str, user_agent: str = Header(...)): | |
| validate_token(user_id, token, user_agent) | |
| if users_db[user_id]["access_level"] != "hush": | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions" | |
| ) | |
| return [ | |
| { | |
| "username": data["username"], | |
| "email": data["email"], | |
| "access_level": data["access_level"], | |
| "date_joined": data["date_joined"], | |
| } | |
| for data in users_db.values() | |
| ] | |
| def get_user(admin_id: str, token: str, user_id: str, user_agent: str = Header(...)): | |
| validate_token(admin_id, token, user_agent) | |
| if users_db[admin_id]["access_level"] != "hush": | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions" | |
| ) | |
| user = users_db.get(user_id) | |
| if not user: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, detail="User not found" | |
| ) | |
| return { | |
| "username": user["username"], | |
| "email": user["email"], | |
| "access_level": user["access_level"], | |
| "date_joined": user["date_joined"], | |
| } | |
| def update_user(admin_id: str, token: str, user_id: str, request: UpdateUserRequest, user_agent: str = Header(...)): | |
| validate_token(admin_id, token, user_agent) | |
| if users_db[admin_id]["access_level"] != "hush": | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions" | |
| ) | |
| user = users_db.get(user_id) | |
| if not user: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, detail="User not found" | |
| ) | |
| # Update only the fields provided in the request | |
| if request.password: | |
| user["password"] = hash_password(request.password) | |
| if request.email: | |
| user["email"] = request.email | |
| if request.username: | |
| user["username"] = request.username | |
| users_db[user_id] = user | |
| save_databases() | |
| return { | |
| "username": user["username"], | |
| "email": user["email"], | |
| "access_level": user["access_level"], | |
| "date_joined": user["date_joined"], | |
| } | |
| def update_access_level(admin_id: str, token: str, user_id: str, request: UpdateAccessLevelRequest, user_agent: str = Header(...)): | |
| validate_token(admin_id, token, user_agent) | |
| if users_db[admin_id]["access_level"] != "hush": | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions" | |
| ) | |
| if user_id not in users_db: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, detail="User not found" | |
| ) | |
| user = users_db.get(user_id) | |
| new_access_level = request.access_level | |
| # Check if the user has the necessary access level to perform the upgrade | |
| if ACCESS_LEVELS.index(new_access_level) <= ACCESS_LEVELS.index(user["access_level"]): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Cannot downgrade a user or change to the same level", | |
| ) | |
| user["access_level"] = new_access_level | |
| users_db[user_id] = user | |
| save_databases() | |
| return { | |
| "username": user["username"], | |
| "email": user["email"], | |
| "access_level": user["access_level"], | |
| "date_joined": user["date_joined"], | |
| } | |
| # User Auth Routes | |
| def update_own_data(user_id: str, request: UpdateUserRequest, token: str = Header(...), user_agent: str = Header(...)): | |
| validate_token(user_id, token, user_agent) | |
| user = users_db.get(user_id) | |
| if not user: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, detail="User not found" | |
| ) | |
| # Update only the fields provided in the request | |
| if request.password: | |
| user["password"] = hash_password(request.password) | |
| if request.email: | |
| user["email"] = request.email | |
| if request.username: | |
| user["username"] = request.username | |
| users_db[user_id] = user | |
| save_databases() | |
| return { | |
| "username": user["username"], | |
| "email": user["email"], | |
| "access_level": user["access_level"], | |
| "date_joined": user["date_joined"], | |
| } | |
| # Include routes | |
| app.include_router(auth_router) | |
| app.include_router(admin_router) | |