File size: 4,966 Bytes
6739f59
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import time
from os import environ as env
from urllib.parse import quote_plus, urlencode

import flask
from authlib.integrations.flask_client import OAuth
from flask import redirect, session, url_for

from src.core.config import get_settings

settings = get_settings()


class AdvancedAuth(object):
    def __init__(self, app, authorization_hook=None, _overwrite_index=True):
        self.app = app
        self._index_view_name = app.config["routes_pathname_prefix"]

        if _overwrite_index:
            self._overwrite_index()
            self._protect_views()
        self._index_view_name = app.config["routes_pathname_prefix"]
        self._auth_hooks = [authorization_hook] if authorization_hook else []

        self.app.server.config["SECRET_KEY"] = settings.SECRET_KEY

        self.oauth = OAuth(self.app.server)
        self.oauth.register(
            "auth0",
            client_id=settings.AUTH0_CLIENT_ID,
            client_secret=settings.AUTH0_CLIENT_SECRET,
            api_base_url=f"https://{settings.AUTH0_DOMAIN}",
            access_token_url=f"https://{settings.AUTH0_DOMAIN}/oauth/token",
            authorize_url=f"https://{settings.AUTH0_DOMAIN}/authorize",
            callback_url=settings.REDIRECT_URI,
            server_metadata_url=f"https://{settings.AUTH0_DOMAIN}/.well-known/openid-configuration",
            audience=settings.AUTH0_AUDIENCE,
            client_kwargs={
                "scope": "openid profile email",
            },
        )
        self.auth0 = self.oauth.auth0

        app.server.add_url_rule(
            settings.LOGIN_URL, view_func=self.login, methods=["GET"]
        )

        app.server.add_url_rule(
            settings.LOGOUT_URL, view_func=self.logout, methods=["GET"]
        )

        app.server.add_url_rule(
            "/callback",
            view_func=self.callback,
            methods=["GET", "POST"],
        )

    def _overwrite_index(self):
        original_index = self.app.server.view_functions[self._index_view_name]

        self.app.server.view_functions[self._index_view_name] = self.index_auth_wrapper(
            original_index
        )

    def _protect_views(self):
        for view_name, view_method in self.app.server.view_functions.items():
            if view_name != self._index_view_name:
                self.app.server.view_functions[view_name] = self.auth_wrapper(
                    view_method
                )

    def login(self):
        if self.auth0 is None:
            return redirect(settings.LOGIN_URL)

        return self.auth0.authorize_redirect(
            redirect_uri=url_for("callback", _external=True)
        )

    def logout(self):
        session.clear()
        # TODO: Use url joins instead of all of this
        return redirect(
            "https://"
            + settings.AUTH0_DOMAIN
            + "/v2/logout?"
            + urlencode(
                {
                    "client_id": env.get("AUTH0_CLIENT_ID"),
                },
                quote_via=quote_plus,
            )
        )

    def callback(self):
        """Callback handler"""
        if self.auth0 is None:
            return redirect(settings.LOGIN_URL)

        try:
            token = self.auth0.authorize_access_token()
        except Exception:
            return redirect(settings.LOGIN_URL)

        resp = self.auth0.get("userinfo")
        userinfo = resp.json()

        session["access_token"] = token["access_token"]
        session["created_at"] = time.time()
        session["expires_in"] = token["expires_in"]

        session[settings.JWT_PAYLOAD] = userinfo
        session[settings.PROFILE_KEY] = {
            "user_id": userinfo["sub"],
            "name": userinfo["name"],
            "picture": userinfo["picture"],
        }
        return redirect("/")

    def is_authorized(self) -> bool:
        """Verify the token is valid"""
        if flask.request.path == settings.CALLBACK_URL:
            return True
        token = session.get("access_token", None)
        if token is None:
            return False

        expires_in = session.get("expires_in", None)
        created_at = session.get("created_at", None)
        if expires_in is None or created_at is None:
            return False

        time_left = expires_in - ((time.time() - created_at) / 60)
        if time_left <= 0:
            return False

        return True

    def login_request(self):
        return redirect(settings.LOGIN_URL)

    def auth_wrapper(self, f):
        def wrap(*args, **kwargs):
            if not self.is_authorized():
                return self.login_request()
            response = f(*args, **kwargs)
            return response

        return wrap

    def index_auth_wrapper(self, original_index):
        def wrap(*args, **kwargs):
            if self.is_authorized():
                return original_index(*args, **kwargs)
            else:
                return self.login_request()

        return wrap