AgenticAI-RAG / src /agents /cloud_agent.py
GreymanT's picture
Upload 80 files
8bf4d58 verified
"""Cloud storage agent for remote data access."""
import logging
import os
from typing import Optional
from src.agents.base_agent import BaseAgent
from src.core.config import get_settings
logger = logging.getLogger(__name__)
class CloudAgent(BaseAgent):
"""Agent specialized in accessing cloud storage and remote data."""
def __init__(self, use_planning: bool = False):
"""Initialize cloud agent."""
super().__init__(
name="cloud_agent",
description=(
"You are a specialized agent for accessing cloud storage and remote data. "
"You can retrieve documents and information from cloud storage services "
"like AWS S3 or Google Cloud Storage."
),
use_memory=True,
use_planning=use_planning,
)
self.settings = get_settings()
self._init_cloud_client()
def _init_cloud_client(self):
"""Initialize cloud storage client based on configuration."""
self.cloud_type = None
self.client = None
# Check for AWS S3
if self.settings.aws_access_key_id and self.settings.aws_s3_bucket:
try:
import boto3
self.client = boto3.client(
"s3",
aws_access_key_id=self.settings.aws_access_key_id,
aws_secret_access_key=self.settings.aws_secret_access_key,
region_name=self.settings.aws_region,
)
self.cloud_type = "s3"
self.bucket_name = self.settings.aws_s3_bucket
logger.info("Initialized AWS S3 client")
except ImportError:
logger.warning("boto3 not installed, AWS S3 unavailable")
except Exception as e:
logger.error(f"Error initializing S3 client: {e}")
# Check for GCS
elif self.settings.google_application_credentials and self.settings.gcs_bucket_name:
try:
from google.cloud import storage
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.settings.google_application_credentials
self.client = storage.Client()
self.cloud_type = "gcs"
self.bucket_name = self.settings.gcs_bucket_name
logger.info("Initialized Google Cloud Storage client")
except ImportError:
logger.warning("google-cloud-storage not installed, GCS unavailable")
except Exception as e:
logger.error(f"Error initializing GCS client: {e}")
if not self.client:
logger.warning("No cloud storage configured")
async def retrieve_context(self, query: str) -> str:
"""
Retrieve relevant context from cloud storage.
Args:
query: User query
Returns:
Context string from cloud documents
"""
if not self.client:
return "Cloud storage is not configured."
try:
if self.cloud_type == "s3":
return await self._retrieve_from_s3(query)
elif self.cloud_type == "gcs":
return await self._retrieve_from_gcs(query)
else:
return "Unknown cloud storage type."
except Exception as e:
logger.error(f"Error retrieving cloud context: {e}")
return f"Error retrieving from cloud storage: {str(e)}"
async def _retrieve_from_s3(self, query: str) -> str:
"""Retrieve documents from S3."""
try:
# List objects in bucket (simplified - in production, use vector search)
response = self.client.list_objects_v2(
Bucket=self.bucket_name,
MaxKeys=10,
)
if "Contents" not in response:
return "No documents found in S3 bucket."
context_parts = [f"Documents in S3 bucket '{self.bucket_name}':"]
for obj in response["Contents"][:5]: # Limit to 5
key = obj["Key"]
size = obj["Size"]
context_parts.append(f"- {key} ({size} bytes)")
return "\n".join(context_parts)
except Exception as e:
logger.error(f"Error listing S3 objects: {e}")
return f"Error accessing S3: {str(e)}"
async def _retrieve_from_gcs(self, query: str) -> str:
"""Retrieve documents from GCS."""
try:
bucket = self.client.bucket(self.bucket_name)
blobs = list(bucket.list_blobs(max_results=10))
if not blobs:
return "No documents found in GCS bucket."
context_parts = [f"Documents in GCS bucket '{self.bucket_name}':"]
for blob in blobs[:5]: # Limit to 5
context_parts.append(f"- {blob.name} ({blob.size} bytes)")
return "\n".join(context_parts)
except Exception as e:
logger.error(f"Error listing GCS objects: {e}")
return f"Error accessing GCS: {str(e)}"
async def process(
self,
query: str,
session_id: Optional[str] = None,
context: Optional[str] = None,
) -> dict:
"""
Process query with cloud storage access.
Args:
query: User query
session_id: Optional session ID
context: Optional additional context
Returns:
Response dictionary
"""
# Retrieve cloud context
cloud_context = await self.retrieve_context(query)
# Combine with provided context
full_context = cloud_context
if context:
full_context = f"{context}\n\n{cloud_context}"
# Process using base agent
return await super().process(query, session_id, full_context)