Spaces:
Running
Running
| from flask import ( | |
| Blueprint, | |
| abort, | |
| after_this_request, | |
| flash, | |
| redirect, | |
| render_template, | |
| request, | |
| send_file, | |
| url_for, | |
| current_app, | |
| ) | |
| from flask_login import current_user, login_required | |
| from functools import wraps | |
| import os | |
| import sqlite3 | |
| import tempfile | |
| from datetime import datetime | |
| from sqlalchemy.engine import make_url | |
| from app.extensions import db | |
| from app.models import Note, NoteType, Subject, User | |
| admin_bp = Blueprint("admin", __name__) | |
| def admin_required(f): | |
| def decorated_function(*args, **kwargs): | |
| if not current_user.is_admin: | |
| return redirect(url_for("admin.verify")) | |
| return f(*args, **kwargs) | |
| return decorated_function | |
| def dashboard(): | |
| users_count = User.query.count() | |
| notes_count = Note.query.count() | |
| subjects_count = Subject.query.count() | |
| note_types_count = NoteType.query.count() | |
| return render_template( | |
| "admin/dashboard.html", | |
| users_count=users_count, | |
| notes_count=notes_count, | |
| subjects_count=subjects_count, | |
| note_types_count=note_types_count, | |
| ) | |
| def verify(): | |
| if current_user.is_admin: | |
| return redirect(url_for("admin.dashboard")) | |
| if request.method == "POST": | |
| code = request.form.get("code", "").strip() | |
| if code == current_app.config["ADMIN_SECRET_CODE"]: | |
| current_user.is_admin = True | |
| db.session.commit() | |
| return redirect(url_for("admin.dashboard")) | |
| else: | |
| pass | |
| return render_template("admin/verify.html") | |
| def subjects(): | |
| subjects = Subject.query.order_by(Subject.name.asc()).all() | |
| return render_template("admin/subjects.html", subjects=subjects) | |
| def add_subject(): | |
| name = request.form.get("name", "").strip() | |
| if name: | |
| existing = Subject.query.filter_by(name=name).first() | |
| if not existing: | |
| subject = Subject(name=name) | |
| db.session.add(subject) | |
| db.session.commit() | |
| else: | |
| pass | |
| return redirect(url_for("admin.subjects")) | |
| def update_subject(id): | |
| subject = Subject.query.get_or_404(id) | |
| name = request.form.get("name", "").strip() | |
| if not name: | |
| return redirect(url_for("admin.subjects")) | |
| existing = Subject.query.filter_by(name=name).first() | |
| if existing and existing.id != subject.id: | |
| return redirect(url_for("admin.subjects")) | |
| subject.name = name | |
| db.session.commit() | |
| return redirect(url_for("admin.subjects")) | |
| def delete_subject(id): | |
| subject = Subject.query.get_or_404(id) | |
| db.session.delete(subject) | |
| db.session.commit() | |
| return redirect(url_for("admin.subjects")) | |
| def note_types(): | |
| note_types = NoteType.query.order_by(NoteType.name.asc()).all() | |
| return render_template("admin/note_types.html", note_types=note_types) | |
| def add_note_type(): | |
| name = request.form.get("name", "").strip() | |
| if name: | |
| existing = NoteType.query.filter_by(name=name).first() | |
| if not existing: | |
| note_type = NoteType(name=name) | |
| db.session.add(note_type) | |
| db.session.commit() | |
| else: | |
| pass | |
| return redirect(url_for("admin.note_types")) | |
| def update_note_type(id): | |
| note_type = NoteType.query.get_or_404(id) | |
| name = request.form.get("name", "").strip() | |
| if not name: | |
| return redirect(url_for("admin.note_types")) | |
| existing = NoteType.query.filter_by(name=name).first() | |
| if existing and existing.id != note_type.id: | |
| return redirect(url_for("admin.note_types")) | |
| note_type.name = name | |
| db.session.commit() | |
| return redirect(url_for("admin.note_types")) | |
| def delete_note_type(id): | |
| note_type = NoteType.query.get_or_404(id) | |
| db.session.delete(note_type) | |
| db.session.commit() | |
| return redirect(url_for("admin.note_types")) | |
| def notes(): | |
| notes = Note.query.order_by(Note.created_at.desc()).all() | |
| subjects = Subject.query.order_by(Subject.name.asc()).all() | |
| note_types = NoteType.query.order_by(NoteType.name.asc()).all() | |
| return render_template( | |
| "admin/notes.html", notes=notes, subjects=subjects, note_types=note_types | |
| ) | |
| def update_note(id): | |
| note = Note.query.get_or_404(id) | |
| title = request.form.get("title", "").strip() | |
| description = request.form.get("description", "").strip() | |
| subject_id = request.form.get("subject_id") | |
| note_type_id = request.form.get("note_type_id") | |
| link = request.form.get("link", "").strip() | |
| if not title or not subject_id or not note_type_id: | |
| return redirect(url_for("admin.notes")) | |
| subject = Subject.query.get(subject_id) | |
| note_type = NoteType.query.get(note_type_id) | |
| if not subject or not note_type: | |
| return redirect(url_for("admin.notes")) | |
| note.title = title | |
| note.description = description or None | |
| note.subject_id = subject.id | |
| note.note_type_id = note_type.id | |
| if note.original_link is not None and link: | |
| note.link = link | |
| note.original_link = link | |
| db.session.commit() | |
| return redirect(url_for("admin.notes")) | |
| def delete_note(id): | |
| note = Note.query.get_or_404(id) | |
| db.session.delete(note) | |
| db.session.commit() | |
| return redirect(url_for("admin.notes")) | |
| def users(): | |
| users = User.query.all() | |
| return render_template("admin/users.html", users=users) | |
| def toggle_admin(id): | |
| user = User.query.get_or_404(id) | |
| if user.id == current_user.id: | |
| pass | |
| else: | |
| user.is_admin = not user.is_admin | |
| db.session.commit() | |
| return redirect(url_for("admin.users")) | |
| def export_db(): | |
| uri = current_app.config.get("SQLALCHEMY_DATABASE_URI", "") | |
| try: | |
| parsed = make_url(uri) | |
| except Exception: | |
| abort(400, "Invalid database URL.") | |
| if parsed.drivername != "sqlite": | |
| abort(400, "Export endpoint only supports SQLite source databases.") | |
| if not parsed.database or parsed.database == ":memory:": | |
| abort(400, "In-memory SQLite database cannot be exported.") | |
| db_path = parsed.database | |
| if not os.path.isabs(db_path): | |
| db_path = os.path.abspath(db_path) | |
| if not os.path.exists(db_path): | |
| abort(404, "Database file not found.") | |
| fd, backup_path = tempfile.mkstemp(prefix="porahobe_backup_", suffix=".db") | |
| os.close(fd) | |
| src = sqlite3.connect(db_path) | |
| dst = sqlite3.connect(backup_path) | |
| try: | |
| src.backup(dst) | |
| finally: | |
| src.close() | |
| dst.close() | |
| def _cleanup(response): | |
| try: | |
| os.remove(backup_path) | |
| except OSError: | |
| pass | |
| return response | |
| filename = f"porahobe_backup_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.db" | |
| return send_file(backup_path, as_attachment=True, download_name=filename) | |