|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import logging |
|
|
import os |
|
|
from typing import Any |
|
|
|
|
|
import vertexai |
|
|
from google.adk.artifacts import GcsArtifactService, InMemoryArtifactService |
|
|
from google.cloud import logging as google_cloud_logging |
|
|
from vertexai.agent_engines.templates.adk import AdkApp |
|
|
|
|
|
from rag_agent.agent import app as adk_app |
|
|
from rag_agent.app_utils.telemetry import setup_telemetry |
|
|
from rag_agent.app_utils.typing import Feedback |
|
|
|
|
|
|
|
|
class AgentEngineApp(AdkApp): |
|
|
def set_up(self) -> None: |
|
|
"""Initialize the agent engine app with logging and telemetry.""" |
|
|
vertexai.init() |
|
|
setup_telemetry() |
|
|
super().set_up() |
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logging_client = google_cloud_logging.Client() |
|
|
self.logger = logging_client.logger(__name__) |
|
|
if gemini_location: |
|
|
os.environ["GOOGLE_CLOUD_LOCATION"] = gemini_location |
|
|
|
|
|
def register_feedback(self, feedback: dict[str, Any]) -> None: |
|
|
"""Collect and log feedback.""" |
|
|
feedback_obj = Feedback.model_validate(feedback) |
|
|
self.logger.log_struct(feedback_obj.model_dump(), severity="INFO") |
|
|
|
|
|
def register_operations(self) -> dict[str, list[str]]: |
|
|
"""Registers the operations of the Agent.""" |
|
|
operations = super().register_operations() |
|
|
operations[""] = operations.get("", []) + ["register_feedback"] |
|
|
return operations |
|
|
|
|
|
|
|
|
gemini_location = os.environ.get("GOOGLE_CLOUD_LOCATION") |
|
|
logs_bucket_name = os.environ.get("LOGS_BUCKET_NAME") |
|
|
agent_engine = AgentEngineApp( |
|
|
app=adk_app, |
|
|
artifact_service_builder=lambda: GcsArtifactService(bucket_name=logs_bucket_name) |
|
|
if logs_bucket_name |
|
|
else InMemoryArtifactService(), |
|
|
) |
|
|
|