soraapi / src /image_uploader.py
gallyga's picture
Update src/image_uploader.py
95e351c verified
raw
history blame
8.09 kB
import os
import requests
import aiohttp
import logging
import json
import aiofiles
import base64
from urllib.parse import urlparse
from .config import Config
# 初始化日志
logger = logging.getLogger("sora-api.image_uploader")
class PicGoUploader:
"""PicGo图床上传器"""
def __init__(self):
"""初始化PicGo上传器"""
self.picgo_url = Config.PICGO_URL
self.api_key = Config.PICGO_API_KEY
self.enabled = Config.ENABLE_PICGO
# 日志输出
if self.enabled:
logger.info(f"PicGo上传已启用,服务器地址: {self.picgo_url}")
else:
logger.info("PicGo上传未启用")
async def upload(self, file_path):
"""
上传图片到PicGo
Args:
file_path: 图片文件路径
Returns:
成功: 图片URL
失败: None
"""
if not self.enabled:
logger.debug("PicGo上传未启用,跳过上传")
return None
if not os.path.exists(file_path):
logger.error(f"文件不存在: {file_path}")
return None
try:
# 读取图片文件
async with aiofiles.open(file_path, "rb") as f:
file_content = await f.read()
# 编码为Base64
base64_content = base64.b64encode(file_content).decode('utf-8')
# 准备上传数据
data = {
"list": [{
"data": f"data:image/{os.path.splitext(file_path)[1][1:]};base64,{base64_content}"
}]
}
# 准备请求头
headers = {"Content-Type": "application/json"}
if self.api_key:
headers["picgo-api-key"] = self.api_key
# 发送请求
timeout = aiohttp.ClientTimeout(total=60)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(
f"{self.picgo_url}/upload",
headers=headers,
json=data
) as response:
result = await response.json()
# 处理响应
if result.get("success"):
image_url = result.get("result")[0]
logger.info(f"图片已成功上传到PicGo: {image_url}")
return image_url
else:
logger.error(f"PicGo上传失败: {result.get('message', '未知错误')}")
return None
except Exception as e:
logger.error(f"PicGo上传过程出错: {str(e)}")
return None
class WebDAVUploader:
"""WebDAV上传器"""
def __init__(self):
"""初始化WebDAV上传器"""
self.webdav_url = Config.WEBDAV_URL
self.username = Config.WEBDAV_USERNAME
self.password = Config.WEBDAV_PASSWORD
self.base_path = Config.WEBDAV_BASE_PATH
self.enabled = Config.ENABLE_WEBDAV
self.public_url = Config.WEBDAV_PUBLIC_URL
# 确保base_path以/开头
if self.base_path and not self.base_path.startswith('/'):
self.base_path = f"/{self.base_path}"
# 确保webdav_url以/结尾
if self.webdav_url and not self.webdav_url.endswith('/'):
self.webdav_url = f"{self.webdav_url}/"
# 日志输出
if self.enabled:
if not self.webdav_url:
logger.warning("WebDAV上传已启用,但未设置WebDAV URL")
self.enabled = False
else:
logger.info(f"WebDAV上传已启用,服务器地址: {self.webdav_url}")
else:
logger.info("WebDAV上传未启用")
async def upload(self, file_path):
"""
上传图片到WebDAV服务器
Args:
file_path: 图片文件路径
Returns:
成功: 图片URL
失败: None
"""
if not self.enabled:
logger.debug("WebDAV上传未启用,跳过上传")
return None
if not os.path.exists(file_path):
logger.error(f"文件不存在: {file_path}")
return None
try:
# 读取图片文件
async with aiofiles.open(file_path, "rb") as f:
file_content = await f.read()
# 准备上传路径
filename = os.path.basename(file_path)
remote_path = f"{self.base_path}/{filename}"
# 确保目录存在
await self._ensure_directory(self.base_path)
# 上传文件
timeout = aiohttp.ClientTimeout(total=60)
auth = aiohttp.BasicAuth(self.username, self.password)
async with aiohttp.ClientSession(timeout=timeout, auth=auth) as session:
async with session.put(
f"{self.webdav_url}{remote_path.lstrip('/')}",
data=file_content
) as response:
if response.status in (200, 201, 204):
# 生成访问URL
if self.public_url:
url = f"{self.public_url.rstrip('/')}{remote_path}"
else:
url = f"{self.webdav_url}{remote_path.lstrip('/')}"
logger.info(f"图片已成功上传到WebDAV: {url}")
return url
else:
logger.error(f"WebDAV上传失败,状态码: {response.status}")
return None
except Exception as e:
logger.error(f"WebDAV上传过程出错: {str(e)}")
return None
async def _ensure_directory(self, directory):
"""确保WebDAV目录存在"""
try:
timeout = aiohttp.ClientTimeout(total=30)
auth = aiohttp.BasicAuth(self.username, self.password)
async with aiohttp.ClientSession(timeout=timeout, auth=auth) as session:
# 检查目录是否存在
async with session.request(
"PROPFIND",
f"{self.webdav_url}{directory.lstrip('/')}",
headers={"Depth": "0"},
) as response:
if response.status == 207:
return True
# 目录不存在,创建目录
async with session.request(
"MKCOL",
f"{self.webdav_url}{directory.lstrip('/')}"
) as response:
if response.status in (201, 405): # 405表示目录已存在
return True
else:
logger.error(f"创建WebDAV目录失败: {directory}, 状态码: {response.status}")
return False
except Exception as e:
logger.error(f"确保WebDAV目录存在时出错: {str(e)}")
return False
# 创建上传器实例
picgo_uploader = PicGoUploader()
webdav_uploader = WebDAVUploader()
async def upload_image(file_path):
"""
上传图片到配置的服务
Args:
file_path: 图片文件路径
Returns:
元组 (成功标志, 图片URL或None)
"""
# 尝试PicGo上传
if picgo_uploader.enabled:
picgo_url = await picgo_uploader.upload(file_path)
if picgo_url:
return True, picgo_url
# 尝试WebDAV上传
if webdav_uploader.enabled:
webdav_url = await webdav_uploader.upload(file_path)
if webdav_url:
return True, webdav_url
# 都上传失败
return False, None