Spaces:
Running
Running
| """ | |
| Migration Script - Extract Metadata for Existing Documents | |
| Run this script once to populate document_metadata and document_summaries | |
| collections for all existing documents. | |
| Usage: | |
| python migrate_metadata.py --user-id <user_id> [--bucket-id <bucket_id>] | |
| """ | |
| import sys | |
| import time | |
| import argparse | |
| from typing import List, Dict | |
| # Add parent directory to path for imports | |
| sys.path.insert(0, '.') | |
| from services.chroma_service import chroma_service | |
| from services.metadata_extractor import metadata_extractor | |
| from services.rag_service import rag_service | |
| def reconstruct_document_content(doc_id: str) -> str: | |
| """ | |
| Reconstruct document content from stored chunks. | |
| """ | |
| chunks = chroma_service.get_document_chunks(doc_id) | |
| if not chunks: | |
| return "" | |
| # Sort by chunk index and combine | |
| chunks.sort(key=lambda x: x.get('chunk_index', 0)) | |
| content = ' '.join(chunk.get('text', '') for chunk in chunks) | |
| return content | |
| def migrate_single_document(doc: Dict, user_id: str, skip_existing: bool = True) -> Dict: | |
| """ | |
| Migrate a single document: extract metadata and create summary. | |
| Args: | |
| doc: Document dictionary with doc_id, filename, bucket_id | |
| user_id: User ID | |
| skip_existing: If True, skip documents that already have metadata and summary | |
| """ | |
| doc_id = doc['doc_id'] | |
| filename = doc.get('filename', '') | |
| bucket_id = doc.get('bucket_id', '') | |
| result = { | |
| 'doc_id': doc_id, | |
| 'filename': filename, | |
| 'status': 'pending', | |
| 'metadata_extracted': False, | |
| 'summary_created': False, | |
| 'error': None | |
| } | |
| try: | |
| # Check if already migrated (fast skip) | |
| if skip_existing: | |
| existing_metadata = chroma_service.get_document_metadata(doc_id, user_id) | |
| summary_id = f"{doc_id}_summary" | |
| existing_summary = chroma_service.summary_chunks_collection.get(ids=[summary_id]) | |
| if existing_metadata and existing_summary['ids']: | |
| result['status'] = 'skipped' | |
| result['metadata_extracted'] = True | |
| result['summary_created'] = True | |
| print(f" [SKIP] Already migrated: {filename}") | |
| return result | |
| # Step 1: Reconstruct content from chunks | |
| print(f" Reconstructing content for {filename}...") | |
| content = reconstruct_document_content(doc_id) | |
| if not content: | |
| result['status'] = 'skipped' | |
| result['error'] = 'No content found' | |
| return result | |
| # Step 2: Extract metadata (only if not exists or skip_existing is False) | |
| needs_metadata = not skip_existing or not chroma_service.get_document_metadata(doc_id, user_id) | |
| if needs_metadata: | |
| print(f" Extracting metadata...") | |
| metadata = metadata_extractor.extract_metadata(content, filename) | |
| # Store metadata | |
| chroma_service.store_document_metadata( | |
| doc_id=doc_id, | |
| user_id=user_id, | |
| bucket_id=bucket_id, | |
| metadata=metadata | |
| ) | |
| result['metadata_extracted'] = True | |
| else: | |
| result['metadata_extracted'] = True # Already exists | |
| # Step 3: Generate and store summary (only if not exists or skip_existing is False) | |
| summary_id = f"{doc_id}_summary" | |
| existing_summary = chroma_service.summary_chunks_collection.get(ids=[summary_id]) | |
| needs_summary = not skip_existing or not existing_summary['ids'] | |
| if needs_summary: | |
| print(f" Generating summary...") | |
| try: | |
| summary_result = rag_service.generate_summary(content, filename) | |
| # Extract the summary string from the result dict | |
| if isinstance(summary_result, dict): | |
| summary = summary_result.get('summary', f'Document: {filename}') | |
| else: | |
| summary = str(summary_result) if summary_result else f'Document: {filename}' | |
| if summary: | |
| chroma_service.store_summary_chunk( | |
| doc_id=doc_id, | |
| user_id=user_id, | |
| summary=summary, | |
| bucket_id=bucket_id, | |
| filename=filename | |
| ) | |
| result['summary_created'] = True | |
| except Exception as e: | |
| print(f" Warning: Summary generation failed: {e}") | |
| else: | |
| result['summary_created'] = True # Already exists | |
| result['status'] = 'success' | |
| print(f" [OK] Completed: {filename}") | |
| except Exception as e: | |
| result['status'] = 'error' | |
| # Sanitize error message for console encoding | |
| error_msg = str(e).encode('ascii', 'replace').decode('ascii') | |
| result['error'] = error_msg | |
| print(f" [ERROR] {filename} - {error_msg}") | |
| return result | |
| def migrate_all_documents(user_id: str, bucket_id: str = None, | |
| batch_size: int = 10, delay: float = 0.5, skip_existing: bool = True): | |
| """ | |
| Migrate all documents for a user/bucket. | |
| Args: | |
| user_id: User ID to migrate documents for | |
| bucket_id: Optional bucket ID to filter by | |
| batch_size: Number of documents to process before pausing | |
| delay: Seconds to wait between documents (rate limiting) | |
| skip_existing: If True, skip documents that already have metadata and summary | |
| """ | |
| print("=" * 60) | |
| print("Document Metadata Migration") | |
| print("=" * 60) | |
| print(f"User ID: {user_id}") | |
| print(f"Bucket ID: {bucket_id or 'All buckets'}") | |
| print(f"Skip existing: {skip_existing}") | |
| print() | |
| # Get all documents | |
| print("Fetching documents...") | |
| documents = chroma_service.get_user_documents(user_id, bucket_id) | |
| total_docs = len(documents) | |
| print(f"Found {total_docs} documents to process") | |
| print() | |
| if total_docs == 0: | |
| print("No documents found. Exiting.") | |
| return | |
| # Track results | |
| results = { | |
| 'total': total_docs, | |
| 'success': 0, | |
| 'skipped': 0, | |
| 'already_migrated': 0, | |
| 'error': 0, | |
| 'metadata_extracted': 0, | |
| 'summaries_created': 0 | |
| } | |
| start_time = time.time() | |
| # Process documents | |
| for i, doc in enumerate(documents, 1): | |
| print(f"\n[{i}/{total_docs}] Processing: {doc.get('filename', 'Unknown')}") | |
| result = migrate_single_document(doc, user_id, skip_existing=skip_existing) | |
| # Update results | |
| if result['status'] == 'success': | |
| results['success'] += 1 | |
| elif result['status'] == 'skipped': | |
| if result.get('metadata_extracted') and result.get('summary_created'): | |
| results['already_migrated'] += 1 | |
| else: | |
| results['skipped'] += 1 | |
| else: | |
| results['error'] += 1 | |
| if result['metadata_extracted']: | |
| results['metadata_extracted'] += 1 | |
| if result['summary_created']: | |
| results['summaries_created'] += 1 | |
| # Rate limiting | |
| if delay > 0: | |
| time.sleep(delay) | |
| # Progress update every batch_size documents | |
| if i % batch_size == 0: | |
| elapsed = time.time() - start_time | |
| rate = i / elapsed if elapsed > 0 else 0 | |
| remaining = (total_docs - i) / rate if rate > 0 else 0 | |
| print(f"\n--- Progress: {i}/{total_docs} ({i/total_docs*100:.1f}%) ---") | |
| print(f" Elapsed: {elapsed:.1f}s | ETA: {remaining:.1f}s") | |
| print(f" Success: {results['success']} | Already migrated: {results['already_migrated']} | Errors: {results['error']}") | |
| # Final summary | |
| elapsed = time.time() - start_time | |
| print("\n" + "=" * 60) | |
| print("Migration Complete!") | |
| print("=" * 60) | |
| print(f"Total documents: {results['total']}") | |
| print(f" [OK] Success: {results['success']}") | |
| print(f" [SKIP] Already migrated: {results['already_migrated']}") | |
| print(f" [SKIP] Skipped (no content): {results['skipped']}") | |
| print(f" [ERR] Errors: {results['error']}") | |
| print() | |
| print(f"Metadata extracted: {results['metadata_extracted']}") | |
| print(f"Summaries created: {results['summaries_created']}") | |
| print() | |
| print(f"Total time: {elapsed:.1f} seconds") | |
| if total_docs > 0: | |
| print(f"Average: {elapsed/total_docs:.2f} seconds per document") | |
| return results | |
| def main(): | |
| parser = argparse.ArgumentParser(description='Migrate existing documents to extract metadata') | |
| parser.add_argument('--user-id', required=True, help='User ID to migrate documents for') | |
| parser.add_argument('--bucket-id', help='Optional bucket ID to filter by') | |
| parser.add_argument('--batch-size', type=int, default=10, help='Batch size for progress updates') | |
| parser.add_argument('--delay', type=float, default=0.5, help='Delay between documents (seconds)') | |
| parser.add_argument('--dry-run', action='store_true', help='Show what would be done without making changes') | |
| args = parser.parse_args() | |
| if args.dry_run: | |
| print("DRY RUN MODE - No changes will be made") | |
| documents = chroma_service.get_user_documents(args.user_id, args.bucket_id) | |
| print(f"Would process {len(documents)} documents:") | |
| for doc in documents[:10]: | |
| print(f" - {doc.get('filename', 'Unknown')}") | |
| if len(documents) > 10: | |
| print(f" ... and {len(documents) - 10} more") | |
| return | |
| migrate_all_documents( | |
| user_id=args.user_id, | |
| bucket_id=args.bucket_id, | |
| batch_size=args.batch_size, | |
| delay=args.delay, | |
| skip_existing=True | |
| ) | |
| if __name__ == '__main__': | |
| main() | |