Mangesh223 commited on
Commit
cb9d24d
·
verified ·
1 Parent(s): d6a01f4

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +50 -63
app.py CHANGED
@@ -1,68 +1,55 @@
1
- from fastapi import FastAPI, Depends, HTTPException, status
2
- from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
3
- from datetime import datetime, timedelta
4
- from jose import jwt
5
- from pydantic import BaseModel
6
- import os
7
-
8
- app = FastAPI(title="JWT Auth API")
9
-
10
- # Configuration
11
- SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key-keep-it-safe")
12
- ALGORITHM = "HS256"
13
- ACCESS_TOKEN_EXPIRE_MINUTES = 30
14
-
15
- # Mock database
16
- fake_users_db = {
17
- "testuser": {
18
- "username": "testuser",
19
- "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", # password = "secret"
20
- }
21
- }
22
-
23
- class Token(BaseModel):
24
- access_token: str
25
- token_type: str
26
-
27
- class User(BaseModel):
28
- username: str
29
-
30
- def create_access_token(data: dict, expires_delta: timedelta = None):
31
- to_encode = data.copy()
32
- if expires_delta:
33
- expire = datetime.utcnow() + expires_delta
34
- else:
35
- expire = datetime.utcnow() + timedelta(minutes=15)
36
- to_encode.update({"exp": expire})
37
- encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
38
- return encoded_jwt
39
 
40
- @app.post("/token", response_model=Token)
41
- async def login(form_data: OAuth2PasswordRequestForm = Depends()):
42
- user = fake_users_db.get(form_data.username)
43
- if not user or form_data.password != "secret": # In real app, verify hashed password
44
- raise HTTPException(
45
- status_code=status.HTTP_401_UNAUTHORIZED,
46
- detail="Incorrect username or password",
47
- )
48
  access_token = create_access_token(
49
- data={"sub": form_data.username},
50
- expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
51
- ) # <-- Correctly closed here
52
- return {"access_token": access_token, "token_type": "bearer"}
53
-
54
-
55
- @app.get("/users/me", response_model=User)
56
- async def read_users_me(token: str = Depends(OAuth2PasswordBearer(tokenUrl="token"))):
 
 
57
  try:
58
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
59
  username = payload.get("sub")
60
  if username is None:
61
- raise HTTPException(status_code=400, detail="Invalid token")
62
- return {"username": username}
63
- except jwt.JWTError:
64
- raise HTTPException(status_code=400, detail="Invalid token")
65
-
66
- @app.get("/")
67
- async def root():
68
- return {"message": "JWT Authentication API"}
 
 
1
+ # app.py
2
+ from flask import Flask, request, jsonify
3
+ from auth import (
4
+ authenticate_user,
5
+ create_access_token,
6
+ register_user,
7
+ ACCESS_TOKEN_EXPIRE_MINUTES,
8
+ SECRET_KEY
9
+ )
10
+ import jwt
11
+ from datetime import timedelta
12
+
13
+ app = Flask(__name__)
14
+
15
+ @app.route("/register", methods=["POST"])
16
+ def register():
17
+ data = request.get_json()
18
+ try:
19
+ result = register_user(data["username"], data["password"])
20
+ return jsonify(result), 201
21
+ except ValueError as e:
22
+ return jsonify({"detail": str(e)}), 400
23
+
24
+ @app.route("/token", methods=["POST"])
25
+ def login():
26
+ username = request.form.get("username")
27
+ password = request.form.get("password")
28
+ user = authenticate_user(username, password)
29
+ if not user:
30
+ return jsonify({"detail": "Invalid username or password"}), 401
 
 
 
 
 
 
 
 
31
 
 
 
 
 
 
 
 
 
32
  access_token = create_access_token(
33
+ data={"sub": username}, expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
34
+ )
35
+ return jsonify({"access_token": access_token, "token_type": "bearer"})
36
+
37
+ @app.route("/users/me", methods=["GET"])
38
+ def get_user():
39
+ auth_header = request.headers.get("Authorization")
40
+ if not auth_header or not auth_header.startswith("Bearer "):
41
+ return jsonify({"detail": "Missing or invalid token"}), 401
42
+ token = auth_header.split(" ")[1]
43
  try:
44
+ payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
45
  username = payload.get("sub")
46
  if username is None:
47
+ raise jwt.InvalidTokenError
48
+ return jsonify({"username": username})
49
+ except jwt.ExpiredSignatureError:
50
+ return jsonify({"detail": "Token expired"}), 401
51
+ except jwt.InvalidTokenError:
52
+ return jsonify({"detail": "Invalid token"}), 401
53
+
54
+ if __name__ == "__main__":
55
+ app.run(host="0.0.0.0", port=8000)