ntfy / app.py
Starchik1's picture
Create app.py
3c536cd verified
#!/usr/bin/env python3
# app.py -- minimal ntfy-like server implemented with Flask
import os
import sqlite3
import json
import uuid
import time
import threading
import queue
from datetime import datetime, timezone
from flask import Flask, request, jsonify, g, send_from_directory, Response, abort
from werkzeug.utils import secure_filename
DATA_DIR = os.environ.get("DATA_DIR", "/data")
ATTACH_DIR = os.path.join(DATA_DIR, "attachments")
DB_PATH = os.path.join(DATA_DIR, "ntfy.db")
PORT = int(os.environ.get("PORT", "8080"))
REQUIRE_AUTH = os.environ.get("REQUIRE_AUTH", "1") == "1" # "1" = require token by default
os.makedirs(ATTACH_DIR, exist_ok=True)
os.makedirs(DATA_DIR, exist_ok=True)
app = Flask(__name__)
app.config['MAX_CONTENT_LENGTH'] = int(os.environ.get("MAX_ATTACHMENT_BYTES", 50 * 1024 * 1024)) # 50MB default
# In-memory pubsub: topic -> list of Queue objects
subscribers_lock = threading.Lock()
subscribers = {} # topic -> [queue.Queue(), ...]
def get_db():
db = getattr(g, "_db", None)
if db is None:
db = g._db = sqlite3.connect(DB_PATH, check_same_thread=False)
db.row_factory = sqlite3.Row
return db
def init_db():
db = get_db()
cur = db.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
username TEXT UNIQUE,
token TEXT UNIQUE,
created_at INTEGER
);
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
topic TEXT,
title TEXT,
body TEXT,
headers TEXT,
attachment TEXT,
ts INTEGER
);
""")
db.commit()
@app.teardown_appcontext
def close_db(exc):
db = getattr(g, "_db", None)
if db is not None:
db.close()
def check_token(req):
"""Return username if token valid, else None"""
auth = req.headers.get("Authorization", "") # Expect "Bearer <token>"
if auth.startswith("Bearer "):
token = auth.split(" ", 1)[1].strip()
else:
token = req.args.get("token", None)
if not token:
return None
db = get_db()
cur = db.execute("SELECT username FROM users WHERE token = ?", (token,))
row = cur.fetchone()
return row["username"] if row else None
def save_message(topic, title, body, headers, attachment_path=None):
ts = int(time.time())
db = get_db()
db.execute(
"INSERT INTO messages (topic, title, body, headers, attachment, ts) VALUES (?, ?, ?, ?, ?, ?)",
(topic, title, body, json.dumps(headers or {}), attachment_path, ts)
)
db.commit()
mid = db.execute("SELECT last_insert_rowid() as id").fetchone()["id"]
return {
"id": mid,
"topic": topic,
"title": title,
"body": body,
"headers": headers or {},
"attachment": os.path.basename(attachment_path) if attachment_path else None,
"ts": ts
}
def notify_subscribers(topic, message):
with subscribers_lock:
queues = subscribers.get(topic, []).copy()
for q in queues:
try:
q.put_nowait(message)
except Exception:
pass
@app.route("/attachments/<path:filename>", methods=["GET"])
def serve_attachment(filename):
# Security: only serve from attachments dir
safe_name = secure_filename(filename)
path = os.path.join(ATTACH_DIR, safe_name)
if not os.path.exists(path):
abort(404)
return send_from_directory(ATTACH_DIR, safe_name, as_attachment=True)
@app.route("/<path:topic>", methods=["POST"])
def publish(topic):
# publish message to topic
if REQUIRE_AUTH:
user = check_token(request)
if user is None:
return jsonify({"error": "auth required"}), 401
title = request.headers.get("Title") or request.form.get("title") or None
# body can be raw text body (curl -d) or 'message' field in form
if request.is_json:
body = (request.get_json(silent=True) or {}).get("message", "")
else:
body = request.get_data(as_text=True) or request.form.get("message") or ""
# collect headers to store (e.g., Priority, Tags)
headers = {}
for k, v in request.headers.items():
if k.lower() in ("title", "priority", "tags", "user-agent", "content-type"):
headers[k] = v
attachment_path = None
if 'file' in request.files:
f = request.files['file']
if f and f.filename:
orig = secure_filename(f.filename)
unique = f"{uuid.uuid4().hex}_{orig}"
store_path = os.path.join(ATTACH_DIR, unique)
f.save(store_path)
attachment_path = store_path
message = save_message(topic, title, body, headers, attachment_path)
notify_subscribers(topic, message)
return jsonify({"status": "ok", "message": message}), 201
@app.route("/<path:topic>", methods=["GET"])
def get_messages(topic):
# return recent messages for topic
limit = int(request.args.get("n", 20))
db = get_db()
cur = db.execute("SELECT * FROM messages WHERE topic = ? ORDER BY ts DESC LIMIT ?", (topic, limit))
rows = cur.fetchall()
out = []
for r in rows:
out.append({
"id": r["id"],
"topic": r["topic"],
"title": r["title"],
"body": r["body"],
"headers": json.loads(r["headers"]) if r["headers"] else {},
"attachment": r["attachment"],
"ts": r["ts"]
})
return jsonify(out)
@app.route("/<path:topic>/sse", methods=["GET"])
def sse_subscribe(topic):
# simple SSE stream for new messages
def gen(q):
# first, optionally send last messages if requested
send_last = int(request.args.get("last", 0))
if send_last:
db = get_db()
cur = db.execute("SELECT * FROM messages WHERE topic = ? ORDER BY ts DESC LIMIT ?", (topic, send_last))
for r in reversed(cur.fetchall()):
payload = {
"id": r["id"],
"topic": r["topic"],
"title": r["title"],
"body": r["body"],
"headers": json.loads(r["headers"]) if r["headers"] else {},
"attachment": r["attachment"],
"ts": r["ts"]
}
yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n"
# then stream live messages
try:
while True:
msg = q.get()
yield f"data: {json.dumps(msg, ensure_ascii=False)}\n\n"
except GeneratorExit:
return
q = queue.Queue()
with subscribers_lock:
subscribers.setdefault(topic, []).append(q)
# cleanup when client disconnects: remove queue
def stream():
try:
for chunk in gen(q):
yield chunk
finally:
with subscribers_lock:
if topic in subscribers and q in subscribers[topic]:
subscribers[topic].remove(q)
headers = {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
return Response(stream(), headers=headers)
# Utility CLI endpoints (optional, protected by admin token)
ADMIN_TOKEN = os.environ.get("ADMIN_TOKEN", None) # set via env to create default user
@app.route("/admin/create_user", methods=["POST"])
def admin_create_user():
token = request.headers.get("Admin-Token") or request.args.get("admin_token")
if not ADMIN_TOKEN or token != ADMIN_TOKEN:
return jsonify({"error": "forbidden"}), 403
data = request.get_json() or {}
username = data.get("username") or data.get("user")
if not username:
return jsonify({"error": "username required"}), 400
u_token = uuid.uuid4().hex
db = get_db()
try:
db.execute("INSERT INTO users (username, token, created_at) VALUES (?, ?, ?)", (username, u_token, int(time.time())))
db.commit()
except sqlite3.IntegrityError:
return jsonify({"error": "user exists"}), 409
return jsonify({"username": username, "token": u_token})
if __name__ == "__main__":
# initialize DB and (optionally) create a default user from env vars
with app.app_context():
init_db()
default_user = os.environ.get("DEFAULT_USER")
default_token = os.environ.get("DEFAULT_TOKEN")
if default_user and default_token:
db = get_db()
try:
db.execute("INSERT OR IGNORE INTO users (username, token, created_at) VALUES (?, ?, ?)",
(default_user, default_token, int(time.time())))
db.commit()
except Exception:
pass
print(f"Starting Flask ntfy-like server on 0.0.0.0:{PORT} (require_auth={REQUIRE_AUTH})")
app.run(host="0.0.0.0", port=PORT, threaded=True)