# services/s3_manager.py """ Centralized S3 Manager for MasterLLM V3 Architecture Handles all S3 operations: - Upload JSON data and files - Generate presigned URLs (max 7-day expiry) - Download from S3 - Manage bucket structure - Handle encryption (SSE-AES256 or KMS) """ import json import os from typing import Dict, Any, Optional, BinaryIO from datetime import datetime, timedelta import boto3 from botocore.exceptions import ClientError from boto3.s3.transfer import TransferConfig class S3Manager: """Centralized S3 operations manager""" def __init__( self, bucket_name: Optional[str] = None, region: Optional[str] = None, prefix: str = "masterllm" ): """ Initialize S3 Manager Args: bucket_name: S3 bucket name (defaults to env S3_BUCKET_NAME) region: AWS region (defaults to env AWS_REGION) prefix: S3 key prefix for all uploads (default: "masterllm") """ self.bucket_name = bucket_name or os.getenv("S3_BUCKET_NAME") or os.getenv("S3_BUCKET") self.region = region or os.getenv("AWS_REGION") or os.getenv("AWS_DEFAULT_REGION") or "us-east-1" self.prefix = prefix if not self.bucket_name: raise RuntimeError("S3 bucket name not configured. Set S3_BUCKET_NAME environment variable.") # Initialize S3 client self.s3 = boto3.client("s3", region_name=self.region) # Transfer config for large files self.transfer_config = TransferConfig( multipart_threshold=8 * 1024 * 1024, # 8MB max_concurrency=4 ) # Encryption settings self.sse = os.getenv("S3_SSE", "").upper() self.kms_key_id = os.getenv("S3_KMS_KEY_ID") def _get_extra_args(self, content_type: str = "application/json") -> Dict[str, Any]: """Get S3 extra args for encryption and content type""" extra_args = {"ContentType": content_type} if self.sse == "AES256": extra_args["ServerSideEncryption"] = "AES256" elif self.sse == "KMS": extra_args["ServerSideEncryption"] = "aws:kms" if self.kms_key_id: extra_args["SSEKMSKeyId"] = self.kms_key_id return extra_args def upload_json( self, key: str, data: Dict[str, Any], add_prefix: bool = True ) -> Dict[str, Any]: """ Upload JSON data to S3 Args: key: S3 key (path within bucket) data: Dictionary to upload as JSON add_prefix: Whether to add self.prefix to key Returns: Dict with s3_uri, s3_key, s3_bucket Raises: RuntimeError: If upload fails """ full_key = f"{self.prefix}/{key}" if add_prefix else key try: # Convert to JSON bytes json_bytes = json.dumps(data, ensure_ascii=False, indent=2).encode('utf-8') # Upload self.s3.put_object( Bucket=self.bucket_name, Key=full_key, Body=json_bytes, **self._get_extra_args("application/json") ) return { "s3_uri": f"s3://{self.bucket_name}/{full_key}", "s3_key": full_key, "s3_bucket": self.bucket_name, "uploaded_at": datetime.utcnow().isoformat() + "Z" } except ClientError as e: error_code = e.response.get("Error", {}).get("Code", "Unknown") raise RuntimeError( f"S3 JSON upload failed: {error_code}. " f"Check AWS credentials, permissions (s3:PutObject), region and bucket." ) def upload_file( self, key: str, file_obj: BinaryIO, content_type: str = "application/octet-stream", add_prefix: bool = True ) -> Dict[str, Any]: """ Upload file object to S3 Args: key: S3 key (path within bucket) file_obj: File-like object to upload content_type: MIME type of file add_prefix: Whether to add self.prefix to key Returns: Dict with s3_uri, s3_key, s3_bucket, file_size Raises: RuntimeError: If upload fails """ full_key = f"{self.prefix}/{key}" if add_prefix else key try: # Get file size file_obj.seek(0, 2) # Seek to end file_size = file_obj.tell() file_obj.seek(0) # Reset to beginning # Upload self.s3.upload_fileobj( Fileobj=file_obj, Bucket=self.bucket_name, Key=full_key, ExtraArgs=self._get_extra_args(content_type), Config=self.transfer_config ) return { "s3_uri": f"s3://{self.bucket_name}/{full_key}", "s3_key": full_key, "s3_bucket": self.bucket_name, "file_size": file_size, "uploaded_at": datetime.utcnow().isoformat() + "Z" } except ClientError as e: error_code = e.response.get("Error", {}).get("Code", "Unknown") raise RuntimeError( f"S3 file upload failed: {error_code}. " f"Check AWS credentials, permissions (s3:PutObject), region and bucket." ) def generate_presigned_url( self, key: str, expires_in: int = 604800, # 7 days (max allowed by AWS) add_prefix: bool = False ) -> Dict[str, str]: """ Generate presigned GET URL for S3 object Args: key: S3 key (path within bucket) expires_in: Expiration time in seconds (max: 604800 = 7 days) add_prefix: Whether to add self.prefix to key Returns: Dict with presigned_url and presigned_expires_at Note: AWS maximum expiry is 7 days (604,800 seconds) """ full_key = f"{self.prefix}/{key}" if add_prefix else key # Enforce AWS maximum expires_in = min(expires_in, 604800) try: url = self.s3.generate_presigned_url( "get_object", Params={ "Bucket": self.bucket_name, "Key": full_key }, ExpiresIn=expires_in ) expires_at = (datetime.utcnow() + timedelta(seconds=expires_in)).isoformat() + "Z" return { "presigned_url": url, "presigned_expires_at": expires_at } except ClientError as e: raise RuntimeError(f"Failed to generate presigned URL: {str(e)}") def download_json( self, key: str, add_prefix: bool = False ) -> Dict[str, Any]: """ Download and parse JSON from S3 Args: key: S3 key (path within bucket) add_prefix: Whether to add self.prefix to key Returns: Parsed JSON as dictionary Raises: RuntimeError: If download or parsing fails """ full_key = f"{self.prefix}/{key}" if add_prefix else key try: response = self.s3.get_object( Bucket=self.bucket_name, Key=full_key ) # Read and parse JSON content = response["Body"].read().decode('utf-8') return json.loads(content) except ClientError as e: error_code = e.response.get("Error", {}).get("Code", "Unknown") if error_code == "NoSuchKey": raise RuntimeError(f"S3 object not found: {full_key}") else: raise RuntimeError(f"S3 download failed: {error_code}") except json.JSONDecodeError as e: raise RuntimeError(f"Invalid JSON in S3 object: {str(e)}") def download_file( self, key: str, local_path: str, add_prefix: bool = False ) -> str: """ Download file from S3 to local path Args: key: S3 key (path within bucket) local_path: Local file path to save to add_prefix: Whether to add self.prefix to key Returns: Local file path Raises: RuntimeError: If download fails """ full_key = f"{self.prefix}/{key}" if add_prefix else key try: self.s3.download_file( Bucket=self.bucket_name, Key=full_key, Filename=local_path, Config=self.transfer_config ) return local_path except ClientError as e: error_code = e.response.get("Error", {}).get("Code", "Unknown") if error_code == "NoSuchKey": raise RuntimeError(f"S3 object not found: {full_key}") else: raise RuntimeError(f"S3 download failed: {error_code}") def delete_object( self, key: str, add_prefix: bool = False ) -> bool: """ Delete object from S3 Args: key: S3 key (path within bucket) add_prefix: Whether to add self.prefix to key Returns: True if successful Raises: RuntimeError: If deletion fails """ full_key = f"{self.prefix}/{key}" if add_prefix else key try: self.s3.delete_object( Bucket=self.bucket_name, Key=full_key ) return True except ClientError as e: error_code = e.response.get("Error", {}).get("Code", "Unknown") raise RuntimeError(f"S3 deletion failed: {error_code}") def object_exists( self, key: str, add_prefix: bool = False ) -> bool: """ Check if object exists in S3 Args: key: S3 key (path within bucket) add_prefix: Whether to add self.prefix to key Returns: True if object exists """ full_key = f"{self.prefix}/{key}" if add_prefix else key try: self.s3.head_object( Bucket=self.bucket_name, Key=full_key ) return True except ClientError: return False # Global singleton instance _s3_manager_instance: Optional[S3Manager] = None def get_s3_manager() -> S3Manager: """Get or create global S3Manager instance""" global _s3_manager_instance if _s3_manager_instance is None: _s3_manager_instance = S3Manager() return _s3_manager_instance # Convenience functions for direct use def upload_json_to_s3(key: str, data: Dict[str, Any]) -> Dict[str, Any]: """Upload JSON to S3 (convenience function)""" return get_s3_manager().upload_json(key, data) def download_json_from_s3(key: str) -> Dict[str, Any]: """Download JSON from S3 (convenience function)""" return get_s3_manager().download_json(key) def generate_s3_presigned_url(key: str, expires_in: int = 604800) -> Dict[str, str]: """Generate presigned URL (convenience function)""" return get_s3_manager().generate_presigned_url(key, expires_in)