File size: 5,328 Bytes
151a2fe
f40ba91
151a2fe
f40ba91
 
81f6e16
 
 
 
f40ba91
81f6e16
 
 
f40ba91
 
81f6e16
f40ba91
 
 
 
 
 
 
 
81f6e16
 
 
 
 
f40ba91
 
 
 
 
 
81f6e16
f40ba91
 
 
 
81f6e16
 
 
 
 
 
 
 
 
151a2fe
81f6e16
 
 
 
 
 
 
 
 
151a2fe
 
 
 
 
81f6e16
 
f40ba91
 
 
 
 
 
 
 
81f6e16
f40ba91
 
81f6e16
f40ba91
 
81f6e16
 
890a508
81f6e16
 
 
 
 
 
 
f40ba91
81f6e16
f40ba91
 
 
 
 
 
 
cee1cfe
151a2fe
cee1cfe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
890a508
 
 
 
 
 
 
151a2fe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import List
import jwt
import datetime
from dotenv import load_dotenv
import os

load_dotenv()

# FastAPI app setup
app = FastAPI()

# CORS setup to allow cross-origin requests
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Allow all origins; use specific domains for security
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Secret key for JWT encoding/decoding
SECRET_KEY = os.getenv("SECRET_KEY", "secret")  # Use a more secure key in production
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30  # Token expiration time in minutes

# Temporary user dictionary for storing credentials
USERS = {
    "user1": "password1",
    "user2": "password2",
}

# Pydantic model to define the structure of user data
class User(BaseModel):
    username: str
    password: str

# JWT token generation function
def create_access_token(data: dict, expires_delta: datetime.timedelta = None):
    """Create a new JWT token."""
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.datetime.utcnow() + expires_delta
    else:
        expire = datetime.datetime.utcnow() + datetime.timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

# JWT token verification function
def verify_access_token(token: str):
    """Verify and decode the JWT token."""
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return payload
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=401, detail="Token has expired")
    except jwt.DecodeError:
        raise HTTPException(status_code=401, detail="Token is invalid")
    except Exception as e:
        # Catch any other exception and raise a generic HTTP 400 error
        raise HTTPException(status_code=400, detail=f"An error occurred: {str(e)}")

# Register a new user
@app.post("/register")
async def register_user(user: User):
    """Register a new user."""
    if user.username in USERS:
        raise HTTPException(status_code=400, detail="Username already exists")
    USERS[user.username] = user.password
    return {"message": "User registered successfully"}

# Login a user and generate a JWT session token
@app.post("/login")
async def login_user(user: User):
    """Authenticate a user and return JWT token."""
    if user.username not in USERS or USERS[user.username] != user.password:
        raise HTTPException(status_code=401, detail="Invalid credentials")
    # Create JWT token
    access_token = create_access_token(data={"sub": user.username})
    return {"token": access_token}

# API to validate the session token
@app.get("/api/validate-token")
async def validate_token(token: str):
    """Validate the JWT token."""
    payload = verify_access_token(token)
    return {"message": "Token is valid", "username": payload.get("sub")}

# Search for users by username
@app.get("/search")
async def search_users(query: str) -> List[str]:
    """Search for users by username."""
    matching_users = [username for username in USERS if query.lower() in username.lower()]
    if not matching_users:
        raise HTTPException(status_code=404, detail="No users found matching the query")
    return matching_users

# API routes for CRUD operations
@app.get("/api/users")
async def get_all_users() -> List[str]:
    """Get a list of all users."""
    return list(USERS.keys())

@app.get("/api/user/{username}")
async def get_user(username: str):
    """Get details of a specific user."""
    if username not in USERS:
        raise HTTPException(status_code=404, detail="User not found")
    return {"username": username, "password": USERS[username]}

@app.delete("/api/user/{username}")
async def delete_user(username: str):
    """Delete a specific user."""
    if username not in USERS:
        raise HTTPException(status_code=404, detail="User not found")
    del USERS[username]
    return {"message": f"User {username} deleted successfully"}

@app.put("/api/user/{username}")
async def update_user(username: str, user: User):
    """Update a user's password."""
    if username not in USERS:
        raise HTTPException(status_code=404, detail="User not found")
    USERS[username] = user.password
    return {"message": f"User {username} password updated successfully"}

@app.get("/api/user-exists/{username}")
async def user_exists(username: str):
    """Check if a user exists based on the username."""
    if username in USERS:
        return {"message": f"User '{username}' exists"}
    raise HTTPException(status_code=404, detail="User not found")

# Global error handling middleware
@app.middleware("http")
async def custom_error_handler(request: Request, call_next):
    try:
        response = await call_next(request)
        return response
    except HTTPException as exc:
        return JSONResponse(
            status_code=exc.status_code,
            content={"detail": exc.detail}
        )
    except Exception as e:
        return JSONResponse(
            status_code=500,
            content={"detail": "Internal Server Error"}
        )