ChandimaPrabath commited on
Commit
151a2fe
·
verified ·
1 Parent(s): 890a508

fix token validation

Browse files
Files changed (1) hide show
  1. services.py +26 -9
services.py CHANGED
@@ -1,5 +1,6 @@
1
- from fastapi import FastAPI, HTTPException
2
  from fastapi.middleware.cors import CORSMiddleware
 
3
  from pydantic import BaseModel
4
  from typing import List
5
  import jwt
@@ -46,9 +47,7 @@ def create_access_token(data: dict, expires_delta: datetime.timedelta = None):
46
  else:
47
  expire = datetime.datetime.utcnow() + datetime.timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
48
  to_encode.update({"exp": expire})
49
- encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
50
- print("encoded jwt", encoded_jwt)
51
- return encoded_jwt
52
 
53
  # JWT token verification function
54
  def verify_access_token(token: str):
@@ -58,8 +57,11 @@ def verify_access_token(token: str):
58
  return payload
59
  except jwt.ExpiredSignatureError:
60
  raise HTTPException(status_code=401, detail="Token has expired")
61
- except jwt.JWTError:
62
- raise HTTPException(status_code=401, detail="Invalid token")
 
 
 
63
 
64
  # Register a new user
65
  @app.post("/register")
@@ -78,7 +80,6 @@ async def login_user(user: User):
78
  raise HTTPException(status_code=401, detail="Invalid credentials")
79
  # Create JWT token
80
  access_token = create_access_token(data={"sub": user.username})
81
- print("access token", access_token)
82
  return {"token": access_token}
83
 
84
  # API to validate the session token
@@ -97,7 +98,7 @@ async def search_users(query: str) -> List[str]:
97
  raise HTTPException(status_code=404, detail="No users found matching the query")
98
  return matching_users
99
 
100
- # New /api/ routes for CRUD operations
101
  @app.get("/api/users")
102
  async def get_all_users() -> List[str]:
103
  """Get a list of all users."""
@@ -126,10 +127,26 @@ async def update_user(username: str, user: User):
126
  USERS[username] = user.password
127
  return {"message": f"User {username} password updated successfully"}
128
 
129
- # New route to check if a user exists by username
130
  @app.get("/api/user-exists/{username}")
131
  async def user_exists(username: str):
132
  """Check if a user exists based on the username."""
133
  if username in USERS:
134
  return {"message": f"User '{username}' exists"}
135
  raise HTTPException(status_code=404, detail="User not found")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import FastAPI, HTTPException, Request
2
  from fastapi.middleware.cors import CORSMiddleware
3
+ from fastapi.responses import JSONResponse
4
  from pydantic import BaseModel
5
  from typing import List
6
  import jwt
 
47
  else:
48
  expire = datetime.datetime.utcnow() + datetime.timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
49
  to_encode.update({"exp": expire})
50
+ return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
 
 
51
 
52
  # JWT token verification function
53
  def verify_access_token(token: str):
 
57
  return payload
58
  except jwt.ExpiredSignatureError:
59
  raise HTTPException(status_code=401, detail="Token has expired")
60
+ except jwt.DecodeError:
61
+ raise HTTPException(status_code=401, detail="Token is invalid")
62
+ except Exception as e:
63
+ # Catch any other exception and raise a generic HTTP 400 error
64
+ raise HTTPException(status_code=400, detail=f"An error occurred: {str(e)}")
65
 
66
  # Register a new user
67
  @app.post("/register")
 
80
  raise HTTPException(status_code=401, detail="Invalid credentials")
81
  # Create JWT token
82
  access_token = create_access_token(data={"sub": user.username})
 
83
  return {"token": access_token}
84
 
85
  # API to validate the session token
 
98
  raise HTTPException(status_code=404, detail="No users found matching the query")
99
  return matching_users
100
 
101
+ # API routes for CRUD operations
102
  @app.get("/api/users")
103
  async def get_all_users() -> List[str]:
104
  """Get a list of all users."""
 
127
  USERS[username] = user.password
128
  return {"message": f"User {username} password updated successfully"}
129
 
 
130
  @app.get("/api/user-exists/{username}")
131
  async def user_exists(username: str):
132
  """Check if a user exists based on the username."""
133
  if username in USERS:
134
  return {"message": f"User '{username}' exists"}
135
  raise HTTPException(status_code=404, detail="User not found")
136
+
137
+ # Global error handling middleware
138
+ @app.middleware("http")
139
+ async def custom_error_handler(request: Request, call_next):
140
+ try:
141
+ response = await call_next(request)
142
+ return response
143
+ except HTTPException as exc:
144
+ return JSONResponse(
145
+ status_code=exc.status_code,
146
+ content={"detail": exc.detail}
147
+ )
148
+ except Exception as e:
149
+ return JSONResponse(
150
+ status_code=500,
151
+ content={"detail": "Internal Server Error"}
152
+ )