| |
| |
| import os |
| import sqlite3 |
| import json |
| import uuid |
| import time |
| import threading |
| import queue |
| from datetime import datetime, timezone |
| from flask import Flask, request, jsonify, g, send_from_directory, Response, abort |
| from werkzeug.utils import secure_filename |
|
|
| DATA_DIR = os.environ.get("DATA_DIR", "/data") |
| ATTACH_DIR = os.path.join(DATA_DIR, "attachments") |
| DB_PATH = os.path.join(DATA_DIR, "ntfy.db") |
| PORT = int(os.environ.get("PORT", "8080")) |
| REQUIRE_AUTH = os.environ.get("REQUIRE_AUTH", "1") == "1" |
|
|
| os.makedirs(ATTACH_DIR, exist_ok=True) |
| os.makedirs(DATA_DIR, exist_ok=True) |
|
|
| app = Flask(__name__) |
| app.config['MAX_CONTENT_LENGTH'] = int(os.environ.get("MAX_ATTACHMENT_BYTES", 50 * 1024 * 1024)) |
|
|
| |
| subscribers_lock = threading.Lock() |
| subscribers = {} |
|
|
| def get_db(): |
| db = getattr(g, "_db", None) |
| if db is None: |
| db = g._db = sqlite3.connect(DB_PATH, check_same_thread=False) |
| db.row_factory = sqlite3.Row |
| return db |
|
|
| def init_db(): |
| db = get_db() |
| cur = db.cursor() |
| cur.execute(""" |
| CREATE TABLE IF NOT EXISTS users ( |
| id INTEGER PRIMARY KEY, |
| username TEXT UNIQUE, |
| token TEXT UNIQUE, |
| created_at INTEGER |
| ); |
| """) |
| cur.execute(""" |
| CREATE TABLE IF NOT EXISTS messages ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| topic TEXT, |
| title TEXT, |
| body TEXT, |
| headers TEXT, |
| attachment TEXT, |
| ts INTEGER |
| ); |
| """) |
| db.commit() |
|
|
| @app.teardown_appcontext |
| def close_db(exc): |
| db = getattr(g, "_db", None) |
| if db is not None: |
| db.close() |
|
|
| def check_token(req): |
| """Return username if token valid, else None""" |
| auth = req.headers.get("Authorization", "") |
| if auth.startswith("Bearer "): |
| token = auth.split(" ", 1)[1].strip() |
| else: |
| token = req.args.get("token", None) |
| if not token: |
| return None |
| db = get_db() |
| cur = db.execute("SELECT username FROM users WHERE token = ?", (token,)) |
| row = cur.fetchone() |
| return row["username"] if row else None |
|
|
| def save_message(topic, title, body, headers, attachment_path=None): |
| ts = int(time.time()) |
| db = get_db() |
| db.execute( |
| "INSERT INTO messages (topic, title, body, headers, attachment, ts) VALUES (?, ?, ?, ?, ?, ?)", |
| (topic, title, body, json.dumps(headers or {}), attachment_path, ts) |
| ) |
| db.commit() |
| mid = db.execute("SELECT last_insert_rowid() as id").fetchone()["id"] |
| return { |
| "id": mid, |
| "topic": topic, |
| "title": title, |
| "body": body, |
| "headers": headers or {}, |
| "attachment": os.path.basename(attachment_path) if attachment_path else None, |
| "ts": ts |
| } |
|
|
| def notify_subscribers(topic, message): |
| with subscribers_lock: |
| queues = subscribers.get(topic, []).copy() |
| for q in queues: |
| try: |
| q.put_nowait(message) |
| except Exception: |
| pass |
|
|
| @app.route("/attachments/<path:filename>", methods=["GET"]) |
| def serve_attachment(filename): |
| |
| safe_name = secure_filename(filename) |
| path = os.path.join(ATTACH_DIR, safe_name) |
| if not os.path.exists(path): |
| abort(404) |
| return send_from_directory(ATTACH_DIR, safe_name, as_attachment=True) |
|
|
| @app.route("/<path:topic>", methods=["POST"]) |
| def publish(topic): |
| |
| if REQUIRE_AUTH: |
| user = check_token(request) |
| if user is None: |
| return jsonify({"error": "auth required"}), 401 |
|
|
| title = request.headers.get("Title") or request.form.get("title") or None |
| |
| if request.is_json: |
| body = (request.get_json(silent=True) or {}).get("message", "") |
| else: |
| body = request.get_data(as_text=True) or request.form.get("message") or "" |
|
|
| |
| headers = {} |
| for k, v in request.headers.items(): |
| if k.lower() in ("title", "priority", "tags", "user-agent", "content-type"): |
| headers[k] = v |
|
|
| attachment_path = None |
| if 'file' in request.files: |
| f = request.files['file'] |
| if f and f.filename: |
| orig = secure_filename(f.filename) |
| unique = f"{uuid.uuid4().hex}_{orig}" |
| store_path = os.path.join(ATTACH_DIR, unique) |
| f.save(store_path) |
| attachment_path = store_path |
|
|
| message = save_message(topic, title, body, headers, attachment_path) |
| notify_subscribers(topic, message) |
| return jsonify({"status": "ok", "message": message}), 201 |
|
|
| @app.route("/<path:topic>", methods=["GET"]) |
| def get_messages(topic): |
| |
| limit = int(request.args.get("n", 20)) |
| db = get_db() |
| cur = db.execute("SELECT * FROM messages WHERE topic = ? ORDER BY ts DESC LIMIT ?", (topic, limit)) |
| rows = cur.fetchall() |
| out = [] |
| for r in rows: |
| out.append({ |
| "id": r["id"], |
| "topic": r["topic"], |
| "title": r["title"], |
| "body": r["body"], |
| "headers": json.loads(r["headers"]) if r["headers"] else {}, |
| "attachment": r["attachment"], |
| "ts": r["ts"] |
| }) |
| return jsonify(out) |
|
|
| @app.route("/<path:topic>/sse", methods=["GET"]) |
| def sse_subscribe(topic): |
| |
| def gen(q): |
| |
| send_last = int(request.args.get("last", 0)) |
| if send_last: |
| db = get_db() |
| cur = db.execute("SELECT * FROM messages WHERE topic = ? ORDER BY ts DESC LIMIT ?", (topic, send_last)) |
| for r in reversed(cur.fetchall()): |
| payload = { |
| "id": r["id"], |
| "topic": r["topic"], |
| "title": r["title"], |
| "body": r["body"], |
| "headers": json.loads(r["headers"]) if r["headers"] else {}, |
| "attachment": r["attachment"], |
| "ts": r["ts"] |
| } |
| yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n" |
|
|
| |
| try: |
| while True: |
| msg = q.get() |
| yield f"data: {json.dumps(msg, ensure_ascii=False)}\n\n" |
| except GeneratorExit: |
| return |
|
|
| q = queue.Queue() |
| with subscribers_lock: |
| subscribers.setdefault(topic, []).append(q) |
|
|
| |
| def stream(): |
| try: |
| for chunk in gen(q): |
| yield chunk |
| finally: |
| with subscribers_lock: |
| if topic in subscribers and q in subscribers[topic]: |
| subscribers[topic].remove(q) |
|
|
| headers = { |
| "Content-Type": "text/event-stream", |
| "Cache-Control": "no-cache", |
| "Connection": "keep-alive", |
| } |
| return Response(stream(), headers=headers) |
|
|
| |
| ADMIN_TOKEN = os.environ.get("ADMIN_TOKEN", None) |
| @app.route("/admin/create_user", methods=["POST"]) |
| def admin_create_user(): |
| token = request.headers.get("Admin-Token") or request.args.get("admin_token") |
| if not ADMIN_TOKEN or token != ADMIN_TOKEN: |
| return jsonify({"error": "forbidden"}), 403 |
| data = request.get_json() or {} |
| username = data.get("username") or data.get("user") |
| if not username: |
| return jsonify({"error": "username required"}), 400 |
| u_token = uuid.uuid4().hex |
| db = get_db() |
| try: |
| db.execute("INSERT INTO users (username, token, created_at) VALUES (?, ?, ?)", (username, u_token, int(time.time()))) |
| db.commit() |
| except sqlite3.IntegrityError: |
| return jsonify({"error": "user exists"}), 409 |
| return jsonify({"username": username, "token": u_token}) |
|
|
| if __name__ == "__main__": |
| |
| with app.app_context(): |
| init_db() |
| default_user = os.environ.get("DEFAULT_USER") |
| default_token = os.environ.get("DEFAULT_TOKEN") |
| if default_user and default_token: |
| db = get_db() |
| try: |
| db.execute("INSERT OR IGNORE INTO users (username, token, created_at) VALUES (?, ?, ?)", |
| (default_user, default_token, int(time.time()))) |
| db.commit() |
| except Exception: |
| pass |
| print(f"Starting Flask ntfy-like server on 0.0.0.0:{PORT} (require_auth={REQUIRE_AUTH})") |
| app.run(host="0.0.0.0", port=PORT, threaded=True) |