Spaces:
Running
Running
File size: 6,197 Bytes
d9add37 3e98433 d9add37 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 | """
S3工具类 - 用于上传图片和视频到S3
简化版本,适配 Hugging Face Space
"""
import os
import logging
import uuid
import boto3
import requests
from typing import Optional
from datetime import datetime
logger = logging.getLogger(__name__)
class S3Utils:
"""S3工具类,用于处理文件上传到S3"""
def __init__(self):
# 从环境变量读取AWS配置
aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID', '')
aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY', '')
aws_region = 'ap-southeast-2' # AWS 区域
self.bucket_name = 'cuti-agent-assets-dev-699475938168-ap-southeast-2' # S3 Bucket 名称
self.cdn_domain = 'https://cdn-dev.newai.land' # CDN 域名
if not aws_access_key_id or not aws_secret_access_key:
logger.warning("AWS凭证未配置,S3功能将不可用")
self.s3_client = None
else:
self.s3_client = boto3.client(
's3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=aws_region
)
def upload_image_from_path(self, image_path: str, folder: str = "evaluator/images") -> Optional[str]:
"""
从本地路径上传图片到S3
Args:
image_path: 本地图片路径
folder: S3文件夹路径
Returns:
str: 图片的CDN URL,失败返回None
"""
if not self.s3_client or not self.bucket_name:
logger.error("S3未配置,无法上传图片")
return None
try:
# 读取图片文件
with open(image_path, 'rb') as f:
image_data = f.read()
# 生成唯一文件名
file_extension = os.path.splitext(image_path)[1] or '.jpg'
unique_filename = f"{uuid.uuid4().hex}{file_extension}"
# S3文件键
s3_key = f"{folder}/{datetime.now().strftime('%Y/%m/%d')}/{unique_filename}"
# 确定Content-Type
content_type_map = {
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.png': 'image/png',
'.webp': 'image/webp',
'.gif': 'image/gif'
}
content_type = content_type_map.get(file_extension.lower(), 'image/jpeg')
# 上传到S3
self.s3_client.put_object(
Bucket=self.bucket_name,
Key=s3_key,
Body=image_data,
ContentType=content_type
)
# 构建CDN URL
if self.cdn_domain:
cdn_url = f"{self.cdn_domain.rstrip('/')}/{s3_key}"
else:
# 如果没有CDN,使用S3 URL
cdn_url = f"https://{self.bucket_name}.s3.amazonaws.com/{s3_key}"
logger.info(f"图片上传成功: {s3_key} -> {cdn_url}")
return cdn_url
except Exception as e:
logger.error(f"上传图片到S3失败: {e}")
return None
def download_and_upload_video(self, video_url: str, folder: str = "evaluator/videos") -> Optional[str]:
"""
从URL下载视频并上传到S3
Args:
video_url: 视频URL
folder: S3文件夹路径
Returns:
str: 视频的CDN URL,失败返回None
"""
if not self.s3_client or not self.bucket_name:
logger.error("S3未配置,无法上传视频")
return None
try:
# 下载视频
logger.info(f"开始下载视频: {video_url}")
response = requests.get(video_url, stream=True, timeout=300)
response.raise_for_status()
video_data = response.content
if not video_data or len(video_data) < 1000:
logger.error(f"下载的视频数据无效: {len(video_data)} bytes")
return None
logger.info(f"视频下载成功: {len(video_data)} bytes")
# 生成唯一文件名
file_extension = '.mp4' # 默认mp4
if video_url:
try:
from urllib.parse import urlparse
parsed = urlparse(video_url)
path = parsed.path.lower()
if path.endswith(('.mp4', '.avi', '.mov', '.mkv', '.webm')):
file_extension = path[path.rfind('.'):]
except:
pass
unique_filename = f"{uuid.uuid4().hex}{file_extension}"
# S3文件键
s3_key = f"{folder}/{datetime.now().strftime('%Y/%m/%d')}/{unique_filename}"
# 确定Content-Type
content_type_map = {
'.mp4': 'video/mp4',
'.avi': 'video/x-msvideo',
'.mov': 'video/quicktime',
'.mkv': 'video/x-matroska',
'.webm': 'video/webm'
}
content_type = content_type_map.get(file_extension.lower(), 'video/mp4')
# 上传到S3
self.s3_client.put_object(
Bucket=self.bucket_name,
Key=s3_key,
Body=video_data,
ContentType=content_type
)
# 构建CDN URL
if self.cdn_domain:
cdn_url = f"{self.cdn_domain.rstrip('/')}/{s3_key}"
else:
# 如果没有CDN,使用S3 URL
cdn_url = f"https://{self.bucket_name}.s3.amazonaws.com/{s3_key}"
logger.info(f"视频上传成功: {s3_key} -> {cdn_url}")
return cdn_url
except Exception as e:
logger.error(f"下载并上传视频失败: {e}")
return None
# 创建全局实例
s3_utils = S3Utils()
|