Mangesh223 commited on
Commit
bbbd47a
·
verified ·
1 Parent(s): f23c26b

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +57 -108
app.py CHANGED
@@ -1,110 +1,59 @@
1
- import os
2
- from flask import Flask, jsonify, request
3
- from flask_jwt_extended import (
4
- JWTManager,
 
5
  create_access_token,
6
- create_refresh_token,
7
- jwt_required,
8
- get_jwt_identity,
9
- get_jwt
10
  )
11
- from passlib.hash import pbkdf2_sha256
12
- from dotenv import load_dotenv
13
-
14
- # Load environment variables
15
- load_dotenv()
16
-
17
- app = Flask(__name__)
18
-
19
- # Configure JWT
20
- app.config["JWT_SECRET_KEY"] = os.getenv("JWT_SECRET_KEY", "super-secret") # Change this in production!
21
- app.config["JWT_ACCESS_TOKEN_EXPIRES"] = 3600 # 1 hour
22
- app.config["JWT_REFRESH_TOKEN_EXPIRES"] = 86400 # 24 hours
23
- app.config["JWT_TOKEN_LOCATION"] = ["headers", "cookies", "json", "query_string"]
24
- app.config["JWT_COOKIE_SECURE"] = True # Only send over HTTPS
25
- app.config["JWT_COOKIE_CSRF_PROTECT"] = True # Enable CSRF protection for cookies
26
- app.config["JWT_CSRF_CHECK_FORM"] = True # Check CSRF token in form data
27
-
28
- jwt = JWTManager(app)
29
-
30
- # In-memory user storage (replace with database in production)
31
- users = {
32
- "testuser": {
33
- "username": "testuser",
34
- "password": pbkdf2_sha256.hash("testpassword"),
35
- "role": "user"
36
- }
37
- }
38
-
39
- # In-memory token blocklist (replace with database in production)
40
- blacklist = set()
41
-
42
- @jwt.token_in_blocklist_loader
43
- def check_if_token_in_blacklist(jwt_header, jwt_payload):
44
- jti = jwt_payload["jti"]
45
- return jti in blacklist
46
-
47
- @app.route("/register", methods=["POST"])
48
- def register():
49
- data = request.get_json()
50
- username = data.get("username")
51
- password = data.get("password")
52
-
53
- if not username or not password:
54
- return jsonify({"msg": "Username and password required"}), 400
55
-
56
- if username in users:
57
- return jsonify({"msg": "Username already exists"}), 400
58
-
59
- users[username] = {
60
- "username": username,
61
- "password": pbkdf2_sha256.hash(password),
62
- "role": "user"
63
- }
64
-
65
- return jsonify({"msg": "User created successfully"}), 201
66
-
67
- @app.route("/login", methods=["POST"])
68
- def login():
69
- data = request.get_json()
70
- username = data.get("username")
71
- password = data.get("password")
72
-
73
- if not username or not password:
74
- return jsonify({"msg": "Username and password required"}), 400
75
-
76
- user = users.get(username)
77
- if not user or not pbkdf2_sha256.verify(password, user["password"]):
78
- return jsonify({"msg": "Invalid credentials"}), 401
79
-
80
- access_token = create_access_token(identity=username)
81
- refresh_token = create_refresh_token(identity=username)
82
-
83
- return jsonify({
84
- "access_token": access_token,
85
- "refresh_token": refresh_token,
86
- "username": username
87
- })
88
-
89
- @app.route("/refresh", methods=["POST"])
90
- @jwt_required(refresh=True)
91
- def refresh():
92
- current_user = get_jwt_identity()
93
- new_token = create_access_token(identity=current_user)
94
- return jsonify({"access_token": new_token})
95
-
96
- @app.route("/protected", methods=["GET"])
97
- @jwt_required()
98
- def protected():
99
- current_user = get_jwt_identity()
100
- return jsonify(logged_in_as=current_user), 200
101
-
102
- @app.route("/logout", methods=["DELETE"])
103
- @jwt_required()
104
- def logout():
105
- jti = get_jwt()["jti"]
106
- blacklist.add(jti)
107
- return jsonify({"msg": "Successfully logged out"}), 200
108
-
109
- if __name__ == "__main__":
110
- app.run(ssl_context="adhoc") # Using adhoc SSL for development
 
1
+ from fastapi import FastAPI, Depends, HTTPException, status
2
+ from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
3
+ from datetime import datetime, timedelta
4
+ from auth import (
5
+ authenticate_user,
6
  create_access_token,
7
+ register_user,
8
+ ACCESS_TOKEN_EXPIRE_MINUTES,
9
+ SECRET_KEY,
 
10
  )
11
+ import jwt
12
+ from pydantic import BaseModel
13
+
14
+ app = FastAPI()
15
+
16
+ oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
17
+
18
+ class Token(BaseModel):
19
+ access_token: str
20
+ token_type: str
21
+
22
+ class User(BaseModel):
23
+ username: str
24
+
25
+ @app.post("/token", response_model=Token)
26
+ async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
27
+ user = authenticate_user(form_data.username, form_data.password)
28
+ if not user:
29
+ raise HTTPException(
30
+ status_code=status.HTTP_401_UNAUTHORIZED,
31
+ detail="Incorrect username or password",
32
+ headers={"WWW-Authenticate": "Bearer"},
33
+ )
34
+ access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
35
+ access_token = create_access_token(
36
+ data={"sub": form_data.username}, expires_delta=access_token_expires
37
+ )
38
+ return {"access_token": access_token, "token_type": "bearer"}
39
+
40
+ @app.post("/register")
41
+ async def register(username: str, password: str):
42
+ return register_user(username, password)
43
+
44
+ @app.get("/users/me", response_model=User)
45
+ async def read_users_me(token: str = Depends(oauth2_scheme)):
46
+ try:
47
+ payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
48
+ username = payload.get("sub")
49
+ if username is None:
50
+ raise HTTPException(
51
+ status_code=status.HTTP_401_UNAUTHORIZED,
52
+ detail="Invalid authentication credentials",
53
+ )
54
+ return {"username": username}
55
+ except jwt.PyJWTError:
56
+ raise HTTPException(
57
+ status_code=status.HTTP_401_UNAUTHORIZED,
58
+ detail="Invalid authentication credentials",
59
+ )