ChandimaPrabath commited on
Commit
81f6e16
·
verified ·
1 Parent(s): 4af01b9

session token test

Browse files
Files changed (1) hide show
  1. services.py +52 -6
services.py CHANGED
@@ -2,10 +2,17 @@ from fastapi import FastAPI, HTTPException
2
  from fastapi.middleware.cors import CORSMiddleware
3
  from pydantic import BaseModel
4
  from typing import List
5
- import json
 
 
 
6
 
 
 
 
7
  app = FastAPI()
8
 
 
9
  app.add_middleware(
10
  CORSMiddleware,
11
  allow_origins=["*"], # Allow all origins; use specific domains for security
@@ -14,16 +21,46 @@ app.add_middleware(
14
  allow_headers=["*"],
15
  )
16
 
 
 
 
 
 
17
  # Temporary user dictionary for storing credentials
18
  USERS = {
19
  "user1": "password1",
20
  "user2": "password2",
21
  }
22
 
 
23
  class User(BaseModel):
24
  username: str
25
  password: str
26
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
27
  @app.post("/register")
28
  async def register_user(user: User):
29
  """Register a new user."""
@@ -32,24 +69,33 @@ async def register_user(user: User):
32
  USERS[user.username] = user.password
33
  return {"message": "User registered successfully"}
34
 
 
35
  @app.post("/login")
36
  async def login_user(user: User):
37
- """Authenticate a user."""
38
  if user.username not in USERS or USERS[user.username] != user.password:
39
  raise HTTPException(status_code=401, detail="Invalid credentials")
40
- return {"message": "Login successful"}
 
 
 
 
 
 
 
 
 
41
 
 
42
  @app.get("/search")
43
  async def search_users(query: str) -> List[str]:
44
  """Search for users by username."""
45
  matching_users = [username for username in USERS if query.lower() in username.lower()]
46
-
47
  if not matching_users:
48
  raise HTTPException(status_code=404, detail="No users found matching the query")
49
-
50
  return matching_users
51
 
52
- # New /api/ routes
53
  @app.get("/api/users")
54
  async def get_all_users() -> List[str]:
55
  """Get a list of all users."""
 
2
  from fastapi.middleware.cors import CORSMiddleware
3
  from pydantic import BaseModel
4
  from typing import List
5
+ import jwt
6
+ import datetime
7
+ from dotenv import load_dotenv
8
+ import os
9
 
10
+ load_dotenv()
11
+
12
+ # FastAPI app setup
13
  app = FastAPI()
14
 
15
+ # CORS setup to allow cross-origin requests
16
  app.add_middleware(
17
  CORSMiddleware,
18
  allow_origins=["*"], # Allow all origins; use specific domains for security
 
21
  allow_headers=["*"],
22
  )
23
 
24
+ # Secret key for JWT encoding/decoding
25
+ SECRET_KEY = os.getenv("SECRET_KEY", "secret") # Use a more secure key in production
26
+ ALGORITHM = "HS256"
27
+ ACCESS_TOKEN_EXPIRE_MINUTES = 30 # Token expiration time in minutes
28
+
29
  # Temporary user dictionary for storing credentials
30
  USERS = {
31
  "user1": "password1",
32
  "user2": "password2",
33
  }
34
 
35
+ # Pydantic model to define the structure of user data
36
  class User(BaseModel):
37
  username: str
38
  password: str
39
 
40
+ # JWT token generation function
41
+ def create_access_token(data: dict, expires_delta: datetime.timedelta = None):
42
+ """Create a new JWT token."""
43
+ to_encode = data.copy()
44
+ if expires_delta:
45
+ expire = datetime.datetime.utcnow() + expires_delta
46
+ else:
47
+ expire = datetime.datetime.utcnow() + datetime.timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
48
+ to_encode.update({"exp": expire})
49
+ encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
50
+ return encoded_jwt
51
+
52
+ # JWT token verification function
53
+ def verify_access_token(token: str):
54
+ """Verify and decode the JWT token."""
55
+ try:
56
+ payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
57
+ return payload
58
+ except jwt.ExpiredSignatureError:
59
+ raise HTTPException(status_code=401, detail="Token has expired")
60
+ except jwt.JWTError:
61
+ raise HTTPException(status_code=401, detail="Invalid token")
62
+
63
+ # Register a new user
64
  @app.post("/register")
65
  async def register_user(user: User):
66
  """Register a new user."""
 
69
  USERS[user.username] = user.password
70
  return {"message": "User registered successfully"}
71
 
72
+ # Login a user and generate a JWT session token
73
  @app.post("/login")
74
  async def login_user(user: User):
75
+ """Authenticate a user and return JWT token."""
76
  if user.username not in USERS or USERS[user.username] != user.password:
77
  raise HTTPException(status_code=401, detail="Invalid credentials")
78
+ # Create JWT token
79
+ access_token = create_access_token(data={"sub": user.username})
80
+ return {"access_token": access_token}
81
+
82
+ # API to validate the session token
83
+ @app.get("/api/validate-token")
84
+ async def validate_token(token: str):
85
+ """Validate the JWT token."""
86
+ payload = verify_access_token(token)
87
+ return {"message": "Token is valid", "username": payload.get("sub")}
88
 
89
+ # Search for users by username
90
  @app.get("/search")
91
  async def search_users(query: str) -> List[str]:
92
  """Search for users by username."""
93
  matching_users = [username for username in USERS if query.lower() in username.lower()]
 
94
  if not matching_users:
95
  raise HTTPException(status_code=404, detail="No users found matching the query")
 
96
  return matching_users
97
 
98
+ # New /api/ routes for CRUD operations
99
  @app.get("/api/users")
100
  async def get_all_users() -> List[str]:
101
  """Get a list of all users."""