nexus-services / services.py
ChandimaPrabath's picture
fix token validation
151a2fe verified
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import List
import jwt
import datetime
from dotenv import load_dotenv
import os
load_dotenv()
# FastAPI app setup
app = FastAPI()
# CORS setup to allow cross-origin requests
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allow all origins; use specific domains for security
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Secret key for JWT encoding/decoding
SECRET_KEY = os.getenv("SECRET_KEY", "secret") # Use a more secure key in production
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30 # Token expiration time in minutes
# Temporary user dictionary for storing credentials
USERS = {
"user1": "password1",
"user2": "password2",
}
# Pydantic model to define the structure of user data
class User(BaseModel):
username: str
password: str
# JWT token generation function
def create_access_token(data: dict, expires_delta: datetime.timedelta = None):
"""Create a new JWT token."""
to_encode = data.copy()
if expires_delta:
expire = datetime.datetime.utcnow() + expires_delta
else:
expire = datetime.datetime.utcnow() + datetime.timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
# JWT token verification function
def verify_access_token(token: str):
"""Verify and decode the JWT token."""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token has expired")
except jwt.DecodeError:
raise HTTPException(status_code=401, detail="Token is invalid")
except Exception as e:
# Catch any other exception and raise a generic HTTP 400 error
raise HTTPException(status_code=400, detail=f"An error occurred: {str(e)}")
# Register a new user
@app.post("/register")
async def register_user(user: User):
"""Register a new user."""
if user.username in USERS:
raise HTTPException(status_code=400, detail="Username already exists")
USERS[user.username] = user.password
return {"message": "User registered successfully"}
# Login a user and generate a JWT session token
@app.post("/login")
async def login_user(user: User):
"""Authenticate a user and return JWT token."""
if user.username not in USERS or USERS[user.username] != user.password:
raise HTTPException(status_code=401, detail="Invalid credentials")
# Create JWT token
access_token = create_access_token(data={"sub": user.username})
return {"token": access_token}
# API to validate the session token
@app.get("/api/validate-token")
async def validate_token(token: str):
"""Validate the JWT token."""
payload = verify_access_token(token)
return {"message": "Token is valid", "username": payload.get("sub")}
# Search for users by username
@app.get("/search")
async def search_users(query: str) -> List[str]:
"""Search for users by username."""
matching_users = [username for username in USERS if query.lower() in username.lower()]
if not matching_users:
raise HTTPException(status_code=404, detail="No users found matching the query")
return matching_users
# API routes for CRUD operations
@app.get("/api/users")
async def get_all_users() -> List[str]:
"""Get a list of all users."""
return list(USERS.keys())
@app.get("/api/user/{username}")
async def get_user(username: str):
"""Get details of a specific user."""
if username not in USERS:
raise HTTPException(status_code=404, detail="User not found")
return {"username": username, "password": USERS[username]}
@app.delete("/api/user/{username}")
async def delete_user(username: str):
"""Delete a specific user."""
if username not in USERS:
raise HTTPException(status_code=404, detail="User not found")
del USERS[username]
return {"message": f"User {username} deleted successfully"}
@app.put("/api/user/{username}")
async def update_user(username: str, user: User):
"""Update a user's password."""
if username not in USERS:
raise HTTPException(status_code=404, detail="User not found")
USERS[username] = user.password
return {"message": f"User {username} password updated successfully"}
@app.get("/api/user-exists/{username}")
async def user_exists(username: str):
"""Check if a user exists based on the username."""
if username in USERS:
return {"message": f"User '{username}' exists"}
raise HTTPException(status_code=404, detail="User not found")
# Global error handling middleware
@app.middleware("http")
async def custom_error_handler(request: Request, call_next):
try:
response = await call_next(request)
return response
except HTTPException as exc:
return JSONResponse(
status_code=exc.status_code,
content={"detail": exc.detail}
)
except Exception as e:
return JSONResponse(
status_code=500,
content={"detail": "Internal Server Error"}
)