| | from typing import Dict, Optional, Tuple |
| | from pathlib import Path |
| | import tensorflow as tf |
| | import os |
| | import subprocess |
| | from datetime import datetime |
| | from logger_config import config_logger |
| |
|
| | logger = config_logger(__name__) |
| |
|
| | class EnvironmentSetup: |
| | def __init__(self): |
| | self.device_type, self.strategy = self.setup_devices() |
| | self.cache_dir = None |
| | |
| | def initialize(self, cache_dir: Optional[Path] = None): |
| | self.cache_dir = self.setup_model_cache(cache_dir) |
| | self.training_dirs = self.setup_training_directories() |
| | |
| | @staticmethod |
| | def setup_model_cache(cache_dir: Optional[Path] = None) -> Path: |
| | """Setup and manage model cache directory.""" |
| | if cache_dir is None: |
| | cache_dir = Path.home() / '.chatbot_cache' |
| | |
| | cache_dir.mkdir(parents=True, exist_ok=True) |
| | |
| | |
| | os.environ['TRANSFORMERS_CACHE'] = str(cache_dir / 'transformers') |
| | os.environ['TORCH_HOME'] = str(cache_dir / 'torch') |
| | os.environ['HF_HOME'] = str(cache_dir / 'huggingface') |
| | |
| | logger.info(f"Using cache directory: {cache_dir}") |
| | return cache_dir |
| | |
| | @staticmethod |
| | def setup_training_directories(base_dir: str = "chatbot_training") -> Dict[str, Path]: |
| | """Setup directory structure for training artifacts.""" |
| | base_dir = Path(base_dir) |
| | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| | train_dir = base_dir / f"training_run_{timestamp}" |
| | |
| | directories = { |
| | 'base': train_dir, |
| | 'checkpoints': train_dir / 'checkpoints', |
| | 'plots': train_dir / 'plots', |
| | 'logs': train_dir / 'logs' |
| | } |
| | |
| | |
| | for dir_path in directories.values(): |
| | dir_path.mkdir(parents=True, exist_ok=True) |
| | |
| | return directories |
| |
|
| | @staticmethod |
| | def is_colab() -> bool: |
| | """Check if code is running in Google Colab.""" |
| | try: |
| | |
| | import google.colab |
| | import IPython |
| | return True |
| | except (ImportError, AttributeError): |
| | return False |
| |
|
| | def setup_colab_tpu(self) -> Optional[tf.distribute.Strategy]: |
| | """Setup TPU in Colab environment if available.""" |
| | if not self.is_colab(): |
| | return None |
| | |
| | try: |
| | import requests |
| | import os |
| | |
| | |
| | if 'COLAB_TPU_ADDR' not in os.environ: |
| | return None |
| | |
| | |
| | tpu_address = 'grpc://' + os.environ['COLAB_TPU_ADDR'] |
| | resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu=tpu_address) |
| | tf.config.experimental_connect_to_cluster(resolver) |
| | tf.tpu.experimental.initialize_tpu_system(resolver) |
| | strategy = tf.distribute.TPUStrategy(resolver) |
| | |
| | return strategy |
| | except Exception as e: |
| | logger.warning(f"Failed to initialize Colab TPU: {e}") |
| | return None |
| |
|
| | def setup_devices(self) -> Tuple[str, tf.distribute.Strategy]: |
| | """Configure available compute devices with Colab optimizations.""" |
| | logger.info("Checking available compute devices...") |
| | |
| | |
| | if self.is_colab(): |
| | logger.info("Running in Google Colab environment") |
| | |
| | |
| | tpu_strategy = self.setup_colab_tpu() |
| | if tpu_strategy is not None: |
| | logger.info("Colab TPU detected and initialized") |
| | return "TPU", tpu_strategy |
| | |
| | |
| | gpus = tf.config.list_physical_devices('GPU') |
| | if gpus: |
| | try: |
| | |
| | for gpu in gpus: |
| | tf.config.experimental.set_memory_growth(gpu, True) |
| | |
| | |
| | try: |
| | gpu_name = subprocess.check_output( |
| | ['nvidia-smi', '--query-gpu=gpu_name', '--format=csv,noheader'], |
| | stderr=subprocess.DEVNULL |
| | ).decode('utf-8').strip() |
| | logger.info(f"Colab GPU detected: {gpu_name}") |
| | |
| | except (subprocess.SubprocessError, FileNotFoundError): |
| | logger.warning("Could not detect specific GPU model") |
| | |
| | strategy = tf.distribute.OneDeviceStrategy("/GPU:0") |
| | return "GPU", strategy |
| | |
| | except Exception as e: |
| | logger.error(f"Error configuring Colab GPU: {str(e)}") |
| | |
| | |
| | else: |
| | |
| | try: |
| | resolver = tf.distribute.cluster_resolver.TPUClusterResolver() |
| | tf.config.experimental_connect_to_cluster(resolver) |
| | tf.tpu.experimental.initialize_tpu_system(resolver) |
| | strategy = tf.distribute.TPUStrategy(resolver) |
| | logger.info("TPU detected and initialized") |
| | return "TPU", strategy |
| | except ValueError: |
| | logger.info("No TPU detected. Checking for GPUs...") |
| | |
| | |
| | gpus = tf.config.list_physical_devices('GPU') |
| | if gpus: |
| | try: |
| | for gpu in gpus: |
| | tf.config.experimental.set_memory_growth(gpu, True) |
| | |
| | if len(gpus) > 1: |
| | strategy = tf.distribute.MirroredStrategy() |
| | logger.info(f"Multi-GPU strategy set up with {len(gpus)} GPUs") |
| | else: |
| | strategy = tf.distribute.OneDeviceStrategy("/GPU:0") |
| | logger.info("Single GPU strategy set up") |
| | |
| | return "GPU", strategy |
| | |
| | except Exception as e: |
| | logger.error(f"Error configuring GPU: {str(e)}") |
| | |
| | |
| | strategy = tf.distribute.OneDeviceStrategy("/CPU:0") |
| | logger.info("Using CPU strategy") |
| | return "CPU", strategy |
| |
|
| | def optimize_batch_size(self, base_batch_size: int = 16) -> int: |
| | """Colab-specific optimizations for training.""" |
| | if not self.is_colab(): |
| | return base_batch_size |
| | |
| | |
| | if self.device_type == "GPU": |
| | try: |
| | gpu_name = subprocess.check_output( |
| | ['nvidia-smi', '--query-gpu=gpu_name', '--format=csv,noheader'], |
| | stderr=subprocess.DEVNULL |
| | ).decode('utf-8').strip() |
| | |
| | if "A100" in gpu_name: |
| | logger.info("Optimizing for Colab A100 GPU") |
| | base_batch_size = min(base_batch_size * 8, 64) |
| | elif "T4" in gpu_name: |
| | logger.info("Optimizing for Colab T4 GPU") |
| | base_batch_size = min(base_batch_size * 2, 32) |
| | elif "V100" in gpu_name: |
| | logger.info("Optimizing for Colab V100 GPU") |
| | base_batch_size = min(base_batch_size * 3, 48) |
| | except (subprocess.SubprocessError, FileNotFoundError): |
| | logger.warning("Could not detect specific GPU model, using default settings") |
| | |
| | elif self.device_type == "TPU": |
| | |
| | base_batch_size = min(base_batch_size * 4, 64) |
| | logger.info("Optimizing for Colab TPU") |
| | |
| | logger.info(f"Optimized batch size for Colab: {base_batch_size}") |
| | return base_batch_size |
| |
|