from flask import Flask, render_template_string, request, send_file, jsonify, redirect, url_for, session import os import sqlite3 import hashlib from datetime import datetime from pathlib import Path from PIL import Image import uuid import threading import time import random import shutil # 环境变量 ACCESS_PASSWORD = os.environ.get("ACCESS_PASSWORD", "changeme") HF_USERNAME = os.environ.get("HF_USERNAME", "") HF_SPACE_NAME = os.environ.get("HF_SPACE_NAME", "") # Flask应用 app = Flask(__name__) app.secret_key = os.environ.get("ACCESS_PASSWORD", "your-secret-key-change-this") # 文件存储路径 IMAGE_DIR = Path("uploaded_images") DB_PATH = "image_database.db" # 创建目录 IMAGE_DIR.mkdir(exist_ok=True) def init_db(): """初始化数据库""" try: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS images ( id INTEGER PRIMARY KEY AUTOINCREMENT, hash TEXT NOT NULL UNIQUE, filename TEXT NOT NULL, original_filename TEXT NOT NULL, file_path TEXT NOT NULL, file_size INTEGER, mime_type TEXT, upload_time TEXT NOT NULL, description TEXT ) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_hash ON images(hash) """) conn.commit() conn.close() print("数据库初始化成功") return True except Exception as e: print(f"数据库初始化失败: {e}") return False def get_db_connection(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def check_password(password): return password == ACCESS_PASSWORD def generate_image_hash(): return uuid.uuid4().hex[:12] def generate_filename(original_filename): timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f') hash_suffix = hashlib.md5(str(datetime.now().timestamp()).encode()).hexdigest()[:8] ext = Path(original_filename).suffix.lower() if not ext: ext = ".png" return f"{timestamp}_{hash_suffix}{ext}" def generate_full_url(image_hash): """生成完整的图片URL""" if HF_USERNAME and HF_SPACE_NAME: return f"https://{HF_USERNAME}-{HF_SPACE_NAME}.hf.space/img/{image_hash}" else: return f"/img/{image_hash}" def keep_alive(): """防止系统休眠""" while True: sleep_time = random.randint(60, 120) time.sleep(sleep_time) print(f"[Keep-Alive] {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") # 初始化 init_db() keep_alive_thread = threading.Thread(target=keep_alive, daemon=True) keep_alive_thread.start() # HTML模板 HTML_TEMPLATE = """ My图床

My图床

上传图片

点击选择图片(支持多选)

图片列表

导出数据

导出所有图片的元数据(包含完整URL)为JSON格式

""" @app.route('/') def index(): return render_template_string(HTML_TEMPLATE) @app.route('/api/upload', methods=['POST']) def upload(): try: password = request.form.get('password') if not check_password(password): return jsonify({'success': False, 'error': '密码错误'}) files = request.files.getlist('images') description = request.form.get('description', '') if not files: return jsonify({'success': False, 'error': '没有文件'}) results = [] for file in files: if file.filename == '': continue # 生成hash和文件名 image_hash = generate_image_hash() original_filename = file.filename new_filename = generate_filename(original_filename) file_path = IMAGE_DIR / new_filename # 保存文件 file.save(file_path) # 获取信息 file_size = file_path.stat().st_size mime_type = f"image/{file_path.suffix[1:]}" upload_time = datetime.now().isoformat() # 保存到数据库 conn = get_db_connection() cursor = conn.cursor() cursor.execute( "INSERT INTO images (hash, filename, original_filename, file_path, file_size, mime_type, upload_time, description) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", (image_hash, new_filename, original_filename, str(file_path), file_size, mime_type, upload_time, description) ) conn.commit() conn.close() results.append({ 'hash': image_hash, 'url': generate_full_url(image_hash), 'filename': original_filename }) return jsonify({'success': True, 'data': results}) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/img/') def serve_image(image_hash): try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT file_path, mime_type, original_filename FROM images WHERE hash = ?", (image_hash,)) row = cursor.fetchone() conn.close() if not row: return "Image not found", 404 file_path = Path(row['file_path']) if not file_path.exists(): return "File not found", 404 return send_file(file_path, mimetype=row['mime_type']) except Exception as e: return str(e), 500 @app.route('/api/images') def get_images(): try: password = request.args.get('password') if not check_password(password): return jsonify({'success': False, 'error': '密码错误'}) conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT hash, original_filename, file_size, upload_time, description FROM images ORDER BY upload_time DESC") rows = cursor.fetchall() conn.close() images = [] for row in rows: images.append({ 'hash': row['hash'], 'original_filename': row['original_filename'], 'file_size': row['file_size'], 'upload_time': row['upload_time'], 'description': row['description'], 'url': generate_full_url(row['hash']) }) return jsonify({'success': True, 'data': images}) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/api/delete/', methods=['POST']) def delete_image(image_hash): try: data = request.get_json() password = data.get('password') if not check_password(password): return jsonify({'success': False, 'error': '密码错误'}) conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT file_path FROM images WHERE hash = ?", (image_hash,)) row = cursor.fetchone() if not row: conn.close() return jsonify({'success': False, 'error': '图片不存在'}) file_path = Path(row['file_path']) cursor.execute("DELETE FROM images WHERE hash = ?", (image_hash,)) conn.commit() conn.close() if file_path.exists(): file_path.unlink() return jsonify({'success': True}) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/api/export') def export_data(): try: password = request.args.get('password') if not check_password(password): return jsonify({'success': False, 'error': '密码错误'}) conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM images ORDER BY upload_time DESC") rows = cursor.fetchall() conn.close() data = [] for row in rows: data.append({ 'hash': row['hash'], 'filename': row['filename'], 'original_filename': row['original_filename'], 'file_size': row['file_size'], 'upload_time': row['upload_time'], 'description': row['description'], 'url': generate_full_url(row['hash']) }) import json from flask import Response return Response( json.dumps(data, ensure_ascii=False, indent=2), mimetype='application/json', headers={'Content-Disposition': 'attachment; filename=image_metadata.json'} ) except Exception as e: return jsonify({'success': False, 'error': str(e)}) if __name__ == '__main__': app.run(host='0.0.0.0', port=7860, debug=False)