Spaces:
Paused
Paused
| import os | |
| import requests | |
| import aiohttp | |
| import logging | |
| import json | |
| import aiofiles | |
| import base64 | |
| from urllib.parse import urlparse | |
| from .config import Config | |
| # 初始化日志 | |
| logger = logging.getLogger("sora-api.image_uploader") | |
| class PicGoUploader: | |
| """PicGo图床上传器""" | |
| def __init__(self): | |
| """初始化PicGo上传器""" | |
| self.picgo_url = Config.PICGO_URL | |
| self.api_key = Config.PICGO_API_KEY | |
| self.enabled = Config.ENABLE_PICGO | |
| # 日志输出 | |
| if self.enabled: | |
| logger.info(f"PicGo上传已启用,服务器地址: {self.picgo_url}") | |
| else: | |
| logger.info("PicGo上传未启用") | |
| async def upload(self, file_path): | |
| """ | |
| 上传图片到PicGo | |
| Args: | |
| file_path: 图片文件路径 | |
| Returns: | |
| 成功: 图片URL | |
| 失败: None | |
| """ | |
| if not self.enabled: | |
| logger.debug("PicGo上传未启用,跳过上传") | |
| return None | |
| if not os.path.exists(file_path): | |
| logger.error(f"文件不存在: {file_path}") | |
| return None | |
| try: | |
| # 读取图片文件 | |
| async with aiofiles.open(file_path, "rb") as f: | |
| file_content = await f.read() | |
| # 编码为Base64 | |
| base64_content = base64.b64encode(file_content).decode('utf-8') | |
| # 准备上传数据 | |
| data = { | |
| "list": [{ | |
| "data": f"data:image/{os.path.splitext(file_path)[1][1:]};base64,{base64_content}" | |
| }] | |
| } | |
| # 准备请求头 | |
| headers = {"Content-Type": "application/json"} | |
| if self.api_key: | |
| headers["picgo-api-key"] = self.api_key | |
| # 发送请求 | |
| timeout = aiohttp.ClientTimeout(total=60) | |
| async with aiohttp.ClientSession(timeout=timeout) as session: | |
| async with session.post( | |
| f"{self.picgo_url}/upload", | |
| headers=headers, | |
| json=data | |
| ) as response: | |
| result = await response.json() | |
| # 处理响应 | |
| if result.get("success"): | |
| image_url = result.get("result")[0] | |
| logger.info(f"图片已成功上传到PicGo: {image_url}") | |
| return image_url | |
| else: | |
| logger.error(f"PicGo上传失败: {result.get('message', '未知错误')}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"PicGo上传过程出错: {str(e)}") | |
| return None | |
| class WebDAVUploader: | |
| """WebDAV上传器""" | |
| def __init__(self): | |
| """初始化WebDAV上传器""" | |
| self.webdav_url = Config.WEBDAV_URL | |
| self.username = Config.WEBDAV_USERNAME | |
| self.password = Config.WEBDAV_PASSWORD | |
| self.base_path = Config.WEBDAV_BASE_PATH | |
| self.enabled = Config.ENABLE_WEBDAV | |
| self.public_url = Config.WEBDAV_PUBLIC_URL | |
| # 确保base_path以/开头 | |
| if self.base_path and not self.base_path.startswith('/'): | |
| self.base_path = f"/{self.base_path}" | |
| # 确保webdav_url以/结尾 | |
| if self.webdav_url and not self.webdav_url.endswith('/'): | |
| self.webdav_url = f"{self.webdav_url}/" | |
| # 日志输出 | |
| if self.enabled: | |
| if not self.webdav_url: | |
| logger.warning("WebDAV上传已启用,但未设置WebDAV URL") | |
| self.enabled = False | |
| else: | |
| logger.info(f"WebDAV上传已启用,服务器地址: {self.webdav_url}") | |
| else: | |
| logger.info("WebDAV上传未启用") | |
| async def upload(self, file_path): | |
| """ | |
| 上传图片到WebDAV服务器 | |
| Args: | |
| file_path: 图片文件路径 | |
| Returns: | |
| 成功: 图片URL | |
| 失败: None | |
| """ | |
| if not self.enabled: | |
| logger.debug("WebDAV上传未启用,跳过上传") | |
| return None | |
| if not os.path.exists(file_path): | |
| logger.error(f"文件不存在: {file_path}") | |
| return None | |
| try: | |
| # 读取图片文件 | |
| async with aiofiles.open(file_path, "rb") as f: | |
| file_content = await f.read() | |
| # 准备上传路径 | |
| filename = os.path.basename(file_path) | |
| remote_path = f"{self.base_path}/{filename}" | |
| # 确保目录存在 | |
| await self._ensure_directory(self.base_path) | |
| # 上传文件 | |
| timeout = aiohttp.ClientTimeout(total=60) | |
| auth = aiohttp.BasicAuth(self.username, self.password) | |
| async with aiohttp.ClientSession(timeout=timeout, auth=auth) as session: | |
| async with session.put( | |
| f"{self.webdav_url}{remote_path.lstrip('/')}", | |
| data=file_content | |
| ) as response: | |
| if response.status in (200, 201, 204): | |
| # 生成访问URL | |
| if self.public_url: | |
| url = f"{self.public_url.rstrip('/')}{remote_path}" | |
| else: | |
| url = f"{self.webdav_url}{remote_path.lstrip('/')}" | |
| logger.info(f"图片已成功上传到WebDAV: {url}") | |
| return url | |
| else: | |
| logger.error(f"WebDAV上传失败,状态码: {response.status}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"WebDAV上传过程出错: {str(e)}") | |
| return None | |
| async def _ensure_directory(self, directory): | |
| """确保WebDAV目录存在""" | |
| try: | |
| timeout = aiohttp.ClientTimeout(total=30) | |
| auth = aiohttp.BasicAuth(self.username, self.password) | |
| async with aiohttp.ClientSession(timeout=timeout, auth=auth) as session: | |
| # 检查目录是否存在 | |
| async with session.request( | |
| "PROPFIND", | |
| f"{self.webdav_url}{directory.lstrip('/')}", | |
| headers={"Depth": "0"}, | |
| ) as response: | |
| if response.status == 207: | |
| return True | |
| # 目录不存在,创建目录 | |
| async with session.request( | |
| "MKCOL", | |
| f"{self.webdav_url}{directory.lstrip('/')}" | |
| ) as response: | |
| if response.status in (201, 405): # 405表示目录已存在 | |
| return True | |
| else: | |
| logger.error(f"创建WebDAV目录失败: {directory}, 状态码: {response.status}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"确保WebDAV目录存在时出错: {str(e)}") | |
| return False | |
| # 创建上传器实例 | |
| picgo_uploader = PicGoUploader() | |
| webdav_uploader = WebDAVUploader() | |
| async def upload_image(file_path): | |
| """ | |
| 上传图片到配置的服务 | |
| Args: | |
| file_path: 图片文件路径 | |
| Returns: | |
| 元组 (成功标志, 图片URL或None) | |
| """ | |
| # 尝试PicGo上传 | |
| if picgo_uploader.enabled: | |
| picgo_url = await picgo_uploader.upload(file_path) | |
| if picgo_url: | |
| return True, picgo_url | |
| # 尝试WebDAV上传 | |
| if webdav_uploader.enabled: | |
| webdav_url = await webdav_uploader.upload(file_path) | |
| if webdav_url: | |
| return True, webdav_url | |
| # 都上传失败 | |
| return False, None |