import os
import time
import uuid
import datetime
import requests
from functools import wraps
from flask import Flask, request, jsonify, render_template_string, redirect, session, url_for, Response, stream_with_context
from flask_cors import CORS
from huggingface_hub import HfApi, CommitOperationDelete
from werkzeug.middleware.proxy_fix import ProxyFix
app = Flask(__name__)
CORS(app)
# app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1, x_host=1)
app.secret_key = "my-fixed-secret-key-2026"
app.config.update(
SESSION_COOKIE_SECURE=True,
SESSION_COOKIE_SAMESITE='None',
PERMANENT_SESSION_LIFETIME=datetime.timedelta(days=30)
)
HF_TOKEN = os.environ.get("HF_TOKEN")
DATASET_NAME = os.environ.get("DATASET_NAME")
SPACE_HOST = os.environ.get("SPACE_HOST", "localhost:7860")
BASE_URL = f"https://{SPACE_HOST}" if "localhost" not in SPACE_HOST else f"http://{SPACE_HOST}"
ADMIN_USER = os.environ.get("ADMIN_USER")
ADMIN_PASS = os.environ.get("ADMIN_PASS")
# ==========================================
# 👇 关键修复:智能处理自定义域名格式
# ==========================================
CUSTOM_DOMAIN = os.environ.get("CUSTOM_DOMAIN")
if CUSTOM_DOMAIN:
# 1. 去除结尾的斜杠
if CUSTOM_DOMAIN.endswith("/"):
CUSTOM_DOMAIN = CUSTOM_DOMAIN[:-1]
# 2. 👇 漏掉的就是这几行:如果没有 https://,自动加上
# 这样 Flask 就知道这是个绝对网址,不会把它当成相对路径去拼接了
if not CUSTOM_DOMAIN.startswith("http"):
CUSTOM_DOMAIN = f"https://{CUSTOM_DOMAIN}"
# ==========================================
if HF_TOKEN: api = HfApi(token=HF_TOKEN)
CACHE_DIR = "/app/cache"
if not os.path.exists(CACHE_DIR): os.makedirs(CACHE_DIR)
def format_size(size):
if size is None: return "未知"
for unit in ['B', 'KB', 'MB', 'GB']:
if size < 1024: return f"{size:.1f} {unit}"
size /= 1024
return f"{size:.1f} TB"
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if ADMIN_USER and ADMIN_PASS and not session.get('logged_in'):
# 未登录跳转逻辑
if CUSTOM_DOMAIN:
return redirect(f"{CUSTOM_DOMAIN}/login")
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
# 1. 登录页面
LOGIN_TEMPLATE = """
登录CloudGallery
{% if error %}
{{ error }}
{% endif %}
"""
# 2. 全屏查看
VIEW_TEMPLATE = """查看
"""
# 3. 主页模板
HTML_TEMPLATE = """
CloudGallery
{% for img in images %}
{{ img.name }}
{{ img.size_fmt }}...
{% endfor %}
"""
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
if request.form.get('username') == ADMIN_USER and request.form.get('password') == ADMIN_PASS:
session.permanent = True; session['logged_in'] = True;
if CUSTOM_DOMAIN:
return redirect(f"{CUSTOM_DOMAIN}/")
return redirect("/")
return render_template_string(LOGIN_TEMPLATE, error="Error")
return render_template_string(LOGIN_TEMPLATE)
@app.route('/logout')
def logout():
session.pop('logged_in', None);
if CUSTOM_DOMAIN:
return redirect(f"{CUSTOM_DOMAIN}/login")
return redirect('/login')
@app.route('/')
@login_required
def home():
if not HF_TOKEN: return "Missing Env"
try:
tree = api.list_repo_tree(repo_id=DATASET_NAME, repo_type="dataset", token=HF_TOKEN, recursive=False)
images = []
current_host = CUSTOM_DOMAIN if CUSTOM_DOMAIN else BASE_URL
for item in tree:
if item.path.lower().endswith(('.png','.jpg','.jpeg','.gif','.webp','.bmp')):
raw_url = f"{current_host}/file/{item.path}"
# view_url 也走代理
view_url = f"{current_host}/view/{item.path}"
images.append({
"name": item.path,
"raw_url": raw_url,
"real_url": f"https://huggingface.co/datasets/{DATASET_NAME}/resolve/main/{item.path}",
"view_url": view_url,
"size_fmt": format_size(item.size) if hasattr(item, 'size') else "?"
})
images.reverse()
return render_template_string(HTML_TEMPLATE, images=images, dataset_name=DATASET_NAME)
except: return "Error loading images"
@app.route('/upload', methods=['POST'])
@login_required
def upload_file():
files = request.files.getlist('files')
count = 0
for file in files:
if not file.filename: continue
ext = os.path.splitext(file.filename)[1].lower()
if not ext: ext = ".jpg"
name = f"{uuid.uuid4().hex[:4]}{ext}"
path = os.path.join(CACHE_DIR, name)
try:
file.save(path)
api.upload_file(path_or_fileobj=path, path_in_repo=name, repo_id=DATASET_NAME, repo_type="dataset", token=HF_TOKEN)
os.remove(path)
count += 1
except: pass
return jsonify({"status": "success", "count": count})
@app.route('/delete', methods=['POST'])
@login_required
def delete_file():
name = request.form.get('filename')
try:
api.create_commit(repo_id=DATASET_NAME, repo_type="dataset", operations=[CommitOperationDelete(path_in_repo=name)], commit_message=f"Del {name}")
return jsonify({"status": "success"})
except Exception as e: return jsonify({"error": str(e)})
@app.route('/view/')
def view_image(filename):
current_host = CUSTOM_DOMAIN if CUSTOM_DOMAIN else BASE_URL
real_url = f"{current_host}/file/{filename}"
return render_template_string(VIEW_TEMPLATE, real_url=real_url)
@app.route('/file/')
def get_image_file(filename):
try:
url = f"https://huggingface.co/datasets/{DATASET_NAME}/resolve/main/{filename}"
headers = {"Authorization": f"Bearer {HF_TOKEN}"}
r = requests.get(url, headers=headers, stream=True)
if r.status_code != 200: return f"Error: {r.status_code}", r.status_code
# 构造响应流
response = Response(stream_with_context(r.iter_content(chunk_size=1024)), content_type=r.headers.get('Content-Type'))
# 🔥 缓存设置
response.headers['Cache-Control'] = 'public, max-age=31536000'
return response
except Exception as e: return f"Proxy Error: {str(e)}", 500
if __name__ == '__main__': app.run(host='0.0.0.0', port=7860)