|
|
import requests |
|
|
from app.domain.image_models import ImageMetadata, ImageUploader, UploadResponse |
|
|
from enum import Enum |
|
|
from typing import Optional, Any |
|
|
import hashlib |
|
|
import base64 |
|
|
import hmac |
|
|
from datetime import datetime |
|
|
from urllib.parse import quote |
|
|
from app.log.logger import get_image_create_logger |
|
|
|
|
|
class UploadErrorType(Enum): |
|
|
"""上传错误类型枚举""" |
|
|
NETWORK_ERROR = "network_error" |
|
|
AUTH_ERROR = "auth_error" |
|
|
INVALID_FILE = "invalid_file" |
|
|
SERVER_ERROR = "server_error" |
|
|
PARSE_ERROR = "parse_error" |
|
|
UNKNOWN = "unknown" |
|
|
|
|
|
|
|
|
class UploadError(Exception): |
|
|
"""图片上传错误异常类""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
message: str, |
|
|
error_type: UploadErrorType = UploadErrorType.UNKNOWN, |
|
|
status_code: Optional[int] = None, |
|
|
details: Optional[dict] = None, |
|
|
original_error: Optional[Exception] = None |
|
|
): |
|
|
""" |
|
|
初始化上传错误异常 |
|
|
|
|
|
Args: |
|
|
message: 错误消息 |
|
|
error_type: 错误类型 |
|
|
status_code: HTTP状态码 |
|
|
details: 详细错误信息 |
|
|
original_error: 原始异常 |
|
|
""" |
|
|
self.message = message |
|
|
self.error_type = error_type |
|
|
self.status_code = status_code |
|
|
self.details = details or {} |
|
|
self.original_error = original_error |
|
|
|
|
|
|
|
|
full_message = f"[{error_type.value}] {message}" |
|
|
if status_code: |
|
|
full_message = f"{full_message} (Status: {status_code})" |
|
|
if details: |
|
|
full_message = f"{full_message} - Details: {details}" |
|
|
|
|
|
super().__init__(full_message) |
|
|
|
|
|
@classmethod |
|
|
def from_response(cls, response: Any, message: Optional[str] = None) -> "UploadError": |
|
|
""" |
|
|
从HTTP响应创建错误实例 |
|
|
|
|
|
Args: |
|
|
response: HTTP响应对象 |
|
|
message: 自定义错误消息 |
|
|
""" |
|
|
try: |
|
|
error_data = response.json() |
|
|
details = error_data.get("data", {}) |
|
|
return cls( |
|
|
message=message or error_data.get("message", "Unknown error"), |
|
|
error_type=UploadErrorType.SERVER_ERROR, |
|
|
status_code=response.status_code, |
|
|
details=details |
|
|
) |
|
|
except Exception: |
|
|
return cls( |
|
|
message=message or "Failed to parse error response", |
|
|
error_type=UploadErrorType.PARSE_ERROR, |
|
|
status_code=response.status_code |
|
|
) |
|
|
|
|
|
|
|
|
class SmMsUploader(ImageUploader): |
|
|
API_URL = "https://sm.ms/api/v2/upload" |
|
|
|
|
|
def __init__(self, api_key: str): |
|
|
self.api_key = api_key |
|
|
|
|
|
def upload(self, file: bytes, filename: str) -> UploadResponse: |
|
|
try: |
|
|
|
|
|
headers = { |
|
|
"Authorization": f"Basic {self.api_key}" |
|
|
} |
|
|
|
|
|
|
|
|
files = { |
|
|
"smfile": (filename, file, "image/png") |
|
|
} |
|
|
|
|
|
|
|
|
response = requests.post( |
|
|
self.API_URL, |
|
|
headers=headers, |
|
|
files=files |
|
|
) |
|
|
|
|
|
|
|
|
response.raise_for_status() |
|
|
|
|
|
|
|
|
result = response.json() |
|
|
|
|
|
|
|
|
if not result.get("success"): |
|
|
raise UploadError(result.get("message", "Upload failed")) |
|
|
|
|
|
|
|
|
data = result["data"] |
|
|
image_metadata = ImageMetadata( |
|
|
width=data["width"], |
|
|
height=data["height"], |
|
|
filename=data["filename"], |
|
|
size=data["size"], |
|
|
url=data["url"], |
|
|
delete_url=data["delete"] |
|
|
) |
|
|
|
|
|
return UploadResponse( |
|
|
success=True, |
|
|
code="success", |
|
|
message="Upload success", |
|
|
data=image_metadata |
|
|
) |
|
|
|
|
|
except requests.RequestException as e: |
|
|
|
|
|
raise UploadError(f"Upload request failed: {str(e)}") |
|
|
except (KeyError, ValueError) as e: |
|
|
|
|
|
raise UploadError(f"Invalid response format: {str(e)}") |
|
|
except Exception as e: |
|
|
|
|
|
raise UploadError(f"Upload failed: {str(e)}") |
|
|
|
|
|
|
|
|
class QiniuUploader(ImageUploader): |
|
|
def __init__(self, access_key: str, secret_key: str): |
|
|
self.access_key = access_key |
|
|
self.secret_key = secret_key |
|
|
|
|
|
def upload(self, file: bytes, filename: str) -> UploadResponse: |
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
class PicGoUploader(ImageUploader): |
|
|
"""Chevereto API 图片上传器""" |
|
|
|
|
|
def __init__(self, api_key: str, api_url: str = "https://www.picgo.net/api/1/upload"): |
|
|
""" |
|
|
初始化 Chevereto 上传器 |
|
|
|
|
|
Args: |
|
|
api_key: Chevereto API 密钥 |
|
|
api_url: Chevereto API 上传地址 |
|
|
""" |
|
|
self.api_key = api_key |
|
|
self.api_url = api_url |
|
|
|
|
|
def upload(self, file: bytes, filename: str) -> UploadResponse: |
|
|
""" |
|
|
上传图片到 Chevereto 服务 |
|
|
|
|
|
Args: |
|
|
file: 图片文件二进制数据 |
|
|
filename: 文件名 |
|
|
|
|
|
Returns: |
|
|
UploadResponse: 上传响应对象 |
|
|
|
|
|
Raises: |
|
|
UploadError: 上传失败时抛出异常 |
|
|
""" |
|
|
try: |
|
|
|
|
|
headers = {} |
|
|
|
|
|
|
|
|
request_url = self.api_url |
|
|
|
|
|
|
|
|
if self.api_url == "https://www.picgo.net/api/1/upload": |
|
|
headers["X-API-Key"] = self.api_key |
|
|
else: |
|
|
|
|
|
from urllib.parse import urlparse, urlunparse, parse_qs, urlencode |
|
|
parsed_url = urlparse(request_url) |
|
|
query_params = parse_qs(parsed_url.query) |
|
|
query_params["key"] = self.api_key |
|
|
new_query = urlencode(query_params, doseq=True) |
|
|
request_url = urlunparse(parsed_url._replace(query=new_query)) |
|
|
|
|
|
|
|
|
files = { |
|
|
"source": (filename, file) |
|
|
} |
|
|
|
|
|
|
|
|
response = requests.post( |
|
|
request_url, |
|
|
headers=headers, |
|
|
files=files |
|
|
) |
|
|
|
|
|
|
|
|
response.raise_for_status() |
|
|
|
|
|
|
|
|
result = response.json() |
|
|
|
|
|
|
|
|
if "success" in result and "result" in result: |
|
|
|
|
|
if result["success"]: |
|
|
image_url = result["result"][0] if result["result"] and len(result["result"]) > 0 else "" |
|
|
image_metadata = ImageMetadata( |
|
|
width=0, |
|
|
height=0, |
|
|
filename=filename, |
|
|
size=0, |
|
|
url=image_url, |
|
|
delete_url=None |
|
|
) |
|
|
return UploadResponse( |
|
|
success=True, |
|
|
code="success", |
|
|
message="Upload success", |
|
|
data=image_metadata |
|
|
) |
|
|
else: |
|
|
raise UploadError( |
|
|
message="Upload failed", |
|
|
error_type=UploadErrorType.SERVER_ERROR, |
|
|
status_code=400, |
|
|
details=result |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
if result.get("status_code") != 200: |
|
|
error_message = "Upload failed" |
|
|
if "error" in result: |
|
|
error_message = result["error"].get("message", error_message) |
|
|
raise UploadError( |
|
|
message=error_message, |
|
|
error_type=UploadErrorType.SERVER_ERROR, |
|
|
status_code=result.get("status_code"), |
|
|
details=result.get("error") |
|
|
) |
|
|
|
|
|
|
|
|
image_data = result.get("image", {}) |
|
|
|
|
|
|
|
|
image_metadata = ImageMetadata( |
|
|
width=image_data.get("width", 0), |
|
|
height=image_data.get("height", 0), |
|
|
filename=image_data.get("filename", filename), |
|
|
size=image_data.get("size", 0), |
|
|
url=image_data.get("url", ""), |
|
|
delete_url=image_data.get("delete_url", None) |
|
|
) |
|
|
|
|
|
return UploadResponse( |
|
|
success=True, |
|
|
code="success", |
|
|
message=result.get("success", {}).get("message", "Upload success"), |
|
|
data=image_metadata |
|
|
) |
|
|
|
|
|
except requests.RequestException as e: |
|
|
|
|
|
raise UploadError( |
|
|
message=f"Upload request failed: {str(e)}", |
|
|
error_type=UploadErrorType.NETWORK_ERROR, |
|
|
original_error=e |
|
|
) |
|
|
except (KeyError, ValueError, TypeError) as e: |
|
|
|
|
|
raise UploadError( |
|
|
message=f"Invalid response format: {str(e)}", |
|
|
error_type=UploadErrorType.PARSE_ERROR, |
|
|
original_error=e |
|
|
) |
|
|
except UploadError: |
|
|
|
|
|
raise |
|
|
except Exception as e: |
|
|
|
|
|
raise UploadError( |
|
|
message=f"Upload failed: {str(e)}", |
|
|
error_type=UploadErrorType.UNKNOWN, |
|
|
original_error=e |
|
|
) |
|
|
|
|
|
|
|
|
class AliyunOSSUploader(ImageUploader): |
|
|
"""阿里云OSS图片上传器""" |
|
|
|
|
|
def __init__(self, access_key: str, access_key_secret: str, bucket_name: str, |
|
|
endpoint: str, region: str, use_internal: bool = False): |
|
|
""" |
|
|
初始化阿里云OSS上传器 |
|
|
|
|
|
Args: |
|
|
access_key: OSS访问密钥ID |
|
|
access_key_secret: OSS访问密钥 |
|
|
bucket_name: OSS存储桶名称 |
|
|
endpoint: OSS端点地址 |
|
|
region: OSS区域 |
|
|
use_internal: 是否使用内网端点 |
|
|
""" |
|
|
self.access_key = access_key |
|
|
self.access_key_secret = access_key_secret |
|
|
self.bucket_name = bucket_name |
|
|
self.endpoint = endpoint |
|
|
self.region = region |
|
|
self.use_internal = use_internal |
|
|
self.logger = get_image_create_logger() |
|
|
|
|
|
|
|
|
if not endpoint.startswith(('http://', 'https://')): |
|
|
self.base_url = f"https://{bucket_name}.{endpoint}" |
|
|
else: |
|
|
self.base_url = f"{endpoint}/{bucket_name}" |
|
|
|
|
|
self.logger.info(f"Initialized AliyunOSSUploader for bucket: {bucket_name}, region: {region}") |
|
|
|
|
|
def _sign_request(self, method: str, path: str, headers: dict, content: bytes = b'') -> dict: |
|
|
""" |
|
|
为OSS请求生成签名 |
|
|
|
|
|
Args: |
|
|
method: HTTP方法 |
|
|
path: 请求路径 |
|
|
headers: 请求头 |
|
|
content: 请求内容 |
|
|
|
|
|
Returns: |
|
|
包含签名的请求头 |
|
|
""" |
|
|
|
|
|
content_md5 = base64.b64encode(hashlib.md5(content).digest()).decode('utf-8') if content else '' |
|
|
|
|
|
|
|
|
date = datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT') |
|
|
|
|
|
|
|
|
headers['Date'] = date |
|
|
if content_md5: |
|
|
headers['Content-MD5'] = content_md5 |
|
|
headers['Content-Type'] = headers.get('Content-Type', 'image/png') |
|
|
|
|
|
|
|
|
oss_headers = [] |
|
|
for key, value in sorted(headers.items()): |
|
|
if key.lower().startswith('x-oss-'): |
|
|
oss_headers.append(f"{key.lower()}:{value}") |
|
|
canonicalized_oss_headers = '\n'.join(oss_headers) |
|
|
if canonicalized_oss_headers: |
|
|
canonicalized_oss_headers += '\n' |
|
|
|
|
|
|
|
|
canonicalized_resource = f"/{self.bucket_name}{path}" |
|
|
|
|
|
|
|
|
string_to_sign = f"{method}\n{content_md5}\n{headers.get('Content-Type', '')}\n{date}\n{canonicalized_oss_headers}{canonicalized_resource}" |
|
|
|
|
|
|
|
|
signature = base64.b64encode( |
|
|
hmac.new( |
|
|
self.access_key_secret.encode('utf-8'), |
|
|
string_to_sign.encode('utf-8'), |
|
|
hashlib.sha1 |
|
|
).digest() |
|
|
).decode('utf-8') |
|
|
|
|
|
|
|
|
headers['Authorization'] = f"OSS {self.access_key}:{signature}" |
|
|
|
|
|
return headers |
|
|
|
|
|
def upload(self, file: bytes, filename: str) -> UploadResponse: |
|
|
""" |
|
|
上传图片到阿里云OSS |
|
|
|
|
|
Args: |
|
|
file: 图片文件二进制数据 |
|
|
filename: 文件名(将作为OSS对象的key) |
|
|
|
|
|
Returns: |
|
|
UploadResponse: 上传响应对象 |
|
|
|
|
|
Raises: |
|
|
UploadError: 上传失败时抛出异常 |
|
|
""" |
|
|
|
|
|
self.logger.info(f"Starting OSS upload for file: {filename}, size: {len(file)} bytes") |
|
|
|
|
|
try: |
|
|
|
|
|
object_key = f"/{filename}" |
|
|
|
|
|
|
|
|
headers = { |
|
|
'Content-Type': 'image/png', |
|
|
'x-oss-object-acl': 'public-read' |
|
|
} |
|
|
|
|
|
|
|
|
signed_headers = self._sign_request('PUT', object_key, headers, file) |
|
|
|
|
|
|
|
|
upload_url = f"{self.base_url}{object_key}" |
|
|
self.logger.debug(f"OSS upload URL: {upload_url}") |
|
|
|
|
|
|
|
|
response = requests.put( |
|
|
upload_url, |
|
|
data=file, |
|
|
headers=signed_headers |
|
|
) |
|
|
|
|
|
|
|
|
if response.status_code != 200: |
|
|
error_msg = f"OSS upload failed with status {response.status_code}, response: {response.text}" |
|
|
self.logger.error(f"OSS upload failed for {filename}: {error_msg}") |
|
|
raise UploadError( |
|
|
message=f"OSS upload failed with status {response.status_code}", |
|
|
error_type=UploadErrorType.SERVER_ERROR, |
|
|
status_code=response.status_code, |
|
|
details={'response': response.text} |
|
|
) |
|
|
|
|
|
|
|
|
if self.endpoint.startswith(('http://', 'https://')): |
|
|
access_url = f"{self.endpoint}/{self.bucket_name}{object_key}" |
|
|
else: |
|
|
access_url = f"https://{self.bucket_name}.{self.endpoint}{object_key}" |
|
|
|
|
|
|
|
|
image_metadata = ImageMetadata( |
|
|
width=0, |
|
|
height=0, |
|
|
filename=filename, |
|
|
size=len(file), |
|
|
url=access_url, |
|
|
delete_url=None |
|
|
) |
|
|
|
|
|
|
|
|
self.logger.info(f"OSS upload successful for {filename}, URL: {access_url}") |
|
|
|
|
|
return UploadResponse( |
|
|
success=True, |
|
|
code="success", |
|
|
message="Upload to Aliyun OSS success", |
|
|
data=image_metadata |
|
|
) |
|
|
|
|
|
except requests.RequestException as e: |
|
|
error_msg = f"OSS upload request failed: {str(e)}" |
|
|
self.logger.error(f"OSS upload request failed for {filename}: {error_msg}") |
|
|
raise UploadError( |
|
|
message=error_msg, |
|
|
error_type=UploadErrorType.NETWORK_ERROR, |
|
|
original_error=e |
|
|
) |
|
|
except UploadError: |
|
|
|
|
|
raise |
|
|
except Exception as e: |
|
|
error_msg = f"OSS upload failed: {str(e)}" |
|
|
self.logger.error(f"OSS upload unexpected error for {filename}: {error_msg}") |
|
|
raise UploadError( |
|
|
message=error_msg, |
|
|
error_type=UploadErrorType.UNKNOWN, |
|
|
original_error=e |
|
|
) |
|
|
|
|
|
|
|
|
class CloudFlareImgBedUploader(ImageUploader): |
|
|
"""CloudFlare图床上传器""" |
|
|
|
|
|
def __init__(self, auth_code: str, api_url: str, upload_folder: str = ""): |
|
|
""" |
|
|
初始化CloudFlare图床上传器 |
|
|
|
|
|
Args: |
|
|
auth_code: 认证码 |
|
|
api_url: 上传API地址 |
|
|
upload_folder: 上传文件夹路径(可选) |
|
|
""" |
|
|
self.auth_code = auth_code |
|
|
self.api_url = api_url |
|
|
self.upload_folder = upload_folder |
|
|
|
|
|
def upload(self, file: bytes, filename: str) -> UploadResponse: |
|
|
""" |
|
|
上传图片到CloudFlare图床 |
|
|
|
|
|
Args: |
|
|
file: 图片文件二进制数据 |
|
|
filename: 文件名 |
|
|
|
|
|
Returns: |
|
|
UploadResponse: 上传响应对象 |
|
|
|
|
|
Raises: |
|
|
UploadError: 上传失败时抛出异常 |
|
|
""" |
|
|
try: |
|
|
|
|
|
params = [] |
|
|
if self.upload_folder: |
|
|
params.append(f"uploadFolder={self.upload_folder}") |
|
|
if self.auth_code: |
|
|
params.append(f"authCode={self.auth_code}") |
|
|
params.append("uploadNameType=origin") |
|
|
|
|
|
request_url = f"{self.api_url}?{'&'.join(params)}" |
|
|
|
|
|
|
|
|
files = { |
|
|
"file": (filename, file) |
|
|
} |
|
|
|
|
|
|
|
|
response = requests.post( |
|
|
request_url, |
|
|
files=files |
|
|
) |
|
|
|
|
|
|
|
|
response.raise_for_status() |
|
|
|
|
|
|
|
|
result = response.json() |
|
|
|
|
|
|
|
|
if not result or not isinstance(result, list) or len(result) == 0: |
|
|
raise UploadError( |
|
|
message="Invalid response format", |
|
|
error_type=UploadErrorType.PARSE_ERROR |
|
|
) |
|
|
|
|
|
|
|
|
file_path = result[0].get("src") |
|
|
if not file_path: |
|
|
raise UploadError( |
|
|
message="Missing file URL in response", |
|
|
error_type=UploadErrorType.PARSE_ERROR |
|
|
) |
|
|
|
|
|
|
|
|
base_url = self.api_url.split("/upload")[0] |
|
|
full_url = file_path if file_path.startswith(("http://", "https://")) else f"{base_url}{file_path}" |
|
|
|
|
|
|
|
|
image_metadata = ImageMetadata( |
|
|
width=0, |
|
|
height=0, |
|
|
filename=filename, |
|
|
size=0, |
|
|
url=full_url, |
|
|
delete_url=None |
|
|
) |
|
|
|
|
|
return UploadResponse( |
|
|
success=True, |
|
|
code="success", |
|
|
message="Upload success", |
|
|
data=image_metadata |
|
|
) |
|
|
|
|
|
except requests.RequestException as e: |
|
|
|
|
|
raise UploadError( |
|
|
message=f"Upload request failed: {str(e)}", |
|
|
error_type=UploadErrorType.NETWORK_ERROR, |
|
|
original_error=e |
|
|
) |
|
|
except (KeyError, ValueError, TypeError, IndexError) as e: |
|
|
|
|
|
raise UploadError( |
|
|
message=f"Invalid response format: {str(e)}", |
|
|
error_type=UploadErrorType.PARSE_ERROR, |
|
|
original_error=e |
|
|
) |
|
|
except UploadError: |
|
|
|
|
|
raise |
|
|
except Exception as e: |
|
|
|
|
|
raise UploadError( |
|
|
message=f"Upload failed: {str(e)}", |
|
|
error_type=UploadErrorType.UNKNOWN, |
|
|
original_error=e |
|
|
) |
|
|
|
|
|
class ImageUploaderFactory: |
|
|
@staticmethod |
|
|
def create(provider: str, **credentials) -> ImageUploader: |
|
|
if provider == "smms": |
|
|
return SmMsUploader(credentials["api_key"]) |
|
|
elif provider == "qiniu": |
|
|
return QiniuUploader( |
|
|
credentials["access_key"], |
|
|
credentials["secret_key"] |
|
|
) |
|
|
elif provider == "picgo": |
|
|
api_url = credentials.get("api_url") or "https://www.picgo.net/api/1/upload" |
|
|
return PicGoUploader(credentials["api_key"], api_url) |
|
|
elif provider == "cloudflare_imgbed": |
|
|
return CloudFlareImgBedUploader( |
|
|
credentials["auth_code"], |
|
|
credentials["base_url"], |
|
|
credentials.get("upload_folder", ""), |
|
|
) |
|
|
elif provider == "aliyun_oss": |
|
|
return AliyunOSSUploader( |
|
|
credentials["access_key"], |
|
|
credentials["access_key_secret"], |
|
|
credentials["bucket_name"], |
|
|
credentials["endpoint"], |
|
|
credentials["region"], |
|
|
credentials.get("use_internal", False) |
|
|
) |
|
|
raise ValueError(f"Unknown provider: {provider}") |
|
|
|