Starchik1 commited on
Commit
3c536cd
·
verified ·
1 Parent(s): 755d1cf

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +258 -0
app.py ADDED
@@ -0,0 +1,258 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ # app.py -- minimal ntfy-like server implemented with Flask
3
+ import os
4
+ import sqlite3
5
+ import json
6
+ import uuid
7
+ import time
8
+ import threading
9
+ import queue
10
+ from datetime import datetime, timezone
11
+ from flask import Flask, request, jsonify, g, send_from_directory, Response, abort
12
+ from werkzeug.utils import secure_filename
13
+
14
+ DATA_DIR = os.environ.get("DATA_DIR", "/data")
15
+ ATTACH_DIR = os.path.join(DATA_DIR, "attachments")
16
+ DB_PATH = os.path.join(DATA_DIR, "ntfy.db")
17
+ PORT = int(os.environ.get("PORT", "8080"))
18
+ REQUIRE_AUTH = os.environ.get("REQUIRE_AUTH", "1") == "1" # "1" = require token by default
19
+
20
+ os.makedirs(ATTACH_DIR, exist_ok=True)
21
+ os.makedirs(DATA_DIR, exist_ok=True)
22
+
23
+ app = Flask(__name__)
24
+ app.config['MAX_CONTENT_LENGTH'] = int(os.environ.get("MAX_ATTACHMENT_BYTES", 50 * 1024 * 1024)) # 50MB default
25
+
26
+ # In-memory pubsub: topic -> list of Queue objects
27
+ subscribers_lock = threading.Lock()
28
+ subscribers = {} # topic -> [queue.Queue(), ...]
29
+
30
+ def get_db():
31
+ db = getattr(g, "_db", None)
32
+ if db is None:
33
+ db = g._db = sqlite3.connect(DB_PATH, check_same_thread=False)
34
+ db.row_factory = sqlite3.Row
35
+ return db
36
+
37
+ def init_db():
38
+ db = get_db()
39
+ cur = db.cursor()
40
+ cur.execute("""
41
+ CREATE TABLE IF NOT EXISTS users (
42
+ id INTEGER PRIMARY KEY,
43
+ username TEXT UNIQUE,
44
+ token TEXT UNIQUE,
45
+ created_at INTEGER
46
+ );
47
+ """)
48
+ cur.execute("""
49
+ CREATE TABLE IF NOT EXISTS messages (
50
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
51
+ topic TEXT,
52
+ title TEXT,
53
+ body TEXT,
54
+ headers TEXT,
55
+ attachment TEXT,
56
+ ts INTEGER
57
+ );
58
+ """)
59
+ db.commit()
60
+
61
+ @app.teardown_appcontext
62
+ def close_db(exc):
63
+ db = getattr(g, "_db", None)
64
+ if db is not None:
65
+ db.close()
66
+
67
+ def check_token(req):
68
+ """Return username if token valid, else None"""
69
+ auth = req.headers.get("Authorization", "") # Expect "Bearer <token>"
70
+ if auth.startswith("Bearer "):
71
+ token = auth.split(" ", 1)[1].strip()
72
+ else:
73
+ token = req.args.get("token", None)
74
+ if not token:
75
+ return None
76
+ db = get_db()
77
+ cur = db.execute("SELECT username FROM users WHERE token = ?", (token,))
78
+ row = cur.fetchone()
79
+ return row["username"] if row else None
80
+
81
+ def save_message(topic, title, body, headers, attachment_path=None):
82
+ ts = int(time.time())
83
+ db = get_db()
84
+ db.execute(
85
+ "INSERT INTO messages (topic, title, body, headers, attachment, ts) VALUES (?, ?, ?, ?, ?, ?)",
86
+ (topic, title, body, json.dumps(headers or {}), attachment_path, ts)
87
+ )
88
+ db.commit()
89
+ mid = db.execute("SELECT last_insert_rowid() as id").fetchone()["id"]
90
+ return {
91
+ "id": mid,
92
+ "topic": topic,
93
+ "title": title,
94
+ "body": body,
95
+ "headers": headers or {},
96
+ "attachment": os.path.basename(attachment_path) if attachment_path else None,
97
+ "ts": ts
98
+ }
99
+
100
+ def notify_subscribers(topic, message):
101
+ with subscribers_lock:
102
+ queues = subscribers.get(topic, []).copy()
103
+ for q in queues:
104
+ try:
105
+ q.put_nowait(message)
106
+ except Exception:
107
+ pass
108
+
109
+ @app.route("/attachments/<path:filename>", methods=["GET"])
110
+ def serve_attachment(filename):
111
+ # Security: only serve from attachments dir
112
+ safe_name = secure_filename(filename)
113
+ path = os.path.join(ATTACH_DIR, safe_name)
114
+ if not os.path.exists(path):
115
+ abort(404)
116
+ return send_from_directory(ATTACH_DIR, safe_name, as_attachment=True)
117
+
118
+ @app.route("/<path:topic>", methods=["POST"])
119
+ def publish(topic):
120
+ # publish message to topic
121
+ if REQUIRE_AUTH:
122
+ user = check_token(request)
123
+ if user is None:
124
+ return jsonify({"error": "auth required"}), 401
125
+
126
+ title = request.headers.get("Title") or request.form.get("title") or None
127
+ # body can be raw text body (curl -d) or 'message' field in form
128
+ if request.is_json:
129
+ body = (request.get_json(silent=True) or {}).get("message", "")
130
+ else:
131
+ body = request.get_data(as_text=True) or request.form.get("message") or ""
132
+
133
+ # collect headers to store (e.g., Priority, Tags)
134
+ headers = {}
135
+ for k, v in request.headers.items():
136
+ if k.lower() in ("title", "priority", "tags", "user-agent", "content-type"):
137
+ headers[k] = v
138
+
139
+ attachment_path = None
140
+ if 'file' in request.files:
141
+ f = request.files['file']
142
+ if f and f.filename:
143
+ orig = secure_filename(f.filename)
144
+ unique = f"{uuid.uuid4().hex}_{orig}"
145
+ store_path = os.path.join(ATTACH_DIR, unique)
146
+ f.save(store_path)
147
+ attachment_path = store_path
148
+
149
+ message = save_message(topic, title, body, headers, attachment_path)
150
+ notify_subscribers(topic, message)
151
+ return jsonify({"status": "ok", "message": message}), 201
152
+
153
+ @app.route("/<path:topic>", methods=["GET"])
154
+ def get_messages(topic):
155
+ # return recent messages for topic
156
+ limit = int(request.args.get("n", 20))
157
+ db = get_db()
158
+ cur = db.execute("SELECT * FROM messages WHERE topic = ? ORDER BY ts DESC LIMIT ?", (topic, limit))
159
+ rows = cur.fetchall()
160
+ out = []
161
+ for r in rows:
162
+ out.append({
163
+ "id": r["id"],
164
+ "topic": r["topic"],
165
+ "title": r["title"],
166
+ "body": r["body"],
167
+ "headers": json.loads(r["headers"]) if r["headers"] else {},
168
+ "attachment": r["attachment"],
169
+ "ts": r["ts"]
170
+ })
171
+ return jsonify(out)
172
+
173
+ @app.route("/<path:topic>/sse", methods=["GET"])
174
+ def sse_subscribe(topic):
175
+ # simple SSE stream for new messages
176
+ def gen(q):
177
+ # first, optionally send last messages if requested
178
+ send_last = int(request.args.get("last", 0))
179
+ if send_last:
180
+ db = get_db()
181
+ cur = db.execute("SELECT * FROM messages WHERE topic = ? ORDER BY ts DESC LIMIT ?", (topic, send_last))
182
+ for r in reversed(cur.fetchall()):
183
+ payload = {
184
+ "id": r["id"],
185
+ "topic": r["topic"],
186
+ "title": r["title"],
187
+ "body": r["body"],
188
+ "headers": json.loads(r["headers"]) if r["headers"] else {},
189
+ "attachment": r["attachment"],
190
+ "ts": r["ts"]
191
+ }
192
+ yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n"
193
+
194
+ # then stream live messages
195
+ try:
196
+ while True:
197
+ msg = q.get()
198
+ yield f"data: {json.dumps(msg, ensure_ascii=False)}\n\n"
199
+ except GeneratorExit:
200
+ return
201
+
202
+ q = queue.Queue()
203
+ with subscribers_lock:
204
+ subscribers.setdefault(topic, []).append(q)
205
+
206
+ # cleanup when client disconnects: remove queue
207
+ def stream():
208
+ try:
209
+ for chunk in gen(q):
210
+ yield chunk
211
+ finally:
212
+ with subscribers_lock:
213
+ if topic in subscribers and q in subscribers[topic]:
214
+ subscribers[topic].remove(q)
215
+
216
+ headers = {
217
+ "Content-Type": "text/event-stream",
218
+ "Cache-Control": "no-cache",
219
+ "Connection": "keep-alive",
220
+ }
221
+ return Response(stream(), headers=headers)
222
+
223
+ # Utility CLI endpoints (optional, protected by admin token)
224
+ ADMIN_TOKEN = os.environ.get("ADMIN_TOKEN", None) # set via env to create default user
225
+ @app.route("/admin/create_user", methods=["POST"])
226
+ def admin_create_user():
227
+ token = request.headers.get("Admin-Token") or request.args.get("admin_token")
228
+ if not ADMIN_TOKEN or token != ADMIN_TOKEN:
229
+ return jsonify({"error": "forbidden"}), 403
230
+ data = request.get_json() or {}
231
+ username = data.get("username") or data.get("user")
232
+ if not username:
233
+ return jsonify({"error": "username required"}), 400
234
+ u_token = uuid.uuid4().hex
235
+ db = get_db()
236
+ try:
237
+ db.execute("INSERT INTO users (username, token, created_at) VALUES (?, ?, ?)", (username, u_token, int(time.time())))
238
+ db.commit()
239
+ except sqlite3.IntegrityError:
240
+ return jsonify({"error": "user exists"}), 409
241
+ return jsonify({"username": username, "token": u_token})
242
+
243
+ if __name__ == "__main__":
244
+ # initialize DB and (optionally) create a default user from env vars
245
+ with app.app_context():
246
+ init_db()
247
+ default_user = os.environ.get("DEFAULT_USER")
248
+ default_token = os.environ.get("DEFAULT_TOKEN")
249
+ if default_user and default_token:
250
+ db = get_db()
251
+ try:
252
+ db.execute("INSERT OR IGNORE INTO users (username, token, created_at) VALUES (?, ?, ?)",
253
+ (default_user, default_token, int(time.time())))
254
+ db.commit()
255
+ except Exception:
256
+ pass
257
+ print(f"Starting Flask ntfy-like server on 0.0.0.0:{PORT} (require_auth={REQUIRE_AUTH})")
258
+ app.run(host="0.0.0.0", port=PORT, threaded=True)