Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Upload Documents to Google Gemini File Search Store | |
| This script uploads PDF documents to a Gemini File Search store for RAG. | |
| It processes documents from the reports directory and uploads them with metadata. | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import time | |
| from pathlib import Path | |
| from typing import List, Dict, Any, Optional | |
| from dotenv import load_dotenv | |
| try: | |
| from google import genai | |
| from google.genai import types | |
| GEMINI_AVAILABLE = True | |
| except ImportError: | |
| GEMINI_AVAILABLE = False | |
| print("β google-genai package not installed. Install with: pip install google-genai") | |
| # Load .env file | |
| load_dotenv() | |
| def extract_metadata_from_path(file_path: Path) -> Dict[str, Any]: | |
| """Extract metadata from file path structure.""" | |
| # Example: /path/to/reports/Annual Consolidated OAG audit reports 2018/Annual Consolidated OAG audit reports 2018.pdf | |
| parts = file_path.parts | |
| filename = file_path.stem # Without extension | |
| metadata = { | |
| "filename": file_path.name, | |
| "filepath": str(file_path), | |
| } | |
| # Extract year | |
| year_match = None | |
| for part in parts: | |
| if any(year in part for year in ['2018', '2019', '2020', '2021', '2022', '2023', '2024', '2025']): | |
| for year in ['2018', '2019', '2020', '2021', '2022', '2023', '2024', '2025']: | |
| if year in part: | |
| year_match = year | |
| break | |
| if year_match: | |
| break | |
| if year_match: | |
| metadata["year"] = year_match | |
| # Extract source/district | |
| filename_lower = filename.lower() | |
| if "consolidated" in filename_lower or "oag" in filename_lower: | |
| metadata["source"] = "Consolidated" | |
| elif "gulu" in filename_lower: | |
| metadata["source"] = "Gulu DLG" | |
| metadata["district"] = "Gulu" | |
| elif "kalangala" in filename_lower: | |
| metadata["source"] = "Kalangala DLG" | |
| metadata["district"] = "Kalangala" | |
| elif "kcca" in filename_lower: | |
| metadata["source"] = "KCCA" | |
| metadata["district"] = "Kampala" | |
| elif "maaif" in filename_lower: | |
| metadata["source"] = "MAAIF" | |
| elif "mwts" in filename_lower: | |
| metadata["source"] = "MWTS" | |
| return metadata | |
| def get_or_create_filestore(client: genai.Client, store_name: Optional[str] = None) -> str: | |
| """Get existing file search store or create a new one.""" | |
| if store_name: | |
| # Try to get existing store | |
| try: | |
| stores = client.file_search_stores.list() | |
| for store in stores: | |
| if store.name == store_name or store.display_name == store_name: | |
| print(f"β Using existing store: {store.display_name} ({store.name})") | |
| return store.name | |
| except Exception as e: | |
| print(f"β οΈ Could not list stores: {e}") | |
| # Create new store | |
| display_name = store_name or "Audit Reports" | |
| print(f"π Creating new file search store: '{display_name}'...") | |
| try: | |
| file_search_store = client.file_search_stores.create( | |
| config={'display_name': display_name} | |
| ) | |
| print(f"β Created store: {file_search_store.display_name} ({file_search_store.name})") | |
| return file_search_store.name | |
| except Exception as e: | |
| print(f"β Failed to create store: {e}") | |
| raise | |
| def format_metadata_for_gemini(metadata: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| """Format metadata dictionary for Gemini API customMetadata format. | |
| Based on Gemini API, customMetadata should use: | |
| - string_value for string fields | |
| - numeric_value for numeric fields | |
| """ | |
| custom_metadata = [] | |
| # Add year if available (as numeric_value) | |
| if metadata.get('year'): | |
| try: | |
| year_int = int(metadata['year']) | |
| custom_metadata.append({ | |
| 'key': 'year', | |
| 'numeric_value': year_int | |
| }) | |
| except (ValueError, TypeError): | |
| # Fallback to string if not numeric | |
| custom_metadata.append({ | |
| 'key': 'year', | |
| 'string_value': str(metadata['year']) | |
| }) | |
| # Add source if available (as string_value) | |
| if metadata.get('source'): | |
| custom_metadata.append({ | |
| 'key': 'source', | |
| 'string_value': str(metadata['source']) | |
| }) | |
| # Add district if available (as string_value) | |
| if metadata.get('district'): | |
| custom_metadata.append({ | |
| 'key': 'district', | |
| 'string_value': str(metadata['district']) | |
| }) | |
| # Add filename for reference (as string_value) | |
| if metadata.get('filename'): | |
| custom_metadata.append({ | |
| 'key': 'filename', | |
| 'string_value': str(metadata['filename']) | |
| }) | |
| return custom_metadata | |
| def check_file_exists(client: genai.Client, store_name: str, filename: str) -> bool: | |
| """Check if a file with the same name already exists in the store.""" | |
| try: | |
| # List files in the store | |
| store = client.file_search_stores.get(name=store_name) | |
| # Note: The API might not have a direct list method, so we'll catch errors | |
| return False # Assume not exists for now | |
| except Exception: | |
| return False # If we can't check, assume it doesn't exist | |
| def upload_file_to_store( | |
| client: genai.Client, | |
| file_path: Path, | |
| store_name: str, | |
| metadata: Dict[str, Any], | |
| skip_existing: bool = True | |
| ) -> Optional[bool]: | |
| """Upload a single file to the file search store with metadata.""" | |
| try: | |
| print(f" π€ Uploading: {file_path.name}...") | |
| # Format metadata for Gemini API | |
| custom_metadata = format_metadata_for_gemini(metadata) | |
| # Display metadata being uploaded | |
| if custom_metadata: | |
| metadata_parts = [] | |
| for m in custom_metadata: | |
| if 'numeric_value' in m: | |
| metadata_parts.append(f"{m['key']}={m['numeric_value']}") | |
| elif 'string_value' in m: | |
| metadata_parts.append(f"{m['key']}={m['string_value']}") | |
| if metadata_parts: | |
| print(f" π Metadata: {', '.join(metadata_parts)}") | |
| # Check if file already exists (if skip_existing is True) | |
| if skip_existing: | |
| # Note: We'll handle duplicates via error messages | |
| pass | |
| # Upload and import file with metadata | |
| # Note: Gemini API may not support customMetadata in upload_to_file_search_store | |
| # We'll try with metadata first, then fallback without it if it fails | |
| upload_params = { | |
| 'file': str(file_path), | |
| 'file_search_store_name': store_name, | |
| } | |
| # Build config | |
| config = { | |
| 'display_name': metadata.get('filename', file_path.name), | |
| } | |
| # Upload file (metadata not supported in upload config per API) | |
| # Note: Gemini File Search API doesn't support customMetadata in upload_to_file_search_store | |
| # Metadata would need to be added via a separate API call after upload, if supported | |
| # For now, we upload without metadata - the filename in display_name contains the info | |
| upload_params['config'] = config | |
| operation = client.file_search_stores.upload_to_file_search_store(**upload_params) | |
| # Wait for import to complete | |
| max_wait = 300 # 5 minutes max per file | |
| start_time = time.time() | |
| while not operation.done: | |
| if time.time() - start_time > max_wait: | |
| print(f" β οΈ Timeout waiting for upload to complete") | |
| return False | |
| time.sleep(2) | |
| try: | |
| operation = client.operations.get(operation) | |
| except Exception as op_error: | |
| # Check if it's a "terminated" error (file might already exist) | |
| error_str = str(op_error).lower() | |
| if 'terminated' in error_str or 'already' in error_str: | |
| print(f" β οΈ File may already exist or upload was interrupted") | |
| print(f" π‘ Skipping this file") | |
| return None # Return None to indicate "skipped" | |
| raise | |
| # Check for errors in the operation result | |
| if hasattr(operation, 'error') and operation.error: | |
| error_msg = str(operation.error) | |
| if 'terminated' in error_msg.lower() or 'already' in error_msg.lower(): | |
| print(f" β οΈ File may already exist in the store") | |
| print(f" π‘ Skipping this file") | |
| return None # Return None to indicate "skipped" vs False for "failed" | |
| print(f" β Upload failed: {operation.error}") | |
| return False | |
| print(f" β Uploaded successfully") | |
| return True | |
| except Exception as e: | |
| error_str = str(e).lower() | |
| # Handle specific error cases | |
| if 'terminated' in error_str or 'already' in error_str or '400' in error_str: | |
| print(f" β οΈ Upload error: File may already exist or upload was interrupted") | |
| print(f" π‘ Error details: {e}") | |
| print(f" π‘ Skipping this file") | |
| return None # Return None to indicate "skipped" | |
| print(f" β Error uploading {file_path.name}: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return False | |
| def find_report_files(reports_dir: Path) -> List[Path]: | |
| """Find all PDF report files in the reports directory.""" | |
| pdf_files = [] | |
| if not reports_dir.exists(): | |
| print(f"β Reports directory not found: {reports_dir}") | |
| return pdf_files | |
| # Find all PDF files | |
| for pdf_file in reports_dir.rglob("*.pdf"): | |
| pdf_files.append(pdf_file) | |
| return sorted(pdf_files) | |
| def main(): | |
| """Main function to upload documents to Gemini File Search store.""" | |
| print("=" * 60) | |
| print("Gemini File Search Store Upload Tool") | |
| print("=" * 60) | |
| if not GEMINI_AVAILABLE: | |
| print("\nβ Please install google-genai package:") | |
| print(" pip install google-genai") | |
| return 1 | |
| # Get API key | |
| api_key = os.getenv("GEMINI_API_KEY") | |
| if not api_key: | |
| print("\nβ GEMINI_API_KEY not found in environment variables") | |
| print(" Please add GEMINI_API_KEY to your .env file") | |
| return 1 | |
| # Get store name (optional) | |
| store_name = os.getenv("GEMINI_FILESTORE_NAME") | |
| # Get reports directory - try multiple possible locations | |
| reports_dir_str = os.getenv("REPORTS_DIR") | |
| if not reports_dir_str: | |
| # Try common locations | |
| possible_paths = [ | |
| "/Users/ayeroyan/workspace/chatbot-rag/reports", | |
| Path(__file__).parent / "reports", | |
| Path.cwd() / "reports", | |
| ] | |
| for path in possible_paths: | |
| if Path(path).exists(): | |
| reports_dir_str = str(path) | |
| break | |
| if not reports_dir_str: | |
| reports_dir_str = "/Users/ayeroyan/workspace/chatbot-rag/reports" # Default fallback | |
| reports_dir = Path(reports_dir_str) | |
| # Initialize Gemini client | |
| print(f"\nπ Connecting to Gemini API...") | |
| try: | |
| client = genai.Client(api_key=api_key) | |
| print(f" β Connected") | |
| except Exception as e: | |
| print(f" β Failed to connect: {e}") | |
| return 1 | |
| # Get or create file search store | |
| print(f"\nπ¦ Setting up file search store...") | |
| try: | |
| store_name = get_or_create_filestore(client, store_name) | |
| except Exception as e: | |
| print(f" β Failed to setup store: {e}") | |
| return 1 | |
| # Find all PDF files | |
| print(f"\nπ Scanning for PDF files in: {reports_dir}") | |
| pdf_files = find_report_files(reports_dir) | |
| if not pdf_files: | |
| print(f" β No PDF files found in {reports_dir}") | |
| return 1 | |
| print(f" β Found {len(pdf_files)} PDF files") | |
| # Upload files | |
| print(f"\nπ€ Uploading files to store...") | |
| print(f" Store: {store_name}") | |
| print(f" Files: {len(pdf_files)}") | |
| uploaded = 0 | |
| failed = 0 | |
| skipped = 0 | |
| for i, pdf_file in enumerate(pdf_files, 1): | |
| print(f"\n[{i}/{len(pdf_files)}] Processing: {pdf_file.name}") | |
| # Extract metadata | |
| metadata = extract_metadata_from_path(pdf_file) | |
| # Display extracted metadata | |
| metadata_info = [] | |
| if metadata.get('year'): | |
| metadata_info.append(f"Year: {metadata['year']}") | |
| if metadata.get('source'): | |
| metadata_info.append(f"Source: {metadata['source']}") | |
| if metadata.get('district'): | |
| metadata_info.append(f"District: {metadata['district']}") | |
| if metadata_info: | |
| print(f" π Extracted metadata: {', '.join(metadata_info)}") | |
| # Upload file with metadata | |
| result = upload_file_to_store(client, pdf_file, store_name, metadata, skip_existing=True) | |
| if result is True: | |
| uploaded += 1 | |
| elif result is None: # Skipped (already exists) | |
| skipped += 1 | |
| else: # Failed | |
| failed += 1 | |
| # Small delay between uploads to avoid rate limits | |
| if i < len(pdf_files): | |
| time.sleep(1) | |
| # Summary | |
| print(f"\n" + "=" * 60) | |
| print(f"Upload Summary") | |
| print(f"=" * 60) | |
| print(f" β Uploaded: {uploaded}") | |
| if skipped > 0: | |
| print(f" βοΈ Skipped (already exists): {skipped}") | |
| print(f" β Failed: {failed}") | |
| print(f" π¦ Store: {store_name}") | |
| if uploaded > 0: | |
| print(f"\nβ Successfully uploaded {uploaded} files to Gemini File Search store!") | |
| print(f" You can now use this store in the beta version of the chatbot.") | |
| return 0 if failed == 0 else 1 | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |