noorprajuda's picture
first commit
9396cca
import pandas as pd
import bcrypt
import sqlite3
import os
from pydantic import BaseModel
from typing import Annotated, Optional, List
from fastapi import FastAPI, Depends, HTTPException, status, Response, Security
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from passlib.context import CryptContext
from jose import JWTError, jwt
from dotenv import load_dotenv
conn = sqlite3.connect('users.db')
cur = conn.cursor()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
load_dotenv(".env")
SECRET_KEY = os.getenv("SECRET_KEY")
ALGORITHM = os.getenv("ALGORITHM")
# Pydantic models
class User(BaseModel):
id: Optional[int]
username: str
password: str
role: str
class UserCreate(BaseModel):
username: str
password: str
role: str
class UserUpdate(BaseModel):
username: str
password: str
role: str
class TokenData(BaseModel):
username: str | None = None
tickers: tuple | None = None
class Ticker(BaseModel):
id: int
ticker: str
def hash_password(password):
salt = bcrypt.gensalt()
return bcrypt.hashpw(password.encode(), salt)
def create_user(user: UserCreate):
hashed_password = hash_password(user.password)
cur.execute("INSERT INTO users (username, password, role) VALUES (?, ?, ?)",
(user.username, hashed_password, "user"))
conn.commit()
user_id = cur.lastrowid
return {**user.dict(), "id": user_id}
def get_user(user_id: int):
cur.execute("SELECT * FROM users WHERE id = ?", (user_id,))
user = cur.fetchone()
if user:
return User(id=user[0], username=user[1], password=user[2], role=user[3])
return None
def update_user(user_id: int, user: UserUpdate):
hashed_password = hash_password(user.password)
cur.execute("UPDATE users SET username = ?, password = ?, role = ? WHERE id = ?",
(user.username, hashed_password, user.role, user_id))
conn.commit()
return user
def delete_user(user_id: int):
cur.execute("DELETE FROM users WHERE id = ?", (user_id,))
conn.commit()
def get_db_connection():
conn = sqlite3.connect('tickers.db')
conn.row_factory = sqlite3.Row
return conn
def get_all_tickers() -> List[Ticker]:
conn = get_db_connection()
cur = conn.cursor()
cur.execute("SELECT * FROM tickers")
rows = cur.fetchall()
conn.close()
return [Ticker(id=row['id'], ticker=row['ticker']) for row in rows]
app = FastAPI()
def create_access_token(data: dict):
encoded_jwt = jwt.encode(data, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(token: str = Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except JWTError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
def get_all_users() -> List[User]:
cur.execute("SELECT * FROM users")
rows = cur.fetchall()
users = []
for row in rows:
user = User(id=row[0], username=row[1], password=row[2], role=row[3])
users.append(user)
return users
@app.get("/users/", response_model=List[User])
async def read_all_users(token: dict = Depends(verify_token)):
if token['role'] != 'admin' and token['user_id'] != user_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions")
return get_all_users()
@app.post("/users/", response_model=User)
async def create_user_endpoint(user: UserCreate, token: dict = Depends(verify_token)):
if token['role'] != 'admin' and token['user_id'] != user_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions")
return create_user(user)
@app.get("/users/{user_id}", response_model=User)
async def read_user_endpoint(user_id: int, token: dict = Depends(verify_token)):
if token['role'] != 'admin' and token['user_id'] != user_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions")
user = get_user(user_id)
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
return user
@app.patch("/users/{user_id}", response_model=User)
async def update_user_endpoint(user_id: int, user: UserUpdate, token: dict = Depends(verify_token)):
if token['role'] != 'admin' and token['user_id'] != user_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions")
return update_user(user_id, user)
@app.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user_endpoint(user_id: int, token: dict = Depends(verify_token)):
if token['role'] != 'admin' and token['user_id'] != user_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions")
delete_user(user_id)
return Response(content=f"User {user_id} deleted successfully", status_code=200)
@app.post("/token")
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
username = form_data.username
password = form_data.password
cur.execute("SELECT * FROM users WHERE username = ?", (username,))
user = cur.fetchone()
if user and pwd_context.verify(password, user[2]):
user_id = user[0]
user_username = user[1]
user_role = user[3]
access_token = create_access_token(data={"user_id": user_id, "username": user_username, "role": user_role})
return {"access_token": access_token, "token_type": "bearer", "tickers": get_all_tickers()}
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password")
@app.get("/tickers/", response_model=List[Ticker])
async def read_all_tickers(token: dict = Security(verify_token)):
if not token:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions")
return get_all_tickers()