Spaces:
Sleeping
Sleeping
| # services/s3_manager.py | |
| """ | |
| Centralized S3 Manager for MasterLLM V3 Architecture | |
| Handles all S3 operations: | |
| - Upload JSON data and files | |
| - Generate presigned URLs (max 7-day expiry) | |
| - Download from S3 | |
| - Manage bucket structure | |
| - Handle encryption (SSE-AES256 or KMS) | |
| """ | |
| import json | |
| import os | |
| from typing import Dict, Any, Optional, BinaryIO | |
| from datetime import datetime, timedelta | |
| import boto3 | |
| from botocore.exceptions import ClientError | |
| from boto3.s3.transfer import TransferConfig | |
| class S3Manager: | |
| """Centralized S3 operations manager""" | |
| def __init__( | |
| self, | |
| bucket_name: Optional[str] = None, | |
| region: Optional[str] = None, | |
| prefix: str = "masterllm" | |
| ): | |
| """ | |
| Initialize S3 Manager | |
| Args: | |
| bucket_name: S3 bucket name (defaults to env S3_BUCKET_NAME) | |
| region: AWS region (defaults to env AWS_REGION) | |
| prefix: S3 key prefix for all uploads (default: "masterllm") | |
| """ | |
| self.bucket_name = bucket_name or os.getenv("S3_BUCKET_NAME") or os.getenv("S3_BUCKET") | |
| self.region = region or os.getenv("AWS_REGION") or os.getenv("AWS_DEFAULT_REGION") or "us-east-1" | |
| self.prefix = prefix | |
| if not self.bucket_name: | |
| raise RuntimeError("S3 bucket name not configured. Set S3_BUCKET_NAME environment variable.") | |
| # Initialize S3 client | |
| self.s3 = boto3.client("s3", region_name=self.region) | |
| # Transfer config for large files | |
| self.transfer_config = TransferConfig( | |
| multipart_threshold=8 * 1024 * 1024, # 8MB | |
| max_concurrency=4 | |
| ) | |
| # Encryption settings | |
| self.sse = os.getenv("S3_SSE", "").upper() | |
| self.kms_key_id = os.getenv("S3_KMS_KEY_ID") | |
| def _get_extra_args(self, content_type: str = "application/json") -> Dict[str, Any]: | |
| """Get S3 extra args for encryption and content type""" | |
| extra_args = {"ContentType": content_type} | |
| if self.sse == "AES256": | |
| extra_args["ServerSideEncryption"] = "AES256" | |
| elif self.sse == "KMS": | |
| extra_args["ServerSideEncryption"] = "aws:kms" | |
| if self.kms_key_id: | |
| extra_args["SSEKMSKeyId"] = self.kms_key_id | |
| return extra_args | |
| def upload_json( | |
| self, | |
| key: str, | |
| data: Dict[str, Any], | |
| add_prefix: bool = True | |
| ) -> Dict[str, Any]: | |
| """ | |
| Upload JSON data to S3 | |
| Args: | |
| key: S3 key (path within bucket) | |
| data: Dictionary to upload as JSON | |
| add_prefix: Whether to add self.prefix to key | |
| Returns: | |
| Dict with s3_uri, s3_key, s3_bucket | |
| Raises: | |
| RuntimeError: If upload fails | |
| """ | |
| full_key = f"{self.prefix}/{key}" if add_prefix else key | |
| try: | |
| # Convert to JSON bytes | |
| json_bytes = json.dumps(data, ensure_ascii=False, indent=2).encode('utf-8') | |
| # Upload | |
| self.s3.put_object( | |
| Bucket=self.bucket_name, | |
| Key=full_key, | |
| Body=json_bytes, | |
| **self._get_extra_args("application/json") | |
| ) | |
| return { | |
| "s3_uri": f"s3://{self.bucket_name}/{full_key}", | |
| "s3_key": full_key, | |
| "s3_bucket": self.bucket_name, | |
| "uploaded_at": datetime.utcnow().isoformat() + "Z" | |
| } | |
| except ClientError as e: | |
| error_code = e.response.get("Error", {}).get("Code", "Unknown") | |
| raise RuntimeError( | |
| f"S3 JSON upload failed: {error_code}. " | |
| f"Check AWS credentials, permissions (s3:PutObject), region and bucket." | |
| ) | |
| def upload_file( | |
| self, | |
| key: str, | |
| file_obj: BinaryIO, | |
| content_type: str = "application/octet-stream", | |
| add_prefix: bool = True | |
| ) -> Dict[str, Any]: | |
| """ | |
| Upload file object to S3 | |
| Args: | |
| key: S3 key (path within bucket) | |
| file_obj: File-like object to upload | |
| content_type: MIME type of file | |
| add_prefix: Whether to add self.prefix to key | |
| Returns: | |
| Dict with s3_uri, s3_key, s3_bucket, file_size | |
| Raises: | |
| RuntimeError: If upload fails | |
| """ | |
| full_key = f"{self.prefix}/{key}" if add_prefix else key | |
| try: | |
| # Get file size | |
| file_obj.seek(0, 2) # Seek to end | |
| file_size = file_obj.tell() | |
| file_obj.seek(0) # Reset to beginning | |
| # Upload | |
| self.s3.upload_fileobj( | |
| Fileobj=file_obj, | |
| Bucket=self.bucket_name, | |
| Key=full_key, | |
| ExtraArgs=self._get_extra_args(content_type), | |
| Config=self.transfer_config | |
| ) | |
| return { | |
| "s3_uri": f"s3://{self.bucket_name}/{full_key}", | |
| "s3_key": full_key, | |
| "s3_bucket": self.bucket_name, | |
| "file_size": file_size, | |
| "uploaded_at": datetime.utcnow().isoformat() + "Z" | |
| } | |
| except ClientError as e: | |
| error_code = e.response.get("Error", {}).get("Code", "Unknown") | |
| raise RuntimeError( | |
| f"S3 file upload failed: {error_code}. " | |
| f"Check AWS credentials, permissions (s3:PutObject), region and bucket." | |
| ) | |
| def generate_presigned_url( | |
| self, | |
| key: str, | |
| expires_in: int = 604800, # 7 days (max allowed by AWS) | |
| add_prefix: bool = False | |
| ) -> Dict[str, str]: | |
| """ | |
| Generate presigned GET URL for S3 object | |
| Args: | |
| key: S3 key (path within bucket) | |
| expires_in: Expiration time in seconds (max: 604800 = 7 days) | |
| add_prefix: Whether to add self.prefix to key | |
| Returns: | |
| Dict with presigned_url and presigned_expires_at | |
| Note: | |
| AWS maximum expiry is 7 days (604,800 seconds) | |
| """ | |
| full_key = f"{self.prefix}/{key}" if add_prefix else key | |
| # Enforce AWS maximum | |
| expires_in = min(expires_in, 604800) | |
| try: | |
| url = self.s3.generate_presigned_url( | |
| "get_object", | |
| Params={ | |
| "Bucket": self.bucket_name, | |
| "Key": full_key | |
| }, | |
| ExpiresIn=expires_in | |
| ) | |
| expires_at = (datetime.utcnow() + timedelta(seconds=expires_in)).isoformat() + "Z" | |
| return { | |
| "presigned_url": url, | |
| "presigned_expires_at": expires_at | |
| } | |
| except ClientError as e: | |
| raise RuntimeError(f"Failed to generate presigned URL: {str(e)}") | |
| def download_json( | |
| self, | |
| key: str, | |
| add_prefix: bool = False | |
| ) -> Dict[str, Any]: | |
| """ | |
| Download and parse JSON from S3 | |
| Args: | |
| key: S3 key (path within bucket) | |
| add_prefix: Whether to add self.prefix to key | |
| Returns: | |
| Parsed JSON as dictionary | |
| Raises: | |
| RuntimeError: If download or parsing fails | |
| """ | |
| full_key = f"{self.prefix}/{key}" if add_prefix else key | |
| try: | |
| response = self.s3.get_object( | |
| Bucket=self.bucket_name, | |
| Key=full_key | |
| ) | |
| # Read and parse JSON | |
| content = response["Body"].read().decode('utf-8') | |
| return json.loads(content) | |
| except ClientError as e: | |
| error_code = e.response.get("Error", {}).get("Code", "Unknown") | |
| if error_code == "NoSuchKey": | |
| raise RuntimeError(f"S3 object not found: {full_key}") | |
| else: | |
| raise RuntimeError(f"S3 download failed: {error_code}") | |
| except json.JSONDecodeError as e: | |
| raise RuntimeError(f"Invalid JSON in S3 object: {str(e)}") | |
| def download_file( | |
| self, | |
| key: str, | |
| local_path: str, | |
| add_prefix: bool = False | |
| ) -> str: | |
| """ | |
| Download file from S3 to local path | |
| Args: | |
| key: S3 key (path within bucket) | |
| local_path: Local file path to save to | |
| add_prefix: Whether to add self.prefix to key | |
| Returns: | |
| Local file path | |
| Raises: | |
| RuntimeError: If download fails | |
| """ | |
| full_key = f"{self.prefix}/{key}" if add_prefix else key | |
| try: | |
| self.s3.download_file( | |
| Bucket=self.bucket_name, | |
| Key=full_key, | |
| Filename=local_path, | |
| Config=self.transfer_config | |
| ) | |
| return local_path | |
| except ClientError as e: | |
| error_code = e.response.get("Error", {}).get("Code", "Unknown") | |
| if error_code == "NoSuchKey": | |
| raise RuntimeError(f"S3 object not found: {full_key}") | |
| else: | |
| raise RuntimeError(f"S3 download failed: {error_code}") | |
| def delete_object( | |
| self, | |
| key: str, | |
| add_prefix: bool = False | |
| ) -> bool: | |
| """ | |
| Delete object from S3 | |
| Args: | |
| key: S3 key (path within bucket) | |
| add_prefix: Whether to add self.prefix to key | |
| Returns: | |
| True if successful | |
| Raises: | |
| RuntimeError: If deletion fails | |
| """ | |
| full_key = f"{self.prefix}/{key}" if add_prefix else key | |
| try: | |
| self.s3.delete_object( | |
| Bucket=self.bucket_name, | |
| Key=full_key | |
| ) | |
| return True | |
| except ClientError as e: | |
| error_code = e.response.get("Error", {}).get("Code", "Unknown") | |
| raise RuntimeError(f"S3 deletion failed: {error_code}") | |
| def object_exists( | |
| self, | |
| key: str, | |
| add_prefix: bool = False | |
| ) -> bool: | |
| """ | |
| Check if object exists in S3 | |
| Args: | |
| key: S3 key (path within bucket) | |
| add_prefix: Whether to add self.prefix to key | |
| Returns: | |
| True if object exists | |
| """ | |
| full_key = f"{self.prefix}/{key}" if add_prefix else key | |
| try: | |
| self.s3.head_object( | |
| Bucket=self.bucket_name, | |
| Key=full_key | |
| ) | |
| return True | |
| except ClientError: | |
| return False | |
| # Global singleton instance | |
| _s3_manager_instance: Optional[S3Manager] = None | |
| def get_s3_manager() -> S3Manager: | |
| """Get or create global S3Manager instance""" | |
| global _s3_manager_instance | |
| if _s3_manager_instance is None: | |
| _s3_manager_instance = S3Manager() | |
| return _s3_manager_instance | |
| # Convenience functions for direct use | |
| def upload_json_to_s3(key: str, data: Dict[str, Any]) -> Dict[str, Any]: | |
| """Upload JSON to S3 (convenience function)""" | |
| return get_s3_manager().upload_json(key, data) | |
| def download_json_from_s3(key: str) -> Dict[str, Any]: | |
| """Download JSON from S3 (convenience function)""" | |
| return get_s3_manager().download_json(key) | |
| def generate_s3_presigned_url(key: str, expires_in: int = 604800) -> Dict[str, str]: | |
| """Generate presigned URL (convenience function)""" | |
| return get_s3_manager().generate_presigned_url(key, expires_in) | |