Spaces:
Runtime error
Runtime error
File size: 4,966 Bytes
6739f59 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
import time
from os import environ as env
from urllib.parse import quote_plus, urlencode
import flask
from authlib.integrations.flask_client import OAuth
from flask import redirect, session, url_for
from src.core.config import get_settings
settings = get_settings()
class AdvancedAuth(object):
def __init__(self, app, authorization_hook=None, _overwrite_index=True):
self.app = app
self._index_view_name = app.config["routes_pathname_prefix"]
if _overwrite_index:
self._overwrite_index()
self._protect_views()
self._index_view_name = app.config["routes_pathname_prefix"]
self._auth_hooks = [authorization_hook] if authorization_hook else []
self.app.server.config["SECRET_KEY"] = settings.SECRET_KEY
self.oauth = OAuth(self.app.server)
self.oauth.register(
"auth0",
client_id=settings.AUTH0_CLIENT_ID,
client_secret=settings.AUTH0_CLIENT_SECRET,
api_base_url=f"https://{settings.AUTH0_DOMAIN}",
access_token_url=f"https://{settings.AUTH0_DOMAIN}/oauth/token",
authorize_url=f"https://{settings.AUTH0_DOMAIN}/authorize",
callback_url=settings.REDIRECT_URI,
server_metadata_url=f"https://{settings.AUTH0_DOMAIN}/.well-known/openid-configuration",
audience=settings.AUTH0_AUDIENCE,
client_kwargs={
"scope": "openid profile email",
},
)
self.auth0 = self.oauth.auth0
app.server.add_url_rule(
settings.LOGIN_URL, view_func=self.login, methods=["GET"]
)
app.server.add_url_rule(
settings.LOGOUT_URL, view_func=self.logout, methods=["GET"]
)
app.server.add_url_rule(
"/callback",
view_func=self.callback,
methods=["GET", "POST"],
)
def _overwrite_index(self):
original_index = self.app.server.view_functions[self._index_view_name]
self.app.server.view_functions[self._index_view_name] = self.index_auth_wrapper(
original_index
)
def _protect_views(self):
for view_name, view_method in self.app.server.view_functions.items():
if view_name != self._index_view_name:
self.app.server.view_functions[view_name] = self.auth_wrapper(
view_method
)
def login(self):
if self.auth0 is None:
return redirect(settings.LOGIN_URL)
return self.auth0.authorize_redirect(
redirect_uri=url_for("callback", _external=True)
)
def logout(self):
session.clear()
# TODO: Use url joins instead of all of this
return redirect(
"https://"
+ settings.AUTH0_DOMAIN
+ "/v2/logout?"
+ urlencode(
{
"client_id": env.get("AUTH0_CLIENT_ID"),
},
quote_via=quote_plus,
)
)
def callback(self):
"""Callback handler"""
if self.auth0 is None:
return redirect(settings.LOGIN_URL)
try:
token = self.auth0.authorize_access_token()
except Exception:
return redirect(settings.LOGIN_URL)
resp = self.auth0.get("userinfo")
userinfo = resp.json()
session["access_token"] = token["access_token"]
session["created_at"] = time.time()
session["expires_in"] = token["expires_in"]
session[settings.JWT_PAYLOAD] = userinfo
session[settings.PROFILE_KEY] = {
"user_id": userinfo["sub"],
"name": userinfo["name"],
"picture": userinfo["picture"],
}
return redirect("/")
def is_authorized(self) -> bool:
"""Verify the token is valid"""
if flask.request.path == settings.CALLBACK_URL:
return True
token = session.get("access_token", None)
if token is None:
return False
expires_in = session.get("expires_in", None)
created_at = session.get("created_at", None)
if expires_in is None or created_at is None:
return False
time_left = expires_in - ((time.time() - created_at) / 60)
if time_left <= 0:
return False
return True
def login_request(self):
return redirect(settings.LOGIN_URL)
def auth_wrapper(self, f):
def wrap(*args, **kwargs):
if not self.is_authorized():
return self.login_request()
response = f(*args, **kwargs)
return response
return wrap
def index_auth_wrapper(self, original_index):
def wrap(*args, **kwargs):
if self.is_authorized():
return original_index(*args, **kwargs)
else:
return self.login_request()
return wrap
|