| | import boto3 |
| | from botocore.exceptions import ClientError |
| | from core.config import settings |
| | import logging |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | class S3Service: |
| | def __init__(self): |
| | self.s3_client = boto3.client( |
| | 's3', |
| | aws_access_key_id=settings.AWS_ACCESS_KEY_ID, |
| | aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY, |
| | region_name=settings.AWS_REGION |
| | ) |
| | self.bucket_name = settings.AWS_S3_BUCKET |
| |
|
| | def get_public_url(self, key: str): |
| | """ |
| | Generates the standard S3 public URL for a given key. |
| | """ |
| | return f"https://{self.bucket_name}.s3.{settings.AWS_REGION}.amazonaws.com/{key}" |
| |
|
| | def get_presigned_url(self, key: str, expires_in: int = 3600): |
| | """ |
| | Generates a pre-signed URL for secure access. Default: 1 hour. |
| | """ |
| | try: |
| | url = self.s3_client.generate_presigned_url( |
| | 'get_object', |
| | Params={'Bucket': self.bucket_name, 'Key': key}, |
| | ExpiresIn=expires_in |
| | ) |
| | return url |
| | except ClientError as e: |
| | logger.error(f"Failed to generate presigned URL: {e}") |
| | return None |
| |
|
| | async def upload_file(self, file_content: bytes, filename: str, user_id: str): |
| | """ |
| | Uploads a file to S3 under a user-specific folder. |
| | """ |
| | key = f"users/{user_id}/sources/{filename}" |
| | try: |
| | self.s3_client.put_object( |
| | Bucket=self.bucket_name, |
| | Key=key, |
| | Body=file_content |
| | ) |
| | return { |
| | "key": key, |
| | "public_url": self.get_public_url(key), |
| | "private_url": self.get_presigned_url(key) |
| | } |
| | except ClientError as e: |
| | logger.error(f"Failed to upload to S3: {e}") |
| | raise Exception("S3 Upload Failed") |
| |
|
| | async def list_user_files(self, user_id: str): |
| | """ |
| | Lists files for a specific user. |
| | """ |
| | prefix = f"users/{user_id}/sources/" |
| | try: |
| | response = self.s3_client.list_objects_v2( |
| | Bucket=self.bucket_name, |
| | Prefix=prefix |
| | ) |
| | files = [] |
| | if 'Contents' in response: |
| | for obj in response['Contents']: |
| | |
| | filename = obj['Key'].replace(prefix, "") |
| | if filename: |
| | files.append({ |
| | "filename": filename, |
| | "key": obj['Key'], |
| | "public_url": self.get_public_url(obj['Key']), |
| | "private_url": self.get_presigned_url(obj['Key']), |
| | "size": obj['Size'], |
| | "last_modified": obj['LastModified'] |
| | }) |
| | return files |
| | except ClientError as e: |
| | logger.error(f"Failed to list S3 files: {e}") |
| | raise Exception("S3 List Failed") |
| |
|
| | async def delete_file(self, key: str): |
| | """ |
| | Deletes a file from S3. |
| | """ |
| | try: |
| | self.s3_client.delete_object( |
| | Bucket=self.bucket_name, |
| | Key=key |
| | ) |
| | logger.info(f"Deleted S3 object: {key}") |
| | return True |
| | except ClientError as e: |
| | logger.error(f"Failed to delete S3 object: {e}") |
| | return False |
| |
|
| | s3_service = S3Service() |
| |
|