alho94 commited on
Commit
3d963b9
·
verified ·
1 Parent(s): ef05b24

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +32 -27
app.py CHANGED
@@ -5,6 +5,25 @@ import bcrypt
5
  from fastapi import FastAPI, HTTPException, Depends, Response, Request
6
  from fastapi.middleware.cors import CORSMiddleware
7
  from dotenv import load_dotenv
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8
 
9
  # Load environment variables
10
  load_dotenv()
@@ -13,21 +32,16 @@ load_dotenv()
13
  SECRET_KEY = os.getenv("SECRET_KEY")
14
  TOKEN_EXPIRATION_MINUTES = int(os.getenv("TOKEN_EXPIRATION_MINUTES", 30))
15
  REFRESH_TOKEN_EXPIRATION_DAYS = int(os.getenv("REFRESH_TOKEN_EXPIRATION_DAYS", 7))
16
- ALLOWED_ORIGIN = os.getenv("ALLOWED_ORIGIN")
17
 
18
-
19
-
20
- # Load environment variables
21
  hashed_password = os.getenv("DUMMY_USER_KEY")
22
-
23
  if hashed_password:
24
  # Ensure it's stored as a hashed password (not plain text)
25
  hashed_password = bcrypt.hashpw(hashed_password.encode(), bcrypt.gensalt()).decode()
26
 
27
  # Fake database of API keys (hashed)
28
- API_KEYS_DB = {
29
- "user1": hashed_password
30
- }
31
 
32
  def verify_api_key(api_key: str) -> bool:
33
  """Check if the provided API key is valid."""
@@ -36,53 +50,44 @@ def verify_api_key(api_key: str) -> bool:
36
  return True
37
  return False
38
 
39
- app = FastAPI()
40
-
41
  # Configure CORS for security (allow only trusted frontend)
42
  app.add_middleware(
43
  CORSMiddleware,
44
- allow_origins=[ALLOWED_ORIGIN],
45
  allow_credentials=True,
46
  allow_methods=["GET", "POST"],
47
  allow_headers=["Authorization", "Content-Type"],
48
  )
49
 
50
- def create_jwt_token(expiration_minutes: int):
51
- """Generate JWT Token."""
52
  expiration = datetime.datetime.utcnow() + datetime.timedelta(minutes=expiration_minutes)
53
- payload = {"exp": expiration}
54
  return jwt.encode(payload, SECRET_KEY, algorithm="HS256")
55
 
56
  def verify_jwt_token(token: str):
57
  """Verify and decode JWT token."""
58
  try:
59
- jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
60
- return True
61
  except jwt.ExpiredSignatureError:
62
  raise HTTPException(status_code=401, detail="Token expired")
63
  except jwt.InvalidTokenError:
64
  raise HTTPException(status_code=401, detail="Invalid token")
65
 
66
- def verify_api_key(api_key: str) -> bool:
67
- """Check if the provided API key is valid."""
68
- for hashed_key in API_KEYS_DB.values():
69
- if bcrypt.checkpw(api_key.encode(), hashed_key.encode()):
70
- return True
71
- return False
72
-
73
  @app.get("/sample-data")
74
  def get_sample_data():
75
  """Public endpoint: No token required"""
76
  return {"message": "This is public sample data."}
77
 
78
  @app.post("/login")
79
- def login(response: Response, api_key: str):
 
80
  """User must provide a valid API key to obtain JWT tokens."""
81
  if not verify_api_key(api_key):
82
  raise HTTPException(status_code=403, detail="Invalid API key")
83
 
84
- access_token = create_jwt_token(TOKEN_EXPIRATION_MINUTES)
85
- refresh_token = create_jwt_token(REFRESH_TOKEN_EXPIRATION_DAYS * 24 * 60)
86
 
87
  # Secure HTTP-only cookies (prevent XSS)
88
  response.set_cookie(
@@ -119,7 +124,7 @@ def refresh_token(request: Request, response: Response):
119
  raise HTTPException(status_code=401, detail="Invalid refresh token")
120
 
121
  # Issue new access token
122
- new_access_token = create_jwt_token(TOKEN_EXPIRATION_MINUTES)
123
  response.set_cookie(
124
  key="access_token",
125
  value=new_access_token,
 
5
  from fastapi import FastAPI, HTTPException, Depends, Response, Request
6
  from fastapi.middleware.cors import CORSMiddleware
7
  from dotenv import load_dotenv
8
+ from slowapi import Limiter
9
+ from slowapi.util import get_remote_address
10
+ from slowapi.errors import RateLimitExceeded
11
+ from starlette.responses import JSONResponse
12
+
13
+ # Initialize FastAPI app
14
+ app = FastAPI()
15
+
16
+ # Initialize rate limiter
17
+ limiter = Limiter(key_func=get_remote_address)
18
+ app.state.limiter = limiter
19
+
20
+ # Custom error handler for too many requests
21
+ @app.exception_handler(RateLimitExceeded)
22
+ async def ratelimit_handler(request: Request, exc: RateLimitExceeded):
23
+ return JSONResponse(
24
+ {"detail": "Too many login attempts. Try again later."},
25
+ status_code=429
26
+ )
27
 
28
  # Load environment variables
29
  load_dotenv()
 
32
  SECRET_KEY = os.getenv("SECRET_KEY")
33
  TOKEN_EXPIRATION_MINUTES = int(os.getenv("TOKEN_EXPIRATION_MINUTES", 30))
34
  REFRESH_TOKEN_EXPIRATION_DAYS = int(os.getenv("REFRESH_TOKEN_EXPIRATION_DAYS", 7))
35
+ ALLOWED_ORIGIN = os.getenv("ALLOWED_ORIGIN", "*").split(',') # Ensure it's a list
36
 
37
+ # Load dummy user API key
 
 
38
  hashed_password = os.getenv("DUMMY_USER_KEY")
 
39
  if hashed_password:
40
  # Ensure it's stored as a hashed password (not plain text)
41
  hashed_password = bcrypt.hashpw(hashed_password.encode(), bcrypt.gensalt()).decode()
42
 
43
  # Fake database of API keys (hashed)
44
+ API_KEYS_DB = {"user1": hashed_password} if hashed_password else {}
 
 
45
 
46
  def verify_api_key(api_key: str) -> bool:
47
  """Check if the provided API key is valid."""
 
50
  return True
51
  return False
52
 
 
 
53
  # Configure CORS for security (allow only trusted frontend)
54
  app.add_middleware(
55
  CORSMiddleware,
56
+ allow_origins=ALLOWED_ORIGIN,
57
  allow_credentials=True,
58
  allow_methods=["GET", "POST"],
59
  allow_headers=["Authorization", "Content-Type"],
60
  )
61
 
62
+ def create_jwt_token(user_id: str, expiration_minutes: int):
63
+ """Generate JWT Token with user ID."""
64
  expiration = datetime.datetime.utcnow() + datetime.timedelta(minutes=expiration_minutes)
65
+ payload = {"sub": user_id, "exp": expiration}
66
  return jwt.encode(payload, SECRET_KEY, algorithm="HS256")
67
 
68
  def verify_jwt_token(token: str):
69
  """Verify and decode JWT token."""
70
  try:
71
+ return jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
 
72
  except jwt.ExpiredSignatureError:
73
  raise HTTPException(status_code=401, detail="Token expired")
74
  except jwt.InvalidTokenError:
75
  raise HTTPException(status_code=401, detail="Invalid token")
76
 
 
 
 
 
 
 
 
77
  @app.get("/sample-data")
78
  def get_sample_data():
79
  """Public endpoint: No token required"""
80
  return {"message": "This is public sample data."}
81
 
82
  @app.post("/login")
83
+ @limiter.limit("5/minute") # Allow max 5 login attempts per minute
84
+ def login(request: Request, response: Response, api_key: str):
85
  """User must provide a valid API key to obtain JWT tokens."""
86
  if not verify_api_key(api_key):
87
  raise HTTPException(status_code=403, detail="Invalid API key")
88
 
89
+ access_token = create_jwt_token("user1", TOKEN_EXPIRATION_MINUTES)
90
+ refresh_token = create_jwt_token("user1", REFRESH_TOKEN_EXPIRATION_DAYS * 24 * 60)
91
 
92
  # Secure HTTP-only cookies (prevent XSS)
93
  response.set_cookie(
 
124
  raise HTTPException(status_code=401, detail="Invalid refresh token")
125
 
126
  # Issue new access token
127
+ new_access_token = create_jwt_token("user1", TOKEN_EXPIRATION_MINUTES)
128
  response.set_cookie(
129
  key="access_token",
130
  value=new_access_token,