File size: 16,716 Bytes
75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 35dd900 cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab 75ba54e cb18dab |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 |
from flask import (
Flask,
render_template,
request,
redirect,
url_for,
abort,
flash,
session,
send_from_directory,
)
from werkzeug.utils import secure_filename
from werkzeug.security import check_password_hash, generate_password_hash
from models import db, Board, Post, BannedIP, PostFile, User
from forms import PostForm
from datetime import datetime
import os
import uuid
import re
from markupsafe import Markup
from functools import wraps
app = Flask(__name__)
app.config["SECRET_KEY"] = "dev-secret-key" # Change in production
os.makedirs(app.instance_path, exist_ok=True)
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///" + os.path.join(
app.instance_path, "ghostboard.db"
)
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config["UPLOAD_FOLDER"] = os.path.join(app.root_path, "static/uploads")
os.makedirs(app.config["UPLOAD_FOLDER"], exist_ok=True)
app.config["MAX_CONTENT_LENGTH"] = 20 * 1024 * 1024 # 20 MB limit
app.config["ALLOWED_EXTENSIONS"] = {"png", "jpg", "jpeg", "gif", "webp", "mp4", "webm"}
app.config["ADMIN_PASSWORD_HASH"] = generate_password_hash(
"admin"
) # Default password: admin
db.init_app(app)
def init_db():
db.create_all()
# Seed default boards
boards = [
{"slug": "g", "name": "Général", "description": "Discussion générale"},
{
"slug": "tech",
"name": "Technologie",
"description": "Ordinateurs, programmation, gadgets",
},
{"slug": "art", "name": "Création", "description": "Art, design, musique"},
{"slug": "news", "name": "Actualités", "description": "Événements actuels"},
]
for b_data in boards:
if not Board.query.filter_by(slug=b_data["slug"]).first():
board = Board(
slug=b_data["slug"],
name=b_data["name"],
description=b_data["description"],
)
db.session.add(board)
db.session.commit()
# Auto-initialize database if it doesn't exist
with app.app_context():
if not os.path.exists(os.path.join(app.instance_path, "ghostboard.db")):
init_db()
def allowed_file(filename):
return (
"." in filename
and filename.rsplit(".", 1)[1].lower() in app.config["ALLOWED_EXTENSIONS"]
)
def save_file(file):
if file and allowed_file(file.filename):
original_filename = secure_filename(file.filename)
extension = original_filename.rsplit(".", 1)[1].lower()
new_filename = f"{uuid.uuid4().hex}.{extension}"
file.save(os.path.join(app.config["UPLOAD_FOLDER"], new_filename))
return new_filename, original_filename
return None, None
def delete_post_files(post):
for file in post.files:
path = os.path.join(app.config["UPLOAD_FOLDER"], file.filename)
if os.path.exists(path):
os.remove(path)
db.session.delete(file)
def delete_thread_content(thread):
# Delete all replies
replies = Post.query.filter_by(thread_id=thread.id).all()
for reply in replies:
delete_post_files(reply)
db.session.delete(reply)
delete_post_files(thread)
db.session.delete(thread)
def prune_board(board_id):
# Limit to 100 threads
threads_query = Post.query.filter_by(board_id=board_id, thread_id=None).order_by(
Post.updated_at.desc()
)
if threads_query.count() > 100:
threads_to_delete = threads_query.offset(100).all()
for thread in threads_to_delete:
delete_thread_content(thread)
db.session.commit()
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not session.get("is_admin"):
return redirect(url_for("admin_login", next=request.url))
return f(*args, **kwargs)
return decorated_function
def check_ban():
ip = request.remote_addr
ban = BannedIP.query.filter_by(ip_address=ip).first()
if ban:
abort(403, description=f"Vous êtes banni. Raison : {ban.reason}")
@app.cli.command("init-db")
def init_db_command():
"""Create database tables and seed initial data."""
with app.app_context():
init_db()
print("Base de données initialisée.")
@app.context_processor
def inject_boards():
return dict(global_boards=Board.query.all())
@app.template_filter("format_post")
def format_post(content, post_context=None):
"""
Format post content with quotes and greentext.
post_context: dict mapping post_id -> nickname for resolving >>12345 references
"""
if not content:
return ""
# Escape HTML first (safety)
content = str(Markup.escape(content))
# Link to quotes >>12345
def replace_quote(match):
post_id = match.group(1)
if post_context and post_id in post_context:
nickname = post_context[post_id]
return f'<a href="#post-{post_id}" class="text-blue-400 hover:underline" onclick="highlightPost({post_id})">@<span class="font-semibold">{nickname}</span></a>'
else:
# Fallback: try to get from database (slower)
post = Post.query.get(int(post_id))
if post and post.user:
return f'<a href="#post-{post_id}" class="text-blue-400 hover:underline" onclick="highlightPost({post_id})">@<span class="font-semibold">{post.user.nickname}</span></a>'
elif post:
return f'<a href="#post-{post_id}" class="text-blue-400 hover:underline" onclick="highlightPost({post_id})">@<span class="font-semibold">{post.nickname}</span></a>'
else:
return f'<a href="#post-{post_id}" class="text-blue-400 hover:underline" onclick="highlightPost({post_id})">>>{post_id}</a>'
content = re.sub(r">>(\d+)", replace_quote, content)
# Greentext (lines starting with >)
content = re.sub(
r"^(>.*)$",
r'<span class="text-green-400">\1</span>',
content,
flags=re.MULTILINE,
)
# Convert newlines to <br>
content = content.replace("\n", "<br>")
return Markup(content)
# API Routes for user management
@app.route("/api/set-nickname", methods=["POST"])
def set_nickname():
"""Set or update user nickname"""
data = request.get_json()
nickname = data.get("nickname", "").strip()
if not nickname:
return {"error": "Pseudo requis"}, 400
if len(nickname) > 50:
return {"error": "Pseudo trop long (max 50 caractères)"}, 400
# Check if nickname already exists
existing_user = User.query.filter_by(nickname=nickname).first()
if existing_user:
# If it's the same user, allow it
if session.get("user_id") == existing_user.id:
return {"success": True, "user_id": existing_user.id, "nickname": nickname}
else:
return {"error": "Ce pseudo est déjà pris"}, 400
# Get or create user
user_id = session.get("user_id")
if user_id:
user = User.query.get(user_id)
if user:
user.nickname = nickname
user.last_seen_at = datetime.utcnow()
else:
# Session invalid, create new user
user = User(nickname=nickname)
db.session.add(user)
else:
user = User(nickname=nickname)
db.session.add(user)
db.session.commit()
session["user_id"] = user.id
session["nickname"] = user.nickname
return {"success": True, "user_id": user.id, "nickname": nickname}
@app.route("/api/me")
def get_current_user():
"""Get current user info"""
user_id = session.get("user_id")
if user_id:
user = User.query.get(user_id)
if user:
user.last_seen_at = datetime.utcnow()
db.session.commit()
return {"user_id": user.id, "nickname": user.nickname}
return {"user_id": None, "nickname": None}
@app.route("/user/<int:user_id>")
def user_profile(user_id):
"""Show user profile and posts"""
user = User.query.get_or_404(user_id)
posts = (
Post.query.filter_by(user_id=user_id)
.order_by(Post.created_at.desc())
.limit(50)
.all()
)
return render_template("user_profile.html", user=user, posts=posts)
@app.route("/manifest.json")
def manifest():
return send_from_directory("static", "manifest.json")
@app.route("/sw.js")
def service_worker():
return send_from_directory("static", "sw.js")
@app.route("/")
def index():
boards = Board.query.all()
return render_template("index.html", boards=boards)
@app.route("/<slug>/", methods=["GET", "POST"])
def board(slug):
board = Board.query.filter_by(slug=slug).first_or_404()
form = PostForm()
if form.validate_on_submit():
check_ban()
# Get user_id from session or use anonymous
user_id = session.get("user_id")
nickname = "Anonyme"
if user_id:
user = User.query.get(user_id)
if user:
nickname = user.nickname
else:
user_id = None # Invalid user_id
else:
# Anonymous post
nickname = form.nickname.data or "Anonyme"
# Create new thread
post = Post(
board_id=board.id,
user_id=user_id,
content=form.content.data,
nickname=nickname,
ip_address=request.remote_addr,
)
db.session.add(post)
db.session.commit()
# Handle multiple files
if form.files.data:
for f in form.files.data:
filename, original_filename = save_file(f)
if filename:
post_file = PostFile(
post_id=post.id,
filename=filename,
original_filename=original_filename,
)
db.session.add(post_file)
db.session.commit()
# Prune board
prune_board(board.id)
return redirect(url_for("board", slug=slug))
# Get threads (posts with no thread_id), sorted by updated_at (bumping)
threads = (
Post.query.filter_by(board_id=board.id, thread_id=None)
.order_by(Post.updated_at.desc())
.limit(50)
.all()
)
# Create post context for format_post filter
# Include all threads and their recent replies
post_context = {}
for thread in threads:
if thread.user:
post_context[str(thread.id)] = thread.user.nickname
else:
post_context[str(thread.id)] = thread.nickname
# Also include recent replies for this thread
recent_replies = (
Post.query.filter_by(thread_id=thread.id)
.order_by(Post.created_at.desc())
.limit(3)
.all()
)
for reply in recent_replies:
if reply.user:
post_context[str(reply.id)] = reply.user.nickname
else:
post_context[str(reply.id)] = reply.nickname
return render_template(
"board.html", board=board, threads=threads, form=form, post_context=post_context
)
@app.route("/thread/<int:thread_id>/", methods=["GET", "POST"])
def thread(thread_id):
thread_post = Post.query.get_or_404(thread_id)
if thread_post.thread_id is not None:
# If it's a reply, redirect to the main thread
return redirect(url_for("thread", thread_id=thread_post.thread_id))
form = PostForm()
if form.validate_on_submit():
check_ban()
# Get user_id from session or use anonymous
user_id = session.get("user_id")
nickname = "Anonyme"
if user_id:
user = User.query.get(user_id)
if user:
nickname = user.nickname
else:
user_id = None # Invalid user_id
else:
# Anonymous post
nickname = form.nickname.data or "Anonyme"
# Create reply
reply = Post(
board_id=thread_post.board_id,
thread_id=thread_post.id,
user_id=user_id,
content=form.content.data,
nickname=nickname,
ip_address=request.remote_addr,
)
db.session.add(reply)
# Handle multiple files
if form.files.data:
for f in form.files.data:
filename, original_filename = save_file(f)
if filename:
post_file = PostFile(
post_id=reply.id,
filename=filename,
original_filename=original_filename,
)
db.session.add(post_file)
# Bump the thread
thread_post.updated_at = datetime.utcnow()
db.session.commit()
return redirect(url_for("thread", thread_id=thread_post.id))
replies = (
Post.query.filter_by(thread_id=thread_post.id)
.order_by(Post.created_at.asc())
.all()
)
# Create post context for format_post filter (maps post_id -> nickname)
post_context = {}
all_posts = [thread_post] + replies
for post in all_posts:
if post.user:
post_context[str(post.id)] = post.user.nickname
else:
post_context[str(post.id)] = post.nickname
return render_template(
"thread.html",
thread=thread_post,
replies=replies,
board=thread_post.board,
form=form,
post_context=post_context,
)
# Admin Routes
@app.route("/admin/login", methods=["GET", "POST"])
def admin_login():
if request.method == "POST":
password = request.form.get("password")
if check_password_hash(app.config["ADMIN_PASSWORD_HASH"], password):
session["is_admin"] = True
return redirect(url_for("admin_dashboard"))
else:
flash("Mot de passe invalide")
return render_template("admin_login.html")
@app.route("/admin/logout")
def admin_logout():
session.pop("is_admin", None)
return redirect(url_for("index"))
@app.route("/admin")
@admin_required
def admin_dashboard():
# Show recent posts (both threads and replies) for moderation
recent_posts = Post.query.order_by(Post.created_at.desc()).limit(100).all()
return render_template("admin.html", posts=recent_posts)
@app.route("/admin/delete/<int:post_id>", methods=["POST"])
@admin_required
def delete_post(post_id):
post = Post.query.get_or_404(post_id)
if post.thread_id is None:
# It's a thread starter, delete whole thread
delete_thread_content(post)
else:
# It's a reply
delete_post_files(post)
db.session.delete(post)
db.session.commit()
flash("Post supprimé")
return redirect(request.referrer or url_for("admin_dashboard"))
@app.route("/admin/ban/<int:post_id>", methods=["POST"])
@admin_required
def ban_ip(post_id):
post = Post.query.get_or_404(post_id)
if not BannedIP.query.filter_by(ip_address=post.ip_address).first():
ban = BannedIP(ip_address=post.ip_address, reason="Banni par l'administrateur")
db.session.add(ban)
db.session.commit()
flash(f"IP {post.ip_address} bannie")
else:
flash(f"L'IP {post.ip_address} est déjà bannie")
return redirect(request.referrer or url_for("admin_dashboard"))
@app.route("/admin/boards")
@admin_required
def admin_boards():
boards = Board.query.all()
return render_template("admin_boards.html", boards=boards)
@app.route("/admin/boards/add", methods=["POST"])
@admin_required
def admin_add_board():
slug = request.form.get("slug")
name = request.form.get("name")
description = request.form.get("description")
if slug and name:
if not Board.query.filter_by(slug=slug).first():
board = Board(slug=slug, name=name, description=description)
db.session.add(board)
db.session.commit()
flash("Planche ajoutée")
else:
flash("Ce slug existe déjà")
else:
flash("Données manquantes")
return redirect(url_for("admin_boards"))
@app.route("/admin/boards/delete/<int:board_id>", methods=["POST"])
@admin_required
def admin_delete_board(board_id):
board = Board.query.get_or_404(board_id)
# Delete all posts in board
posts = Post.query.filter_by(board_id=board.id, thread_id=None).all()
for post in posts:
delete_thread_content(post)
db.session.delete(board)
db.session.commit()
flash("Planche supprimée")
return redirect(url_for("admin_boards"))
if __name__ == "__main__":
app.run(debug=True)
|