Spaces:
Runtime error
Runtime error
| import pandas as pd | |
| import bcrypt | |
| import sqlite3 | |
| import os | |
| from pydantic import BaseModel | |
| from typing import Annotated, Optional, List | |
| from fastapi import FastAPI, Depends, HTTPException, status, Response, Security | |
| from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm | |
| from passlib.context import CryptContext | |
| from jose import JWTError, jwt | |
| from dotenv import load_dotenv | |
| conn = sqlite3.connect('users.db') | |
| cur = conn.cursor() | |
| pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") | |
| load_dotenv(".env") | |
| SECRET_KEY = os.getenv("SECRET_KEY") | |
| ALGORITHM = os.getenv("ALGORITHM") | |
| # Pydantic models | |
| class User(BaseModel): | |
| id: Optional[int] | |
| username: str | |
| password: str | |
| role: str | |
| class UserCreate(BaseModel): | |
| username: str | |
| password: str | |
| role: str | |
| class UserUpdate(BaseModel): | |
| username: str | |
| password: str | |
| role: str | |
| class TokenData(BaseModel): | |
| username: str | None = None | |
| tickers: tuple | None = None | |
| class Ticker(BaseModel): | |
| id: int | |
| ticker: str | |
| def hash_password(password): | |
| salt = bcrypt.gensalt() | |
| return bcrypt.hashpw(password.encode(), salt) | |
| def create_user(user: UserCreate): | |
| hashed_password = hash_password(user.password) | |
| cur.execute("INSERT INTO users (username, password, role) VALUES (?, ?, ?)", | |
| (user.username, hashed_password, "user")) | |
| conn.commit() | |
| user_id = cur.lastrowid | |
| return {**user.dict(), "id": user_id} | |
| def get_user(user_id: int): | |
| cur.execute("SELECT * FROM users WHERE id = ?", (user_id,)) | |
| user = cur.fetchone() | |
| if user: | |
| return User(id=user[0], username=user[1], password=user[2], role=user[3]) | |
| return None | |
| def update_user(user_id: int, user: UserUpdate): | |
| hashed_password = hash_password(user.password) | |
| cur.execute("UPDATE users SET username = ?, password = ?, role = ? WHERE id = ?", | |
| (user.username, hashed_password, user.role, user_id)) | |
| conn.commit() | |
| return user | |
| def delete_user(user_id: int): | |
| cur.execute("DELETE FROM users WHERE id = ?", (user_id,)) | |
| conn.commit() | |
| def get_db_connection(): | |
| conn = sqlite3.connect('tickers.db') | |
| conn.row_factory = sqlite3.Row | |
| return conn | |
| def get_all_tickers() -> List[Ticker]: | |
| conn = get_db_connection() | |
| cur = conn.cursor() | |
| cur.execute("SELECT * FROM tickers") | |
| rows = cur.fetchall() | |
| conn.close() | |
| return [Ticker(id=row['id'], ticker=row['ticker']) for row in rows] | |
| app = FastAPI() | |
| def create_access_token(data: dict): | |
| encoded_jwt = jwt.encode(data, SECRET_KEY, algorithm=ALGORITHM) | |
| return encoded_jwt | |
| def verify_token(token: str = Depends(oauth2_scheme)): | |
| try: | |
| payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) | |
| return payload | |
| except JWTError: | |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") | |
| def get_all_users() -> List[User]: | |
| cur.execute("SELECT * FROM users") | |
| rows = cur.fetchall() | |
| users = [] | |
| for row in rows: | |
| user = User(id=row[0], username=row[1], password=row[2], role=row[3]) | |
| users.append(user) | |
| return users | |
| async def read_all_users(token: dict = Depends(verify_token)): | |
| if token['role'] != 'admin' and token['user_id'] != user_id: | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions") | |
| return get_all_users() | |
| async def create_user_endpoint(user: UserCreate, token: dict = Depends(verify_token)): | |
| if token['role'] != 'admin' and token['user_id'] != user_id: | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions") | |
| return create_user(user) | |
| async def read_user_endpoint(user_id: int, token: dict = Depends(verify_token)): | |
| if token['role'] != 'admin' and token['user_id'] != user_id: | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions") | |
| user = get_user(user_id) | |
| if not user: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") | |
| return user | |
| async def update_user_endpoint(user_id: int, user: UserUpdate, token: dict = Depends(verify_token)): | |
| if token['role'] != 'admin' and token['user_id'] != user_id: | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions") | |
| return update_user(user_id, user) | |
| async def delete_user_endpoint(user_id: int, token: dict = Depends(verify_token)): | |
| if token['role'] != 'admin' and token['user_id'] != user_id: | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions") | |
| delete_user(user_id) | |
| return Response(content=f"User {user_id} deleted successfully", status_code=200) | |
| async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): | |
| username = form_data.username | |
| password = form_data.password | |
| cur.execute("SELECT * FROM users WHERE username = ?", (username,)) | |
| user = cur.fetchone() | |
| if user and pwd_context.verify(password, user[2]): | |
| user_id = user[0] | |
| user_username = user[1] | |
| user_role = user[3] | |
| access_token = create_access_token(data={"user_id": user_id, "username": user_username, "role": user_role}) | |
| return {"access_token": access_token, "token_type": "bearer", "tickers": get_all_tickers()} | |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password") | |
| async def read_all_tickers(token: dict = Security(verify_token)): | |
| if not token: | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions") | |
| return get_all_tickers() |