| | |
| | """ |
| | Background pre-computation script for processing ALL models incrementally. |
| | Designed to run in the background and save progress so it can be resumed. |
| | |
| | Features: |
| | - Processes models in batches to manage memory |
| | - Saves progress incrementally |
| | - Can be resumed if interrupted |
| | - Provides status updates via JSON file |
| | |
| | Usage: |
| | # Process all models (default ~500k batch) |
| | python scripts/precompute_background.py --all |
| | |
| | # Process specific number of models |
| | python scripts/precompute_background.py --sample-size 500000 |
| | |
| | # Resume from previous run |
| | python scripts/precompute_background.py --resume |
| | |
| | # Check status |
| | python scripts/precompute_background.py --status |
| | """ |
| |
|
| | import argparse |
| | import os |
| | import sys |
| | import json |
| | import time |
| | import logging |
| | import signal |
| | from datetime import datetime |
| | from pathlib import Path |
| | from typing import Optional, Dict, Any |
| | import threading |
| |
|
| | import pandas as pd |
| | import numpy as np |
| | from umap import UMAP |
| | from sklearn.decomposition import PCA, IncrementalPCA |
| |
|
| | |
| | backend_dir = Path(__file__).parent.parent |
| | sys.path.insert(0, str(backend_dir)) |
| |
|
| | from utils.data_loader import ModelDataLoader |
| | from utils.embeddings import ModelEmbedder |
| |
|
| | logging.basicConfig( |
| | level=logging.INFO, |
| | format='%(asctime)s - %(levelname)s - %(message)s', |
| | handlers=[ |
| | logging.StreamHandler(), |
| | logging.FileHandler('precompute_background.log') |
| | ] |
| | ) |
| | logger = logging.getLogger(__name__) |
| |
|
| | |
| | shutdown_requested = False |
| |
|
| | def signal_handler(signum, frame): |
| | global shutdown_requested |
| | logger.warning("Shutdown requested - will save progress and exit...") |
| | shutdown_requested = True |
| |
|
| | signal.signal(signal.SIGINT, signal_handler) |
| | signal.signal(signal.SIGTERM, signal_handler) |
| |
|
| |
|
| | class BackgroundPrecomputer: |
| | """Handles incremental pre-computation of model embeddings and coordinates.""" |
| | |
| | def __init__( |
| | self, |
| | output_dir: str = "precomputed_data", |
| | version: str = "v1", |
| | batch_size: int = 50000, |
| | embedding_batch_size: int = 256 |
| | ): |
| | self.output_dir = Path(output_dir) |
| | self.version = version |
| | self.batch_size = batch_size |
| | self.embedding_batch_size = embedding_batch_size |
| | |
| | |
| | self.status_file = self.output_dir / f"background_status_{version}.json" |
| | self.output_dir.mkdir(parents=True, exist_ok=True) |
| | |
| | |
| | self.data_loader = ModelDataLoader() |
| | self.embedder = ModelEmbedder() |
| | |
| | def get_status(self) -> Dict[str, Any]: |
| | """Get current computation status.""" |
| | if self.status_file.exists(): |
| | with open(self.status_file, 'r') as f: |
| | return json.load(f) |
| | return { |
| | 'status': 'not_started', |
| | 'total_models': 0, |
| | 'processed_models': 0, |
| | 'current_batch': 0, |
| | 'started_at': None, |
| | 'last_updated': None, |
| | 'error': None |
| | } |
| | |
| | def save_status(self, status: Dict[str, Any]): |
| | """Save computation status.""" |
| | status['last_updated'] = datetime.now().isoformat() |
| | with open(self.status_file, 'w') as f: |
| | json.dump(status, f, indent=2) |
| | |
| | def load_full_dataset(self) -> pd.DataFrame: |
| | """Load the full dataset without sampling.""" |
| | logger.info("Loading full dataset from HuggingFace...") |
| | from datasets import load_dataset |
| | dataset = load_dataset("modelbiome/ai_ecosystem", split="train") |
| | df = dataset.to_pandas() |
| | logger.info(f"Loaded {len(df):,} total models") |
| | return df |
| | |
| | def precompute_all( |
| | self, |
| | sample_size: Optional[int] = None, |
| | resume: bool = False, |
| | pca_dims: int = 50 |
| | ): |
| | """ |
| | Pre-compute embeddings and coordinates for all or specified number of models. |
| | |
| | Args: |
| | sample_size: If None, process all models. Otherwise, process this many. |
| | resume: If True, resume from previous progress |
| | pca_dims: Number of PCA dimensions for pre-reduction |
| | """ |
| | global shutdown_requested |
| | |
| | start_time = time.time() |
| | |
| | |
| | status = self.get_status() if resume else { |
| | 'status': 'initializing', |
| | 'total_models': 0, |
| | 'processed_models': 0, |
| | 'current_batch': 0, |
| | 'started_at': datetime.now().isoformat(), |
| | 'error': None, |
| | 'batches_completed': [] |
| | } |
| | |
| | try: |
| | |
| | status['status'] = 'loading_data' |
| | self.save_status(status) |
| | |
| | if sample_size: |
| | logger.info(f"Loading {sample_size:,} models with stratified sampling...") |
| | df = self.data_loader.load_data(sample_size=sample_size, prioritize_base_models=True) |
| | else: |
| | logger.info("Loading ALL models...") |
| | df = self.load_full_dataset() |
| | |
| | total_models = len(df) |
| | status['total_models'] = total_models |
| | logger.info(f"Total models to process: {total_models:,}") |
| | |
| | |
| | logger.info("Building combined text for embeddings...") |
| | df['combined_text'] = ( |
| | df.get('tags', '').astype(str) + ' ' + |
| | df.get('pipeline_tag', '').astype(str) + ' ' + |
| | df.get('library_name', '').astype(str) |
| | ) |
| | if 'modelCard' in df.columns: |
| | df['combined_text'] = df['combined_text'] + ' ' + df['modelCard'].astype(str).str[:500] |
| | |
| | |
| | status['status'] = 'generating_embeddings' |
| | self.save_status(status) |
| | |
| | logger.info("Generating embeddings...") |
| | all_embeddings = [] |
| | texts = df['combined_text'].tolist() |
| | |
| | num_batches = (len(texts) + self.batch_size - 1) // self.batch_size |
| | |
| | for batch_idx in range(num_batches): |
| | if shutdown_requested: |
| | logger.warning("Shutdown requested - saving partial progress...") |
| | break |
| | |
| | batch_start = batch_idx * self.batch_size |
| | batch_end = min(batch_start + self.batch_size, len(texts)) |
| | batch_texts = texts[batch_start:batch_end] |
| | |
| | logger.info(f"Processing embedding batch {batch_idx + 1}/{num_batches} " |
| | f"(models {batch_start:,} - {batch_end:,})...") |
| | |
| | batch_embeddings = self.embedder.generate_embeddings( |
| | batch_texts, |
| | batch_size=self.embedding_batch_size |
| | ) |
| | all_embeddings.append(batch_embeddings) |
| | |
| | status['processed_models'] = batch_end |
| | status['current_batch'] = batch_idx + 1 |
| | status['progress_percent'] = round(100 * batch_end / total_models, 1) |
| | self.save_status(status) |
| | |
| | if shutdown_requested: |
| | status['status'] = 'interrupted' |
| | self.save_status(status) |
| | return |
| | |
| | embeddings = np.vstack(all_embeddings) |
| | logger.info(f"Generated embeddings: {embeddings.shape}") |
| | |
| | |
| | status['status'] = 'pca_reduction' |
| | self.save_status(status) |
| | |
| | logger.info(f"Applying PCA reduction ({embeddings.shape[1]} -> {pca_dims} dims)...") |
| | pca = PCA(n_components=pca_dims, random_state=42) |
| | embeddings_reduced = pca.fit_transform(embeddings) |
| | explained_var = pca.explained_variance_ratio_.sum() |
| | logger.info(f"PCA complete (preserved {explained_var:.1%} variance)") |
| | |
| | if shutdown_requested: |
| | status['status'] = 'interrupted' |
| | self.save_status(status) |
| | return |
| | |
| | |
| | status['status'] = 'umap_3d' |
| | self.save_status(status) |
| | |
| | logger.info("Running UMAP for 3D coordinates...") |
| | reducer_3d = UMAP( |
| | n_components=3, |
| | n_neighbors=15, |
| | min_dist=0.1, |
| | metric='euclidean', |
| | n_jobs=-1, |
| | low_memory=True if total_models > 200000 else False, |
| | spread=1.5, |
| | verbose=True |
| | ) |
| | coords_3d = reducer_3d.fit_transform(embeddings_reduced) |
| | logger.info(f"3D coordinates: {coords_3d.shape}") |
| | |
| | if shutdown_requested: |
| | status['status'] = 'interrupted' |
| | self.save_status(status) |
| | return |
| | |
| | |
| | status['status'] = 'umap_2d' |
| | self.save_status(status) |
| | |
| | logger.info("Running UMAP for 2D coordinates...") |
| | reducer_2d = UMAP( |
| | n_components=2, |
| | n_neighbors=15, |
| | min_dist=0.1, |
| | metric='euclidean', |
| | n_jobs=-1, |
| | low_memory=True if total_models > 200000 else False, |
| | spread=1.5, |
| | verbose=True |
| | ) |
| | coords_2d = reducer_2d.fit_transform(embeddings_reduced) |
| | logger.info(f"2D coordinates: {coords_2d.shape}") |
| | |
| | |
| | status['status'] = 'saving' |
| | self.save_status(status) |
| | |
| | logger.info("Saving results...") |
| | |
| | |
| | output_df = df.copy() |
| | output_df['x_3d'] = coords_3d[:, 0] |
| | output_df['y_3d'] = coords_3d[:, 1] |
| | output_df['z_3d'] = coords_3d[:, 2] |
| | output_df['x_2d'] = coords_2d[:, 0] |
| | output_df['y_2d'] = coords_2d[:, 1] |
| | |
| | |
| | models_file = self.output_dir / f"models_{self.version}.parquet" |
| | output_df.to_parquet(models_file, compression='snappy', index=False) |
| | logger.info(f"Saved: {models_file} ({models_file.stat().st_size / 1024 / 1024:.1f} MB)") |
| | |
| | |
| | embeddings_file = self.output_dir / f"embeddings_{self.version}.parquet" |
| | embeddings_df = pd.DataFrame({ |
| | 'model_id': df['modelId'].values, |
| | 'embedding': [emb.tolist() for emb in embeddings] |
| | }) |
| | embeddings_df.to_parquet(embeddings_file, compression='snappy', index=False) |
| | logger.info(f"Saved: {embeddings_file} ({embeddings_file.stat().st_size / 1024 / 1024:.1f} MB)") |
| | |
| | |
| | total_time = time.time() - start_time |
| | metadata = { |
| | 'version': self.version, |
| | 'created_at': datetime.now().isoformat(), |
| | 'total_models': int(total_models), |
| | 'embedding_dim': int(embeddings.shape[1]), |
| | 'umap_3d_shape': list(coords_3d.shape), |
| | 'umap_2d_shape': list(coords_2d.shape), |
| | 'unique_libraries': int(df['library_name'].nunique()), |
| | 'unique_pipelines': int(df['pipeline_tag'].nunique()), |
| | 'processing_time_seconds': total_time, |
| | 'processing_time_hours': total_time / 3600, |
| | 'pca_dims': pca_dims, |
| | 'pca_variance_preserved': float(explained_var) |
| | } |
| | |
| | metadata_file = self.output_dir / f"metadata_{self.version}.json" |
| | with open(metadata_file, 'w') as f: |
| | json.dump(metadata, f, indent=2) |
| | logger.info(f"Saved: {metadata_file}") |
| | |
| | |
| | status['status'] = 'completed' |
| | status['completed_at'] = datetime.now().isoformat() |
| | status['processing_time_hours'] = round(total_time / 3600, 2) |
| | self.save_status(status) |
| | |
| | logger.info("="*60) |
| | logger.info("BACKGROUND PRE-COMPUTATION COMPLETE!") |
| | logger.info("="*60) |
| | logger.info(f"Total time: {total_time/3600:.2f} hours ({total_time/60:.1f} minutes)") |
| | logger.info(f"Models processed: {total_models:,}") |
| | logger.info(f"Output directory: {self.output_dir}") |
| | logger.info("="*60) |
| | |
| | except Exception as e: |
| | logger.error(f"Pre-computation failed: {e}", exc_info=True) |
| | status['status'] = 'failed' |
| | status['error'] = str(e) |
| | self.save_status(status) |
| | raise |
| |
|
| |
|
| | def main(): |
| | parser = argparse.ArgumentParser( |
| | description="Background pre-computation of HF model embeddings and coordinates" |
| | ) |
| | parser.add_argument( |
| | "--sample-size", type=int, default=None, |
| | help="Number of models to process (default: all)" |
| | ) |
| | parser.add_argument( |
| | "--all", action="store_true", |
| | help="Process ALL models (may take many hours)" |
| | ) |
| | parser.add_argument( |
| | "--resume", action="store_true", |
| | help="Resume from previous progress" |
| | ) |
| | parser.add_argument( |
| | "--status", action="store_true", |
| | help="Show current computation status and exit" |
| | ) |
| | parser.add_argument( |
| | "--output-dir", type=str, default="../precomputed_data", |
| | help="Output directory" |
| | ) |
| | parser.add_argument( |
| | "--version", type=str, default="v1", |
| | help="Version tag" |
| | ) |
| | parser.add_argument( |
| | "--batch-size", type=int, default=50000, |
| | help="Batch size for processing" |
| | ) |
| | |
| | args = parser.parse_args() |
| | |
| | precomputer = BackgroundPrecomputer( |
| | output_dir=args.output_dir, |
| | version=args.version, |
| | batch_size=args.batch_size |
| | ) |
| | |
| | if args.status: |
| | status = precomputer.get_status() |
| | print(json.dumps(status, indent=2)) |
| | return |
| | |
| | sample_size = None if args.all else (args.sample_size or 150000) |
| | |
| | if sample_size: |
| | logger.info(f"Processing {sample_size:,} models...") |
| | else: |
| | logger.info("Processing ALL models (this may take many hours)...") |
| | |
| | try: |
| | precomputer.precompute_all( |
| | sample_size=sample_size, |
| | resume=args.resume |
| | ) |
| | except Exception as e: |
| | logger.error(f"Failed: {e}") |
| | sys.exit(1) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|
| |
|