import os import requests import aiohttp import logging import json import aiofiles import base64 from urllib.parse import urlparse from .config import Config # 初始化日志 logger = logging.getLogger("sora-api.image_uploader") class PicGoUploader: """PicGo图床上传器""" def __init__(self): """初始化PicGo上传器""" self.picgo_url = Config.PICGO_URL self.api_key = Config.PICGO_API_KEY self.enabled = Config.ENABLE_PICGO # 日志输出 if self.enabled: logger.info(f"PicGo上传已启用,服务器地址: {self.picgo_url}") else: logger.info("PicGo上传未启用") async def upload(self, file_path): """ 上传图片到PicGo Args: file_path: 图片文件路径 Returns: 成功: 图片URL 失败: None """ if not self.enabled: logger.debug("PicGo上传未启用,跳过上传") return None if not os.path.exists(file_path): logger.error(f"文件不存在: {file_path}") return None try: # 读取图片文件 async with aiofiles.open(file_path, "rb") as f: file_content = await f.read() # 编码为Base64 base64_content = base64.b64encode(file_content).decode('utf-8') # 准备上传数据 data = { "list": [{ "data": f"data:image/{os.path.splitext(file_path)[1][1:]};base64,{base64_content}" }] } # 准备请求头 headers = {"Content-Type": "application/json"} if self.api_key: headers["picgo-api-key"] = self.api_key # 发送请求 timeout = aiohttp.ClientTimeout(total=60) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.post( f"{self.picgo_url}/upload", headers=headers, json=data ) as response: result = await response.json() # 处理响应 if result.get("success"): image_url = result.get("result")[0] logger.info(f"图片已成功上传到PicGo: {image_url}") return image_url else: logger.error(f"PicGo上传失败: {result.get('message', '未知错误')}") return None except Exception as e: logger.error(f"PicGo上传过程出错: {str(e)}") return None class WebDAVUploader: """WebDAV上传器""" def __init__(self): """初始化WebDAV上传器""" self.webdav_url = Config.WEBDAV_URL self.username = Config.WEBDAV_USERNAME self.password = Config.WEBDAV_PASSWORD self.base_path = Config.WEBDAV_BASE_PATH self.enabled = Config.ENABLE_WEBDAV self.public_url = Config.WEBDAV_PUBLIC_URL # 确保base_path以/开头 if self.base_path and not self.base_path.startswith('/'): self.base_path = f"/{self.base_path}" # 确保webdav_url以/结尾 if self.webdav_url and not self.webdav_url.endswith('/'): self.webdav_url = f"{self.webdav_url}/" # 日志输出 if self.enabled: if not self.webdav_url: logger.warning("WebDAV上传已启用,但未设置WebDAV URL") self.enabled = False else: logger.info(f"WebDAV上传已启用,服务器地址: {self.webdav_url}") else: logger.info("WebDAV上传未启用") async def upload(self, file_path): """ 上传图片到WebDAV服务器 Args: file_path: 图片文件路径 Returns: 成功: 图片URL 失败: None """ if not self.enabled: logger.debug("WebDAV上传未启用,跳过上传") return None if not os.path.exists(file_path): logger.error(f"文件不存在: {file_path}") return None try: # 读取图片文件 async with aiofiles.open(file_path, "rb") as f: file_content = await f.read() # 准备上传路径 filename = os.path.basename(file_path) remote_path = f"{self.base_path}/{filename}" # 确保目录存在 await self._ensure_directory(self.base_path) # 上传文件 timeout = aiohttp.ClientTimeout(total=60) auth = aiohttp.BasicAuth(self.username, self.password) async with aiohttp.ClientSession(timeout=timeout, auth=auth) as session: async with session.put( f"{self.webdav_url}{remote_path.lstrip('/')}", data=file_content ) as response: if response.status in (200, 201, 204): # 生成访问URL if self.public_url: url = f"{self.public_url.rstrip('/')}{remote_path}" else: url = f"{self.webdav_url}{remote_path.lstrip('/')}" logger.info(f"图片已成功上传到WebDAV: {url}") return url else: logger.error(f"WebDAV上传失败,状态码: {response.status}") return None except Exception as e: logger.error(f"WebDAV上传过程出错: {str(e)}") return None async def _ensure_directory(self, directory): """确保WebDAV目录存在""" try: timeout = aiohttp.ClientTimeout(total=30) auth = aiohttp.BasicAuth(self.username, self.password) async with aiohttp.ClientSession(timeout=timeout, auth=auth) as session: # 检查目录是否存在 async with session.request( "PROPFIND", f"{self.webdav_url}{directory.lstrip('/')}", headers={"Depth": "0"}, ) as response: if response.status == 207: return True # 目录不存在,创建目录 async with session.request( "MKCOL", f"{self.webdav_url}{directory.lstrip('/')}" ) as response: if response.status in (201, 405): # 405表示目录已存在 return True else: logger.error(f"创建WebDAV目录失败: {directory}, 状态码: {response.status}") return False except Exception as e: logger.error(f"确保WebDAV目录存在时出错: {str(e)}") return False # 创建上传器实例 picgo_uploader = PicGoUploader() webdav_uploader = WebDAVUploader() async def upload_image(file_path): """ 上传图片到配置的服务 Args: file_path: 图片文件路径 Returns: 元组 (成功标志, 图片URL或None) """ # 尝试PicGo上传 if picgo_uploader.enabled: picgo_url = await picgo_uploader.upload(file_path) if picgo_url: return True, picgo_url # 尝试WebDAV上传 if webdav_uploader.enabled: webdav_url = await webdav_uploader.upload(file_path) if webdav_url: return True, webdav_url # 都上传失败 return False, None