|
|
from flask import ( |
|
|
Flask, |
|
|
render_template, |
|
|
request, |
|
|
redirect, |
|
|
url_for, |
|
|
abort, |
|
|
flash, |
|
|
session, |
|
|
send_from_directory, |
|
|
) |
|
|
from werkzeug.utils import secure_filename |
|
|
from werkzeug.security import check_password_hash, generate_password_hash |
|
|
from models import db, Board, Post, BannedIP, PostFile, User |
|
|
from forms import PostForm |
|
|
from datetime import datetime |
|
|
import os |
|
|
import uuid |
|
|
import re |
|
|
from markupsafe import Markup |
|
|
from functools import wraps |
|
|
|
|
|
app = Flask(__name__) |
|
|
app.config["SECRET_KEY"] = "dev-secret-key" |
|
|
os.makedirs(app.instance_path, exist_ok=True) |
|
|
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///" + os.path.join( |
|
|
app.instance_path, "ghostboard.db" |
|
|
) |
|
|
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False |
|
|
app.config["UPLOAD_FOLDER"] = os.path.join(app.root_path, "static/uploads") |
|
|
os.makedirs(app.config["UPLOAD_FOLDER"], exist_ok=True) |
|
|
app.config["MAX_CONTENT_LENGTH"] = 20 * 1024 * 1024 |
|
|
app.config["ALLOWED_EXTENSIONS"] = {"png", "jpg", "jpeg", "gif", "webp", "mp4", "webm"} |
|
|
app.config["ADMIN_PASSWORD_HASH"] = generate_password_hash( |
|
|
"admin" |
|
|
) |
|
|
|
|
|
db.init_app(app) |
|
|
|
|
|
|
|
|
def init_db(): |
|
|
db.create_all() |
|
|
|
|
|
|
|
|
boards = [ |
|
|
{"slug": "g", "name": "Général", "description": "Discussion générale"}, |
|
|
{ |
|
|
"slug": "tech", |
|
|
"name": "Technologie", |
|
|
"description": "Ordinateurs, programmation, gadgets", |
|
|
}, |
|
|
{"slug": "art", "name": "Création", "description": "Art, design, musique"}, |
|
|
{"slug": "news", "name": "Actualités", "description": "Événements actuels"}, |
|
|
] |
|
|
|
|
|
for b_data in boards: |
|
|
if not Board.query.filter_by(slug=b_data["slug"]).first(): |
|
|
board = Board( |
|
|
slug=b_data["slug"], |
|
|
name=b_data["name"], |
|
|
description=b_data["description"], |
|
|
) |
|
|
db.session.add(board) |
|
|
|
|
|
db.session.commit() |
|
|
|
|
|
|
|
|
|
|
|
with app.app_context(): |
|
|
if not os.path.exists(os.path.join(app.instance_path, "ghostboard.db")): |
|
|
init_db() |
|
|
|
|
|
|
|
|
def allowed_file(filename): |
|
|
return ( |
|
|
"." in filename |
|
|
and filename.rsplit(".", 1)[1].lower() in app.config["ALLOWED_EXTENSIONS"] |
|
|
) |
|
|
|
|
|
|
|
|
def save_file(file): |
|
|
if file and allowed_file(file.filename): |
|
|
original_filename = secure_filename(file.filename) |
|
|
extension = original_filename.rsplit(".", 1)[1].lower() |
|
|
new_filename = f"{uuid.uuid4().hex}.{extension}" |
|
|
file.save(os.path.join(app.config["UPLOAD_FOLDER"], new_filename)) |
|
|
return new_filename, original_filename |
|
|
return None, None |
|
|
|
|
|
|
|
|
def delete_post_files(post): |
|
|
for file in post.files: |
|
|
path = os.path.join(app.config["UPLOAD_FOLDER"], file.filename) |
|
|
if os.path.exists(path): |
|
|
os.remove(path) |
|
|
db.session.delete(file) |
|
|
|
|
|
|
|
|
def delete_thread_content(thread): |
|
|
|
|
|
replies = Post.query.filter_by(thread_id=thread.id).all() |
|
|
for reply in replies: |
|
|
delete_post_files(reply) |
|
|
db.session.delete(reply) |
|
|
|
|
|
delete_post_files(thread) |
|
|
db.session.delete(thread) |
|
|
|
|
|
|
|
|
def prune_board(board_id): |
|
|
|
|
|
threads_query = Post.query.filter_by(board_id=board_id, thread_id=None).order_by( |
|
|
Post.updated_at.desc() |
|
|
) |
|
|
if threads_query.count() > 100: |
|
|
threads_to_delete = threads_query.offset(100).all() |
|
|
for thread in threads_to_delete: |
|
|
delete_thread_content(thread) |
|
|
db.session.commit() |
|
|
|
|
|
|
|
|
def admin_required(f): |
|
|
@wraps(f) |
|
|
def decorated_function(*args, **kwargs): |
|
|
if not session.get("is_admin"): |
|
|
return redirect(url_for("admin_login", next=request.url)) |
|
|
return f(*args, **kwargs) |
|
|
|
|
|
return decorated_function |
|
|
|
|
|
|
|
|
def check_ban(): |
|
|
ip = request.remote_addr |
|
|
ban = BannedIP.query.filter_by(ip_address=ip).first() |
|
|
if ban: |
|
|
abort(403, description=f"Vous êtes banni. Raison : {ban.reason}") |
|
|
|
|
|
|
|
|
@app.cli.command("init-db") |
|
|
def init_db_command(): |
|
|
"""Create database tables and seed initial data.""" |
|
|
with app.app_context(): |
|
|
init_db() |
|
|
print("Base de données initialisée.") |
|
|
|
|
|
|
|
|
@app.context_processor |
|
|
def inject_boards(): |
|
|
return dict(global_boards=Board.query.all()) |
|
|
|
|
|
|
|
|
@app.template_filter("format_post") |
|
|
def format_post(content, post_context=None): |
|
|
""" |
|
|
Format post content with quotes and greentext. |
|
|
post_context: dict mapping post_id -> nickname for resolving >>12345 references |
|
|
""" |
|
|
if not content: |
|
|
return "" |
|
|
|
|
|
|
|
|
content = str(Markup.escape(content)) |
|
|
|
|
|
|
|
|
def replace_quote(match): |
|
|
post_id = match.group(1) |
|
|
if post_context and post_id in post_context: |
|
|
nickname = post_context[post_id] |
|
|
return f'<a href="#post-{post_id}" class="text-blue-400 hover:underline" onclick="highlightPost({post_id})">@<span class="font-semibold">{nickname}</span></a>' |
|
|
else: |
|
|
|
|
|
post = Post.query.get(int(post_id)) |
|
|
if post and post.user: |
|
|
return f'<a href="#post-{post_id}" class="text-blue-400 hover:underline" onclick="highlightPost({post_id})">@<span class="font-semibold">{post.user.nickname}</span></a>' |
|
|
elif post: |
|
|
return f'<a href="#post-{post_id}" class="text-blue-400 hover:underline" onclick="highlightPost({post_id})">@<span class="font-semibold">{post.nickname}</span></a>' |
|
|
else: |
|
|
return f'<a href="#post-{post_id}" class="text-blue-400 hover:underline" onclick="highlightPost({post_id})">>>{post_id}</a>' |
|
|
|
|
|
content = re.sub(r">>(\d+)", replace_quote, content) |
|
|
|
|
|
|
|
|
content = re.sub( |
|
|
r"^(>.*)$", |
|
|
r'<span class="text-green-400">\1</span>', |
|
|
content, |
|
|
flags=re.MULTILINE, |
|
|
) |
|
|
|
|
|
|
|
|
content = content.replace("\n", "<br>") |
|
|
|
|
|
return Markup(content) |
|
|
|
|
|
|
|
|
|
|
|
@app.route("/api/set-nickname", methods=["POST"]) |
|
|
def set_nickname(): |
|
|
"""Set or update user nickname""" |
|
|
data = request.get_json() |
|
|
nickname = data.get("nickname", "").strip() |
|
|
|
|
|
if not nickname: |
|
|
return {"error": "Pseudo requis"}, 400 |
|
|
|
|
|
if len(nickname) > 50: |
|
|
return {"error": "Pseudo trop long (max 50 caractères)"}, 400 |
|
|
|
|
|
|
|
|
existing_user = User.query.filter_by(nickname=nickname).first() |
|
|
if existing_user: |
|
|
|
|
|
if session.get("user_id") == existing_user.id: |
|
|
return {"success": True, "user_id": existing_user.id, "nickname": nickname} |
|
|
else: |
|
|
return {"error": "Ce pseudo est déjà pris"}, 400 |
|
|
|
|
|
|
|
|
user_id = session.get("user_id") |
|
|
if user_id: |
|
|
user = User.query.get(user_id) |
|
|
if user: |
|
|
user.nickname = nickname |
|
|
user.last_seen_at = datetime.utcnow() |
|
|
else: |
|
|
|
|
|
user = User(nickname=nickname) |
|
|
db.session.add(user) |
|
|
else: |
|
|
user = User(nickname=nickname) |
|
|
db.session.add(user) |
|
|
|
|
|
db.session.commit() |
|
|
session["user_id"] = user.id |
|
|
session["nickname"] = user.nickname |
|
|
|
|
|
return {"success": True, "user_id": user.id, "nickname": nickname} |
|
|
|
|
|
|
|
|
@app.route("/api/me") |
|
|
def get_current_user(): |
|
|
"""Get current user info""" |
|
|
user_id = session.get("user_id") |
|
|
if user_id: |
|
|
user = User.query.get(user_id) |
|
|
if user: |
|
|
user.last_seen_at = datetime.utcnow() |
|
|
db.session.commit() |
|
|
return {"user_id": user.id, "nickname": user.nickname} |
|
|
|
|
|
return {"user_id": None, "nickname": None} |
|
|
|
|
|
|
|
|
@app.route("/user/<int:user_id>") |
|
|
def user_profile(user_id): |
|
|
"""Show user profile and posts""" |
|
|
user = User.query.get_or_404(user_id) |
|
|
posts = ( |
|
|
Post.query.filter_by(user_id=user_id) |
|
|
.order_by(Post.created_at.desc()) |
|
|
.limit(50) |
|
|
.all() |
|
|
) |
|
|
return render_template("user_profile.html", user=user, posts=posts) |
|
|
|
|
|
|
|
|
@app.route("/manifest.json") |
|
|
def manifest(): |
|
|
return send_from_directory("static", "manifest.json") |
|
|
|
|
|
|
|
|
@app.route("/sw.js") |
|
|
def service_worker(): |
|
|
return send_from_directory("static", "sw.js") |
|
|
|
|
|
|
|
|
@app.route("/") |
|
|
def index(): |
|
|
boards = Board.query.all() |
|
|
return render_template("index.html", boards=boards) |
|
|
|
|
|
|
|
|
@app.route("/<slug>/", methods=["GET", "POST"]) |
|
|
def board(slug): |
|
|
board = Board.query.filter_by(slug=slug).first_or_404() |
|
|
form = PostForm() |
|
|
|
|
|
if form.validate_on_submit(): |
|
|
check_ban() |
|
|
|
|
|
|
|
|
user_id = session.get("user_id") |
|
|
nickname = "Anonyme" |
|
|
if user_id: |
|
|
user = User.query.get(user_id) |
|
|
if user: |
|
|
nickname = user.nickname |
|
|
else: |
|
|
user_id = None |
|
|
else: |
|
|
|
|
|
nickname = form.nickname.data or "Anonyme" |
|
|
|
|
|
|
|
|
post = Post( |
|
|
board_id=board.id, |
|
|
user_id=user_id, |
|
|
content=form.content.data, |
|
|
nickname=nickname, |
|
|
ip_address=request.remote_addr, |
|
|
) |
|
|
db.session.add(post) |
|
|
db.session.commit() |
|
|
|
|
|
|
|
|
if form.files.data: |
|
|
for f in form.files.data: |
|
|
filename, original_filename = save_file(f) |
|
|
if filename: |
|
|
post_file = PostFile( |
|
|
post_id=post.id, |
|
|
filename=filename, |
|
|
original_filename=original_filename, |
|
|
) |
|
|
db.session.add(post_file) |
|
|
db.session.commit() |
|
|
|
|
|
|
|
|
prune_board(board.id) |
|
|
|
|
|
return redirect(url_for("board", slug=slug)) |
|
|
|
|
|
|
|
|
threads = ( |
|
|
Post.query.filter_by(board_id=board.id, thread_id=None) |
|
|
.order_by(Post.updated_at.desc()) |
|
|
.limit(50) |
|
|
.all() |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
post_context = {} |
|
|
for thread in threads: |
|
|
if thread.user: |
|
|
post_context[str(thread.id)] = thread.user.nickname |
|
|
else: |
|
|
post_context[str(thread.id)] = thread.nickname |
|
|
|
|
|
|
|
|
recent_replies = ( |
|
|
Post.query.filter_by(thread_id=thread.id) |
|
|
.order_by(Post.created_at.desc()) |
|
|
.limit(3) |
|
|
.all() |
|
|
) |
|
|
for reply in recent_replies: |
|
|
if reply.user: |
|
|
post_context[str(reply.id)] = reply.user.nickname |
|
|
else: |
|
|
post_context[str(reply.id)] = reply.nickname |
|
|
|
|
|
return render_template( |
|
|
"board.html", board=board, threads=threads, form=form, post_context=post_context |
|
|
) |
|
|
|
|
|
|
|
|
@app.route("/thread/<int:thread_id>/", methods=["GET", "POST"]) |
|
|
def thread(thread_id): |
|
|
thread_post = Post.query.get_or_404(thread_id) |
|
|
if thread_post.thread_id is not None: |
|
|
|
|
|
return redirect(url_for("thread", thread_id=thread_post.thread_id)) |
|
|
|
|
|
form = PostForm() |
|
|
if form.validate_on_submit(): |
|
|
check_ban() |
|
|
|
|
|
|
|
|
user_id = session.get("user_id") |
|
|
nickname = "Anonyme" |
|
|
if user_id: |
|
|
user = User.query.get(user_id) |
|
|
if user: |
|
|
nickname = user.nickname |
|
|
else: |
|
|
user_id = None |
|
|
else: |
|
|
|
|
|
nickname = form.nickname.data or "Anonyme" |
|
|
|
|
|
|
|
|
reply = Post( |
|
|
board_id=thread_post.board_id, |
|
|
thread_id=thread_post.id, |
|
|
user_id=user_id, |
|
|
content=form.content.data, |
|
|
nickname=nickname, |
|
|
ip_address=request.remote_addr, |
|
|
) |
|
|
db.session.add(reply) |
|
|
|
|
|
|
|
|
if form.files.data: |
|
|
for f in form.files.data: |
|
|
filename, original_filename = save_file(f) |
|
|
if filename: |
|
|
post_file = PostFile( |
|
|
post_id=reply.id, |
|
|
filename=filename, |
|
|
original_filename=original_filename, |
|
|
) |
|
|
db.session.add(post_file) |
|
|
|
|
|
|
|
|
thread_post.updated_at = datetime.utcnow() |
|
|
|
|
|
db.session.commit() |
|
|
return redirect(url_for("thread", thread_id=thread_post.id)) |
|
|
|
|
|
replies = ( |
|
|
Post.query.filter_by(thread_id=thread_post.id) |
|
|
.order_by(Post.created_at.asc()) |
|
|
.all() |
|
|
) |
|
|
|
|
|
|
|
|
post_context = {} |
|
|
all_posts = [thread_post] + replies |
|
|
for post in all_posts: |
|
|
if post.user: |
|
|
post_context[str(post.id)] = post.user.nickname |
|
|
else: |
|
|
post_context[str(post.id)] = post.nickname |
|
|
|
|
|
return render_template( |
|
|
"thread.html", |
|
|
thread=thread_post, |
|
|
replies=replies, |
|
|
board=thread_post.board, |
|
|
form=form, |
|
|
post_context=post_context, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
@app.route("/admin/login", methods=["GET", "POST"]) |
|
|
def admin_login(): |
|
|
if request.method == "POST": |
|
|
password = request.form.get("password") |
|
|
if check_password_hash(app.config["ADMIN_PASSWORD_HASH"], password): |
|
|
session["is_admin"] = True |
|
|
return redirect(url_for("admin_dashboard")) |
|
|
else: |
|
|
flash("Mot de passe invalide") |
|
|
return render_template("admin_login.html") |
|
|
|
|
|
|
|
|
@app.route("/admin/logout") |
|
|
def admin_logout(): |
|
|
session.pop("is_admin", None) |
|
|
return redirect(url_for("index")) |
|
|
|
|
|
|
|
|
@app.route("/admin") |
|
|
@admin_required |
|
|
def admin_dashboard(): |
|
|
|
|
|
recent_posts = Post.query.order_by(Post.created_at.desc()).limit(100).all() |
|
|
return render_template("admin.html", posts=recent_posts) |
|
|
|
|
|
|
|
|
@app.route("/admin/delete/<int:post_id>", methods=["POST"]) |
|
|
@admin_required |
|
|
def delete_post(post_id): |
|
|
post = Post.query.get_or_404(post_id) |
|
|
|
|
|
if post.thread_id is None: |
|
|
|
|
|
delete_thread_content(post) |
|
|
else: |
|
|
|
|
|
delete_post_files(post) |
|
|
db.session.delete(post) |
|
|
|
|
|
db.session.commit() |
|
|
flash("Post supprimé") |
|
|
return redirect(request.referrer or url_for("admin_dashboard")) |
|
|
|
|
|
|
|
|
@app.route("/admin/ban/<int:post_id>", methods=["POST"]) |
|
|
@admin_required |
|
|
def ban_ip(post_id): |
|
|
post = Post.query.get_or_404(post_id) |
|
|
if not BannedIP.query.filter_by(ip_address=post.ip_address).first(): |
|
|
ban = BannedIP(ip_address=post.ip_address, reason="Banni par l'administrateur") |
|
|
db.session.add(ban) |
|
|
db.session.commit() |
|
|
flash(f"IP {post.ip_address} bannie") |
|
|
else: |
|
|
flash(f"L'IP {post.ip_address} est déjà bannie") |
|
|
|
|
|
return redirect(request.referrer or url_for("admin_dashboard")) |
|
|
|
|
|
|
|
|
@app.route("/admin/boards") |
|
|
@admin_required |
|
|
def admin_boards(): |
|
|
boards = Board.query.all() |
|
|
return render_template("admin_boards.html", boards=boards) |
|
|
|
|
|
|
|
|
@app.route("/admin/boards/add", methods=["POST"]) |
|
|
@admin_required |
|
|
def admin_add_board(): |
|
|
slug = request.form.get("slug") |
|
|
name = request.form.get("name") |
|
|
description = request.form.get("description") |
|
|
|
|
|
if slug and name: |
|
|
if not Board.query.filter_by(slug=slug).first(): |
|
|
board = Board(slug=slug, name=name, description=description) |
|
|
db.session.add(board) |
|
|
db.session.commit() |
|
|
flash("Planche ajoutée") |
|
|
else: |
|
|
flash("Ce slug existe déjà") |
|
|
else: |
|
|
flash("Données manquantes") |
|
|
return redirect(url_for("admin_boards")) |
|
|
|
|
|
|
|
|
@app.route("/admin/boards/delete/<int:board_id>", methods=["POST"]) |
|
|
@admin_required |
|
|
def admin_delete_board(board_id): |
|
|
board = Board.query.get_or_404(board_id) |
|
|
|
|
|
posts = Post.query.filter_by(board_id=board.id, thread_id=None).all() |
|
|
for post in posts: |
|
|
delete_thread_content(post) |
|
|
|
|
|
db.session.delete(board) |
|
|
db.session.commit() |
|
|
flash("Planche supprimée") |
|
|
return redirect(url_for("admin_boards")) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
app.run(debug=True) |
|
|
|