Spaces:
Sleeping
Sleeping
File size: 5,866 Bytes
8bf4d58 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
"""Cloud storage agent for remote data access."""
import logging
import os
from typing import Optional
from src.agents.base_agent import BaseAgent
from src.core.config import get_settings
logger = logging.getLogger(__name__)
class CloudAgent(BaseAgent):
"""Agent specialized in accessing cloud storage and remote data."""
def __init__(self, use_planning: bool = False):
"""Initialize cloud agent."""
super().__init__(
name="cloud_agent",
description=(
"You are a specialized agent for accessing cloud storage and remote data. "
"You can retrieve documents and information from cloud storage services "
"like AWS S3 or Google Cloud Storage."
),
use_memory=True,
use_planning=use_planning,
)
self.settings = get_settings()
self._init_cloud_client()
def _init_cloud_client(self):
"""Initialize cloud storage client based on configuration."""
self.cloud_type = None
self.client = None
# Check for AWS S3
if self.settings.aws_access_key_id and self.settings.aws_s3_bucket:
try:
import boto3
self.client = boto3.client(
"s3",
aws_access_key_id=self.settings.aws_access_key_id,
aws_secret_access_key=self.settings.aws_secret_access_key,
region_name=self.settings.aws_region,
)
self.cloud_type = "s3"
self.bucket_name = self.settings.aws_s3_bucket
logger.info("Initialized AWS S3 client")
except ImportError:
logger.warning("boto3 not installed, AWS S3 unavailable")
except Exception as e:
logger.error(f"Error initializing S3 client: {e}")
# Check for GCS
elif self.settings.google_application_credentials and self.settings.gcs_bucket_name:
try:
from google.cloud import storage
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.settings.google_application_credentials
self.client = storage.Client()
self.cloud_type = "gcs"
self.bucket_name = self.settings.gcs_bucket_name
logger.info("Initialized Google Cloud Storage client")
except ImportError:
logger.warning("google-cloud-storage not installed, GCS unavailable")
except Exception as e:
logger.error(f"Error initializing GCS client: {e}")
if not self.client:
logger.warning("No cloud storage configured")
async def retrieve_context(self, query: str) -> str:
"""
Retrieve relevant context from cloud storage.
Args:
query: User query
Returns:
Context string from cloud documents
"""
if not self.client:
return "Cloud storage is not configured."
try:
if self.cloud_type == "s3":
return await self._retrieve_from_s3(query)
elif self.cloud_type == "gcs":
return await self._retrieve_from_gcs(query)
else:
return "Unknown cloud storage type."
except Exception as e:
logger.error(f"Error retrieving cloud context: {e}")
return f"Error retrieving from cloud storage: {str(e)}"
async def _retrieve_from_s3(self, query: str) -> str:
"""Retrieve documents from S3."""
try:
# List objects in bucket (simplified - in production, use vector search)
response = self.client.list_objects_v2(
Bucket=self.bucket_name,
MaxKeys=10,
)
if "Contents" not in response:
return "No documents found in S3 bucket."
context_parts = [f"Documents in S3 bucket '{self.bucket_name}':"]
for obj in response["Contents"][:5]: # Limit to 5
key = obj["Key"]
size = obj["Size"]
context_parts.append(f"- {key} ({size} bytes)")
return "\n".join(context_parts)
except Exception as e:
logger.error(f"Error listing S3 objects: {e}")
return f"Error accessing S3: {str(e)}"
async def _retrieve_from_gcs(self, query: str) -> str:
"""Retrieve documents from GCS."""
try:
bucket = self.client.bucket(self.bucket_name)
blobs = list(bucket.list_blobs(max_results=10))
if not blobs:
return "No documents found in GCS bucket."
context_parts = [f"Documents in GCS bucket '{self.bucket_name}':"]
for blob in blobs[:5]: # Limit to 5
context_parts.append(f"- {blob.name} ({blob.size} bytes)")
return "\n".join(context_parts)
except Exception as e:
logger.error(f"Error listing GCS objects: {e}")
return f"Error accessing GCS: {str(e)}"
async def process(
self,
query: str,
session_id: Optional[str] = None,
context: Optional[str] = None,
) -> dict:
"""
Process query with cloud storage access.
Args:
query: User query
session_id: Optional session ID
context: Optional additional context
Returns:
Response dictionary
"""
# Retrieve cloud context
cloud_context = await self.retrieve_context(query)
# Combine with provided context
full_context = cloud_context
if context:
full_context = f"{context}\n\n{cloud_context}"
# Process using base agent
return await super().process(query, session_id, full_context)
|