an / app.py
Youssouf ⚜️
h
35dd900
from flask import (
Flask,
render_template,
request,
redirect,
url_for,
abort,
flash,
session,
send_from_directory,
)
from werkzeug.utils import secure_filename
from werkzeug.security import check_password_hash, generate_password_hash
from models import db, Board, Post, BannedIP, PostFile, User
from forms import PostForm
from datetime import datetime
import os
import uuid
import re
from markupsafe import Markup
from functools import wraps
app = Flask(__name__)
app.config["SECRET_KEY"] = "dev-secret-key" # Change in production
os.makedirs(app.instance_path, exist_ok=True)
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///" + os.path.join(
app.instance_path, "ghostboard.db"
)
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config["UPLOAD_FOLDER"] = os.path.join(app.root_path, "static/uploads")
os.makedirs(app.config["UPLOAD_FOLDER"], exist_ok=True)
app.config["MAX_CONTENT_LENGTH"] = 20 * 1024 * 1024 # 20 MB limit
app.config["ALLOWED_EXTENSIONS"] = {"png", "jpg", "jpeg", "gif", "webp", "mp4", "webm"}
app.config["ADMIN_PASSWORD_HASH"] = generate_password_hash(
"admin"
) # Default password: admin
db.init_app(app)
def init_db():
db.create_all()
# Seed default boards
boards = [
{"slug": "g", "name": "Général", "description": "Discussion générale"},
{
"slug": "tech",
"name": "Technologie",
"description": "Ordinateurs, programmation, gadgets",
},
{"slug": "art", "name": "Création", "description": "Art, design, musique"},
{"slug": "news", "name": "Actualités", "description": "Événements actuels"},
]
for b_data in boards:
if not Board.query.filter_by(slug=b_data["slug"]).first():
board = Board(
slug=b_data["slug"],
name=b_data["name"],
description=b_data["description"],
)
db.session.add(board)
db.session.commit()
# Auto-initialize database if it doesn't exist
with app.app_context():
if not os.path.exists(os.path.join(app.instance_path, "ghostboard.db")):
init_db()
def allowed_file(filename):
return (
"." in filename
and filename.rsplit(".", 1)[1].lower() in app.config["ALLOWED_EXTENSIONS"]
)
def save_file(file):
if file and allowed_file(file.filename):
original_filename = secure_filename(file.filename)
extension = original_filename.rsplit(".", 1)[1].lower()
new_filename = f"{uuid.uuid4().hex}.{extension}"
file.save(os.path.join(app.config["UPLOAD_FOLDER"], new_filename))
return new_filename, original_filename
return None, None
def delete_post_files(post):
for file in post.files:
path = os.path.join(app.config["UPLOAD_FOLDER"], file.filename)
if os.path.exists(path):
os.remove(path)
db.session.delete(file)
def delete_thread_content(thread):
# Delete all replies
replies = Post.query.filter_by(thread_id=thread.id).all()
for reply in replies:
delete_post_files(reply)
db.session.delete(reply)
delete_post_files(thread)
db.session.delete(thread)
def prune_board(board_id):
# Limit to 100 threads
threads_query = Post.query.filter_by(board_id=board_id, thread_id=None).order_by(
Post.updated_at.desc()
)
if threads_query.count() > 100:
threads_to_delete = threads_query.offset(100).all()
for thread in threads_to_delete:
delete_thread_content(thread)
db.session.commit()
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not session.get("is_admin"):
return redirect(url_for("admin_login", next=request.url))
return f(*args, **kwargs)
return decorated_function
def check_ban():
ip = request.remote_addr
ban = BannedIP.query.filter_by(ip_address=ip).first()
if ban:
abort(403, description=f"Vous êtes banni. Raison : {ban.reason}")
@app.cli.command("init-db")
def init_db_command():
"""Create database tables and seed initial data."""
with app.app_context():
init_db()
print("Base de données initialisée.")
@app.context_processor
def inject_boards():
return dict(global_boards=Board.query.all())
@app.template_filter("format_post")
def format_post(content, post_context=None):
"""
Format post content with quotes and greentext.
post_context: dict mapping post_id -> nickname for resolving >>12345 references
"""
if not content:
return ""
# Escape HTML first (safety)
content = str(Markup.escape(content))
# Link to quotes >>12345
def replace_quote(match):
post_id = match.group(1)
if post_context and post_id in post_context:
nickname = post_context[post_id]
return f'<a href="#post-{post_id}" class="text-blue-400 hover:underline" onclick="highlightPost({post_id})">@<span class="font-semibold">{nickname}</span></a>'
else:
# Fallback: try to get from database (slower)
post = Post.query.get(int(post_id))
if post and post.user:
return f'<a href="#post-{post_id}" class="text-blue-400 hover:underline" onclick="highlightPost({post_id})">@<span class="font-semibold">{post.user.nickname}</span></a>'
elif post:
return f'<a href="#post-{post_id}" class="text-blue-400 hover:underline" onclick="highlightPost({post_id})">@<span class="font-semibold">{post.nickname}</span></a>'
else:
return f'<a href="#post-{post_id}" class="text-blue-400 hover:underline" onclick="highlightPost({post_id})">&gt;&gt;{post_id}</a>'
content = re.sub(r"&gt;&gt;(\d+)", replace_quote, content)
# Greentext (lines starting with >)
content = re.sub(
r"^(&gt;.*)$",
r'<span class="text-green-400">\1</span>',
content,
flags=re.MULTILINE,
)
# Convert newlines to <br>
content = content.replace("\n", "<br>")
return Markup(content)
# API Routes for user management
@app.route("/api/set-nickname", methods=["POST"])
def set_nickname():
"""Set or update user nickname"""
data = request.get_json()
nickname = data.get("nickname", "").strip()
if not nickname:
return {"error": "Pseudo requis"}, 400
if len(nickname) > 50:
return {"error": "Pseudo trop long (max 50 caractères)"}, 400
# Check if nickname already exists
existing_user = User.query.filter_by(nickname=nickname).first()
if existing_user:
# If it's the same user, allow it
if session.get("user_id") == existing_user.id:
return {"success": True, "user_id": existing_user.id, "nickname": nickname}
else:
return {"error": "Ce pseudo est déjà pris"}, 400
# Get or create user
user_id = session.get("user_id")
if user_id:
user = User.query.get(user_id)
if user:
user.nickname = nickname
user.last_seen_at = datetime.utcnow()
else:
# Session invalid, create new user
user = User(nickname=nickname)
db.session.add(user)
else:
user = User(nickname=nickname)
db.session.add(user)
db.session.commit()
session["user_id"] = user.id
session["nickname"] = user.nickname
return {"success": True, "user_id": user.id, "nickname": nickname}
@app.route("/api/me")
def get_current_user():
"""Get current user info"""
user_id = session.get("user_id")
if user_id:
user = User.query.get(user_id)
if user:
user.last_seen_at = datetime.utcnow()
db.session.commit()
return {"user_id": user.id, "nickname": user.nickname}
return {"user_id": None, "nickname": None}
@app.route("/user/<int:user_id>")
def user_profile(user_id):
"""Show user profile and posts"""
user = User.query.get_or_404(user_id)
posts = (
Post.query.filter_by(user_id=user_id)
.order_by(Post.created_at.desc())
.limit(50)
.all()
)
return render_template("user_profile.html", user=user, posts=posts)
@app.route("/manifest.json")
def manifest():
return send_from_directory("static", "manifest.json")
@app.route("/sw.js")
def service_worker():
return send_from_directory("static", "sw.js")
@app.route("/")
def index():
boards = Board.query.all()
return render_template("index.html", boards=boards)
@app.route("/<slug>/", methods=["GET", "POST"])
def board(slug):
board = Board.query.filter_by(slug=slug).first_or_404()
form = PostForm()
if form.validate_on_submit():
check_ban()
# Get user_id from session or use anonymous
user_id = session.get("user_id")
nickname = "Anonyme"
if user_id:
user = User.query.get(user_id)
if user:
nickname = user.nickname
else:
user_id = None # Invalid user_id
else:
# Anonymous post
nickname = form.nickname.data or "Anonyme"
# Create new thread
post = Post(
board_id=board.id,
user_id=user_id,
content=form.content.data,
nickname=nickname,
ip_address=request.remote_addr,
)
db.session.add(post)
db.session.commit()
# Handle multiple files
if form.files.data:
for f in form.files.data:
filename, original_filename = save_file(f)
if filename:
post_file = PostFile(
post_id=post.id,
filename=filename,
original_filename=original_filename,
)
db.session.add(post_file)
db.session.commit()
# Prune board
prune_board(board.id)
return redirect(url_for("board", slug=slug))
# Get threads (posts with no thread_id), sorted by updated_at (bumping)
threads = (
Post.query.filter_by(board_id=board.id, thread_id=None)
.order_by(Post.updated_at.desc())
.limit(50)
.all()
)
# Create post context for format_post filter
# Include all threads and their recent replies
post_context = {}
for thread in threads:
if thread.user:
post_context[str(thread.id)] = thread.user.nickname
else:
post_context[str(thread.id)] = thread.nickname
# Also include recent replies for this thread
recent_replies = (
Post.query.filter_by(thread_id=thread.id)
.order_by(Post.created_at.desc())
.limit(3)
.all()
)
for reply in recent_replies:
if reply.user:
post_context[str(reply.id)] = reply.user.nickname
else:
post_context[str(reply.id)] = reply.nickname
return render_template(
"board.html", board=board, threads=threads, form=form, post_context=post_context
)
@app.route("/thread/<int:thread_id>/", methods=["GET", "POST"])
def thread(thread_id):
thread_post = Post.query.get_or_404(thread_id)
if thread_post.thread_id is not None:
# If it's a reply, redirect to the main thread
return redirect(url_for("thread", thread_id=thread_post.thread_id))
form = PostForm()
if form.validate_on_submit():
check_ban()
# Get user_id from session or use anonymous
user_id = session.get("user_id")
nickname = "Anonyme"
if user_id:
user = User.query.get(user_id)
if user:
nickname = user.nickname
else:
user_id = None # Invalid user_id
else:
# Anonymous post
nickname = form.nickname.data or "Anonyme"
# Create reply
reply = Post(
board_id=thread_post.board_id,
thread_id=thread_post.id,
user_id=user_id,
content=form.content.data,
nickname=nickname,
ip_address=request.remote_addr,
)
db.session.add(reply)
# Handle multiple files
if form.files.data:
for f in form.files.data:
filename, original_filename = save_file(f)
if filename:
post_file = PostFile(
post_id=reply.id,
filename=filename,
original_filename=original_filename,
)
db.session.add(post_file)
# Bump the thread
thread_post.updated_at = datetime.utcnow()
db.session.commit()
return redirect(url_for("thread", thread_id=thread_post.id))
replies = (
Post.query.filter_by(thread_id=thread_post.id)
.order_by(Post.created_at.asc())
.all()
)
# Create post context for format_post filter (maps post_id -> nickname)
post_context = {}
all_posts = [thread_post] + replies
for post in all_posts:
if post.user:
post_context[str(post.id)] = post.user.nickname
else:
post_context[str(post.id)] = post.nickname
return render_template(
"thread.html",
thread=thread_post,
replies=replies,
board=thread_post.board,
form=form,
post_context=post_context,
)
# Admin Routes
@app.route("/admin/login", methods=["GET", "POST"])
def admin_login():
if request.method == "POST":
password = request.form.get("password")
if check_password_hash(app.config["ADMIN_PASSWORD_HASH"], password):
session["is_admin"] = True
return redirect(url_for("admin_dashboard"))
else:
flash("Mot de passe invalide")
return render_template("admin_login.html")
@app.route("/admin/logout")
def admin_logout():
session.pop("is_admin", None)
return redirect(url_for("index"))
@app.route("/admin")
@admin_required
def admin_dashboard():
# Show recent posts (both threads and replies) for moderation
recent_posts = Post.query.order_by(Post.created_at.desc()).limit(100).all()
return render_template("admin.html", posts=recent_posts)
@app.route("/admin/delete/<int:post_id>", methods=["POST"])
@admin_required
def delete_post(post_id):
post = Post.query.get_or_404(post_id)
if post.thread_id is None:
# It's a thread starter, delete whole thread
delete_thread_content(post)
else:
# It's a reply
delete_post_files(post)
db.session.delete(post)
db.session.commit()
flash("Post supprimé")
return redirect(request.referrer or url_for("admin_dashboard"))
@app.route("/admin/ban/<int:post_id>", methods=["POST"])
@admin_required
def ban_ip(post_id):
post = Post.query.get_or_404(post_id)
if not BannedIP.query.filter_by(ip_address=post.ip_address).first():
ban = BannedIP(ip_address=post.ip_address, reason="Banni par l'administrateur")
db.session.add(ban)
db.session.commit()
flash(f"IP {post.ip_address} bannie")
else:
flash(f"L'IP {post.ip_address} est déjà bannie")
return redirect(request.referrer or url_for("admin_dashboard"))
@app.route("/admin/boards")
@admin_required
def admin_boards():
boards = Board.query.all()
return render_template("admin_boards.html", boards=boards)
@app.route("/admin/boards/add", methods=["POST"])
@admin_required
def admin_add_board():
slug = request.form.get("slug")
name = request.form.get("name")
description = request.form.get("description")
if slug and name:
if not Board.query.filter_by(slug=slug).first():
board = Board(slug=slug, name=name, description=description)
db.session.add(board)
db.session.commit()
flash("Planche ajoutée")
else:
flash("Ce slug existe déjà")
else:
flash("Données manquantes")
return redirect(url_for("admin_boards"))
@app.route("/admin/boards/delete/<int:board_id>", methods=["POST"])
@admin_required
def admin_delete_board(board_id):
board = Board.query.get_or_404(board_id)
# Delete all posts in board
posts = Post.query.filter_by(board_id=board.id, thread_id=None).all()
for post in posts:
delete_thread_content(post)
db.session.delete(board)
db.session.commit()
flash("Planche supprimée")
return redirect(url_for("admin_boards"))
if __name__ == "__main__":
app.run(debug=True)