import os import time import uuid import datetime import requests from functools import wraps from flask import Flask, request, jsonify, render_template_string, redirect, session, url_for, Response from flask_cors import CORS from huggingface_hub import HfApi, CommitOperationDelete from werkzeug.middleware.proxy_fix import ProxyFix app = Flask(__name__) CORS(app) app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1, x_host=1) app.secret_key = "my-fixed-secret-key-2026" app.config.update( SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='None', PERMANENT_SESSION_LIFETIME=datetime.timedelta(days=30) ) HF_TOKEN = os.environ.get("HF_TOKEN") DATASET_NAME = os.environ.get("DATASET_NAME") SPACE_HOST = os.environ.get("SPACE_HOST", "localhost:7860") BASE_URL = f"https://{SPACE_HOST}" if "localhost" not in SPACE_HOST else f"http://{SPACE_HOST}" ADMIN_USER = os.environ.get("ADMIN_USER") ADMIN_PASS = os.environ.get("ADMIN_PASS") if HF_TOKEN: api = HfApi(token=HF_TOKEN) CACHE_DIR = "/app/cache" if not os.path.exists(CACHE_DIR): os.makedirs(CACHE_DIR) def format_size(size): if size is None: return "未知" for unit in ['B', 'KB', 'MB', 'GB']: if size < 1024: return f"{size:.1f} {unit}" size /= 1024 return f"{size:.1f} TB" def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): if ADMIN_USER and ADMIN_PASS and not session.get('logged_in'): return redirect(url_for('login')) return f(*args, **kwargs) return decorated_function # 1. 登录页面 LOGIN_TEMPLATE = """登录

CloudGallery

{% if error %}
{{ error }}
{% endif %}
""" # 2. 全屏查看 (备用) VIEW_TEMPLATE = """查看""" # ========================================================= # 🎨 3. 主页模板 (V11.0 纯净白透版) # ========================================================= HTML_TEMPLATE = """ 我的图床
🔴 退出

☁️ CloudGallery

准备就绪
""" @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': if request.form.get('username') == ADMIN_USER and request.form.get('password') == ADMIN_PASS: session.permanent = True; session['logged_in'] = True; return redirect('/') return render_template_string(LOGIN_TEMPLATE, error="Error") return render_template_string(LOGIN_TEMPLATE) @app.route('/logout') def logout(): session.pop('logged_in', None); return redirect('/login') @app.route('/') @login_required def home(): if not HF_TOKEN: return "Missing Env" try: tree = api.list_repo_tree(repo_id=DATASET_NAME, repo_type="dataset", token=HF_TOKEN, recursive=False) images = [] for item in tree: if item.path.lower().endswith(('.png','.jpg','.jpeg','.gif','.webp','.bmp')): raw_url = f"{BASE_URL}/file/{item.path}" images.append({ "name": item.path, "raw_url": raw_url, "real_url": f"https://huggingface.co/datasets/{DATASET_NAME}/resolve/main/{item.path}", "view_url": f"{BASE_URL}/view/{item.path}", "size_fmt": format_size(item.size) if hasattr(item, 'size') else "?" }) images.reverse() return render_template_string(HTML_TEMPLATE, images=images, dataset_name=DATASET_NAME) except: return "Error loading images" @app.route('/upload', methods=['POST']) @login_required def upload_file(): files = request.files.getlist('files') count = 0 for file in files: if not file.filename: continue # 核心修改:使用 4 位随机字符 ext = os.path.splitext(file.filename)[1].lower() if not ext: ext = ".jpg" name = f"{uuid.uuid4().hex[:4]}{ext}" path = os.path.join(CACHE_DIR, name) try: file.save(path) api.upload_file(path_or_fileobj=path, path_in_repo=name, repo_id=DATASET_NAME, repo_type="dataset", token=HF_TOKEN) os.remove(path) count += 1 except: pass return jsonify({"status": "success", "count": count}) @app.route('/delete', methods=['POST']) @login_required def delete_file(): name = request.form.get('filename') try: api.create_commit(repo_id=DATASET_NAME, repo_type="dataset", operations=[CommitOperationDelete(path_in_repo=name)], commit_message=f"Del {name}") return jsonify({"status": "success"}) except Exception as e: return jsonify({"error": str(e)}) @app.route('/view/') def view_image(filename): return render_template_string(VIEW_TEMPLATE, real_url=f"https://huggingface.co/datasets/{DATASET_NAME}/resolve/main/{filename}") @app.route('/file/') def get_image_file(filename): url = f"https://huggingface.co/datasets/{DATASET_NAME}/resolve/main/{filename}" headers = {"Authorization": f"Bearer {HF_TOKEN}"} r = requests.get(url, headers=headers, stream=True) return Response(r.iter_content(chunk_size=1024), content_type=r.headers.get('Content-Type')) if __name__ == '__main__': app.run(host='0.0.0.0', port=7860)