Spaces:
Sleeping
Sleeping
File size: 5,564 Bytes
d2d5a16 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
"""
Tool for adding new data sources to a Vertex AI RAG corpus.
"""
import re
from typing import List
from google.adk.tools.tool_context import ToolContext
from vertexai import rag
from ..config import (
DEFAULT_CHUNK_OVERLAP,
DEFAULT_CHUNK_SIZE,
DEFAULT_EMBEDDING_REQUESTS_PER_MIN,
)
from .utils import check_corpus_exists, get_corpus_resource_name
def add_data(
corpus_name: str,
paths: List[str],
tool_context: ToolContext,
) -> dict:
"""
Add new data sources to a Vertex AI RAG corpus.
Args:
corpus_name (str): The name of the corpus to add data to. If empty, the current corpus will be used.
paths (List[str]): List of URLs or GCS paths to add to the corpus.
Supported formats:
- Google Drive: "https://drive.google.com/file/d/{FILE_ID}/view"
- Google Docs/Sheets/Slides: "https://docs.google.com/{type}/d/{FILE_ID}/..."
- Google Cloud Storage: "gs://{BUCKET}/{PATH}"
Example: ["https://drive.google.com/file/d/123", "gs://my_bucket/my_files_dir"]
tool_context (ToolContext): The tool context
Returns:
dict: Information about the added data and status
"""
# Check if the corpus exists
if not check_corpus_exists(corpus_name, tool_context):
return {
"status": "error",
"message": f"Corpus '{corpus_name}' does not exist. Please create it first using the create_corpus tool.",
"corpus_name": corpus_name,
"paths": paths,
}
# Validate inputs
if not paths or not all(isinstance(path, str) for path in paths):
return {
"status": "error",
"message": "Invalid paths: Please provide a list of URLs or GCS paths",
"corpus_name": corpus_name,
"paths": paths,
}
# Pre-process paths to validate and convert Google Docs URLs to Drive format if needed
validated_paths = []
invalid_paths = []
conversions = []
for path in paths:
if not path or not isinstance(path, str):
invalid_paths.append(f"{path} (Not a valid string)")
continue
# Check for Google Docs/Sheets/Slides URLs and convert them to Drive format
docs_match = re.match(
r"https:\/\/docs\.google\.com\/(?:document|spreadsheets|presentation)\/d\/([a-zA-Z0-9_-]+)(?:\/|$)",
path,
)
if docs_match:
file_id = docs_match.group(1)
drive_url = f"https://drive.google.com/file/d/{file_id}/view"
validated_paths.append(drive_url)
conversions.append(f"{path} → {drive_url}")
continue
# Check for valid Drive URL format
drive_match = re.match(
r"https:\/\/drive\.google\.com\/(?:file\/d\/|open\?id=)([a-zA-Z0-9_-]+)(?:\/|$)",
path,
)
if drive_match:
# Normalize to the standard Drive URL format
file_id = drive_match.group(1)
drive_url = f"https://drive.google.com/file/d/{file_id}/view"
validated_paths.append(drive_url)
if drive_url != path:
conversions.append(f"{path} → {drive_url}")
continue
# Check for GCS paths
if path.startswith("gs://"):
validated_paths.append(path)
continue
# If we're here, the path wasn't in a recognized format
invalid_paths.append(f"{path} (Invalid format)")
# Check if we have any valid paths after validation
if not validated_paths:
return {
"status": "error",
"message": "No valid paths provided. Please provide Google Drive URLs or GCS paths.",
"corpus_name": corpus_name,
"invalid_paths": invalid_paths,
}
try:
# Get the corpus resource name
corpus_resource_name = get_corpus_resource_name(corpus_name)
# Set up chunking configuration
transformation_config = rag.TransformationConfig(
chunking_config=rag.ChunkingConfig(
chunk_size=DEFAULT_CHUNK_SIZE,
chunk_overlap=DEFAULT_CHUNK_OVERLAP,
),
)
# Import files to the corpus
import_result = rag.import_files(
corpus_resource_name,
validated_paths,
transformation_config=transformation_config,
max_embedding_requests_per_min=DEFAULT_EMBEDDING_REQUESTS_PER_MIN,
)
# Set this as the current corpus if not already set
if not tool_context.state.get("current_corpus"):
tool_context.state["current_corpus"] = corpus_name
# Build the success message
conversion_msg = ""
if conversions:
conversion_msg = " (Converted Google Docs URLs to Drive format)"
return {
"status": "success",
"message": f"Successfully added {import_result.imported_rag_files_count} file(s) to corpus '{corpus_name}'{conversion_msg}",
"corpus_name": corpus_name,
"files_added": import_result.imported_rag_files_count,
"paths": validated_paths,
"invalid_paths": invalid_paths,
"conversions": conversions,
}
except Exception as e:
return {
"status": "error",
"message": f"Error adding data to corpus: {str(e)}",
"corpus_name": corpus_name,
"paths": paths,
}
|