import os import time import threading import requests import logging import secrets from functools import wraps from datetime import datetime from flask import Flask, render_template_string, request, jsonify, session, redirect, url_for from flask_sqlalchemy import SQLAlchemy # ================= 配置区域 ================= DEFAULT_DB_URI = 'postgresql://postgres:password@192.168.1.10:5432/alist_sync' DB_URI = os.environ.get('DB_URI', DEFAULT_DB_URI) # 兼容性修复 if DB_URI and DB_URI.startswith("postgres://"): DB_URI = DB_URI.replace("postgres://", "postgresql://", 1) PORT = int(os.environ.get('PORT', 5000)) # =========================================== app = Flask(__name__) app.secret_key = os.environ.get('SECRET_KEY', secrets.token_hex(16)) app.config['SQLALCHEMY_DATABASE_URI'] = DB_URI app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {'pool_pre_ping': True} db = SQLAlchemy(app) logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s') logger = logging.getLogger(__name__) # --- 数据库模型 --- class AppConfig(db.Model): __tablename__ = 'config' id = db.Column(db.Integer, primary_key=True) url = db.Column(db.String(255), default="http://localhost:5244") username = db.Column(db.String(255), default="admin") password = db.Column(db.String(255), default="") path1 = db.Column(db.String(500), default="") path2 = db.Column(db.String(500), default="") interval = db.Column(db.Integer, default=60) auto_sync = db.Column(db.Boolean, default=False) web_password = db.Column(db.String(255), default="123456") class AppLog(db.Model): __tablename__ = 'logs' id = db.Column(db.Integer, primary_key=True) timestamp = db.Column(db.DateTime, default=datetime.now) message = db.Column(db.Text) # --- 装饰器与逻辑 --- def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): if 'logged_in' not in session: return redirect(url_for('login')) return f(*args, **kwargs) return decorated_function def add_log(msg): print(f"[Log] {msg}") try: with app.app_context(): log = AppLog(message=str(msg), timestamp=datetime.now()) db.session.add(log) db.session.commit() # 自动清理日志 last = db.session.query(db.func.max(AppLog.id)).scalar() if last and last > 1000: AppLog.query.filter(AppLog.id < (last - 500)).delete() db.session.commit() except: pass def get_token(base_url, username, password): try: res = requests.post(f"{base_url.rstrip('/')}/api/auth/login", json={'username': username, 'password': password}, timeout=10) if res.status_code == 200: return res.json().get('data', {}).get('token') except Exception as e: add_log(f"登录失败: {e}") return None def list_files(base_url, token, path): try: res = requests.post(f"{base_url.rstrip('/')}/api/fs/list", headers={'Authorization': token}, json={"path": path, "password": "", "page": 1, "per_page": 0, "refresh": True}, timeout=30) if res.status_code == 200: return res.json().get('data', {}).get('content', []) except Exception as e: add_log(f"获取列表失败: {e}") return [] def copy_files_api(base_url, token, src_dir, dst_dir, file_names): try: res = requests.post(f"{base_url.rstrip('/')}/api/fs/copy", headers={'Authorization': token}, json={"src_dir": src_dir, "dst_dir": dst_dir, "names": file_names}, timeout=30) return res.json() except: return {"code": 500} # --- 后台任务 --- def background_worker(): while True: try: with app.app_context(): cfg = AppConfig.query.first() if not cfg: db.session.add(AppConfig(web_password="123456")) db.session.commit() continue if cfg.auto_sync: interval = max(1, cfg.interval) token = get_token(cfg.url, cfg.username, cfg.password) if token: src = list_files(cfg.url, token, cfg.path1) dst = list_files(cfg.url, token, cfg.path2) if src: dst_names = {f['name'] for f in dst} missing = [i['name'] for i in src if i['name'] not in dst_names] if missing: add_log(f"自动同步: 发现 {len(missing)} 个文件,开始复制") for i in range(0, len(missing), 20): copy_files_api(cfg.url, token, cfg.path1, cfg.path2, missing[i:i+20]) time.sleep(interval * 60) else: time.sleep(10) except Exception as e: print(f"后台错误: {e}") time.sleep(60) # ========================================== # Material 3 风格前端模板 # ========================================== # 通用头部,包含 Material Web 组件加载 HEAD_COMMON = """ Alist Sync Panel """ LOGIN_HTML = HEAD_COMMON + """
sync_lock

Alist Sync

请输入面板访问密码

{% if error %}
error {{ error }}
{% endif %}
key 登 录
Default: 123456
""" INDEX_HTML = HEAD_COMMON + """
cloud_sync Alist Sync Panel
logout 退出
settings 同步配置
link person lock
folder_open save_alt
save 保存并应用
analytics 运行状态
stop_circle
已停止
security 修改面板密码
修改
terminal 系统日志
refresh
加载中...
Notification
""" # --- 路由逻辑 (保持不变) --- @app.route('/login', methods=['GET', 'POST']) def login(): error = None if request.method == 'POST': input_pass = request.form.get('password') with app.app_context(): cfg = AppConfig.query.first() real_pass = cfg.web_password if cfg else "123456" if input_pass == real_pass: session['logged_in'] = True return redirect(url_for('index')) else: error = "密码错误 (默认为 123456)" return render_template_string(LOGIN_HTML, error=error) @app.route('/logout') def logout(): session.pop('logged_in', None) return redirect(url_for('login')) @app.route('/') @login_required def index(): return render_template_string(INDEX_HTML) @app.route('/api/config', methods=['GET', 'POST']) @login_required def api_config(): cfg = AppConfig.query.first() if not cfg: cfg = AppConfig(web_password="123456") db.session.add(cfg) db.session.commit() if request.method == 'POST': data = request.json cfg.url = data.get('url') cfg.username = data.get('username') cfg.password = data.get('password') cfg.path1 = data.get('path1') cfg.path2 = data.get('path2') cfg.interval = data.get('interval') cfg.auto_sync = data.get('auto_sync') db.session.commit() return jsonify({'success': True}) return jsonify({ 'url': cfg.url, 'username': cfg.username, 'password': cfg.password, 'path1': cfg.path1, 'path2': cfg.path2, 'interval': cfg.interval, 'auto_sync': cfg.auto_sync }) @app.route('/api/change_pass', methods=['POST']) @login_required def api_change_pass(): new_pass = request.form.get('password') if new_pass: cfg = AppConfig.query.first() cfg.web_password = new_pass db.session.commit() return jsonify({'msg': '面板密码已修改'}) return jsonify({'msg': '无效密码'}) @app.route('/api/logs') @login_required def api_logs(): logs = AppLog.query.order_by(AppLog.timestamp.desc()).limit(100).all() return jsonify([{'timestamp': l.timestamp.strftime("%H:%M:%S"), 'message': l.message} for l in logs]) if __name__ == '__main__': with app.app_context(): try: db.create_all() except: pass t = threading.Thread(target=background_worker, daemon=True) t.start() app.run(host='0.0.0.0', port=PORT, debug=False, use_reloader=False)