Spaces:
Sleeping
Sleeping
| """ | |
| Tool for adding new data sources to a Vertex AI RAG corpus. | |
| """ | |
| import re | |
| from typing import List | |
| from google.adk.tools.tool_context import ToolContext | |
| from vertexai import rag | |
| from ..config import ( | |
| DEFAULT_CHUNK_OVERLAP, | |
| DEFAULT_CHUNK_SIZE, | |
| DEFAULT_EMBEDDING_REQUESTS_PER_MIN, | |
| ) | |
| from .utils import check_corpus_exists, get_corpus_resource_name | |
| def add_data( | |
| corpus_name: str, | |
| paths: List[str], | |
| tool_context: ToolContext, | |
| ) -> dict: | |
| """ | |
| Add new data sources to a Vertex AI RAG corpus. | |
| Args: | |
| corpus_name (str): The name of the corpus to add data to. If empty, the current corpus will be used. | |
| paths (List[str]): List of URLs or GCS paths to add to the corpus. | |
| Supported formats: | |
| - Google Drive: "https://drive.google.com/file/d/{FILE_ID}/view" | |
| - Google Docs/Sheets/Slides: "https://docs.google.com/{type}/d/{FILE_ID}/..." | |
| - Google Cloud Storage: "gs://{BUCKET}/{PATH}" | |
| Example: ["https://drive.google.com/file/d/123", "gs://my_bucket/my_files_dir"] | |
| tool_context (ToolContext): The tool context | |
| Returns: | |
| dict: Information about the added data and status | |
| """ | |
| # Check if the corpus exists | |
| if not check_corpus_exists(corpus_name, tool_context): | |
| return { | |
| "status": "error", | |
| "message": f"Corpus '{corpus_name}' does not exist. Please create it first using the create_corpus tool.", | |
| "corpus_name": corpus_name, | |
| "paths": paths, | |
| } | |
| # Validate inputs | |
| if not paths or not all(isinstance(path, str) for path in paths): | |
| return { | |
| "status": "error", | |
| "message": "Invalid paths: Please provide a list of URLs or GCS paths", | |
| "corpus_name": corpus_name, | |
| "paths": paths, | |
| } | |
| # Pre-process paths to validate and convert Google Docs URLs to Drive format if needed | |
| validated_paths = [] | |
| invalid_paths = [] | |
| conversions = [] | |
| for path in paths: | |
| if not path or not isinstance(path, str): | |
| invalid_paths.append(f"{path} (Not a valid string)") | |
| continue | |
| # Check for Google Docs/Sheets/Slides URLs and convert them to Drive format | |
| docs_match = re.match( | |
| r"https:\/\/docs\.google\.com\/(?:document|spreadsheets|presentation)\/d\/([a-zA-Z0-9_-]+)(?:\/|$)", | |
| path, | |
| ) | |
| if docs_match: | |
| file_id = docs_match.group(1) | |
| drive_url = f"https://drive.google.com/file/d/{file_id}/view" | |
| validated_paths.append(drive_url) | |
| conversions.append(f"{path} → {drive_url}") | |
| continue | |
| # Check for valid Drive URL format | |
| drive_match = re.match( | |
| r"https:\/\/drive\.google\.com\/(?:file\/d\/|open\?id=)([a-zA-Z0-9_-]+)(?:\/|$)", | |
| path, | |
| ) | |
| if drive_match: | |
| # Normalize to the standard Drive URL format | |
| file_id = drive_match.group(1) | |
| drive_url = f"https://drive.google.com/file/d/{file_id}/view" | |
| validated_paths.append(drive_url) | |
| if drive_url != path: | |
| conversions.append(f"{path} → {drive_url}") | |
| continue | |
| # Check for GCS paths | |
| if path.startswith("gs://"): | |
| validated_paths.append(path) | |
| continue | |
| # If we're here, the path wasn't in a recognized format | |
| invalid_paths.append(f"{path} (Invalid format)") | |
| # Check if we have any valid paths after validation | |
| if not validated_paths: | |
| return { | |
| "status": "error", | |
| "message": "No valid paths provided. Please provide Google Drive URLs or GCS paths.", | |
| "corpus_name": corpus_name, | |
| "invalid_paths": invalid_paths, | |
| } | |
| try: | |
| # Get the corpus resource name | |
| corpus_resource_name = get_corpus_resource_name(corpus_name) | |
| # Set up chunking configuration | |
| transformation_config = rag.TransformationConfig( | |
| chunking_config=rag.ChunkingConfig( | |
| chunk_size=DEFAULT_CHUNK_SIZE, | |
| chunk_overlap=DEFAULT_CHUNK_OVERLAP, | |
| ), | |
| ) | |
| # Import files to the corpus | |
| import_result = rag.import_files( | |
| corpus_resource_name, | |
| validated_paths, | |
| transformation_config=transformation_config, | |
| max_embedding_requests_per_min=DEFAULT_EMBEDDING_REQUESTS_PER_MIN, | |
| ) | |
| # Set this as the current corpus if not already set | |
| if not tool_context.state.get("current_corpus"): | |
| tool_context.state["current_corpus"] = corpus_name | |
| # Build the success message | |
| conversion_msg = "" | |
| if conversions: | |
| conversion_msg = " (Converted Google Docs URLs to Drive format)" | |
| return { | |
| "status": "success", | |
| "message": f"Successfully added {import_result.imported_rag_files_count} file(s) to corpus '{corpus_name}'{conversion_msg}", | |
| "corpus_name": corpus_name, | |
| "files_added": import_result.imported_rag_files_count, | |
| "paths": validated_paths, | |
| "invalid_paths": invalid_paths, | |
| "conversions": conversions, | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "message": f"Error adding data to corpus: {str(e)}", | |
| "corpus_name": corpus_name, | |
| "paths": paths, | |
| } | |