from flask import ( Blueprint, abort, after_this_request, flash, redirect, render_template, request, send_file, url_for, current_app, ) from flask_login import current_user, login_required from functools import wraps import os import sqlite3 import tempfile from datetime import datetime from sqlalchemy.engine import make_url from app.extensions import db from app.models import Note, NoteType, Subject, User admin_bp = Blueprint("admin", __name__) def admin_required(f): @wraps(f) @login_required def decorated_function(*args, **kwargs): if not current_user.is_admin: return redirect(url_for("admin.verify")) return f(*args, **kwargs) return decorated_function @admin_bp.route("/") @admin_required def dashboard(): users_count = User.query.count() notes_count = Note.query.count() subjects_count = Subject.query.count() note_types_count = NoteType.query.count() return render_template( "admin/dashboard.html", users_count=users_count, notes_count=notes_count, subjects_count=subjects_count, note_types_count=note_types_count, ) @admin_bp.route("/verify", methods=["GET", "POST"]) @login_required def verify(): if current_user.is_admin: return redirect(url_for("admin.dashboard")) if request.method == "POST": code = request.form.get("code", "").strip() if code == current_app.config["ADMIN_SECRET_CODE"]: current_user.is_admin = True db.session.commit() return redirect(url_for("admin.dashboard")) else: pass return render_template("admin/verify.html") @admin_bp.route("/subjects") @admin_required def subjects(): subjects = Subject.query.order_by(Subject.name.asc()).all() return render_template("admin/subjects.html", subjects=subjects) @admin_bp.route("/subjects/add", methods=["POST"]) @admin_required def add_subject(): name = request.form.get("name", "").strip() if name: existing = Subject.query.filter_by(name=name).first() if not existing: subject = Subject(name=name) db.session.add(subject) db.session.commit() else: pass return redirect(url_for("admin.subjects")) @admin_bp.route("/subjects/update/", methods=["POST"]) @admin_required def update_subject(id): subject = Subject.query.get_or_404(id) name = request.form.get("name", "").strip() if not name: return redirect(url_for("admin.subjects")) existing = Subject.query.filter_by(name=name).first() if existing and existing.id != subject.id: return redirect(url_for("admin.subjects")) subject.name = name db.session.commit() return redirect(url_for("admin.subjects")) @admin_bp.route("/subjects/delete/", methods=["POST"]) @admin_required def delete_subject(id): subject = Subject.query.get_or_404(id) db.session.delete(subject) db.session.commit() return redirect(url_for("admin.subjects")) @admin_bp.route("/note-types") @admin_required def note_types(): note_types = NoteType.query.order_by(NoteType.name.asc()).all() return render_template("admin/note_types.html", note_types=note_types) @admin_bp.route("/note-types/add", methods=["POST"]) @admin_required def add_note_type(): name = request.form.get("name", "").strip() if name: existing = NoteType.query.filter_by(name=name).first() if not existing: note_type = NoteType(name=name) db.session.add(note_type) db.session.commit() else: pass return redirect(url_for("admin.note_types")) @admin_bp.route("/note-types/update/", methods=["POST"]) @admin_required def update_note_type(id): note_type = NoteType.query.get_or_404(id) name = request.form.get("name", "").strip() if not name: return redirect(url_for("admin.note_types")) existing = NoteType.query.filter_by(name=name).first() if existing and existing.id != note_type.id: return redirect(url_for("admin.note_types")) note_type.name = name db.session.commit() return redirect(url_for("admin.note_types")) @admin_bp.route("/note-types/delete/", methods=["POST"]) @admin_required def delete_note_type(id): note_type = NoteType.query.get_or_404(id) db.session.delete(note_type) db.session.commit() return redirect(url_for("admin.note_types")) @admin_bp.route("/notes") @admin_required def notes(): notes = Note.query.order_by(Note.created_at.desc()).all() subjects = Subject.query.order_by(Subject.name.asc()).all() note_types = NoteType.query.order_by(NoteType.name.asc()).all() return render_template( "admin/notes.html", notes=notes, subjects=subjects, note_types=note_types ) @admin_bp.route("/notes/update/", methods=["POST"]) @admin_required def update_note(id): note = Note.query.get_or_404(id) title = request.form.get("title", "").strip() description = request.form.get("description", "").strip() subject_id = request.form.get("subject_id") note_type_id = request.form.get("note_type_id") link = request.form.get("link", "").strip() if not title or not subject_id or not note_type_id: return redirect(url_for("admin.notes")) subject = Subject.query.get(subject_id) note_type = NoteType.query.get(note_type_id) if not subject or not note_type: return redirect(url_for("admin.notes")) note.title = title note.description = description or None note.subject_id = subject.id note.note_type_id = note_type.id if note.original_link is not None and link: note.link = link note.original_link = link db.session.commit() return redirect(url_for("admin.notes")) @admin_bp.route("/notes/delete/", methods=["POST"]) @admin_required def delete_note(id): note = Note.query.get_or_404(id) db.session.delete(note) db.session.commit() return redirect(url_for("admin.notes")) @admin_bp.route("/users") @admin_required def users(): users = User.query.all() return render_template("admin/users.html", users=users) @admin_bp.route("/users/toggle-admin/", methods=["POST"]) @admin_required def toggle_admin(id): user = User.query.get_or_404(id) if user.id == current_user.id: pass else: user.is_admin = not user.is_admin db.session.commit() return redirect(url_for("admin.users")) @admin_bp.route("/export-db") @admin_required def export_db(): uri = current_app.config.get("SQLALCHEMY_DATABASE_URI", "") try: parsed = make_url(uri) except Exception: abort(400, "Invalid database URL.") if parsed.drivername != "sqlite": abort(400, "Export endpoint only supports SQLite source databases.") if not parsed.database or parsed.database == ":memory:": abort(400, "In-memory SQLite database cannot be exported.") db_path = parsed.database if not os.path.isabs(db_path): db_path = os.path.abspath(db_path) if not os.path.exists(db_path): abort(404, "Database file not found.") fd, backup_path = tempfile.mkstemp(prefix="porahobe_backup_", suffix=".db") os.close(fd) src = sqlite3.connect(db_path) dst = sqlite3.connect(backup_path) try: src.backup(dst) finally: src.close() dst.close() @after_this_request def _cleanup(response): try: os.remove(backup_path) except OSError: pass return response filename = f"porahobe_backup_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.db" return send_file(backup_path, as_attachment=True, download_name=filename)