Bima Ardhia
initttt
d2d5a16
"""
Tool for adding new data sources to a Vertex AI RAG corpus.
"""
import re
from typing import List
from google.adk.tools.tool_context import ToolContext
from vertexai import rag
from ..config import (
DEFAULT_CHUNK_OVERLAP,
DEFAULT_CHUNK_SIZE,
DEFAULT_EMBEDDING_REQUESTS_PER_MIN,
)
from .utils import check_corpus_exists, get_corpus_resource_name
def add_data(
corpus_name: str,
paths: List[str],
tool_context: ToolContext,
) -> dict:
"""
Add new data sources to a Vertex AI RAG corpus.
Args:
corpus_name (str): The name of the corpus to add data to. If empty, the current corpus will be used.
paths (List[str]): List of URLs or GCS paths to add to the corpus.
Supported formats:
- Google Drive: "https://drive.google.com/file/d/{FILE_ID}/view"
- Google Docs/Sheets/Slides: "https://docs.google.com/{type}/d/{FILE_ID}/..."
- Google Cloud Storage: "gs://{BUCKET}/{PATH}"
Example: ["https://drive.google.com/file/d/123", "gs://my_bucket/my_files_dir"]
tool_context (ToolContext): The tool context
Returns:
dict: Information about the added data and status
"""
# Check if the corpus exists
if not check_corpus_exists(corpus_name, tool_context):
return {
"status": "error",
"message": f"Corpus '{corpus_name}' does not exist. Please create it first using the create_corpus tool.",
"corpus_name": corpus_name,
"paths": paths,
}
# Validate inputs
if not paths or not all(isinstance(path, str) for path in paths):
return {
"status": "error",
"message": "Invalid paths: Please provide a list of URLs or GCS paths",
"corpus_name": corpus_name,
"paths": paths,
}
# Pre-process paths to validate and convert Google Docs URLs to Drive format if needed
validated_paths = []
invalid_paths = []
conversions = []
for path in paths:
if not path or not isinstance(path, str):
invalid_paths.append(f"{path} (Not a valid string)")
continue
# Check for Google Docs/Sheets/Slides URLs and convert them to Drive format
docs_match = re.match(
r"https:\/\/docs\.google\.com\/(?:document|spreadsheets|presentation)\/d\/([a-zA-Z0-9_-]+)(?:\/|$)",
path,
)
if docs_match:
file_id = docs_match.group(1)
drive_url = f"https://drive.google.com/file/d/{file_id}/view"
validated_paths.append(drive_url)
conversions.append(f"{path}{drive_url}")
continue
# Check for valid Drive URL format
drive_match = re.match(
r"https:\/\/drive\.google\.com\/(?:file\/d\/|open\?id=)([a-zA-Z0-9_-]+)(?:\/|$)",
path,
)
if drive_match:
# Normalize to the standard Drive URL format
file_id = drive_match.group(1)
drive_url = f"https://drive.google.com/file/d/{file_id}/view"
validated_paths.append(drive_url)
if drive_url != path:
conversions.append(f"{path}{drive_url}")
continue
# Check for GCS paths
if path.startswith("gs://"):
validated_paths.append(path)
continue
# If we're here, the path wasn't in a recognized format
invalid_paths.append(f"{path} (Invalid format)")
# Check if we have any valid paths after validation
if not validated_paths:
return {
"status": "error",
"message": "No valid paths provided. Please provide Google Drive URLs or GCS paths.",
"corpus_name": corpus_name,
"invalid_paths": invalid_paths,
}
try:
# Get the corpus resource name
corpus_resource_name = get_corpus_resource_name(corpus_name)
# Set up chunking configuration
transformation_config = rag.TransformationConfig(
chunking_config=rag.ChunkingConfig(
chunk_size=DEFAULT_CHUNK_SIZE,
chunk_overlap=DEFAULT_CHUNK_OVERLAP,
),
)
# Import files to the corpus
import_result = rag.import_files(
corpus_resource_name,
validated_paths,
transformation_config=transformation_config,
max_embedding_requests_per_min=DEFAULT_EMBEDDING_REQUESTS_PER_MIN,
)
# Set this as the current corpus if not already set
if not tool_context.state.get("current_corpus"):
tool_context.state["current_corpus"] = corpus_name
# Build the success message
conversion_msg = ""
if conversions:
conversion_msg = " (Converted Google Docs URLs to Drive format)"
return {
"status": "success",
"message": f"Successfully added {import_result.imported_rag_files_count} file(s) to corpus '{corpus_name}'{conversion_msg}",
"corpus_name": corpus_name,
"files_added": import_result.imported_rag_files_count,
"paths": validated_paths,
"invalid_paths": invalid_paths,
"conversions": conversions,
}
except Exception as e:
return {
"status": "error",
"message": f"Error adding data to corpus: {str(e)}",
"corpus_name": corpus_name,
"paths": paths,
}