masterllm / services /s3_manager.py
ganesh-vilje's picture
feat: Implement V3 Architecture - S3 + Normalized MongoDB
f0db057
# services/s3_manager.py
"""
Centralized S3 Manager for MasterLLM V3 Architecture
Handles all S3 operations:
- Upload JSON data and files
- Generate presigned URLs (max 7-day expiry)
- Download from S3
- Manage bucket structure
- Handle encryption (SSE-AES256 or KMS)
"""
import json
import os
from typing import Dict, Any, Optional, BinaryIO
from datetime import datetime, timedelta
import boto3
from botocore.exceptions import ClientError
from boto3.s3.transfer import TransferConfig
class S3Manager:
"""Centralized S3 operations manager"""
def __init__(
self,
bucket_name: Optional[str] = None,
region: Optional[str] = None,
prefix: str = "masterllm"
):
"""
Initialize S3 Manager
Args:
bucket_name: S3 bucket name (defaults to env S3_BUCKET_NAME)
region: AWS region (defaults to env AWS_REGION)
prefix: S3 key prefix for all uploads (default: "masterllm")
"""
self.bucket_name = bucket_name or os.getenv("S3_BUCKET_NAME") or os.getenv("S3_BUCKET")
self.region = region or os.getenv("AWS_REGION") or os.getenv("AWS_DEFAULT_REGION") or "us-east-1"
self.prefix = prefix
if not self.bucket_name:
raise RuntimeError("S3 bucket name not configured. Set S3_BUCKET_NAME environment variable.")
# Initialize S3 client
self.s3 = boto3.client("s3", region_name=self.region)
# Transfer config for large files
self.transfer_config = TransferConfig(
multipart_threshold=8 * 1024 * 1024, # 8MB
max_concurrency=4
)
# Encryption settings
self.sse = os.getenv("S3_SSE", "").upper()
self.kms_key_id = os.getenv("S3_KMS_KEY_ID")
def _get_extra_args(self, content_type: str = "application/json") -> Dict[str, Any]:
"""Get S3 extra args for encryption and content type"""
extra_args = {"ContentType": content_type}
if self.sse == "AES256":
extra_args["ServerSideEncryption"] = "AES256"
elif self.sse == "KMS":
extra_args["ServerSideEncryption"] = "aws:kms"
if self.kms_key_id:
extra_args["SSEKMSKeyId"] = self.kms_key_id
return extra_args
def upload_json(
self,
key: str,
data: Dict[str, Any],
add_prefix: bool = True
) -> Dict[str, Any]:
"""
Upload JSON data to S3
Args:
key: S3 key (path within bucket)
data: Dictionary to upload as JSON
add_prefix: Whether to add self.prefix to key
Returns:
Dict with s3_uri, s3_key, s3_bucket
Raises:
RuntimeError: If upload fails
"""
full_key = f"{self.prefix}/{key}" if add_prefix else key
try:
# Convert to JSON bytes
json_bytes = json.dumps(data, ensure_ascii=False, indent=2).encode('utf-8')
# Upload
self.s3.put_object(
Bucket=self.bucket_name,
Key=full_key,
Body=json_bytes,
**self._get_extra_args("application/json")
)
return {
"s3_uri": f"s3://{self.bucket_name}/{full_key}",
"s3_key": full_key,
"s3_bucket": self.bucket_name,
"uploaded_at": datetime.utcnow().isoformat() + "Z"
}
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code", "Unknown")
raise RuntimeError(
f"S3 JSON upload failed: {error_code}. "
f"Check AWS credentials, permissions (s3:PutObject), region and bucket."
)
def upload_file(
self,
key: str,
file_obj: BinaryIO,
content_type: str = "application/octet-stream",
add_prefix: bool = True
) -> Dict[str, Any]:
"""
Upload file object to S3
Args:
key: S3 key (path within bucket)
file_obj: File-like object to upload
content_type: MIME type of file
add_prefix: Whether to add self.prefix to key
Returns:
Dict with s3_uri, s3_key, s3_bucket, file_size
Raises:
RuntimeError: If upload fails
"""
full_key = f"{self.prefix}/{key}" if add_prefix else key
try:
# Get file size
file_obj.seek(0, 2) # Seek to end
file_size = file_obj.tell()
file_obj.seek(0) # Reset to beginning
# Upload
self.s3.upload_fileobj(
Fileobj=file_obj,
Bucket=self.bucket_name,
Key=full_key,
ExtraArgs=self._get_extra_args(content_type),
Config=self.transfer_config
)
return {
"s3_uri": f"s3://{self.bucket_name}/{full_key}",
"s3_key": full_key,
"s3_bucket": self.bucket_name,
"file_size": file_size,
"uploaded_at": datetime.utcnow().isoformat() + "Z"
}
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code", "Unknown")
raise RuntimeError(
f"S3 file upload failed: {error_code}. "
f"Check AWS credentials, permissions (s3:PutObject), region and bucket."
)
def generate_presigned_url(
self,
key: str,
expires_in: int = 604800, # 7 days (max allowed by AWS)
add_prefix: bool = False
) -> Dict[str, str]:
"""
Generate presigned GET URL for S3 object
Args:
key: S3 key (path within bucket)
expires_in: Expiration time in seconds (max: 604800 = 7 days)
add_prefix: Whether to add self.prefix to key
Returns:
Dict with presigned_url and presigned_expires_at
Note:
AWS maximum expiry is 7 days (604,800 seconds)
"""
full_key = f"{self.prefix}/{key}" if add_prefix else key
# Enforce AWS maximum
expires_in = min(expires_in, 604800)
try:
url = self.s3.generate_presigned_url(
"get_object",
Params={
"Bucket": self.bucket_name,
"Key": full_key
},
ExpiresIn=expires_in
)
expires_at = (datetime.utcnow() + timedelta(seconds=expires_in)).isoformat() + "Z"
return {
"presigned_url": url,
"presigned_expires_at": expires_at
}
except ClientError as e:
raise RuntimeError(f"Failed to generate presigned URL: {str(e)}")
def download_json(
self,
key: str,
add_prefix: bool = False
) -> Dict[str, Any]:
"""
Download and parse JSON from S3
Args:
key: S3 key (path within bucket)
add_prefix: Whether to add self.prefix to key
Returns:
Parsed JSON as dictionary
Raises:
RuntimeError: If download or parsing fails
"""
full_key = f"{self.prefix}/{key}" if add_prefix else key
try:
response = self.s3.get_object(
Bucket=self.bucket_name,
Key=full_key
)
# Read and parse JSON
content = response["Body"].read().decode('utf-8')
return json.loads(content)
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code", "Unknown")
if error_code == "NoSuchKey":
raise RuntimeError(f"S3 object not found: {full_key}")
else:
raise RuntimeError(f"S3 download failed: {error_code}")
except json.JSONDecodeError as e:
raise RuntimeError(f"Invalid JSON in S3 object: {str(e)}")
def download_file(
self,
key: str,
local_path: str,
add_prefix: bool = False
) -> str:
"""
Download file from S3 to local path
Args:
key: S3 key (path within bucket)
local_path: Local file path to save to
add_prefix: Whether to add self.prefix to key
Returns:
Local file path
Raises:
RuntimeError: If download fails
"""
full_key = f"{self.prefix}/{key}" if add_prefix else key
try:
self.s3.download_file(
Bucket=self.bucket_name,
Key=full_key,
Filename=local_path,
Config=self.transfer_config
)
return local_path
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code", "Unknown")
if error_code == "NoSuchKey":
raise RuntimeError(f"S3 object not found: {full_key}")
else:
raise RuntimeError(f"S3 download failed: {error_code}")
def delete_object(
self,
key: str,
add_prefix: bool = False
) -> bool:
"""
Delete object from S3
Args:
key: S3 key (path within bucket)
add_prefix: Whether to add self.prefix to key
Returns:
True if successful
Raises:
RuntimeError: If deletion fails
"""
full_key = f"{self.prefix}/{key}" if add_prefix else key
try:
self.s3.delete_object(
Bucket=self.bucket_name,
Key=full_key
)
return True
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code", "Unknown")
raise RuntimeError(f"S3 deletion failed: {error_code}")
def object_exists(
self,
key: str,
add_prefix: bool = False
) -> bool:
"""
Check if object exists in S3
Args:
key: S3 key (path within bucket)
add_prefix: Whether to add self.prefix to key
Returns:
True if object exists
"""
full_key = f"{self.prefix}/{key}" if add_prefix else key
try:
self.s3.head_object(
Bucket=self.bucket_name,
Key=full_key
)
return True
except ClientError:
return False
# Global singleton instance
_s3_manager_instance: Optional[S3Manager] = None
def get_s3_manager() -> S3Manager:
"""Get or create global S3Manager instance"""
global _s3_manager_instance
if _s3_manager_instance is None:
_s3_manager_instance = S3Manager()
return _s3_manager_instance
# Convenience functions for direct use
def upload_json_to_s3(key: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""Upload JSON to S3 (convenience function)"""
return get_s3_manager().upload_json(key, data)
def download_json_from_s3(key: str) -> Dict[str, Any]:
"""Download JSON from S3 (convenience function)"""
return get_s3_manager().download_json(key)
def generate_s3_presigned_url(key: str, expires_in: int = 604800) -> Dict[str, str]:
"""Generate presigned URL (convenience function)"""
return get_s3_manager().generate_presigned_url(key, expires_in)