notebooklm-fast / migrate_metadata.py
jashdoshi77
feat: Add AI-powered query understanding with DeepSeek parsing
64deb3c
"""
Migration Script - Extract Metadata for Existing Documents
Run this script once to populate document_metadata and document_summaries
collections for all existing documents.
Usage:
python migrate_metadata.py --user-id <user_id> [--bucket-id <bucket_id>]
"""
import sys
import time
import argparse
from typing import List, Dict
# Add parent directory to path for imports
sys.path.insert(0, '.')
from services.chroma_service import chroma_service
from services.metadata_extractor import metadata_extractor
from services.rag_service import rag_service
def reconstruct_document_content(doc_id: str) -> str:
"""
Reconstruct document content from stored chunks.
"""
chunks = chroma_service.get_document_chunks(doc_id)
if not chunks:
return ""
# Sort by chunk index and combine
chunks.sort(key=lambda x: x.get('chunk_index', 0))
content = ' '.join(chunk.get('text', '') for chunk in chunks)
return content
def migrate_single_document(doc: Dict, user_id: str, skip_existing: bool = True) -> Dict:
"""
Migrate a single document: extract metadata and create summary.
Args:
doc: Document dictionary with doc_id, filename, bucket_id
user_id: User ID
skip_existing: If True, skip documents that already have metadata and summary
"""
doc_id = doc['doc_id']
filename = doc.get('filename', '')
bucket_id = doc.get('bucket_id', '')
result = {
'doc_id': doc_id,
'filename': filename,
'status': 'pending',
'metadata_extracted': False,
'summary_created': False,
'error': None
}
try:
# Check if already migrated (fast skip)
if skip_existing:
existing_metadata = chroma_service.get_document_metadata(doc_id, user_id)
summary_id = f"{doc_id}_summary"
existing_summary = chroma_service.summary_chunks_collection.get(ids=[summary_id])
if existing_metadata and existing_summary['ids']:
result['status'] = 'skipped'
result['metadata_extracted'] = True
result['summary_created'] = True
print(f" [SKIP] Already migrated: {filename}")
return result
# Step 1: Reconstruct content from chunks
print(f" Reconstructing content for {filename}...")
content = reconstruct_document_content(doc_id)
if not content:
result['status'] = 'skipped'
result['error'] = 'No content found'
return result
# Step 2: Extract metadata (only if not exists or skip_existing is False)
needs_metadata = not skip_existing or not chroma_service.get_document_metadata(doc_id, user_id)
if needs_metadata:
print(f" Extracting metadata...")
metadata = metadata_extractor.extract_metadata(content, filename)
# Store metadata
chroma_service.store_document_metadata(
doc_id=doc_id,
user_id=user_id,
bucket_id=bucket_id,
metadata=metadata
)
result['metadata_extracted'] = True
else:
result['metadata_extracted'] = True # Already exists
# Step 3: Generate and store summary (only if not exists or skip_existing is False)
summary_id = f"{doc_id}_summary"
existing_summary = chroma_service.summary_chunks_collection.get(ids=[summary_id])
needs_summary = not skip_existing or not existing_summary['ids']
if needs_summary:
print(f" Generating summary...")
try:
summary_result = rag_service.generate_summary(content, filename)
# Extract the summary string from the result dict
if isinstance(summary_result, dict):
summary = summary_result.get('summary', f'Document: {filename}')
else:
summary = str(summary_result) if summary_result else f'Document: {filename}'
if summary:
chroma_service.store_summary_chunk(
doc_id=doc_id,
user_id=user_id,
summary=summary,
bucket_id=bucket_id,
filename=filename
)
result['summary_created'] = True
except Exception as e:
print(f" Warning: Summary generation failed: {e}")
else:
result['summary_created'] = True # Already exists
result['status'] = 'success'
print(f" [OK] Completed: {filename}")
except Exception as e:
result['status'] = 'error'
# Sanitize error message for console encoding
error_msg = str(e).encode('ascii', 'replace').decode('ascii')
result['error'] = error_msg
print(f" [ERROR] {filename} - {error_msg}")
return result
def migrate_all_documents(user_id: str, bucket_id: str = None,
batch_size: int = 10, delay: float = 0.5, skip_existing: bool = True):
"""
Migrate all documents for a user/bucket.
Args:
user_id: User ID to migrate documents for
bucket_id: Optional bucket ID to filter by
batch_size: Number of documents to process before pausing
delay: Seconds to wait between documents (rate limiting)
skip_existing: If True, skip documents that already have metadata and summary
"""
print("=" * 60)
print("Document Metadata Migration")
print("=" * 60)
print(f"User ID: {user_id}")
print(f"Bucket ID: {bucket_id or 'All buckets'}")
print(f"Skip existing: {skip_existing}")
print()
# Get all documents
print("Fetching documents...")
documents = chroma_service.get_user_documents(user_id, bucket_id)
total_docs = len(documents)
print(f"Found {total_docs} documents to process")
print()
if total_docs == 0:
print("No documents found. Exiting.")
return
# Track results
results = {
'total': total_docs,
'success': 0,
'skipped': 0,
'already_migrated': 0,
'error': 0,
'metadata_extracted': 0,
'summaries_created': 0
}
start_time = time.time()
# Process documents
for i, doc in enumerate(documents, 1):
print(f"\n[{i}/{total_docs}] Processing: {doc.get('filename', 'Unknown')}")
result = migrate_single_document(doc, user_id, skip_existing=skip_existing)
# Update results
if result['status'] == 'success':
results['success'] += 1
elif result['status'] == 'skipped':
if result.get('metadata_extracted') and result.get('summary_created'):
results['already_migrated'] += 1
else:
results['skipped'] += 1
else:
results['error'] += 1
if result['metadata_extracted']:
results['metadata_extracted'] += 1
if result['summary_created']:
results['summaries_created'] += 1
# Rate limiting
if delay > 0:
time.sleep(delay)
# Progress update every batch_size documents
if i % batch_size == 0:
elapsed = time.time() - start_time
rate = i / elapsed if elapsed > 0 else 0
remaining = (total_docs - i) / rate if rate > 0 else 0
print(f"\n--- Progress: {i}/{total_docs} ({i/total_docs*100:.1f}%) ---")
print(f" Elapsed: {elapsed:.1f}s | ETA: {remaining:.1f}s")
print(f" Success: {results['success']} | Already migrated: {results['already_migrated']} | Errors: {results['error']}")
# Final summary
elapsed = time.time() - start_time
print("\n" + "=" * 60)
print("Migration Complete!")
print("=" * 60)
print(f"Total documents: {results['total']}")
print(f" [OK] Success: {results['success']}")
print(f" [SKIP] Already migrated: {results['already_migrated']}")
print(f" [SKIP] Skipped (no content): {results['skipped']}")
print(f" [ERR] Errors: {results['error']}")
print()
print(f"Metadata extracted: {results['metadata_extracted']}")
print(f"Summaries created: {results['summaries_created']}")
print()
print(f"Total time: {elapsed:.1f} seconds")
if total_docs > 0:
print(f"Average: {elapsed/total_docs:.2f} seconds per document")
return results
def main():
parser = argparse.ArgumentParser(description='Migrate existing documents to extract metadata')
parser.add_argument('--user-id', required=True, help='User ID to migrate documents for')
parser.add_argument('--bucket-id', help='Optional bucket ID to filter by')
parser.add_argument('--batch-size', type=int, default=10, help='Batch size for progress updates')
parser.add_argument('--delay', type=float, default=0.5, help='Delay between documents (seconds)')
parser.add_argument('--dry-run', action='store_true', help='Show what would be done without making changes')
args = parser.parse_args()
if args.dry_run:
print("DRY RUN MODE - No changes will be made")
documents = chroma_service.get_user_documents(args.user_id, args.bucket_id)
print(f"Would process {len(documents)} documents:")
for doc in documents[:10]:
print(f" - {doc.get('filename', 'Unknown')}")
if len(documents) > 10:
print(f" ... and {len(documents) - 10} more")
return
migrate_all_documents(
user_id=args.user_id,
bucket_id=args.bucket_id,
batch_size=args.batch_size,
delay=args.delay,
skip_existing=True
)
if __name__ == '__main__':
main()