audit_assistant / upload_to_gemini_filestore.py
Ara Yeroyan
refactor + add gemini
72eb0bf
raw
history blame
14.2 kB
#!/usr/bin/env python3
"""
Upload Documents to Google Gemini File Search Store
This script uploads PDF documents to a Gemini File Search store for RAG.
It processes documents from the reports directory and uploads them with metadata.
"""
import os
import sys
import json
import time
from pathlib import Path
from typing import List, Dict, Any, Optional
from dotenv import load_dotenv
try:
from google import genai
from google.genai import types
GEMINI_AVAILABLE = True
except ImportError:
GEMINI_AVAILABLE = False
print("❌ google-genai package not installed. Install with: pip install google-genai")
# Load .env file
load_dotenv()
def extract_metadata_from_path(file_path: Path) -> Dict[str, Any]:
"""Extract metadata from file path structure."""
# Example: /path/to/reports/Annual Consolidated OAG audit reports 2018/Annual Consolidated OAG audit reports 2018.pdf
parts = file_path.parts
filename = file_path.stem # Without extension
metadata = {
"filename": file_path.name,
"filepath": str(file_path),
}
# Extract year
year_match = None
for part in parts:
if any(year in part for year in ['2018', '2019', '2020', '2021', '2022', '2023', '2024', '2025']):
for year in ['2018', '2019', '2020', '2021', '2022', '2023', '2024', '2025']:
if year in part:
year_match = year
break
if year_match:
break
if year_match:
metadata["year"] = year_match
# Extract source/district
filename_lower = filename.lower()
if "consolidated" in filename_lower or "oag" in filename_lower:
metadata["source"] = "Consolidated"
elif "gulu" in filename_lower:
metadata["source"] = "Gulu DLG"
metadata["district"] = "Gulu"
elif "kalangala" in filename_lower:
metadata["source"] = "Kalangala DLG"
metadata["district"] = "Kalangala"
elif "kcca" in filename_lower:
metadata["source"] = "KCCA"
metadata["district"] = "Kampala"
elif "maaif" in filename_lower:
metadata["source"] = "MAAIF"
elif "mwts" in filename_lower:
metadata["source"] = "MWTS"
return metadata
def get_or_create_filestore(client: genai.Client, store_name: Optional[str] = None) -> str:
"""Get existing file search store or create a new one."""
if store_name:
# Try to get existing store
try:
stores = client.file_search_stores.list()
for store in stores:
if store.name == store_name or store.display_name == store_name:
print(f"βœ… Using existing store: {store.display_name} ({store.name})")
return store.name
except Exception as e:
print(f"⚠️ Could not list stores: {e}")
# Create new store
display_name = store_name or "Audit Reports"
print(f"πŸ“ Creating new file search store: '{display_name}'...")
try:
file_search_store = client.file_search_stores.create(
config={'display_name': display_name}
)
print(f"βœ… Created store: {file_search_store.display_name} ({file_search_store.name})")
return file_search_store.name
except Exception as e:
print(f"❌ Failed to create store: {e}")
raise
def format_metadata_for_gemini(metadata: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Format metadata dictionary for Gemini API customMetadata format.
Based on Gemini API, customMetadata should use:
- string_value for string fields
- numeric_value for numeric fields
"""
custom_metadata = []
# Add year if available (as numeric_value)
if metadata.get('year'):
try:
year_int = int(metadata['year'])
custom_metadata.append({
'key': 'year',
'numeric_value': year_int
})
except (ValueError, TypeError):
# Fallback to string if not numeric
custom_metadata.append({
'key': 'year',
'string_value': str(metadata['year'])
})
# Add source if available (as string_value)
if metadata.get('source'):
custom_metadata.append({
'key': 'source',
'string_value': str(metadata['source'])
})
# Add district if available (as string_value)
if metadata.get('district'):
custom_metadata.append({
'key': 'district',
'string_value': str(metadata['district'])
})
# Add filename for reference (as string_value)
if metadata.get('filename'):
custom_metadata.append({
'key': 'filename',
'string_value': str(metadata['filename'])
})
return custom_metadata
def check_file_exists(client: genai.Client, store_name: str, filename: str) -> bool:
"""Check if a file with the same name already exists in the store."""
try:
# List files in the store
store = client.file_search_stores.get(name=store_name)
# Note: The API might not have a direct list method, so we'll catch errors
return False # Assume not exists for now
except Exception:
return False # If we can't check, assume it doesn't exist
def upload_file_to_store(
client: genai.Client,
file_path: Path,
store_name: str,
metadata: Dict[str, Any],
skip_existing: bool = True
) -> Optional[bool]:
"""Upload a single file to the file search store with metadata."""
try:
print(f" πŸ“€ Uploading: {file_path.name}...")
# Format metadata for Gemini API
custom_metadata = format_metadata_for_gemini(metadata)
# Display metadata being uploaded
if custom_metadata:
metadata_parts = []
for m in custom_metadata:
if 'numeric_value' in m:
metadata_parts.append(f"{m['key']}={m['numeric_value']}")
elif 'string_value' in m:
metadata_parts.append(f"{m['key']}={m['string_value']}")
if metadata_parts:
print(f" πŸ“‹ Metadata: {', '.join(metadata_parts)}")
# Check if file already exists (if skip_existing is True)
if skip_existing:
# Note: We'll handle duplicates via error messages
pass
# Upload and import file with metadata
# Note: Gemini API may not support customMetadata in upload_to_file_search_store
# We'll try with metadata first, then fallback without it if it fails
upload_params = {
'file': str(file_path),
'file_search_store_name': store_name,
}
# Build config
config = {
'display_name': metadata.get('filename', file_path.name),
}
# Upload file (metadata not supported in upload config per API)
# Note: Gemini File Search API doesn't support customMetadata in upload_to_file_search_store
# Metadata would need to be added via a separate API call after upload, if supported
# For now, we upload without metadata - the filename in display_name contains the info
upload_params['config'] = config
operation = client.file_search_stores.upload_to_file_search_store(**upload_params)
# Wait for import to complete
max_wait = 300 # 5 minutes max per file
start_time = time.time()
while not operation.done:
if time.time() - start_time > max_wait:
print(f" ⚠️ Timeout waiting for upload to complete")
return False
time.sleep(2)
try:
operation = client.operations.get(operation)
except Exception as op_error:
# Check if it's a "terminated" error (file might already exist)
error_str = str(op_error).lower()
if 'terminated' in error_str or 'already' in error_str:
print(f" ⚠️ File may already exist or upload was interrupted")
print(f" πŸ’‘ Skipping this file")
return None # Return None to indicate "skipped"
raise
# Check for errors in the operation result
if hasattr(operation, 'error') and operation.error:
error_msg = str(operation.error)
if 'terminated' in error_msg.lower() or 'already' in error_msg.lower():
print(f" ⚠️ File may already exist in the store")
print(f" πŸ’‘ Skipping this file")
return None # Return None to indicate "skipped" vs False for "failed"
print(f" ❌ Upload failed: {operation.error}")
return False
print(f" βœ… Uploaded successfully")
return True
except Exception as e:
error_str = str(e).lower()
# Handle specific error cases
if 'terminated' in error_str or 'already' in error_str or '400' in error_str:
print(f" ⚠️ Upload error: File may already exist or upload was interrupted")
print(f" πŸ’‘ Error details: {e}")
print(f" πŸ’‘ Skipping this file")
return None # Return None to indicate "skipped"
print(f" ❌ Error uploading {file_path.name}: {e}")
import traceback
traceback.print_exc()
return False
def find_report_files(reports_dir: Path) -> List[Path]:
"""Find all PDF report files in the reports directory."""
pdf_files = []
if not reports_dir.exists():
print(f"❌ Reports directory not found: {reports_dir}")
return pdf_files
# Find all PDF files
for pdf_file in reports_dir.rglob("*.pdf"):
pdf_files.append(pdf_file)
return sorted(pdf_files)
def main():
"""Main function to upload documents to Gemini File Search store."""
print("=" * 60)
print("Gemini File Search Store Upload Tool")
print("=" * 60)
if not GEMINI_AVAILABLE:
print("\n❌ Please install google-genai package:")
print(" pip install google-genai")
return 1
# Get API key
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
print("\n❌ GEMINI_API_KEY not found in environment variables")
print(" Please add GEMINI_API_KEY to your .env file")
return 1
# Get store name (optional)
store_name = os.getenv("GEMINI_FILESTORE_NAME")
# Get reports directory - try multiple possible locations
reports_dir_str = os.getenv("REPORTS_DIR")
if not reports_dir_str:
# Try common locations
possible_paths = [
"/Users/ayeroyan/workspace/chatbot-rag/reports",
Path(__file__).parent / "reports",
Path.cwd() / "reports",
]
for path in possible_paths:
if Path(path).exists():
reports_dir_str = str(path)
break
if not reports_dir_str:
reports_dir_str = "/Users/ayeroyan/workspace/chatbot-rag/reports" # Default fallback
reports_dir = Path(reports_dir_str)
# Initialize Gemini client
print(f"\nπŸ”Œ Connecting to Gemini API...")
try:
client = genai.Client(api_key=api_key)
print(f" βœ… Connected")
except Exception as e:
print(f" ❌ Failed to connect: {e}")
return 1
# Get or create file search store
print(f"\nπŸ“¦ Setting up file search store...")
try:
store_name = get_or_create_filestore(client, store_name)
except Exception as e:
print(f" ❌ Failed to setup store: {e}")
return 1
# Find all PDF files
print(f"\nπŸ” Scanning for PDF files in: {reports_dir}")
pdf_files = find_report_files(reports_dir)
if not pdf_files:
print(f" ❌ No PDF files found in {reports_dir}")
return 1
print(f" βœ… Found {len(pdf_files)} PDF files")
# Upload files
print(f"\nπŸ“€ Uploading files to store...")
print(f" Store: {store_name}")
print(f" Files: {len(pdf_files)}")
uploaded = 0
failed = 0
skipped = 0
for i, pdf_file in enumerate(pdf_files, 1):
print(f"\n[{i}/{len(pdf_files)}] Processing: {pdf_file.name}")
# Extract metadata
metadata = extract_metadata_from_path(pdf_file)
# Display extracted metadata
metadata_info = []
if metadata.get('year'):
metadata_info.append(f"Year: {metadata['year']}")
if metadata.get('source'):
metadata_info.append(f"Source: {metadata['source']}")
if metadata.get('district'):
metadata_info.append(f"District: {metadata['district']}")
if metadata_info:
print(f" πŸ“Š Extracted metadata: {', '.join(metadata_info)}")
# Upload file with metadata
result = upload_file_to_store(client, pdf_file, store_name, metadata, skip_existing=True)
if result is True:
uploaded += 1
elif result is None: # Skipped (already exists)
skipped += 1
else: # Failed
failed += 1
# Small delay between uploads to avoid rate limits
if i < len(pdf_files):
time.sleep(1)
# Summary
print(f"\n" + "=" * 60)
print(f"Upload Summary")
print(f"=" * 60)
print(f" βœ… Uploaded: {uploaded}")
if skipped > 0:
print(f" ⏭️ Skipped (already exists): {skipped}")
print(f" ❌ Failed: {failed}")
print(f" πŸ“¦ Store: {store_name}")
if uploaded > 0:
print(f"\nβœ… Successfully uploaded {uploaded} files to Gemini File Search store!")
print(f" You can now use this store in the beta version of the chatbot.")
return 0 if failed == 0 else 1
if __name__ == "__main__":
sys.exit(main())