Mangesh223 commited on
Commit
6330edf
·
verified ·
1 Parent(s): f660d90

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +39 -32
app.py CHANGED
@@ -1,19 +1,24 @@
1
  from fastapi import FastAPI, Depends, HTTPException, status
2
  from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
3
  from datetime import datetime, timedelta
4
- from auth import (
5
- authenticate_user,
6
- create_access_token,
7
- register_user,
8
- ACCESS_TOKEN_EXPIRE_MINUTES,
9
- SECRET_KEY,
10
- )
11
- import jwt
12
  from pydantic import BaseModel
 
13
 
14
- app = FastAPI()
15
 
16
- oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
 
 
 
 
 
 
 
 
 
 
 
17
 
18
  class Token(BaseModel):
19
  access_token: str
@@ -22,38 +27,40 @@ class Token(BaseModel):
22
  class User(BaseModel):
23
  username: str
24
 
 
 
 
 
 
 
 
 
 
 
25
  @app.post("/token", response_model=Token)
26
- async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
27
- user = authenticate_user(form_data.username, form_data.password)
28
- if not user:
29
  raise HTTPException(
30
  status_code=status.HTTP_401_UNAUTHORIZED,
31
  detail="Incorrect username or password",
32
- headers={"WWW-Authenticate": "Bearer"},
33
  )
34
- access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
35
  access_token = create_access_token(
36
- data={"sub": form_data.username}, expires_delta=access_token_expires
37
- )
38
  return {"access_token": access_token, "token_type": "bearer"}
39
 
40
- @app.post("/register")
41
- async def register(username: str, password: str):
42
- return register_user(username, password)
43
-
44
  @app.get("/users/me", response_model=User)
45
- async def read_users_me(token: str = Depends(oauth2_scheme)):
46
  try:
47
- payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
48
  username = payload.get("sub")
49
  if username is None:
50
- raise HTTPException(
51
- status_code=status.HTTP_401_UNAUTHORIZED,
52
- detail="Invalid authentication credentials",
53
- )
54
  return {"username": username}
55
- except jwt.PyJWTError:
56
- raise HTTPException(
57
- status_code=status.HTTP_401_UNAUTHORIZED,
58
- detail="Invalid authentication credentials",
59
- )
 
 
1
  from fastapi import FastAPI, Depends, HTTPException, status
2
  from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
3
  from datetime import datetime, timedelta
4
+ from jose import jwt
 
 
 
 
 
 
 
5
  from pydantic import BaseModel
6
+ import os
7
 
8
+ app = FastAPI(title="JWT Auth API")
9
 
10
+ # Configuration
11
+ SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key-keep-it-safe")
12
+ ALGORITHM = "HS256"
13
+ ACCESS_TOKEN_EXPIRE_MINUTES = 30
14
+
15
+ # Mock database
16
+ fake_users_db = {
17
+ "testuser": {
18
+ "username": "testuser",
19
+ "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", # password = "secret"
20
+ }
21
+ }
22
 
23
  class Token(BaseModel):
24
  access_token: str
 
27
  class User(BaseModel):
28
  username: str
29
 
30
+ def create_access_token(data: dict, expires_delta: timedelta = None):
31
+ to_encode = data.copy()
32
+ if expires_delta:
33
+ expire = datetime.utcnow() + expires_delta
34
+ else:
35
+ expire = datetime.utcnow() + timedelta(minutes=15)
36
+ to_encode.update({"exp": expire})
37
+ encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
38
+ return encoded_jwt
39
+
40
  @app.post("/token", response_model=Token)
41
+ async def login(form_data: OAuth2PasswordRequestForm = Depends()):
42
+ user = fake_users_db.get(form_data.username)
43
+ if not user or form_data.password != "secret": # In real app, verify hashed password
44
  raise HTTPException(
45
  status_code=status.HTTP_401_UNAUTHORIZED,
46
  detail="Incorrect username or password",
 
47
  )
 
48
  access_token = create_access_token(
49
+ data={"sub": form_data.username},
50
+ expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
51
  return {"access_token": access_token, "token_type": "bearer"}
52
 
 
 
 
 
53
  @app.get("/users/me", response_model=User)
54
+ async def read_users_me(token: str = Depends(OAuth2PasswordBearer(tokenUrl="token"))):
55
  try:
56
+ payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
57
  username = payload.get("sub")
58
  if username is None:
59
+ raise HTTPException(status_code=400, detail="Invalid token")
 
 
 
60
  return {"username": username}
61
+ except jwt.JWTError:
62
+ raise HTTPException(status_code=400, detail="Invalid token")
63
+
64
+ @app.get("/")
65
+ async def root():
66
+ return {"message": "JWT Authentication API"}