#!/usr/bin/env python3 """ 在线文件转存到网盘 - 基于 FastAPI 的现代化界面 通过云端高速通道实现文件转存到网盘中 """ import json import uuid import os import io import zipfile import requests from datetime import datetime from urllib.parse import urlparse, unquote from fastapi import FastAPI, Request, Query from fastapi.responses import HTMLResponse, JSONResponse from pydantic import BaseModel import uvicorn DEFAULT_TOKEN_Q = "" ACCEPT_VALUE = os.environ.get("ACCEPT_VALUE", "") API_VERSION_KEY = os.environ.get("API_VERSION_KEY", "") ACCESS_TOKEN = os.environ.get("ACCESS_TOKEN_G", "") API_BASE_URL = os.environ.get("API_BASE_URL", f"") QUARK_COOKIE = os.environ.get("ACCESS_TOKEN_Q", DEFAULT_TOKEN_Q) ENABLE_TEST_DATA = False def get_remote_processor(): token = ACCESS_TOKEN base_url = API_BASE_URL if not all([token, base_url]): return None return RemoteTaskProcessor(token, base_url) class RemoteTaskProcessor: def __init__(self, token, base_url): self.token = token self.base_url = base_url self.headers = { "Authorization": f"Bearer {token}", "Accept": ACCEPT_VALUE, API_VERSION_KEY: "2022-11-28" } def generate_task_id(self): date_str = datetime.now().strftime("%Y%m%d_%H%M%S") short_uuid = uuid.uuid4().hex[:12] return f"task_{date_str}_{short_uuid}" def exec_task(self, task_file, inputs, trace_id): url = f"{self.base_url}/actions/workflows/{task_file}/dispatches" dispatch_inputs = {"trace_id": trace_id, **inputs} data = {"ref": "main", "inputs": dispatch_inputs} response = requests.post(url, headers=self.headers, json=data) return response.status_code == 204, response def find_task_by_task_id(self, task_file, trace_id): url = f"{self.base_url}/actions/workflows/{task_file}/runs" params = {"event": "workflow_dispatch", "branch": "main", "per_page": 20} resp = requests.get(url, headers=self.headers, params=params) resp.raise_for_status() runs = resp.json().get("workflow_runs", []) for run in runs: run_name = run.get("name", "") display_title = run.get("display_title", "") if trace_id in run_name or trace_id in display_title: return run return None def get_task_status(self, task_id): url = f"{self.base_url}/actions/runs/{task_id}" response = requests.get(url, headers=self.headers) if response.status_code != 200: return None return response.json() def get_result(self, run_id, artifact_name="result"): url = f"{self.base_url}/actions/runs/{run_id}/artifacts" response = requests.get(url, headers=self.headers) if response.status_code != 200: return None artifacts = response.json().get("artifacts", []) target = None for a in artifacts: if a["name"] == artifact_name: target = a break if not target: return None download_url = f"{self.base_url}/actions/artifacts/{target['id']}/zip" response = requests.get(download_url, headers=self.headers) if response.status_code != 200: return None try: with zipfile.ZipFile(io.BytesIO(response.content)) as zf: if "result.json" in zf.namelist(): return json.loads(zf.read("result.json").decode("utf-8")) except Exception: pass return None def is_valid_url(url: str) -> bool: """初步判断是否是有效的 http/https 下载链接""" if not url: return False try: result = urlparse(url.strip()) return all([result.scheme, result.netloc]) and result.scheme in ["http", "https"] except Exception: return False def extract_filename_from_url(url): if not url or not url.strip(): return "" try: parsed = urlparse(url.strip()) path = unquote(parsed.path) path = path.rstrip("/") filename = os.path.basename(path) if filename and "." in filename: return filename if filename: return filename except Exception: pass return "" # ============================================================ # 核心刷新逻辑 # ============================================================ def _do_refresh(task_list, trigger): updated_count = 0 for task in task_list: if task["status"] not in ["正在转存", "未提交"]: continue run_id_str = task.get("run_id", "") if not run_id_str: run = trigger.find_task_by_task_id("upload.yml", task["trace_id"]) if run: task["run_id"] = str(run["id"]) run_id_str = str(run["id"]) else: continue try: run_data = trigger.get_task_status(int(run_id_str)) except (ValueError, TypeError): continue if not run_data: continue status = run_data.get("status") conclusion = run_data.get("conclusion") if status == "completed": if conclusion == "success": result = trigger.get_result(int(run_id_str)) if result and isinstance(result, dict): task["status"] = "已转存" task["share_url"] = result.get("share_url", "") else: task["status"] = "已转存" else: task["status"] = "失败" updated_count += 1 return updated_count def _has_active_tasks(task_list): return any(t["status"] in ["正在转存", "未提交"] for t in task_list) # ============================================================ # FastAPI 应用 # ============================================================ app = FastAPI(title="CloudFileRelay|Online Files to Cloud Drive") # 会话存储 SESSION_FILE = "sessions_db.json" sessions: dict = {} def load_sessions(): global sessions if os.path.exists(SESSION_FILE): try: with open(SESSION_FILE, "r", encoding="utf-8") as f: sessions = json.load(f) except Exception: sessions = {} def save_sessions(): try: with open(SESSION_FILE, "w", encoding="utf-8") as f: json.dump(sessions, f, ensure_ascii=False, indent=2) except Exception: pass load_sessions() def get_or_create_session(request: Request) -> tuple: # 优先从 header 获取 sid,作为 fallback 方案,解决 iframe 跨域 Cookie 丢失问题 sid = request.headers.get("X-Session-ID", "") if not sid: sid = request.cookies.get("sid", "") if sid and sid in sessions: return sid, sessions[sid] # 如果 sid 已经存在但不在 sessions 中(可能服务器重启),仍保留原 sid 以维持客户端一致性 if not sid: sid = uuid.uuid4().hex if sid not in sessions: sessions[sid] = [] save_sessions() return sid, sessions[sid] def json_resp(data: dict, sid: str) -> JSONResponse: # 每次有变更时保存会话(这里简单处理,实际高并发下建议异步或定时保存) save_sessions() # 在响应体中携带 sid,方便前端存入 localStorage data["sid"] = sid resp = JSONResponse(content=data) # 使用 samesite="none" 和 secure=True 以支持在 Hugging Face Spaces 等 iframe 环境中正常传递 Cookie resp.set_cookie("sid", sid, max_age=86400 * 7, httponly=True, samesite="none", secure=True) return resp # ---- 请求模型 ---- class SubmitRequest(BaseModel): url: str filename: str = "" class QueryRequest(BaseModel): trace_id: str # ---- 页面 ---- @app.get("/", response_class=HTMLResponse) async def index(request: Request): sid, _ = get_or_create_session(request) resp = HTMLResponse(content=HTML_TEMPLATE) resp.set_cookie("sid", sid, max_age=86400 * 7, httponly=True, samesite="none", secure=True) return resp # ---- API ---- @app.get("/api/extract-filename") async def api_extract_filename(url: str = Query("")): return {"filename": extract_filename_from_url(url)} @app.post("/api/submit") async def api_submit(req: SubmitRequest, request: Request): sid, task_list = get_or_create_session(request) if not req.url or not req.url.strip(): return json_resp({"success": False, "message": "请输入下载链接", "tasks": task_list}, sid) url_to_submit = req.url.strip() if not is_valid_url(url_to_submit): return json_resp({ "success": False, "message": "转存失败,请输入有效的下载链接地址", "tasks": task_list }, sid) # 检查是否已存在相同链接且处于活跃状态的任务 for task in task_list: if task.get("url") == url_to_submit and task.get("status") in ["正在转存", "未提交"]: return json_resp({ "success": False, "message": "该任务已在转存中,请耐心等待,无需重复提交。", "tasks": task_list }, sid) trigger = get_remote_processor() if not trigger: return json_resp({ "success": False, "message": "配置缺失!", "tasks": task_list }, sid) local_file = req.filename.strip() if req.filename and req.filename.strip() else extract_filename_from_url(req.url) trace_id = trigger.generate_task_id() cookie = QUARK_COOKIE inputs = { "url": req.url.strip(), "local_file": local_file, "cookie": cookie } success, resp = trigger.exec_task("upload.yml", inputs, trace_id) if not success: error_detail = "" try: error_detail = resp.text[:300] except Exception: pass return json_resp({ "success": False, "message": f"任务触发失败 (HTTP {resp.status_code})\n{error_detail}", "tasks": task_list }, sid) task = { "trace_id": trace_id, "run_id": "", "filename": local_file, "url": req.url.strip(), "status": "正在转存", "share_url": "", "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), } task_list.append(task) return json_resp({ "success": True, "message": f"转存任务已提交!\n任务 ID: {trace_id}\n文件名: {local_file}", "task_id": trace_id, "tasks": task_list }, sid) @app.get("/api/tasks") async def api_tasks(request: Request): sid, task_list = get_or_create_session(request) return json_resp({"tasks": task_list}, sid) @app.post("/api/refresh") async def api_refresh(request: Request): sid, task_list = get_or_create_session(request) trigger = get_remote_processor() if not trigger: return json_resp({"tasks": task_list, "message": "配置缺失", "all_done": True}, sid) if not task_list: return json_resp({"tasks": task_list, "message": "暂无任务", "all_done": True}, sid) count = _do_refresh(task_list, trigger) now = datetime.now().strftime("%H:%M:%S") all_done = not _has_active_tasks(task_list) if count > 0: msg = f"[{now}] 已更新 {count} 个任务的状态" else: msg = f"[{now}] 正在转存" if all_done and task_list: msg += " · 所有任务已完成" return json_resp({"tasks": task_list, "message": msg, "all_done": all_done}, sid) @app.post("/api/clear") async def api_clear(request: Request): sid, _ = get_or_create_session(request) sessions[sid] = [] return json_resp({"success": True, "tasks": [], "message": "任务列表已清空"}, sid) @app.post("/api/query") async def api_query(req: QueryRequest, request: Request): sid, task_list = get_or_create_session(request) if not req.trace_id or not req.trace_id.strip(): return json_resp({"success": False, "message": "请输入任务 ID", "tasks": task_list}, sid) trigger = get_remote_processor() if not trigger: return json_resp({"success": False, "message": "配置缺失", "tasks": task_list}, sid) trace_id = req.trace_id.strip() # 在本地任务列表中查找 run_id run_id = None for task in task_list: if task.get("trace_id") == trace_id and task.get("run_id"): try: run_id = int(task["run_id"]) except (ValueError, TypeError): pass break if not run_id: run = trigger.find_task_by_task_id("upload.yml", trace_id) if not run: return json_resp({ "success": False, "message": f"未找到任务 ID: {trace_id}\n可能任务尚未被创建,请稍后再试", "tasks": task_list }, sid) run_id = run["id"] for task in task_list: if task.get("trace_id") == trace_id: task["run_id"] = str(run_id) run_data = trigger.get_task_status(run_id) if not run_data: return json_resp({ "success": False, "message": f"无法获取任务状态\n任务 ID: {trace_id}", "tasks": task_list }, sid) status = run_data.get("status") conclusion = run_data.get("conclusion") html_url = run_data.get("html_url", "") if status != "completed": status_map = { "queued": "排队中", "in_progress": "执行中", "waiting": "等待中", "requested": "已请求", "pending": "等待中" } status_cn = status_map.get(status, status) return json_resp({ "success": True, "message": f"任务正在执行中,请稍后再查询\n\n任务 ID: {trace_id}\n当前状态: {status_cn}", "tasks": task_list }, sid) if conclusion == "success": result = trigger.get_result(run_id) for task in task_list: if task.get("trace_id") == trace_id: if result and isinstance(result, dict): task["status"] = "已转存" task["share_url"] = result.get("share_url", "") else: task["status"] = "已转存" if result and isinstance(result, dict): share_url = result.get("share_url", "无") local_file = result.get("local_file", "无") result_status = result.get("status", "unknown") return json_resp({ "success": True, "message": ( f"任务已完成!\n\n" f"任务 ID: {trace_id}\n" f"状态: {result_status}\n" f"文件名: {local_file}\n" f"网盘地址: {share_url}" ), "tasks": task_list }, sid) else: return json_resp({ "success": True, "message": f"任务已完成 (结论: {conclusion})\n但未找到结果文件 (artifact)", "tasks": task_list }, sid) else: for task in task_list: if task.get("trace_id") == trace_id: task["status"] = "失败" result = trigger.get_result(run_id) error_info = "" if result and isinstance(result, dict) and "error" in result: error_info = f"\n错误信息: {result['error']}" return json_resp({ "success": False, "message": f"任务失败\n\n任务 ID: {trace_id}\n结论: {conclusion}{error_info}", "tasks": task_list }, sid) # ============================================================ # HTML 模板 # ============================================================ HTML_TEMPLATE = r""" CloudFileRelay|Online Files to Cloud Drive
📦

在线文件云端中转到你的网盘

输入文件下载链接,自动转存到你的夸克网盘

📥 新建转存任务
📋 任务列表
自动刷新
📭

暂无转存任务

提交转存任务后,将在此处显示

🔍 查询指定任务
Powered by 小豹
""" # ============================================================ # 启动 # ============================================================ if __name__ == "__main__": uvicorn.run("app:app", host="0.0.0.0", port=7860)