from flask import ( Flask, render_template, request, redirect, url_for, abort, flash, session, send_from_directory, ) from werkzeug.utils import secure_filename from werkzeug.security import check_password_hash, generate_password_hash from models import db, Board, Post, BannedIP, PostFile, User from forms import PostForm from datetime import datetime import os import uuid import re from markupsafe import Markup from functools import wraps app = Flask(__name__) app.config["SECRET_KEY"] = "dev-secret-key" # Change in production os.makedirs(app.instance_path, exist_ok=True) app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///" + os.path.join( app.instance_path, "ghostboard.db" ) app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False app.config["UPLOAD_FOLDER"] = os.path.join(app.root_path, "static/uploads") os.makedirs(app.config["UPLOAD_FOLDER"], exist_ok=True) app.config["MAX_CONTENT_LENGTH"] = 20 * 1024 * 1024 # 20 MB limit app.config["ALLOWED_EXTENSIONS"] = {"png", "jpg", "jpeg", "gif", "webp", "mp4", "webm"} app.config["ADMIN_PASSWORD_HASH"] = generate_password_hash( "admin" ) # Default password: admin db.init_app(app) def init_db(): db.create_all() # Seed default boards boards = [ {"slug": "g", "name": "Général", "description": "Discussion générale"}, { "slug": "tech", "name": "Technologie", "description": "Ordinateurs, programmation, gadgets", }, {"slug": "art", "name": "Création", "description": "Art, design, musique"}, {"slug": "news", "name": "Actualités", "description": "Événements actuels"}, ] for b_data in boards: if not Board.query.filter_by(slug=b_data["slug"]).first(): board = Board( slug=b_data["slug"], name=b_data["name"], description=b_data["description"], ) db.session.add(board) db.session.commit() # Auto-initialize database if it doesn't exist with app.app_context(): if not os.path.exists(os.path.join(app.instance_path, "ghostboard.db")): init_db() def allowed_file(filename): return ( "." in filename and filename.rsplit(".", 1)[1].lower() in app.config["ALLOWED_EXTENSIONS"] ) def save_file(file): if file and allowed_file(file.filename): original_filename = secure_filename(file.filename) extension = original_filename.rsplit(".", 1)[1].lower() new_filename = f"{uuid.uuid4().hex}.{extension}" file.save(os.path.join(app.config["UPLOAD_FOLDER"], new_filename)) return new_filename, original_filename return None, None def delete_post_files(post): for file in post.files: path = os.path.join(app.config["UPLOAD_FOLDER"], file.filename) if os.path.exists(path): os.remove(path) db.session.delete(file) def delete_thread_content(thread): # Delete all replies replies = Post.query.filter_by(thread_id=thread.id).all() for reply in replies: delete_post_files(reply) db.session.delete(reply) delete_post_files(thread) db.session.delete(thread) def prune_board(board_id): # Limit to 100 threads threads_query = Post.query.filter_by(board_id=board_id, thread_id=None).order_by( Post.updated_at.desc() ) if threads_query.count() > 100: threads_to_delete = threads_query.offset(100).all() for thread in threads_to_delete: delete_thread_content(thread) db.session.commit() def admin_required(f): @wraps(f) def decorated_function(*args, **kwargs): if not session.get("is_admin"): return redirect(url_for("admin_login", next=request.url)) return f(*args, **kwargs) return decorated_function def check_ban(): ip = request.remote_addr ban = BannedIP.query.filter_by(ip_address=ip).first() if ban: abort(403, description=f"Vous êtes banni. Raison : {ban.reason}") @app.cli.command("init-db") def init_db_command(): """Create database tables and seed initial data.""" with app.app_context(): init_db() print("Base de données initialisée.") @app.context_processor def inject_boards(): return dict(global_boards=Board.query.all()) @app.template_filter("format_post") def format_post(content, post_context=None): """ Format post content with quotes and greentext. post_context: dict mapping post_id -> nickname for resolving >>12345 references """ if not content: return "" # Escape HTML first (safety) content = str(Markup.escape(content)) # Link to quotes >>12345 def replace_quote(match): post_id = match.group(1) if post_context and post_id in post_context: nickname = post_context[post_id] return f'@{nickname}' else: # Fallback: try to get from database (slower) post = Post.query.get(int(post_id)) if post and post.user: return f'@{post.user.nickname}' elif post: return f'@{post.nickname}' else: return f'>>{post_id}' content = re.sub(r">>(\d+)", replace_quote, content) # Greentext (lines starting with >) content = re.sub( r"^(>.*)$", r'\1', content, flags=re.MULTILINE, ) # Convert newlines to
content = content.replace("\n", "
") return Markup(content) # API Routes for user management @app.route("/api/set-nickname", methods=["POST"]) def set_nickname(): """Set or update user nickname""" data = request.get_json() nickname = data.get("nickname", "").strip() if not nickname: return {"error": "Pseudo requis"}, 400 if len(nickname) > 50: return {"error": "Pseudo trop long (max 50 caractères)"}, 400 # Check if nickname already exists existing_user = User.query.filter_by(nickname=nickname).first() if existing_user: # If it's the same user, allow it if session.get("user_id") == existing_user.id: return {"success": True, "user_id": existing_user.id, "nickname": nickname} else: return {"error": "Ce pseudo est déjà pris"}, 400 # Get or create user user_id = session.get("user_id") if user_id: user = User.query.get(user_id) if user: user.nickname = nickname user.last_seen_at = datetime.utcnow() else: # Session invalid, create new user user = User(nickname=nickname) db.session.add(user) else: user = User(nickname=nickname) db.session.add(user) db.session.commit() session["user_id"] = user.id session["nickname"] = user.nickname return {"success": True, "user_id": user.id, "nickname": nickname} @app.route("/api/me") def get_current_user(): """Get current user info""" user_id = session.get("user_id") if user_id: user = User.query.get(user_id) if user: user.last_seen_at = datetime.utcnow() db.session.commit() return {"user_id": user.id, "nickname": user.nickname} return {"user_id": None, "nickname": None} @app.route("/user/") def user_profile(user_id): """Show user profile and posts""" user = User.query.get_or_404(user_id) posts = ( Post.query.filter_by(user_id=user_id) .order_by(Post.created_at.desc()) .limit(50) .all() ) return render_template("user_profile.html", user=user, posts=posts) @app.route("/manifest.json") def manifest(): return send_from_directory("static", "manifest.json") @app.route("/sw.js") def service_worker(): return send_from_directory("static", "sw.js") @app.route("/") def index(): boards = Board.query.all() return render_template("index.html", boards=boards) @app.route("//", methods=["GET", "POST"]) def board(slug): board = Board.query.filter_by(slug=slug).first_or_404() form = PostForm() if form.validate_on_submit(): check_ban() # Get user_id from session or use anonymous user_id = session.get("user_id") nickname = "Anonyme" if user_id: user = User.query.get(user_id) if user: nickname = user.nickname else: user_id = None # Invalid user_id else: # Anonymous post nickname = form.nickname.data or "Anonyme" # Create new thread post = Post( board_id=board.id, user_id=user_id, content=form.content.data, nickname=nickname, ip_address=request.remote_addr, ) db.session.add(post) db.session.commit() # Handle multiple files if form.files.data: for f in form.files.data: filename, original_filename = save_file(f) if filename: post_file = PostFile( post_id=post.id, filename=filename, original_filename=original_filename, ) db.session.add(post_file) db.session.commit() # Prune board prune_board(board.id) return redirect(url_for("board", slug=slug)) # Get threads (posts with no thread_id), sorted by updated_at (bumping) threads = ( Post.query.filter_by(board_id=board.id, thread_id=None) .order_by(Post.updated_at.desc()) .limit(50) .all() ) # Create post context for format_post filter # Include all threads and their recent replies post_context = {} for thread in threads: if thread.user: post_context[str(thread.id)] = thread.user.nickname else: post_context[str(thread.id)] = thread.nickname # Also include recent replies for this thread recent_replies = ( Post.query.filter_by(thread_id=thread.id) .order_by(Post.created_at.desc()) .limit(3) .all() ) for reply in recent_replies: if reply.user: post_context[str(reply.id)] = reply.user.nickname else: post_context[str(reply.id)] = reply.nickname return render_template( "board.html", board=board, threads=threads, form=form, post_context=post_context ) @app.route("/thread//", methods=["GET", "POST"]) def thread(thread_id): thread_post = Post.query.get_or_404(thread_id) if thread_post.thread_id is not None: # If it's a reply, redirect to the main thread return redirect(url_for("thread", thread_id=thread_post.thread_id)) form = PostForm() if form.validate_on_submit(): check_ban() # Get user_id from session or use anonymous user_id = session.get("user_id") nickname = "Anonyme" if user_id: user = User.query.get(user_id) if user: nickname = user.nickname else: user_id = None # Invalid user_id else: # Anonymous post nickname = form.nickname.data or "Anonyme" # Create reply reply = Post( board_id=thread_post.board_id, thread_id=thread_post.id, user_id=user_id, content=form.content.data, nickname=nickname, ip_address=request.remote_addr, ) db.session.add(reply) # Handle multiple files if form.files.data: for f in form.files.data: filename, original_filename = save_file(f) if filename: post_file = PostFile( post_id=reply.id, filename=filename, original_filename=original_filename, ) db.session.add(post_file) # Bump the thread thread_post.updated_at = datetime.utcnow() db.session.commit() return redirect(url_for("thread", thread_id=thread_post.id)) replies = ( Post.query.filter_by(thread_id=thread_post.id) .order_by(Post.created_at.asc()) .all() ) # Create post context for format_post filter (maps post_id -> nickname) post_context = {} all_posts = [thread_post] + replies for post in all_posts: if post.user: post_context[str(post.id)] = post.user.nickname else: post_context[str(post.id)] = post.nickname return render_template( "thread.html", thread=thread_post, replies=replies, board=thread_post.board, form=form, post_context=post_context, ) # Admin Routes @app.route("/admin/login", methods=["GET", "POST"]) def admin_login(): if request.method == "POST": password = request.form.get("password") if check_password_hash(app.config["ADMIN_PASSWORD_HASH"], password): session["is_admin"] = True return redirect(url_for("admin_dashboard")) else: flash("Mot de passe invalide") return render_template("admin_login.html") @app.route("/admin/logout") def admin_logout(): session.pop("is_admin", None) return redirect(url_for("index")) @app.route("/admin") @admin_required def admin_dashboard(): # Show recent posts (both threads and replies) for moderation recent_posts = Post.query.order_by(Post.created_at.desc()).limit(100).all() return render_template("admin.html", posts=recent_posts) @app.route("/admin/delete/", methods=["POST"]) @admin_required def delete_post(post_id): post = Post.query.get_or_404(post_id) if post.thread_id is None: # It's a thread starter, delete whole thread delete_thread_content(post) else: # It's a reply delete_post_files(post) db.session.delete(post) db.session.commit() flash("Post supprimé") return redirect(request.referrer or url_for("admin_dashboard")) @app.route("/admin/ban/", methods=["POST"]) @admin_required def ban_ip(post_id): post = Post.query.get_or_404(post_id) if not BannedIP.query.filter_by(ip_address=post.ip_address).first(): ban = BannedIP(ip_address=post.ip_address, reason="Banni par l'administrateur") db.session.add(ban) db.session.commit() flash(f"IP {post.ip_address} bannie") else: flash(f"L'IP {post.ip_address} est déjà bannie") return redirect(request.referrer or url_for("admin_dashboard")) @app.route("/admin/boards") @admin_required def admin_boards(): boards = Board.query.all() return render_template("admin_boards.html", boards=boards) @app.route("/admin/boards/add", methods=["POST"]) @admin_required def admin_add_board(): slug = request.form.get("slug") name = request.form.get("name") description = request.form.get("description") if slug and name: if not Board.query.filter_by(slug=slug).first(): board = Board(slug=slug, name=name, description=description) db.session.add(board) db.session.commit() flash("Planche ajoutée") else: flash("Ce slug existe déjà") else: flash("Données manquantes") return redirect(url_for("admin_boards")) @app.route("/admin/boards/delete/", methods=["POST"]) @admin_required def admin_delete_board(board_id): board = Board.query.get_or_404(board_id) # Delete all posts in board posts = Post.query.filter_by(board_id=board.id, thread_id=None).all() for post in posts: delete_thread_content(post) db.session.delete(board) db.session.commit() flash("Planche supprimée") return redirect(url_for("admin_boards")) if __name__ == "__main__": app.run(debug=True)