import os import sqlite3 import pandas as pd import numpy as np from sklearn.model_selection import train_test_split PROJECT_ROOT = os.path.dirname(os.path.dirname(__file__)) DB_PATH = os.path.join(PROJECT_ROOT, "data", "database", "database_main.db") def load_raw_data(): """Load raw data from database.""" print("Connecting to SQLite database...") conn = sqlite3.connect(DB_PATH) query = """ SELECT F.Fuel_Name, F.SMILES, T.Standardised_DCN AS cn FROM FUEL F LEFT JOIN TARGET T ON F.fuel_id = T.fuel_id """ df = pd.read_sql_query(query, conn) conn.close() # Clean data df.dropna(subset=["cn", "SMILES"], inplace=True) return df # ============================================================================ # 2. FEATURIZATION MODULE # ============================================================================ from rdkit import Chem from rdkit.Chem import Descriptors, rdFingerprintGenerator from tqdm import tqdm # Get descriptor names globally DESCRIPTOR_NAMES = [d[0] for d in Descriptors._descList] desc_functions = [d[1] for d in Descriptors._descList] def morgan_fp_from_mol(mol, radius=2, n_bits=2048): """Generate Morgan fingerprint.""" fpgen = rdFingerprintGenerator.GetMorganGenerator(radius=radius, fpSize=n_bits) fp = fpgen.GetFingerprint(mol) arr = np.array(list(fp.ToBitString()), dtype=int) return arr def physchem_desc_from_mol(mol): """Calculate physicochemical descriptors.""" try: desc = np.array([fn(mol) for fn in desc_functions], dtype=np.float32) desc = np.nan_to_num(desc, nan=0.0, posinf=0.0, neginf=0.0) return desc except: return None def featurize(smiles): """Convert SMILES to feature vector.""" mol = Chem.MolFromSmiles(smiles) if mol is None: return None fp = morgan_fp_from_mol(mol) desc = physchem_desc_from_mol(mol) if fp is None or desc is None: return None return np.hstack([fp, desc]) def featurize_df(df, smiles_col="SMILES", return_df=True): """ Featurize a DataFrame or list of SMILES (vectorized for speed). """ # Handle different input types if isinstance(df, (list, np.ndarray)): df = pd.DataFrame({smiles_col: df}) elif isinstance(df, pd.Series): df = pd.DataFrame({smiles_col: df}) # Convert all SMILES to molecules in batch mols = [Chem.MolFromSmiles(smi) for smi in df[smiles_col]] features = [] valid_indices = [] # Process valid molecules for i, mol in enumerate(tqdm(mols, desc="Featurizing")): if mol is None: continue try: fp = morgan_fp_from_mol(mol) desc = physchem_desc_from_mol(mol) if fp is not None and desc is not None: features.append(np.hstack([fp, desc])) valid_indices.append(i) except: continue if len(features) == 0: return (None, None) if return_df else None X = np.vstack(features) if return_df: df_valid = df.iloc[valid_indices].reset_index(drop=True) return X, df_valid else: return X # ============================================================================ # 3. FEATURE SELECTOR CLASS # ============================================================================ import joblib class FeatureSelector: """Feature selection pipeline that can be saved and reused.""" def __init__(self, n_morgan=2048, corr_threshold=0.95, top_k=300): self.n_morgan = n_morgan self.corr_threshold = corr_threshold self.top_k = top_k # Filled during fit() self.corr_cols_to_drop = None self.selected_indices = None self.is_fitted = False def fit(self, X, y): """Fit the feature selector on training data.""" print("\n" + "="*70) print("FITTING FEATURE SELECTOR") print("="*70) # Step 1: Split Morgan and descriptors X_mfp = X[:, :self.n_morgan] X_desc = X[:, self.n_morgan:] print(f"Morgan fingerprints: {X_mfp.shape[1]}") print(f"Descriptors: {X_desc.shape[1]}") # Step 2: Remove correlated descriptors desc_df = pd.DataFrame(X_desc) corr_matrix = desc_df.corr().abs() upper = corr_matrix.where( np.triu(np.ones(corr_matrix.shape), k=1).astype(bool) ) self.corr_cols_to_drop = [ col for col in upper.columns if any(upper[col] > self.corr_threshold) ] print(f"Correlated descriptors removed: {len(self.corr_cols_to_drop)}") desc_filtered = desc_df.drop(columns=self.corr_cols_to_drop, axis=1).values X_corr = np.hstack([X_mfp, desc_filtered]) print(f"Features after correlation filter: {X_corr.shape[1]}") # Step 3: Feature importance selection from sklearn.ensemble import ExtraTreesRegressor print("Running feature importance selection...") model = ExtraTreesRegressor(n_estimators=100, random_state=42, n_jobs=-1) model.fit(X_corr, y) importances = model.feature_importances_ indices = np.argsort(importances)[::-1] self.selected_indices = indices[:self.top_k] print(f"Final selected features: {len(self.selected_indices)}") self.is_fitted = True return self def transform(self, X): """Apply the fitted feature selection to new data.""" if not self.is_fitted: raise RuntimeError("FeatureSelector must be fitted before transform!") # Step 1: Split Morgan and descriptors X_mfp = X[:, :self.n_morgan] X_desc = X[:, self.n_morgan:] # Step 2: Remove same correlated descriptors desc_df = pd.DataFrame(X_desc) desc_filtered = desc_df.drop(columns=self.corr_cols_to_drop, axis=1).values X_corr = np.hstack([X_mfp, desc_filtered]) # Step 3: Select same important features X_selected = X_corr[:, self.selected_indices] return X_selected def fit_transform(self, X, y): """Fit and transform in one step.""" return self.fit(X, y).transform(X) def save(self, filepath='feature_selector.joblib'): """Save the fitted selector.""" if not self.is_fitted: raise RuntimeError("Cannot save unfitted selector!") # Create directory if it doesn't exist os.makedirs(os.path.dirname(filepath) if os.path.dirname(filepath) else '.', exist_ok=True) joblib.dump(self, filepath) print(f"✓ Feature selector saved to {filepath}") @staticmethod def load(filepath='feature_selector.joblib'): """Load a fitted selector.""" selector = joblib.load(filepath) if not selector.is_fitted: raise RuntimeError("Loaded selector is not fitted!") print(f"✓ Feature selector loaded from {filepath}") return selector