File size: 11,891 Bytes
f0db057
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
# services/s3_manager.py
"""
Centralized S3 Manager for MasterLLM V3 Architecture

Handles all S3 operations:
- Upload JSON data and files
- Generate presigned URLs (max 7-day expiry)
- Download from S3
- Manage bucket structure
- Handle encryption (SSE-AES256 or KMS)
"""
import json
import os
from typing import Dict, Any, Optional, BinaryIO
from datetime import datetime, timedelta
import boto3
from botocore.exceptions import ClientError
from boto3.s3.transfer import TransferConfig


class S3Manager:
    """Centralized S3 operations manager"""
    
    def __init__(
        self,
        bucket_name: Optional[str] = None,
        region: Optional[str] = None,
        prefix: str = "masterllm"
    ):
        """
        Initialize S3 Manager
        
        Args:
            bucket_name: S3 bucket name (defaults to env S3_BUCKET_NAME)
            region: AWS region (defaults to env AWS_REGION)
            prefix: S3 key prefix for all uploads (default: "masterllm")
        """
        self.bucket_name = bucket_name or os.getenv("S3_BUCKET_NAME") or os.getenv("S3_BUCKET")
        self.region = region or os.getenv("AWS_REGION") or os.getenv("AWS_DEFAULT_REGION") or "us-east-1"
        self.prefix = prefix
        
        if not self.bucket_name:
            raise RuntimeError("S3 bucket name not configured. Set S3_BUCKET_NAME environment variable.")
        
        # Initialize S3 client
        self.s3 = boto3.client("s3", region_name=self.region)
        
        # Transfer config for large files
        self.transfer_config = TransferConfig(
            multipart_threshold=8 * 1024 * 1024,  # 8MB
            max_concurrency=4
        )
        
        # Encryption settings
        self.sse = os.getenv("S3_SSE", "").upper()
        self.kms_key_id = os.getenv("S3_KMS_KEY_ID")
    
    def _get_extra_args(self, content_type: str = "application/json") -> Dict[str, Any]:
        """Get S3 extra args for encryption and content type"""
        extra_args = {"ContentType": content_type}
        
        if self.sse == "AES256":
            extra_args["ServerSideEncryption"] = "AES256"
        elif self.sse == "KMS":
            extra_args["ServerSideEncryption"] = "aws:kms"
            if self.kms_key_id:
                extra_args["SSEKMSKeyId"] = self.kms_key_id
        
        return extra_args
    
    def upload_json(
        self,
        key: str,
        data: Dict[str, Any],
        add_prefix: bool = True
    ) -> Dict[str, Any]:
        """
        Upload JSON data to S3
        
        Args:
            key: S3 key (path within bucket)
            data: Dictionary to upload as JSON
            add_prefix: Whether to add self.prefix to key
        
        Returns:
            Dict with s3_uri, s3_key, s3_bucket
        
        Raises:
            RuntimeError: If upload fails
        """
        full_key = f"{self.prefix}/{key}" if add_prefix else key
        
        try:
            # Convert to JSON bytes
            json_bytes = json.dumps(data, ensure_ascii=False, indent=2).encode('utf-8')
            
            # Upload
            self.s3.put_object(
                Bucket=self.bucket_name,
                Key=full_key,
                Body=json_bytes,
                **self._get_extra_args("application/json")
            )
            
            return {
                "s3_uri": f"s3://{self.bucket_name}/{full_key}",
                "s3_key": full_key,
                "s3_bucket": self.bucket_name,
                "uploaded_at": datetime.utcnow().isoformat() + "Z"
            }
            
        except ClientError as e:
            error_code = e.response.get("Error", {}).get("Code", "Unknown")
            raise RuntimeError(
                f"S3 JSON upload failed: {error_code}. "
                f"Check AWS credentials, permissions (s3:PutObject), region and bucket."
            )
    
    def upload_file(
        self,
        key: str,
        file_obj: BinaryIO,
        content_type: str = "application/octet-stream",
        add_prefix: bool = True
    ) -> Dict[str, Any]:
        """
        Upload file object to S3
        
        Args:
            key: S3 key (path within bucket)
            file_obj: File-like object to upload
            content_type: MIME type of file
            add_prefix: Whether to add self.prefix to key
        
        Returns:
            Dict with s3_uri, s3_key, s3_bucket, file_size
        
        Raises:
            RuntimeError: If upload fails
        """
        full_key = f"{self.prefix}/{key}" if add_prefix else key
        
        try:
            # Get file size
            file_obj.seek(0, 2)  # Seek to end
            file_size = file_obj.tell()
            file_obj.seek(0)  # Reset to beginning
            
            # Upload
            self.s3.upload_fileobj(
                Fileobj=file_obj,
                Bucket=self.bucket_name,
                Key=full_key,
                ExtraArgs=self._get_extra_args(content_type),
                Config=self.transfer_config
            )
            
            return {
                "s3_uri": f"s3://{self.bucket_name}/{full_key}",
                "s3_key": full_key,
                "s3_bucket": self.bucket_name,
                "file_size": file_size,
                "uploaded_at": datetime.utcnow().isoformat() + "Z"
            }
            
        except ClientError as e:
            error_code = e.response.get("Error", {}).get("Code", "Unknown")
            raise RuntimeError(
                f"S3 file upload failed: {error_code}. "
                f"Check AWS credentials, permissions (s3:PutObject), region and bucket."
            )
    
    def generate_presigned_url(
        self,
        key: str,
        expires_in: int = 604800,  # 7 days (max allowed by AWS)
        add_prefix: bool = False
    ) -> Dict[str, str]:
        """
        Generate presigned GET URL for S3 object
        
        Args:
            key: S3 key (path within bucket)
            expires_in: Expiration time in seconds (max: 604800 = 7 days)
            add_prefix: Whether to add self.prefix to key
        
        Returns:
            Dict with presigned_url and presigned_expires_at
        
        Note:
            AWS maximum expiry is 7 days (604,800 seconds)
        """
        full_key = f"{self.prefix}/{key}" if add_prefix else key
        
        # Enforce AWS maximum
        expires_in = min(expires_in, 604800)
        
        try:
            url = self.s3.generate_presigned_url(
                "get_object",
                Params={
                    "Bucket": self.bucket_name,
                    "Key": full_key
                },
                ExpiresIn=expires_in
            )
            
            expires_at = (datetime.utcnow() + timedelta(seconds=expires_in)).isoformat() + "Z"
            
            return {
                "presigned_url": url,
                "presigned_expires_at": expires_at
            }
            
        except ClientError as e:
            raise RuntimeError(f"Failed to generate presigned URL: {str(e)}")
    
    def download_json(
        self,
        key: str,
        add_prefix: bool = False
    ) -> Dict[str, Any]:
        """
        Download and parse JSON from S3
        
        Args:
            key: S3 key (path within bucket)
            add_prefix: Whether to add self.prefix to key
        
        Returns:
            Parsed JSON as dictionary
        
        Raises:
            RuntimeError: If download or parsing fails
        """
        full_key = f"{self.prefix}/{key}" if add_prefix else key
        
        try:
            response = self.s3.get_object(
                Bucket=self.bucket_name,
                Key=full_key
            )
            
            # Read and parse JSON
            content = response["Body"].read().decode('utf-8')
            return json.loads(content)
            
        except ClientError as e:
            error_code = e.response.get("Error", {}).get("Code", "Unknown")
            if error_code == "NoSuchKey":
                raise RuntimeError(f"S3 object not found: {full_key}")
            else:
                raise RuntimeError(f"S3 download failed: {error_code}")
        except json.JSONDecodeError as e:
            raise RuntimeError(f"Invalid JSON in S3 object: {str(e)}")
    
    def download_file(
        self,
        key: str,
        local_path: str,
        add_prefix: bool = False
    ) -> str:
        """
        Download file from S3 to local path
        
        Args:
            key: S3 key (path within bucket)
            local_path: Local file path to save to
            add_prefix: Whether to add self.prefix to key
        
        Returns:
            Local file path
        
        Raises:
            RuntimeError: If download fails
        """
        full_key = f"{self.prefix}/{key}" if add_prefix else key
        
        try:
            self.s3.download_file(
                Bucket=self.bucket_name,
                Key=full_key,
                Filename=local_path,
                Config=self.transfer_config
            )
            
            return local_path
            
        except ClientError as e:
            error_code = e.response.get("Error", {}).get("Code", "Unknown")
            if error_code == "NoSuchKey":
                raise RuntimeError(f"S3 object not found: {full_key}")
            else:
                raise RuntimeError(f"S3 download failed: {error_code}")
    
    def delete_object(
        self,
        key: str,
        add_prefix: bool = False
    ) -> bool:
        """
        Delete object from S3
        
        Args:
            key: S3 key (path within bucket)
            add_prefix: Whether to add self.prefix to key
        
        Returns:
            True if successful
        
        Raises:
            RuntimeError: If deletion fails
        """
        full_key = f"{self.prefix}/{key}" if add_prefix else key
        
        try:
            self.s3.delete_object(
                Bucket=self.bucket_name,
                Key=full_key
            )
            return True
            
        except ClientError as e:
            error_code = e.response.get("Error", {}).get("Code", "Unknown")
            raise RuntimeError(f"S3 deletion failed: {error_code}")
    
    def object_exists(
        self,
        key: str,
        add_prefix: bool = False
    ) -> bool:
        """
        Check if object exists in S3
        
        Args:
            key: S3 key (path within bucket)
            add_prefix: Whether to add self.prefix to key
        
        Returns:
            True if object exists
        """
        full_key = f"{self.prefix}/{key}" if add_prefix else key
        
        try:
            self.s3.head_object(
                Bucket=self.bucket_name,
                Key=full_key
            )
            return True
        except ClientError:
            return False


# Global singleton instance
_s3_manager_instance: Optional[S3Manager] = None


def get_s3_manager() -> S3Manager:
    """Get or create global S3Manager instance"""
    global _s3_manager_instance
    
    if _s3_manager_instance is None:
        _s3_manager_instance = S3Manager()
    
    return _s3_manager_instance


# Convenience functions for direct use
def upload_json_to_s3(key: str, data: Dict[str, Any]) -> Dict[str, Any]:
    """Upload JSON to S3 (convenience function)"""
    return get_s3_manager().upload_json(key, data)


def download_json_from_s3(key: str) -> Dict[str, Any]:
    """Download JSON from S3 (convenience function)"""
    return get_s3_manager().download_json(key)


def generate_s3_presigned_url(key: str, expires_in: int = 604800) -> Dict[str, str]:
    """Generate presigned URL (convenience function)"""
    return get_s3_manager().generate_presigned_url(key, expires_in)