File size: 1,940 Bytes
edcdbb0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 | import json, os, jwt
from hashlib import sha256
from fastapi import HTTPException, status, Request
from fastapi.security import HTTPBasicCredentials
from logger import log_write, logger
from datetime import datetime, timedelta
users = json.loads(str(os.getenv("USER_KEYS")).replace("\n", ""))
for key in users:
if key == "master": continue
password = key+users[key]+users["master"]
users[key] = sha256(password.encode('UTF-8')).hexdigest()
JWT_SECRET = users["master"]
JWT_ALGORITHM = "HS256"
JWT_EXPIRATION_TIME_MINUTES = 30
def authenticate_user(credentials: HTTPBasicCredentials) -> bool:
username = credentials.username
password = credentials.password
password = username+password+users["master"]
password = sha256(password.encode('UTF-8')).hexdigest()
if credentials.username not in users or password != users[credentials.username]:
log_write(credentials.username, "Autenticacion usuario fallida", "")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Basic"},
)
log_write(credentials.username, "Usuario autenticado", "")
return True
def create_jwt_token(data):
to_encode = {"data": data}
expire = datetime.utcnow() + timedelta(minutes=JWT_EXPIRATION_TIME_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, JWT_SECRET, algorithm=JWT_ALGORITHM)
return encoded_jwt
async def validate_token(request: Request):
data = {}
try:
data = await request.json()
token = data.pop("token")
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
data["token_data"] = payload["data"]
except Exception as e:
logger.error(repr(e) + " - " + str(data))
raise HTTPException(status_code=404, detail="Token inválido")
return data
|