File size: 3,962 Bytes
31c4369 581b93e 31c4369 581b93e 31c4369 581b93e 31c4369 581b93e 31c4369 581b93e 31c4369 581b93e 31c4369 581b93e 338f969 31c4369 bf2487a 31c4369 bf2487a 31c4369 581b93e ac6b83d 581b93e ac6b83d 581b93e ac6b83d 581b93e ac6b83d 581b93e ac6b83d 581b93e 31c4369 bf2487a 66f9f74 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
import os
import base64
import hashlib
import mimetypes
from datetime import datetime
from io import BytesIO
from flask import Flask, request, jsonify, redirect, render_template
from huggingface_hub import HfApi, HfFileSystem
from PIL import Image
import uuid
app = Flask(__name__)
# 环境变量配置
HF_TOKEN = os.environ.get("HF_TOKEN")
DATASET_ID = os.environ.get("DATASET_ID")
MAX_SIZE = int(os.environ.get("MAX_SIZE", 10)) * 1024 * 1024 # 转字节
API_KEY = os.environ.get("API_KEY")
fs = HfFileSystem(token=HF_TOKEN)
hf_api = HfApi(token=HF_TOKEN)
def validate_auth():
if not API_KEY:
return True
header_key = request.headers.get("X-API-Key")
param_key = request.args.get("key")
return header_key == API_KEY or param_key == API_KEY
def generate_filename(data, mime_type=None):
ext = mimetypes.guess_extension(mime_type or "application/octet-stream") or ".bin"
unique_id = uuid.uuid4().hex[:8]
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
return f"{timestamp}-{unique_id}{ext}"
def save_to_dataset(content, filename):
path = f"datasets/{DATASET_ID}/resolve/main/images/{filename}"
fs.mkdir(f"datasets/{DATASET_ID}/images", exist_ok=True)
fs.write_bytes(f"datasets/{DATASET_ID}/images/{filename}", content)
return f"https://hf-mirror.com/datasets/{DATASET_ID}/resolve/main/images/{filename}"
@app.route("/", methods=["GET"])
def index():
return render_template("index.html")
@app.route("/api/1/upload", methods=["GET", "POST"])
def upload():
# 鉴权检查
if not validate_auth():
return jsonify({
"status_code": 401,
"error": {"message": "Invalid API key"},
"status_txt": "Unauthorized"
}), 401
# 获取文件数据
source = None
if request.method == "POST":
if "source" in request.files:
file = request.files["source"]
source = file.read()
else:
source = request.form.get("source")
else:
source = request.args.get("source")
# 处理Base64、文件或URL
if isinstance(source, bytes):
content = source
elif source.startswith(("http://", "https://")):
# 需要实现URL下载逻辑(示例代码省略)
return jsonify({"error": "URL download not implemented"}), 501
elif source.startswith("data:"):
header, data = source.split(",", 1)
mime_type = header.split(":")[1].split(";")[0]
content = base64.b64decode(data)
else:
content = base64.b64decode(source)
# 验证大小限制
if len(content) > MAX_SIZE:
return jsonify({
"status_code": 413,
"error": {"message": f"File exceeds {MAX_SIZE//1024//1024}MB limit"},
"status_txt": "Payload Too Large"
}), 413
# 处理图片属性
try:
image = Image.open(BytesIO(content))
width, height = image.size
mime_type = image.get_format_mimetype()
except:
width = height = 0
mime_type = None
# 生成存储信息
filename = generate_filename(content, mime_type)
file_url = save_to_dataset(content, filename)
# 构造响应
response_format = request.args.get("format", "json")
if response_format == "txt":
return file_url
elif response_format == "redirect":
return redirect(file_url, code=302)
else:
return jsonify({
"status_code": 200,
"success": {"message": "file uploaded", "code": 200},
"image": {
"filename": filename,
"url": file_url,
"size": len(content),
"width": width,
"height": height,
"mime": mime_type,
"size_formatted": f"{len(content)/1024/1024:.1f} MB"
},
"status_txt": "OK"
})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860, debug=True)
|