#!/usr/bin/env python3 """ Upload Documents to Google Gemini File Search Store This script uploads PDF documents to a Gemini File Search store for RAG. It processes documents from the reports directory and uploads them with metadata. """ import os import sys import json import time from pathlib import Path from typing import List, Dict, Any, Optional from dotenv import load_dotenv try: from google import genai from google.genai import types GEMINI_AVAILABLE = True except ImportError: GEMINI_AVAILABLE = False print("āŒ google-genai package not installed. Install with: pip install google-genai") # Load .env file load_dotenv() def extract_metadata_from_path(file_path: Path) -> Dict[str, Any]: """Extract metadata from file path structure.""" # Example: /path/to/reports/Annual Consolidated OAG audit reports 2018/Annual Consolidated OAG audit reports 2018.pdf parts = file_path.parts filename = file_path.stem # Without extension metadata = { "filename": file_path.name, "filepath": str(file_path), } # Extract year year_match = None for part in parts: if any(year in part for year in ['2018', '2019', '2020', '2021', '2022', '2023', '2024', '2025']): for year in ['2018', '2019', '2020', '2021', '2022', '2023', '2024', '2025']: if year in part: year_match = year break if year_match: break if year_match: metadata["year"] = year_match # Extract source/district filename_lower = filename.lower() if "consolidated" in filename_lower or "oag" in filename_lower: metadata["source"] = "Consolidated" elif "gulu" in filename_lower: metadata["source"] = "Gulu DLG" metadata["district"] = "Gulu" elif "kalangala" in filename_lower: metadata["source"] = "Kalangala DLG" metadata["district"] = "Kalangala" elif "kcca" in filename_lower: metadata["source"] = "KCCA" metadata["district"] = "Kampala" elif "maaif" in filename_lower: metadata["source"] = "MAAIF" elif "mwts" in filename_lower: metadata["source"] = "MWTS" return metadata def get_or_create_filestore(client: genai.Client, store_name: Optional[str] = None) -> str: """Get existing file search store or create a new one.""" if store_name: # Try to get existing store try: stores = client.file_search_stores.list() for store in stores: if store.name == store_name or store.display_name == store_name: print(f"āœ… Using existing store: {store.display_name} ({store.name})") return store.name except Exception as e: print(f"āš ļø Could not list stores: {e}") # Create new store display_name = store_name or "Audit Reports" print(f"šŸ“ Creating new file search store: '{display_name}'...") try: file_search_store = client.file_search_stores.create( config={'display_name': display_name} ) print(f"āœ… Created store: {file_search_store.display_name} ({file_search_store.name})") return file_search_store.name except Exception as e: print(f"āŒ Failed to create store: {e}") raise def format_metadata_for_gemini(metadata: Dict[str, Any]) -> List[Dict[str, Any]]: """Format metadata dictionary for Gemini API customMetadata format. Based on Gemini API, customMetadata should use: - string_value for string fields - numeric_value for numeric fields """ custom_metadata = [] # Add year if available (as numeric_value) if metadata.get('year'): try: year_int = int(metadata['year']) custom_metadata.append({ 'key': 'year', 'numeric_value': year_int }) except (ValueError, TypeError): # Fallback to string if not numeric custom_metadata.append({ 'key': 'year', 'string_value': str(metadata['year']) }) # Add source if available (as string_value) if metadata.get('source'): custom_metadata.append({ 'key': 'source', 'string_value': str(metadata['source']) }) # Add district if available (as string_value) if metadata.get('district'): custom_metadata.append({ 'key': 'district', 'string_value': str(metadata['district']) }) # Add filename for reference (as string_value) if metadata.get('filename'): custom_metadata.append({ 'key': 'filename', 'string_value': str(metadata['filename']) }) return custom_metadata def check_file_exists(client: genai.Client, store_name: str, filename: str) -> bool: """Check if a file with the same name already exists in the store.""" try: # List files in the store store = client.file_search_stores.get(name=store_name) # Note: The API might not have a direct list method, so we'll catch errors return False # Assume not exists for now except Exception: return False # If we can't check, assume it doesn't exist def upload_file_to_store( client: genai.Client, file_path: Path, store_name: str, metadata: Dict[str, Any], skip_existing: bool = True ) -> Optional[bool]: """Upload a single file to the file search store with metadata.""" try: print(f" šŸ“¤ Uploading: {file_path.name}...") # Format metadata for Gemini API custom_metadata = format_metadata_for_gemini(metadata) # Display metadata being uploaded if custom_metadata: metadata_parts = [] for m in custom_metadata: if 'numeric_value' in m: metadata_parts.append(f"{m['key']}={m['numeric_value']}") elif 'string_value' in m: metadata_parts.append(f"{m['key']}={m['string_value']}") if metadata_parts: print(f" šŸ“‹ Metadata: {', '.join(metadata_parts)}") # Check if file already exists (if skip_existing is True) if skip_existing: # Note: We'll handle duplicates via error messages pass # Upload and import file with metadata # Note: Gemini API may not support customMetadata in upload_to_file_search_store # We'll try with metadata first, then fallback without it if it fails upload_params = { 'file': str(file_path), 'file_search_store_name': store_name, } # Build config config = { 'display_name': metadata.get('filename', file_path.name), } # Upload file (metadata not supported in upload config per API) # Note: Gemini File Search API doesn't support customMetadata in upload_to_file_search_store # Metadata would need to be added via a separate API call after upload, if supported # For now, we upload without metadata - the filename in display_name contains the info upload_params['config'] = config operation = client.file_search_stores.upload_to_file_search_store(**upload_params) # Wait for import to complete max_wait = 300 # 5 minutes max per file start_time = time.time() while not operation.done: if time.time() - start_time > max_wait: print(f" āš ļø Timeout waiting for upload to complete") return False time.sleep(2) try: operation = client.operations.get(operation) except Exception as op_error: # Check if it's a "terminated" error (file might already exist) error_str = str(op_error).lower() if 'terminated' in error_str or 'already' in error_str: print(f" āš ļø File may already exist or upload was interrupted") print(f" šŸ’” Skipping this file") return None # Return None to indicate "skipped" raise # Check for errors in the operation result if hasattr(operation, 'error') and operation.error: error_msg = str(operation.error) if 'terminated' in error_msg.lower() or 'already' in error_msg.lower(): print(f" āš ļø File may already exist in the store") print(f" šŸ’” Skipping this file") return None # Return None to indicate "skipped" vs False for "failed" print(f" āŒ Upload failed: {operation.error}") return False print(f" āœ… Uploaded successfully") return True except Exception as e: error_str = str(e).lower() # Handle specific error cases if 'terminated' in error_str or 'already' in error_str or '400' in error_str: print(f" āš ļø Upload error: File may already exist or upload was interrupted") print(f" šŸ’” Error details: {e}") print(f" šŸ’” Skipping this file") return None # Return None to indicate "skipped" print(f" āŒ Error uploading {file_path.name}: {e}") import traceback traceback.print_exc() return False def find_report_files(reports_dir: Path) -> List[Path]: """Find all PDF report files in the reports directory.""" pdf_files = [] if not reports_dir.exists(): print(f"āŒ Reports directory not found: {reports_dir}") return pdf_files # Find all PDF files for pdf_file in reports_dir.rglob("*.pdf"): pdf_files.append(pdf_file) return sorted(pdf_files) def main(): """Main function to upload documents to Gemini File Search store.""" print("=" * 60) print("Gemini File Search Store Upload Tool") print("=" * 60) if not GEMINI_AVAILABLE: print("\nāŒ Please install google-genai package:") print(" pip install google-genai") return 1 # Get API key api_key = os.getenv("GEMINI_API_KEY") if not api_key: print("\nāŒ GEMINI_API_KEY not found in environment variables") print(" Please add GEMINI_API_KEY to your .env file") return 1 # Get store name (optional) store_name = os.getenv("GEMINI_FILESTORE_NAME") # Get reports directory - try multiple possible locations reports_dir_str = os.getenv("REPORTS_DIR") if not reports_dir_str: # Try common locations possible_paths = [ "/Users/ayeroyan/workspace/chatbot-rag/reports", Path(__file__).parent / "reports", Path.cwd() / "reports", ] for path in possible_paths: if Path(path).exists(): reports_dir_str = str(path) break if not reports_dir_str: reports_dir_str = "/Users/ayeroyan/workspace/chatbot-rag/reports" # Default fallback reports_dir = Path(reports_dir_str) # Initialize Gemini client print(f"\nšŸ”Œ Connecting to Gemini API...") try: client = genai.Client(api_key=api_key) print(f" āœ… Connected") except Exception as e: print(f" āŒ Failed to connect: {e}") return 1 # Get or create file search store print(f"\nšŸ“¦ Setting up file search store...") try: store_name = get_or_create_filestore(client, store_name) except Exception as e: print(f" āŒ Failed to setup store: {e}") return 1 # Find all PDF files print(f"\nšŸ” Scanning for PDF files in: {reports_dir}") pdf_files = find_report_files(reports_dir) if not pdf_files: print(f" āŒ No PDF files found in {reports_dir}") return 1 print(f" āœ… Found {len(pdf_files)} PDF files") # Upload files print(f"\nšŸ“¤ Uploading files to store...") print(f" Store: {store_name}") print(f" Files: {len(pdf_files)}") uploaded = 0 failed = 0 skipped = 0 for i, pdf_file in enumerate(pdf_files, 1): print(f"\n[{i}/{len(pdf_files)}] Processing: {pdf_file.name}") # Extract metadata metadata = extract_metadata_from_path(pdf_file) # Display extracted metadata metadata_info = [] if metadata.get('year'): metadata_info.append(f"Year: {metadata['year']}") if metadata.get('source'): metadata_info.append(f"Source: {metadata['source']}") if metadata.get('district'): metadata_info.append(f"District: {metadata['district']}") if metadata_info: print(f" šŸ“Š Extracted metadata: {', '.join(metadata_info)}") # Upload file with metadata result = upload_file_to_store(client, pdf_file, store_name, metadata, skip_existing=True) if result is True: uploaded += 1 elif result is None: # Skipped (already exists) skipped += 1 else: # Failed failed += 1 # Small delay between uploads to avoid rate limits if i < len(pdf_files): time.sleep(1) # Summary print(f"\n" + "=" * 60) print(f"Upload Summary") print(f"=" * 60) print(f" āœ… Uploaded: {uploaded}") if skipped > 0: print(f" ā­ļø Skipped (already exists): {skipped}") print(f" āŒ Failed: {failed}") print(f" šŸ“¦ Store: {store_name}") if uploaded > 0: print(f"\nāœ… Successfully uploaded {uploaded} files to Gemini File Search store!") print(f" You can now use this store in the beta version of the chatbot.") return 0 if failed == 0 else 1 if __name__ == "__main__": sys.exit(main())