Spaces:
Runtime error
Runtime error
| import time | |
| from os import environ as env | |
| from urllib.parse import quote_plus, urlencode | |
| import flask | |
| from authlib.integrations.flask_client import OAuth | |
| from flask import redirect, session, url_for | |
| from src.core.config import get_settings | |
| settings = get_settings() | |
| class AdvancedAuth(object): | |
| def __init__(self, app, authorization_hook=None, _overwrite_index=True): | |
| self.app = app | |
| self._index_view_name = app.config["routes_pathname_prefix"] | |
| if _overwrite_index: | |
| self._overwrite_index() | |
| self._protect_views() | |
| self._index_view_name = app.config["routes_pathname_prefix"] | |
| self._auth_hooks = [authorization_hook] if authorization_hook else [] | |
| self.app.server.config["SECRET_KEY"] = settings.SECRET_KEY | |
| self.oauth = OAuth(self.app.server) | |
| self.oauth.register( | |
| "auth0", | |
| client_id=settings.AUTH0_CLIENT_ID, | |
| client_secret=settings.AUTH0_CLIENT_SECRET, | |
| api_base_url=f"https://{settings.AUTH0_DOMAIN}", | |
| access_token_url=f"https://{settings.AUTH0_DOMAIN}/oauth/token", | |
| authorize_url=f"https://{settings.AUTH0_DOMAIN}/authorize", | |
| callback_url=settings.REDIRECT_URI, | |
| server_metadata_url=f"https://{settings.AUTH0_DOMAIN}/.well-known/openid-configuration", | |
| audience=settings.AUTH0_AUDIENCE, | |
| client_kwargs={ | |
| "scope": "openid profile email", | |
| }, | |
| ) | |
| self.auth0 = self.oauth.auth0 | |
| app.server.add_url_rule( | |
| settings.LOGIN_URL, view_func=self.login, methods=["GET"] | |
| ) | |
| app.server.add_url_rule( | |
| settings.LOGOUT_URL, view_func=self.logout, methods=["GET"] | |
| ) | |
| app.server.add_url_rule( | |
| "/callback", | |
| view_func=self.callback, | |
| methods=["GET", "POST"], | |
| ) | |
| def _overwrite_index(self): | |
| original_index = self.app.server.view_functions[self._index_view_name] | |
| self.app.server.view_functions[self._index_view_name] = self.index_auth_wrapper( | |
| original_index | |
| ) | |
| def _protect_views(self): | |
| for view_name, view_method in self.app.server.view_functions.items(): | |
| if view_name != self._index_view_name: | |
| self.app.server.view_functions[view_name] = self.auth_wrapper( | |
| view_method | |
| ) | |
| def login(self): | |
| if self.auth0 is None: | |
| return redirect(settings.LOGIN_URL) | |
| return self.auth0.authorize_redirect( | |
| redirect_uri=url_for("callback", _external=True) | |
| ) | |
| def logout(self): | |
| session.clear() | |
| # TODO: Use url joins instead of all of this | |
| return redirect( | |
| "https://" | |
| + settings.AUTH0_DOMAIN | |
| + "/v2/logout?" | |
| + urlencode( | |
| { | |
| "client_id": env.get("AUTH0_CLIENT_ID"), | |
| }, | |
| quote_via=quote_plus, | |
| ) | |
| ) | |
| def callback(self): | |
| """Callback handler""" | |
| if self.auth0 is None: | |
| return redirect(settings.LOGIN_URL) | |
| try: | |
| token = self.auth0.authorize_access_token() | |
| except Exception: | |
| return redirect(settings.LOGIN_URL) | |
| resp = self.auth0.get("userinfo") | |
| userinfo = resp.json() | |
| session["access_token"] = token["access_token"] | |
| session["created_at"] = time.time() | |
| session["expires_in"] = token["expires_in"] | |
| session[settings.JWT_PAYLOAD] = userinfo | |
| session[settings.PROFILE_KEY] = { | |
| "user_id": userinfo["sub"], | |
| "name": userinfo["name"], | |
| "picture": userinfo["picture"], | |
| } | |
| return redirect("/") | |
| def is_authorized(self) -> bool: | |
| """Verify the token is valid""" | |
| if flask.request.path == settings.CALLBACK_URL: | |
| return True | |
| token = session.get("access_token", None) | |
| if token is None: | |
| return False | |
| expires_in = session.get("expires_in", None) | |
| created_at = session.get("created_at", None) | |
| if expires_in is None or created_at is None: | |
| return False | |
| time_left = expires_in - ((time.time() - created_at) / 60) | |
| if time_left <= 0: | |
| return False | |
| return True | |
| def login_request(self): | |
| return redirect(settings.LOGIN_URL) | |
| def auth_wrapper(self, f): | |
| def wrap(*args, **kwargs): | |
| if not self.is_authorized(): | |
| return self.login_request() | |
| response = f(*args, **kwargs) | |
| return response | |
| return wrap | |
| def index_auth_wrapper(self, original_index): | |
| def wrap(*args, **kwargs): | |
| if self.is_authorized(): | |
| return original_index(*args, **kwargs) | |
| else: | |
| return self.login_request() | |
| return wrap | |