Spaces:
Runtime error
Runtime error
File size: 5,951 Bytes
8de1004 d85ed28 b1df455 02426f0 8de1004 d85ed28 93e700d b1df455 93e700d d85ed28 93e700d 02426f0 93e700d d85ed28 93e700d 8de1004 93e700d 8de1004 93e700d b1df455 93e700d d85ed28 b1df455 d85ed28 b1df455 d85ed28 b1df455 d85ed28 b1df455 d85ed28 b1df455 d85ed28 b1df455 d85ed28 b1df455 d85ed28 b1df455 d85ed28 b1df455 d85ed28 b1df455 d85ed28 b1df455 d85ed28 02426f0 7e666bd 93e700d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 | import os
from typing import Optional
from flask import Flask, flash, redirect, url_for
from oauthlib.oauth2.rfc6749.errors import InvalidGrantError
from werkzeug.middleware.proxy_fix import ProxyFix
# Allow OAuth scope to change (e.g. Discord adding 'guilds.join') without raising an error
os.environ["OAUTHLIB_RELAX_TOKEN_SCOPE"] = "1"
from flask_dance.consumer import oauth_authorized, oauth_error
from flask_dance.contrib.discord import make_discord_blueprint
from flask_dance.contrib.google import make_google_blueprint
from flask_login import current_user, login_user
from .extensions import db, login_manager, migrate
from .models import OAuth, User
def create_app():
app = Flask(__name__)
app.config.from_object("config.Config")
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_port=1)
app.config.setdefault("PREFERRED_URL_SCHEME", "https")
db.init_app(app)
migrate.init_app(app, db)
login_manager.init_app(app)
login_manager.login_view = "main.login"
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
from .blueprints.main import main_bp
from .blueprints.notes import notes_bp
from .blueprints.admin import admin_bp
app.register_blueprint(main_bp)
app.register_blueprint(notes_bp, url_prefix="/notes")
app.register_blueprint(admin_bp, url_prefix="/admin")
google_bp = make_google_blueprint(
client_id=app.config["GOOGLE_CLIENT_ID"],
client_secret=app.config["GOOGLE_CLIENT_SECRET"],
scope=[
"https://www.googleapis.com/auth/userinfo.profile",
"https://www.googleapis.com/auth/userinfo.email",
"openid",
],
reprompt_consent=True,
)
app.register_blueprint(google_bp, url_prefix="/login")
discord_bp = make_discord_blueprint(
client_id=app.config["DISCORD_CLIENT_ID"],
client_secret=app.config["DISCORD_CLIENT_SECRET"],
scope=["identify", "email"],
)
app.register_blueprint(discord_bp, url_prefix="/login")
def _finish_login(
provider_name: str,
provider_user_id: str,
email: Optional[str],
name: Optional[str],
token: dict,
):
if not email:
return False
if current_user.is_authenticated:
user = current_user
oauth = OAuth.query.filter_by(
provider=provider_name, provider_user_id=provider_user_id
).first()
if not oauth:
oauth = OAuth(
provider=provider_name,
provider_user_id=provider_user_id,
token=token,
user_id=user.id,
)
db.session.add(oauth)
else:
oauth.token = token
db.session.commit()
return redirect(url_for("main.settings"))
oauth = OAuth.query.filter_by(
provider=provider_name,
provider_user_id=provider_user_id,
).first()
if oauth:
user = oauth.user
oauth.token = token
else:
user = User.query.filter_by(email=email).first()
if not user:
user = User(name=name or email.split("@")[0], email=email)
db.session.add(user)
db.session.flush()
oauth = OAuth(
provider=provider_name,
provider_user_id=provider_user_id,
token=token,
user=user,
)
db.session.add(oauth)
db.session.commit()
login_user(user)
return redirect(url_for("main.index"))
@oauth_authorized.connect_via(google_bp)
def google_logged_in(blueprint, token):
if not token:
return False
resp = blueprint.session.get("/oauth2/v2/userinfo")
if not resp.ok:
return False
info = resp.json()
google_user_id = str(info["id"])
if "refresh_token" in token:
existing = OAuth.query.filter_by(
provider=blueprint.name, provider_user_id=google_user_id
).first()
if existing:
existing.token["refresh_token"] = token["refresh_token"]
return _finish_login(
provider_name=blueprint.name,
provider_user_id=google_user_id,
email=info.get("email"),
name=info.get("name"),
token=token,
)
@oauth_authorized.connect_via(discord_bp)
def discord_logged_in(blueprint, token):
if not token:
return False
resp = blueprint.session.get("/api/users/@me")
if not resp.ok:
return False
info = resp.json()
discord_user_id = str(info["id"])
response = _finish_login(
provider_name=blueprint.name,
provider_user_id=discord_user_id,
email=info.get("email"),
name=info.get("username"),
token=token,
)
return response
@oauth_error.connect_via(google_bp)
@oauth_error.connect_via(discord_bp)
def oauth_error_handler(blueprint, error, error_description=None, error_uri=None):
message = f"{blueprint.name.title()} OAuth error: {error_description or error}"
app.logger.error(message)
flash(
"Login failed. Please try again. If it keeps happening, recheck OAuth callback URL and server time.",
"error",
)
return redirect(url_for("main.login"))
@app.errorhandler(InvalidGrantError)
def handle_invalid_grant(error):
app.logger.warning(f"OAuth invalid_grant: {error}")
flash(
"Google/Discord sign-in expired or mismatched. Please retry login once.",
"error",
)
return redirect(url_for("main.login"))
return app
|