""" S3工具类 - 用于上传图片和视频到S3 简化版本,适配 Hugging Face Space """ import os import logging import uuid import boto3 import requests from typing import Optional from datetime import datetime logger = logging.getLogger(__name__) class S3Utils: """S3工具类,用于处理文件上传到S3""" def __init__(self): # 从环境变量读取AWS配置 aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID', '') aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY', '') aws_region = 'ap-southeast-2' # AWS 区域 self.bucket_name = 'cuti-agent-assets-dev-699475938168-ap-southeast-2' # S3 Bucket 名称 self.cdn_domain = 'https://cdn-dev.newai.land' # CDN 域名 if not aws_access_key_id or not aws_secret_access_key: logger.warning("AWS凭证未配置,S3功能将不可用") self.s3_client = None else: self.s3_client = boto3.client( 's3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=aws_region ) def upload_image_from_path(self, image_path: str, folder: str = "evaluator/images") -> Optional[str]: """ 从本地路径上传图片到S3 Args: image_path: 本地图片路径 folder: S3文件夹路径 Returns: str: 图片的CDN URL,失败返回None """ if not self.s3_client or not self.bucket_name: logger.error("S3未配置,无法上传图片") return None try: # 读取图片文件 with open(image_path, 'rb') as f: image_data = f.read() # 生成唯一文件名 file_extension = os.path.splitext(image_path)[1] or '.jpg' unique_filename = f"{uuid.uuid4().hex}{file_extension}" # S3文件键 s3_key = f"{folder}/{datetime.now().strftime('%Y/%m/%d')}/{unique_filename}" # 确定Content-Type content_type_map = { '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.webp': 'image/webp', '.gif': 'image/gif' } content_type = content_type_map.get(file_extension.lower(), 'image/jpeg') # 上传到S3 self.s3_client.put_object( Bucket=self.bucket_name, Key=s3_key, Body=image_data, ContentType=content_type ) # 构建CDN URL if self.cdn_domain: cdn_url = f"{self.cdn_domain.rstrip('/')}/{s3_key}" else: # 如果没有CDN,使用S3 URL cdn_url = f"https://{self.bucket_name}.s3.amazonaws.com/{s3_key}" logger.info(f"图片上传成功: {s3_key} -> {cdn_url}") return cdn_url except Exception as e: logger.error(f"上传图片到S3失败: {e}") return None def download_and_upload_video(self, video_url: str, folder: str = "evaluator/videos") -> Optional[str]: """ 从URL下载视频并上传到S3 Args: video_url: 视频URL folder: S3文件夹路径 Returns: str: 视频的CDN URL,失败返回None """ if not self.s3_client or not self.bucket_name: logger.error("S3未配置,无法上传视频") return None try: # 下载视频 logger.info(f"开始下载视频: {video_url}") response = requests.get(video_url, stream=True, timeout=300) response.raise_for_status() video_data = response.content if not video_data or len(video_data) < 1000: logger.error(f"下载的视频数据无效: {len(video_data)} bytes") return None logger.info(f"视频下载成功: {len(video_data)} bytes") # 生成唯一文件名 file_extension = '.mp4' # 默认mp4 if video_url: try: from urllib.parse import urlparse parsed = urlparse(video_url) path = parsed.path.lower() if path.endswith(('.mp4', '.avi', '.mov', '.mkv', '.webm')): file_extension = path[path.rfind('.'):] except: pass unique_filename = f"{uuid.uuid4().hex}{file_extension}" # S3文件键 s3_key = f"{folder}/{datetime.now().strftime('%Y/%m/%d')}/{unique_filename}" # 确定Content-Type content_type_map = { '.mp4': 'video/mp4', '.avi': 'video/x-msvideo', '.mov': 'video/quicktime', '.mkv': 'video/x-matroska', '.webm': 'video/webm' } content_type = content_type_map.get(file_extension.lower(), 'video/mp4') # 上传到S3 self.s3_client.put_object( Bucket=self.bucket_name, Key=s3_key, Body=video_data, ContentType=content_type ) # 构建CDN URL if self.cdn_domain: cdn_url = f"{self.cdn_domain.rstrip('/')}/{s3_key}" else: # 如果没有CDN,使用S3 URL cdn_url = f"https://{self.bucket_name}.s3.amazonaws.com/{s3_key}" logger.info(f"视频上传成功: {s3_key} -> {cdn_url}") return cdn_url except Exception as e: logger.error(f"下载并上传视频失败: {e}") return None # 创建全局实例 s3_utils = S3Utils()