Spaces:
Sleeping
Sleeping
| # ============================================================================ | |
| # 1. DATA LOADING MODULE | |
| # ============================================================================ | |
| import os | |
| import sqlite3 | |
| import pandas as pd | |
| import numpy as np | |
| from sklearn.model_selection import train_test_split | |
| PROJECT_ROOT = os.path.dirname(os.path.dirname(__file__)) | |
| def load_raw_data(): | |
| df = pd.read_csv("density.csv") | |
| # Clean data | |
| df.dropna(subset=["density", "SMILES"], inplace=True) | |
| return df | |
| # ============================================================================ | |
| # 2. FEATURIZATION MODULE | |
| # ============================================================================ | |
| from rdkit import Chem | |
| from rdkit.Chem import Descriptors, rdFingerprintGenerator | |
| from tqdm import tqdm | |
| # Get descriptor names globally | |
| DESCRIPTOR_NAMES = [d[0] for d in Descriptors._descList] | |
| desc_functions = [d[1] for d in Descriptors._descList] | |
| def morgan_fp_from_mol(mol, radius=2, n_bits=2048): | |
| """Generate Morgan fingerprint.""" | |
| fpgen = rdFingerprintGenerator.GetMorganGenerator(radius=radius, fpSize=n_bits) | |
| fp = fpgen.GetFingerprint(mol) | |
| arr = np.array(list(fp.ToBitString()), dtype=int) | |
| return arr | |
| def physchem_desc_from_mol(mol): | |
| """Calculate physicochemical descriptors.""" | |
| try: | |
| desc = np.array([fn(mol) for fn in desc_functions], dtype=np.float32) | |
| desc = np.nan_to_num(desc, nan=0.0, posinf=0.0, neginf=0.0) | |
| return desc | |
| except: | |
| return None | |
| def featurize(smiles): | |
| """Convert SMILES to feature vector.""" | |
| mol = Chem.MolFromSmiles(smiles) | |
| if mol is None: | |
| return None | |
| fp = morgan_fp_from_mol(mol) | |
| desc = physchem_desc_from_mol(mol) | |
| if fp is None or desc is None: | |
| return None | |
| return np.hstack([fp, desc]) | |
| def featurize_df(df, smiles_col="SMILES", return_df=True): | |
| """ | |
| Featurize a DataFrame or list of SMILES. | |
| Args: | |
| df: DataFrame with SMILES column, or list of SMILES strings | |
| smiles_col: Name of SMILES column (if df is DataFrame) | |
| return_df: If True, return (X, df_valid). If False, return only X | |
| Returns: | |
| X: Feature matrix | |
| df_valid: Valid DataFrame (only if return_df=True) | |
| """ | |
| # Handle different input types | |
| if isinstance(df, (list, np.ndarray)): | |
| df = pd.DataFrame({smiles_col: df}) | |
| elif isinstance(df, pd.Series): | |
| df = pd.DataFrame({smiles_col: df}) | |
| features = [] | |
| valid_indices = [] | |
| for i, smi in tqdm(enumerate(df[smiles_col]), total=len(df), desc="Featurizing"): | |
| fv = featurize(smi) | |
| if fv is not None: | |
| features.append(fv) | |
| valid_indices.append(i) | |
| if len(features) == 0: | |
| return (None, None) if return_df else None | |
| X = np.vstack(features) | |
| if return_df: | |
| df_valid = df.iloc[valid_indices].reset_index(drop=True) | |
| return X, df_valid | |
| else: | |
| return X | |
| # ============================================================================ | |
| # 3. FEATURE SELECTOR CLASS | |
| # ============================================================================ | |
| import joblib | |
| class FeatureSelector: | |
| """Feature selection pipeline that can be saved and reused.""" | |
| def __init__(self, n_morgan=2048, corr_threshold=0.95, top_k=300): | |
| self.n_morgan = n_morgan | |
| self.corr_threshold = corr_threshold | |
| self.top_k = top_k | |
| # Filled during fit() | |
| self.corr_cols_to_drop = None | |
| self.selected_indices = None | |
| self.is_fitted = False | |
| def fit(self, X, y): | |
| """Fit the feature selector on training data.""" | |
| print("\n" + "="*70) | |
| print("FITTING FEATURE SELECTOR") | |
| print("="*70) | |
| # Step 1: Split Morgan and descriptors | |
| X_mfp = X[:, :self.n_morgan] | |
| X_desc = X[:, self.n_morgan:] | |
| print(f"Morgan fingerprints: {X_mfp.shape[1]}") | |
| print(f"Descriptors: {X_desc.shape[1]}") | |
| # Step 2: Remove correlated descriptors | |
| desc_df = pd.DataFrame(X_desc) | |
| corr_matrix = desc_df.corr().abs() | |
| upper = corr_matrix.where( | |
| np.triu(np.ones(corr_matrix.shape), k=1).astype(bool) | |
| ) | |
| self.corr_cols_to_drop = [ | |
| col for col in upper.columns if any(upper[col] > self.corr_threshold) | |
| ] | |
| print(f"Correlated descriptors removed: {len(self.corr_cols_to_drop)}") | |
| desc_filtered = desc_df.drop(columns=self.corr_cols_to_drop, axis=1).values | |
| X_corr = np.hstack([X_mfp, desc_filtered]) | |
| print(f"Features after correlation filter: {X_corr.shape[1]}") | |
| # Step 3: Feature importance selection | |
| from sklearn.ensemble import ExtraTreesRegressor | |
| print("Running feature importance selection...") | |
| model = ExtraTreesRegressor(n_estimators=100, random_state=42, n_jobs=-1) | |
| model.fit(X_corr, y) | |
| importances = model.feature_importances_ | |
| indices = np.argsort(importances)[::-1] | |
| self.selected_indices = indices[:self.top_k] | |
| print(f"Final selected features: {len(self.selected_indices)}") | |
| self.is_fitted = True | |
| return self | |
| def transform(self, X): | |
| """Apply the fitted feature selection to new data.""" | |
| if not self.is_fitted: | |
| raise RuntimeError("FeatureSelector must be fitted before transform!") | |
| # Step 1: Split Morgan and descriptors | |
| X_mfp = X[:, :self.n_morgan] | |
| X_desc = X[:, self.n_morgan:] | |
| # Step 2: Remove same correlated descriptors | |
| desc_df = pd.DataFrame(X_desc) | |
| desc_filtered = desc_df.drop(columns=self.corr_cols_to_drop, axis=1).values | |
| X_corr = np.hstack([X_mfp, desc_filtered]) | |
| # Step 3: Select same important features | |
| X_selected = X_corr[:, self.selected_indices] | |
| return X_selected | |
| def fit_transform(self, X, y): | |
| """Fit and transform in one step.""" | |
| return self.fit(X, y).transform(X) | |
| def save(self, filepath='feature_selector.joblib'): | |
| """Save the fitted selector.""" | |
| if not self.is_fitted: | |
| raise RuntimeError("Cannot save unfitted selector!") | |
| # Create directory if it doesn't exist | |
| os.makedirs(os.path.dirname(filepath) if os.path.dirname(filepath) else '.', exist_ok=True) | |
| joblib.dump(self, filepath) | |
| print(f"✓ Feature selector saved to {filepath}") | |
| def load(filepath='feature_selector.joblib'): | |
| """Load a fitted selector.""" | |
| selector = joblib.load(filepath) | |
| if not selector.is_fitted: | |
| raise RuntimeError("Loaded selector is not fitted!") | |
| print(f"✓ Feature selector loaded from {filepath}") | |
| return selector | |
| # ============================================================================ | |
| # 4. TRAINING PIPELINE | |
| # ============================================================================ | |
| import optuna | |
| from optuna.samplers import TPESampler | |
| from sklearn.ensemble import ExtraTreesRegressor | |
| from sklearn.model_selection import cross_val_score, KFold | |
| def prepare_training_data(): | |
| """Load and prepare clean training data.""" | |
| print("\n" + "="*70) | |
| print("PREPARING TRAINING DATA") | |
| print("="*70) | |
| # Load raw data | |
| df = load_raw_data() | |
| print(f"Raw samples: {len(df)}") | |
| # Featurize | |
| X, df_valid = featurize_df(df, return_df=True) | |
| y = df_valid["density"].values | |
| # Remove any remaining NaN in target | |
| nan_mask = ~np.isnan(y) | |
| X = X[nan_mask] | |
| y = y[nan_mask] | |
| print(f"Valid samples after cleaning: {len(y)}") | |
| print(f"Feature matrix shape: {X.shape}") | |
| print(f"Target range: [{y.min():.1f}, {y.max():.1f}]") | |
| return X, y | |
| def objective(trial, X_train, y_train): | |
| """Optuna objective function for ExtraTrees.""" | |
| params = { | |
| "n_estimators": trial.suggest_int("n_estimators", 100, 500), | |
| "max_depth": trial.suggest_int("max_depth", 10, 30), | |
| "min_samples_split": trial.suggest_int("min_samples_split", 10, 40), | |
| "min_samples_leaf": trial.suggest_int("min_samples_leaf", 5, 20), | |
| "max_features": trial.suggest_float("max_features", 0.3, 0.8), | |
| "min_impurity_decrease": trial.suggest_float("min_impurity_decrease", 0.0, 0.02), | |
| "bootstrap": True, | |
| "random_state": 42, | |
| "n_jobs": -1 | |
| } | |
| model = ExtraTreesRegressor(**params) | |
| # 5-fold cross-validation | |
| cv = KFold(n_splits=5, shuffle=True, random_state=42) | |
| scores = cross_val_score( | |
| model, X_train, y_train, | |
| cv=cv, | |
| scoring="neg_mean_squared_error", | |
| n_jobs=-1 | |
| ) | |
| # Return mean RMSE | |
| rmse = np.sqrt(-scores.mean()) | |
| return rmse | |
| def train_and_save_model(n_trials=100, | |
| output_dir='density_model/artifacts'): | |
| """ | |
| Complete training pipeline: | |
| 1. Load and featurize data | |
| 2. Fit and save feature selector | |
| 3. Optimize hyperparameters | |
| 4. Train final model with best params | |
| 5. Save model | |
| """ | |
| # Create output directory | |
| os.makedirs(output_dir, exist_ok=True) | |
| selector_path = os.path.join(output_dir, 'selector.joblib') | |
| model_path = os.path.join(output_dir, 'model.joblib') | |
| study_path = os.path.join(output_dir, 'optuna_study.joblib') | |
| # Step 1: Prepare data | |
| X_full, y = prepare_training_data() | |
| # Step 2: Create and fit feature selector | |
| print("\n" + "="*70) | |
| print("FEATURE SELECTION") | |
| print("="*70) | |
| selector = FeatureSelector(n_morgan=2048, corr_threshold=0.95, top_k=300) | |
| X_selected = selector.fit_transform(X_full, y) | |
| # Save selector | |
| selector.save(selector_path) | |
| # Step 3: Hyperparameter optimization | |
| print("\n" + "="*70) | |
| print("HYPERPARAMETER OPTIMIZATION") | |
| print("="*70) | |
| study = optuna.create_study( | |
| direction="minimize", | |
| sampler=TPESampler(seed=42), | |
| study_name="extratrees_density_tuning" | |
| ) | |
| print(f"Running {n_trials} trials...") | |
| study.optimize( | |
| lambda trial: objective(trial, X_selected, y), | |
| n_trials=n_trials, | |
| show_progress_bar=True, | |
| n_jobs=1 | |
| ) | |
| print(f"\n✓ Best CV RMSE: {study.best_value:.4f}") | |
| print(f"\nBest parameters:") | |
| for key, value in study.best_params.items(): | |
| print(f" {key}: {value}") | |
| # Step 4: Train final model | |
| print("\n" + "="*70) | |
| print("TRAINING FINAL MODEL") | |
| print("="*70) | |
| best_params = study.best_params.copy() | |
| best_params["random_state"] = 42 | |
| best_params["n_jobs"] = -1 | |
| final_model = ExtraTreesRegressor(**best_params) | |
| final_model.fit(X_selected, y) | |
| # Evaluate | |
| train_pred = final_model.predict(X_selected) | |
| train_rmse = np.sqrt(np.mean((y - train_pred)**2)) | |
| train_mae = np.mean(np.abs(y - train_pred)) | |
| print(f"Train RMSE: {train_rmse:.4f}") | |
| print(f"Train MAE: {train_mae:.4f}") | |
| # Step 5: Save model | |
| joblib.dump(final_model, model_path) | |
| print(f"\n✓ Model saved to {model_path}") | |
| # Save study for reference | |
| joblib.dump(study, study_path) | |
| print(f"✓ Optuna study saved to {study_path}") | |
| print("\n" + "="*70) | |
| print("TRAINING COMPLETE!") | |
| print("="*70) | |
| print(f"Artifacts saved in: {output_dir}/") | |
| print(f" - model.joblib") | |
| print(f" - selector.joblib") | |
| print(f" - optuna_study.joblib") | |
| return final_model, selector, study | |
| # ============================================================================ | |
| # 5. PREDICTION CLASS | |
| # ============================================================================ | |
| class DensityPredictor: | |
| """ | |
| Simple predictor class for density number prediction. | |
| Usage: | |
| predictor = densityPredictor() | |
| density_values = predictor.predict(["CCCCCCCC", "CC(C)C"]) | |
| """ | |
| def __init__(self, model_path="density_model/artifacts/model.joblib", | |
| selector_path="density_model/artifacts/selector.joblib"): | |
| """Load the trained model and feature selector.""" | |
| print("Loading Density Predictor...") | |
| self.model = joblib.load(model_path) | |
| self.selector = FeatureSelector.load(selector_path) | |
| print("✓ Predictor ready!\n") | |
| def predict(self, smiles_list): | |
| """ | |
| Predict Density numbers for SMILES strings. | |
| Args: | |
| smiles_list: Single SMILES string, list of SMILES, or pandas Series | |
| Returns: | |
| List of predicted Density values | |
| """ | |
| # Handle single SMILES | |
| if isinstance(smiles_list, str): | |
| smiles_list = [smiles_list] | |
| # Featurize | |
| X_full = featurize_df(smiles_list, return_df=False) | |
| if X_full is None: | |
| print("⚠ Warning: No valid molecules found!") | |
| return [] | |
| # Apply feature selection | |
| X_selected = self.selector.transform(X_full) | |
| # Predict | |
| predictions = self.model.predict(X_selected) | |
| return predictions.tolist() | |
| def predict_with_details(self, smiles_list): | |
| """ | |
| Predict with validation info. | |
| Returns: | |
| DataFrame with SMILES and predictions | |
| """ | |
| if isinstance(smiles_list, str): | |
| smiles_list = [smiles_list] | |
| # Featurize with validation | |
| df = pd.DataFrame({"SMILES": smiles_list}) | |
| X_full, df_valid = featurize_df(df, return_df=True) | |
| if X_full is None: | |
| print("⚠ Warning: No valid molecules found!") | |
| return pd.DataFrame(columns=["SMILES", "Predicted_Density", "Valid"]) | |
| # Apply feature selection | |
| X_selected = self.selector.transform(X_full) | |
| # Predict | |
| predictions = self.model.predict(X_selected) | |
| # Create results dataframe | |
| df_valid["Predicted_Density"] = predictions | |
| df_valid["Valid"] = True | |
| # Mark invalid molecules | |
| all_results = pd.DataFrame({"SMILES": smiles_list}) | |
| all_results = all_results.merge(df_valid[["SMILES", "Predicted_Density", "Valid"]], | |
| on="SMILES", how="left") | |
| all_results["Valid"] = all_results["Valid"].fillna(False) | |
| return all_results | |
| # ============================================================================ | |
| # 6. MAIN EXECUTION | |
| # ============================================================================ | |
| if __name__ == "__main__": | |
| import sys | |
| if len(sys.argv) > 1 and sys.argv[1] == "train": | |
| # Training mode | |
| print("="*70) | |
| print("TRAINING MODE") | |
| print("="*70) | |
| model, selector, study = train_and_save_model( | |
| n_trials=100, | |
| output_dir='density_model/artifacts' | |
| ) | |
| else: | |
| # Prediction mode - check if model exists | |
| model_path = "density_model/artifacts/model.joblib" | |
| selector_path = "density_model/artifacts/selector.joblib" | |
| if not os.path.exists(model_path) or not os.path.exists(selector_path): | |
| print("="*70) | |
| print("MODEL NOT FOUND") | |
| print("="*70) | |
| print("\nNo trained model found. Please train first:") | |
| print(" python train.py train") | |
| print("\nThis will create:") | |
| print(" - density_model/artifacts/model.joblib") | |
| print(" - density_model/artifacts/selector.joblib") | |
| sys.exit(1) | |
| # Prediction example | |
| print("="*70) | |
| print("PREDICTION EXAMPLE") | |
| print("="*70) | |
| # Create predictor | |
| predictor = DensityPredictor() | |
| # Example SMILES | |
| test_smiles = [ | |
| "CC(C)C", # Isobutane | |
| "CCCCCCCC", # Octane | |
| "C1CCCCC1", # Cyclohexane | |
| "INVALID" # Invalid SMILES | |
| ] | |
| print("Testing with example SMILES:") | |
| results = predictor.predict_with_details(test_smiles) | |
| print("\n" + results.to_string(index=False)) | |
| print("\n" + "="*70) | |
| print("Simple prediction:") | |
| predictions = predictor.predict(["CCCCCCCC", "CC(C)C"]) | |
| print(f"Octane density: {predictions[0]:.2f}") | |
| print(f"Isobutane density: {predictions[1]:.2f}") | |
| print("\n" + "="*70) | |
| print("Usage:") | |
| print(" Training: python train.py train") | |
| print(" Prediction: python train.py") | |
| print("\nIn your code:") | |
| print(" from train import DensityPredictor") | |
| print(" predictor = DensityPredictor()") | |
| print(" predictions = predictor.predict(['CCCCCCCC'])") | |
| print("="*70) |