""" Migration Script - Extract Metadata for Existing Documents Run this script once to populate document_metadata and document_summaries collections for all existing documents. Usage: python migrate_metadata.py --user-id [--bucket-id ] """ import sys import time import argparse from typing import List, Dict # Add parent directory to path for imports sys.path.insert(0, '.') from services.chroma_service import chroma_service from services.metadata_extractor import metadata_extractor from services.rag_service import rag_service def reconstruct_document_content(doc_id: str) -> str: """ Reconstruct document content from stored chunks. """ chunks = chroma_service.get_document_chunks(doc_id) if not chunks: return "" # Sort by chunk index and combine chunks.sort(key=lambda x: x.get('chunk_index', 0)) content = ' '.join(chunk.get('text', '') for chunk in chunks) return content def migrate_single_document(doc: Dict, user_id: str, skip_existing: bool = True) -> Dict: """ Migrate a single document: extract metadata and create summary. Args: doc: Document dictionary with doc_id, filename, bucket_id user_id: User ID skip_existing: If True, skip documents that already have metadata and summary """ doc_id = doc['doc_id'] filename = doc.get('filename', '') bucket_id = doc.get('bucket_id', '') result = { 'doc_id': doc_id, 'filename': filename, 'status': 'pending', 'metadata_extracted': False, 'summary_created': False, 'error': None } try: # Check if already migrated (fast skip) if skip_existing: existing_metadata = chroma_service.get_document_metadata(doc_id, user_id) summary_id = f"{doc_id}_summary" existing_summary = chroma_service.summary_chunks_collection.get(ids=[summary_id]) if existing_metadata and existing_summary['ids']: result['status'] = 'skipped' result['metadata_extracted'] = True result['summary_created'] = True print(f" [SKIP] Already migrated: {filename}") return result # Step 1: Reconstruct content from chunks print(f" Reconstructing content for {filename}...") content = reconstruct_document_content(doc_id) if not content: result['status'] = 'skipped' result['error'] = 'No content found' return result # Step 2: Extract metadata (only if not exists or skip_existing is False) needs_metadata = not skip_existing or not chroma_service.get_document_metadata(doc_id, user_id) if needs_metadata: print(f" Extracting metadata...") metadata = metadata_extractor.extract_metadata(content, filename) # Store metadata chroma_service.store_document_metadata( doc_id=doc_id, user_id=user_id, bucket_id=bucket_id, metadata=metadata ) result['metadata_extracted'] = True else: result['metadata_extracted'] = True # Already exists # Step 3: Generate and store summary (only if not exists or skip_existing is False) summary_id = f"{doc_id}_summary" existing_summary = chroma_service.summary_chunks_collection.get(ids=[summary_id]) needs_summary = not skip_existing or not existing_summary['ids'] if needs_summary: print(f" Generating summary...") try: summary_result = rag_service.generate_summary(content, filename) # Extract the summary string from the result dict if isinstance(summary_result, dict): summary = summary_result.get('summary', f'Document: {filename}') else: summary = str(summary_result) if summary_result else f'Document: {filename}' if summary: chroma_service.store_summary_chunk( doc_id=doc_id, user_id=user_id, summary=summary, bucket_id=bucket_id, filename=filename ) result['summary_created'] = True except Exception as e: print(f" Warning: Summary generation failed: {e}") else: result['summary_created'] = True # Already exists result['status'] = 'success' print(f" [OK] Completed: {filename}") except Exception as e: result['status'] = 'error' # Sanitize error message for console encoding error_msg = str(e).encode('ascii', 'replace').decode('ascii') result['error'] = error_msg print(f" [ERROR] {filename} - {error_msg}") return result def migrate_all_documents(user_id: str, bucket_id: str = None, batch_size: int = 10, delay: float = 0.5, skip_existing: bool = True): """ Migrate all documents for a user/bucket. Args: user_id: User ID to migrate documents for bucket_id: Optional bucket ID to filter by batch_size: Number of documents to process before pausing delay: Seconds to wait between documents (rate limiting) skip_existing: If True, skip documents that already have metadata and summary """ print("=" * 60) print("Document Metadata Migration") print("=" * 60) print(f"User ID: {user_id}") print(f"Bucket ID: {bucket_id or 'All buckets'}") print(f"Skip existing: {skip_existing}") print() # Get all documents print("Fetching documents...") documents = chroma_service.get_user_documents(user_id, bucket_id) total_docs = len(documents) print(f"Found {total_docs} documents to process") print() if total_docs == 0: print("No documents found. Exiting.") return # Track results results = { 'total': total_docs, 'success': 0, 'skipped': 0, 'already_migrated': 0, 'error': 0, 'metadata_extracted': 0, 'summaries_created': 0 } start_time = time.time() # Process documents for i, doc in enumerate(documents, 1): print(f"\n[{i}/{total_docs}] Processing: {doc.get('filename', 'Unknown')}") result = migrate_single_document(doc, user_id, skip_existing=skip_existing) # Update results if result['status'] == 'success': results['success'] += 1 elif result['status'] == 'skipped': if result.get('metadata_extracted') and result.get('summary_created'): results['already_migrated'] += 1 else: results['skipped'] += 1 else: results['error'] += 1 if result['metadata_extracted']: results['metadata_extracted'] += 1 if result['summary_created']: results['summaries_created'] += 1 # Rate limiting if delay > 0: time.sleep(delay) # Progress update every batch_size documents if i % batch_size == 0: elapsed = time.time() - start_time rate = i / elapsed if elapsed > 0 else 0 remaining = (total_docs - i) / rate if rate > 0 else 0 print(f"\n--- Progress: {i}/{total_docs} ({i/total_docs*100:.1f}%) ---") print(f" Elapsed: {elapsed:.1f}s | ETA: {remaining:.1f}s") print(f" Success: {results['success']} | Already migrated: {results['already_migrated']} | Errors: {results['error']}") # Final summary elapsed = time.time() - start_time print("\n" + "=" * 60) print("Migration Complete!") print("=" * 60) print(f"Total documents: {results['total']}") print(f" [OK] Success: {results['success']}") print(f" [SKIP] Already migrated: {results['already_migrated']}") print(f" [SKIP] Skipped (no content): {results['skipped']}") print(f" [ERR] Errors: {results['error']}") print() print(f"Metadata extracted: {results['metadata_extracted']}") print(f"Summaries created: {results['summaries_created']}") print() print(f"Total time: {elapsed:.1f} seconds") if total_docs > 0: print(f"Average: {elapsed/total_docs:.2f} seconds per document") return results def main(): parser = argparse.ArgumentParser(description='Migrate existing documents to extract metadata') parser.add_argument('--user-id', required=True, help='User ID to migrate documents for') parser.add_argument('--bucket-id', help='Optional bucket ID to filter by') parser.add_argument('--batch-size', type=int, default=10, help='Batch size for progress updates') parser.add_argument('--delay', type=float, default=0.5, help='Delay between documents (seconds)') parser.add_argument('--dry-run', action='store_true', help='Show what would be done without making changes') args = parser.parse_args() if args.dry_run: print("DRY RUN MODE - No changes will be made") documents = chroma_service.get_user_documents(args.user_id, args.bucket_id) print(f"Would process {len(documents)} documents:") for doc in documents[:10]: print(f" - {doc.get('filename', 'Unknown')}") if len(documents) > 10: print(f" ... and {len(documents) - 10} more") return migrate_all_documents( user_id=args.user_id, bucket_id=args.bucket_id, batch_size=args.batch_size, delay=args.delay, skip_existing=True ) if __name__ == '__main__': main()