Spaces:
Sleeping
Sleeping
| import os | |
| os.environ["HF_HUB_DISABLE_PROGRESS_BARS"] = "1" | |
| os.environ["HF_HUB_DISABLE_TELEMETRY"] = "1" | |
| from flask import Flask, render_template, request, redirect, url_for, send_file, session | |
| import sqlite3 | |
| import pandas as pd | |
| import io | |
| import json | |
| import joblib | |
| import numpy as np | |
| from rdkit import Chem | |
| from rdkit.Chem import Descriptors, Draw | |
| from sklearn.base import BaseEstimator, RegressorMixin | |
| from huggingface_hub import hf_hub_download | |
| import sys | |
| import pubchempy as pcp | |
| import importlib.util | |
| from core.config import EvolutionConfig | |
| from core.evolution.evolution import MolecularEvolution | |
| # ------------------------------ | |
| # MODEL REPOSITORY CONFIGURATION | |
| # ------------------------------ | |
| REPO_ID_CN = "SalZa2004/Cetane_Number_Predictor" | |
| REPO_ID_YSI = "SalZa2004/YSI_Predictor" | |
| REPO_ID_BP = "SalZa2004/Boiling_Point_Predictor" | |
| REPO_ID_DENSITY = "SalZa2004/Density_Predictor" | |
| REPO_ID_LHV = "SalZa2004/LHV_Predictor" | |
| REPO_ID_VISC = "SalZa2004/Dynamic_Viscosity_Predictor" | |
| # ------------------------- | |
| # LOAD SHARED FEATURES | |
| # ------------------------- | |
| def load_shared_features(): | |
| path = hf_hub_download(REPO_ID_CN, "shared_features.py") | |
| spec = importlib.util.spec_from_file_location("shared_features", path) | |
| shared = importlib.util.module_from_spec(spec) | |
| spec.loader.exec_module(shared) | |
| # Register module for joblib | |
| sys.modules["shared_features"] = shared | |
| sys.modules["main"] = shared | |
| sys.modules["__main__"] = shared | |
| globals()["FeatureSelector"] = shared.FeatureSelector | |
| return shared | |
| shared = load_shared_features() | |
| # ------------------------- | |
| # GENERIC MODEL LOADER | |
| # ------------------------- | |
| def load_model(repo_id): | |
| model_path = hf_hub_download(repo_id, "model.joblib") | |
| selector_path = hf_hub_download(repo_id, "selector.joblib") | |
| model = joblib.load(model_path) | |
| selector = joblib.load(selector_path) | |
| return model, selector | |
| # ------------------------- | |
| # LOAD ALL MODELS | |
| # ------------------------- | |
| cn_model, cn_selector = load_model(REPO_ID_CN) | |
| ysi_model, ysi_selector = load_model(REPO_ID_YSI) | |
| bp_model, bp_selector = load_model(REPO_ID_BP) | |
| density_model, density_selector = load_model(REPO_ID_DENSITY) | |
| lhv_model, lhv_selector = load_model(REPO_ID_LHV) | |
| visc_model, visc_selector = load_model(REPO_ID_VISC) | |
| # ------------------------- | |
| # HELPER FUNCTIONS | |
| # ------------------------- | |
| def validate_smiles(smiles): | |
| if pd.isna(smiles) or smiles == "": | |
| return False | |
| return Chem.MolFromSmiles(smiles) is not None | |
| def predict_property(smiles, model, selector): | |
| X = shared.featurize_df([smiles], return_df=False) | |
| if X is None: | |
| return None | |
| X = selector.transform(X) | |
| return float(model.predict(X)[0]) | |
| def predict_cn(smiles): | |
| return predict_property(smiles, cn_model, cn_selector) | |
| def predict_ysi(smiles): | |
| return predict_property(smiles, ysi_model, ysi_selector) | |
| def predict_bp(smiles): | |
| return predict_property(smiles, bp_model, bp_selector) | |
| def predict_density(smiles): | |
| return predict_property(smiles, density_model, density_selector) | |
| def predict_lhv(smiles): | |
| return predict_property(smiles, lhv_model, lhv_selector) | |
| def predict_dynamic_viscosity(smiles): | |
| return predict_property(smiles, visc_model, visc_selector) | |
| def pubchem_name_to_smiles(name): | |
| """Return canonical SMILES from a compound name.""" | |
| if not name or not isinstance(name, str): | |
| return None | |
| name = name.strip() | |
| if name == "": | |
| return None | |
| try: | |
| results = pcp.get_compounds(name, "name") | |
| if not results: | |
| return None | |
| return results[0].canonical_smiles | |
| except Exception: | |
| return None | |
| def pubchem_smiles_to_name(smiles): | |
| """Return preferred IUPAC name from SMILES.""" | |
| try: | |
| results = pcp.get_compounds(smiles, "smiles") | |
| if not results: | |
| return None | |
| compound = results[0] | |
| # Prefer IUPAC name if available | |
| if getattr(compound, "iupac_name", None): | |
| return compound.iupac_name | |
| # Fallback to title | |
| return compound.title | |
| except Exception: | |
| return None | |
| def get_user_config_from_form(form) -> EvolutionConfig: | |
| """Flask version of cli.get_user_config() (same logic, different input source).""" | |
| mode = form.get("mode", "target") # "target" or "maximize" | |
| maximize_cn = (mode == "maximize") | |
| if maximize_cn: | |
| target = 100.0 # dummy, same as CLI | |
| else: | |
| target = float(form.get("target_cn") or "50") | |
| minimize_ysi = (form.get("minimize_ysi") == "on") | |
| return EvolutionConfig(target_cn=target, maximize_cn=maximize_cn, minimize_ysi=minimize_ysi) | |
| def display_results_as_tables(final_df: pd.DataFrame, pareto_df: pd.DataFrame, config: EvolutionConfig): | |
| """Flask version of results.display_results() (same cols logic, rendered as HTML).""" | |
| cols = ["rank", "smiles", "cn", "cn_error", "ysi", "bp", "density", "lhv", "dynamic_viscosity"] | |
| if config.maximize_cn: | |
| cols = [c for c in cols if c != "cn_error"] | |
| final_cols = [c for c in cols if c in final_df.columns] | |
| final_df = final_df.reset_index(drop=True) | |
| final_table = final_df.head(10)[final_cols].to_html(index=False, classes="table table-striped") | |
| pareto_table = None | |
| if config.minimize_ysi and pareto_df is not None and not pareto_df.empty: | |
| pareto_cols = [c for c in cols if c in pareto_df.columns] | |
| pareto_df = pareto_df.reset_index(drop=True) if pareto_df is not None else None | |
| pareto_table = pareto_df.head(20)[pareto_cols].to_html(index=False, classes="table table-striped") | |
| return final_table, pareto_table | |
| def build_history_images(history): | |
| """ | |
| Convert a small set of history SMILES into RDKit images saved in static/generated/. | |
| Returns history enriched with SMILES, structure image, and name. | |
| """ | |
| out = [] | |
| base_dir = os.path.join("static", "generated") | |
| os.makedirs(base_dir, exist_ok=True) | |
| for h in history or []: | |
| gen = h.get("generation") | |
| samples = [] | |
| for i, smi in enumerate(h.get("smiles", []), start=1): | |
| mol = Chem.MolFromSmiles(smi) | |
| if mol is None: | |
| continue | |
| # draw structure | |
| img = Draw.MolToImage(mol, size=(240, 200)) | |
| img_filename = f"evo_gen_{gen}_{i}.png" | |
| img_path = os.path.join(base_dir, img_filename) | |
| img.save(img_path) | |
| # get molecule name using YOUR helper | |
| name = pubchem_smiles_to_name(smi) | |
| if not name: | |
| name = f"Gen {gen} – Rank {i}" | |
| samples.append({ | |
| "rank": i, | |
| "name": name, | |
| "smiles": smi, | |
| "img_id": img_filename | |
| }) | |
| out.append({ | |
| "generation": gen, | |
| "samples": samples | |
| }) | |
| return out | |
| DB_PATH = os.path.join("data", "database", "database_compiled.db") | |
| PURE_TABLE = "main_pure_datasets_cn_ysi" | |
| def canonicalize_smiles(smiles: str): | |
| mol = Chem.MolFromSmiles(smiles) | |
| if mol is None: | |
| return None | |
| return Chem.MolToSmiles(mol, canonical=True) | |
| def fetch_measured(name: str = None, smiles: str = None): | |
| """ | |
| Priority: | |
| 1) If smiles provided: match SMILES_Standardized | |
| 2) Else (or if not found): match Name | |
| """ | |
| conn = sqlite3.connect(DB_PATH) | |
| cur = conn.cursor() | |
| # 1) Try SMILES_Standardized | |
| if smiles: | |
| can = canonicalize_smiles(smiles) | |
| if can: | |
| cur.execute(f""" | |
| SELECT CN_Measured, YSI_Unified_Measured | |
| FROM "{PURE_TABLE}" | |
| WHERE SMILES_Standardized = ? | |
| LIMIT 1 | |
| """, (can,)) | |
| row = cur.fetchone() | |
| if row: | |
| conn.close() | |
| return {"measured_dcn": row[0], "measured_ysi": row[1]} | |
| # 2) Fallback: Name match (case-insensitive) | |
| if name and name.strip() and name != "-": | |
| cur.execute(f""" | |
| SELECT CN_Measured, YSI_Unified_Measured | |
| FROM "{PURE_TABLE}" | |
| WHERE LOWER(Name) = LOWER(?) | |
| LIMIT 1 | |
| """, (name.strip(),)) | |
| row = cur.fetchone() | |
| conn.close() | |
| if row: | |
| return {"measured_dcn": row[0], "measured_ysi": row[1]} | |
| conn.close() | |
| return {"measured_dcn": None, "measured_ysi": None} | |
| # Run Flask app | |
| app = Flask(__name__) | |
| def dashboard(): | |
| return render_template("dashboard.html") | |
| def pure_predictor(): | |
| results = [] | |
| error = None | |
| #------------------ | |
| # CSV FILE INPUT | |
| #------------------ | |
| if request.method == "POST" and request.form.get("mode") == "csv": | |
| csv_file = request.files.get("csv_file") | |
| if not csv_file: | |
| error = "No CSV file uploaded." | |
| return render_template("pure_predictor.html", results=results, error=error) | |
| try: | |
| df = pd.read_csv(csv_file) | |
| if "SMILES" not in df.columns: | |
| error = "CSV must contain a 'SMILES' column." | |
| return render_template("pure_predictor.html", results=results, error=error) | |
| for i, row in df.iterrows(): | |
| raw_name = row.get("IUPAC names", "") | |
| if pd.isna(raw_name): | |
| name = "" | |
| else: | |
| name = str(raw_name).strip() | |
| raw_smiles = row.get("SMILES", "") | |
| if pd.isna(raw_smiles): | |
| smiles = "" | |
| else: | |
| smiles = str(raw_smiles).strip() | |
| entry = { | |
| "name": name if name else "-", | |
| "smiles": smiles, | |
| "dcn": None, | |
| "ysi": None, | |
| "bp": None, | |
| "density": None, | |
| "lhv": None, | |
| "dynamic_viscosity": None, | |
| "error": None, | |
| "img_id": None, | |
| "measured_dcn": None, | |
| "measured_ysi": None | |
| } | |
| # STEP 1 — If SMILES empty → convert NAME → SMILES | |
| if smiles == "" and name not in ("", None, "-"): | |
| final_smiles = pubchem_name_to_smiles(name) | |
| if final_smiles is None: | |
| entry["error"] = "Name not found in PubChem" | |
| results.append(entry) | |
| continue | |
| else: | |
| final_smiles = smiles | |
| # STEP 2 — Validate SMILES | |
| if not validate_smiles(final_smiles): | |
| entry["error"] = "Invalid SMILES" | |
| results.append(entry) | |
| continue | |
| entry["smiles"] = final_smiles | |
| # STEP 3 — Convert SMILES → IUPAC name | |
| iupac_name = pubchem_smiles_to_name(final_smiles) | |
| if (not name or name == "-") and iupac_name: | |
| entry["name"] = iupac_name | |
| # Extra step for measured value | |
| meas = fetch_measured(name=entry["name"], smiles=final_smiles) | |
| entry["measured_dcn"] = None if meas["measured_dcn"] is None else round(float(meas["measured_dcn"]), 2) | |
| entry["measured_ysi"] = None if meas["measured_ysi"] is None else round(float(meas["measured_ysi"]), 2) | |
| # STEP 4 — Predict DCN | |
| pred_cn = predict_cn(final_smiles) | |
| pred_ysi = predict_ysi(final_smiles) | |
| pred_bp = predict_bp(final_smiles) | |
| pred_density = predict_density(final_smiles) | |
| pred_lhv = predict_lhv(final_smiles) | |
| pred_visc = predict_dynamic_viscosity(final_smiles) | |
| if pred_cn is None and pred_ysi is None: | |
| entry["error"] = "Prediction failed" | |
| else: | |
| entry["dcn"] = round(pred_cn, 2) if pred_cn is not None else None | |
| entry["ysi"] = round(pred_ysi, 2) if pred_ysi is not None else None | |
| entry["bp"] = round(pred_bp, 2) if pred_bp is not None else None | |
| entry["density"] = round(pred_density, 2) if pred_density is not None else None | |
| entry["lhv"] = round(pred_lhv, 2) if pred_lhv is not None else None | |
| entry["dynamic_viscosity"] = round(pred_visc, 2) if pred_visc is not None else None | |
| mol = Chem.MolFromSmiles(final_smiles) | |
| img = Draw.MolToImage(mol, size=(300, 250)) | |
| img_filename = f"mol_csv_{i}.png" | |
| img_path = os.path.join("static", "generated", img_filename) | |
| os.makedirs(os.path.dirname(img_path), exist_ok=True) | |
| img.save(img_path) | |
| entry["img_id"] = img_filename | |
| results.append(entry) | |
| return render_template("pure_predictor.html", results=results) | |
| except Exception as e: | |
| error = f"Failed to read CSV file: {e}" | |
| return render_template("pure_predictor.html", results=results, error=error) | |
| #------------------ | |
| # MANUAL INPUT | |
| #------------------ | |
| elif request.method == "POST": | |
| names = request.form.getlist("fuel_name[]") | |
| smiles_list = request.form.getlist("smiles[]") | |
| for i, (name, smiles) in enumerate(zip(names, smiles_list)): | |
| name = name.strip() | |
| smiles = smiles.strip() | |
| entry = { | |
| "name": name if name else "-", | |
| "smiles": smiles, | |
| "dcn": None, | |
| "ysi": None, | |
| "bp": None, | |
| "density": None, | |
| "lhv": None, | |
| "dynamic_viscosity": None, | |
| "error": None, | |
| "img_id": None, | |
| "measured_dcn": None, | |
| "measured_ysi": None | |
| } | |
| # STEP 1 — If SMILES empty → convert NAME → SMILES | |
| if smiles == "" and name not in ("", None, "-"): | |
| final_smiles = pubchem_name_to_smiles(name) | |
| if final_smiles is None: | |
| entry["error"] = "Name not found in PubChem" | |
| results.append(entry) | |
| continue | |
| else: | |
| final_smiles = smiles | |
| # STEP 2 — Validate SMILES | |
| if not validate_smiles(final_smiles): | |
| entry["error"] = "Invalid SMILES" | |
| results.append(entry) | |
| continue | |
| entry["smiles"] = final_smiles | |
| # STEP 3 — Convert SMILES → IUPAC name | |
| iupac_name = pubchem_smiles_to_name(final_smiles) | |
| if (not name or name == "-") and iupac_name: | |
| entry["name"] = iupac_name | |
| # Extra step for measured value | |
| meas = fetch_measured(name=entry["name"], smiles=final_smiles) | |
| entry["measured_dcn"] = None if meas["measured_dcn"] is None else round(float(meas["measured_dcn"]), 2) | |
| entry["measured_ysi"] = None if meas["measured_ysi"] is None else round(float(meas["measured_ysi"]), 2) | |
| # STEP 4 — Predict & draw molecule | |
| pred_cn = predict_cn(final_smiles) | |
| pred_ysi = predict_ysi(final_smiles) | |
| pred_bp = predict_bp(final_smiles) | |
| pred_density = predict_density(final_smiles) | |
| pred_lhv = predict_lhv(final_smiles) | |
| pred_visc = predict_dynamic_viscosity(final_smiles) | |
| if pred_cn is None and pred_ysi is None: | |
| entry["error"] = "Prediction failed" | |
| else: | |
| entry["dcn"] = round(pred_cn, 2) if pred_cn is not None else None | |
| entry["ysi"] = round(pred_ysi, 2) if pred_ysi is not None else None | |
| entry["bp"] = round(pred_bp, 2) if pred_bp is not None else None | |
| entry["density"] = round(pred_density, 4) if pred_density is not None else None | |
| entry["lhv"] = round(pred_lhv, 2) if pred_lhv is not None else None | |
| entry["dynamic_viscosity"] = round(pred_visc, 4) if pred_visc is not None else None | |
| mol = Chem.MolFromSmiles(final_smiles) | |
| img = Draw.MolToImage(mol, size=(300, 250)) | |
| img_filename = f"mol_{i}.png" | |
| img_path = os.path.join("static", "generated", img_filename) | |
| os.makedirs(os.path.dirname(img_path), exist_ok=True) | |
| img.save(img_path) | |
| entry["img_id"] = img_filename | |
| results.append(entry) | |
| return render_template("pure_predictor.html", results=results, error=error) | |
| def download_results(): | |
| import io, json | |
| results_json = request.form.get("results_data") | |
| results = json.loads(results_json) | |
| cleaned_rows = [] | |
| for r in results: | |
| cleaned_rows.append({ | |
| "IUPAC Name": r.get("name", "-"), | |
| "SMILES": r.get("smiles", "-"), | |
| "Predicted DCN": r.get("dcn", None), | |
| "Predicted YSI": r.get("ysi", None), | |
| "Predicted BP": r.get("bp", None), | |
| "Predicted Density": r.get("density", None), | |
| "Predicted LHV": r.get("lhv", None), | |
| "Predicted Dynamic Viscosity": r.get("dynamic_viscosity", None), | |
| "Status": ("OK" if r.get("error") in (None, "", "OK") else r.get("error")) | |
| }) | |
| df = pd.DataFrame(cleaned_rows) | |
| # column order | |
| df = df[["IUPAC Name", "SMILES", "Predicted DCN", | |
| "Predicted YSI", "Predicted BP", "Predicted Density", | |
| "Predicted LHV", "Predicted Dynamic Viscosity", | |
| "Status" | |
| ]] | |
| buffer = io.StringIO() | |
| df.to_csv(buffer, index=False) | |
| buffer.seek(0) | |
| return send_file( | |
| io.BytesIO(buffer.getvalue().encode()), | |
| mimetype="text/csv", | |
| as_attachment=True, | |
| download_name="pure_fuel_predictions.csv" | |
| ) | |
| def mixture_predictor(): | |
| return render_template("mixture_predictor.html") | |
| def generative(): | |
| final_table = None | |
| pareto_table = None | |
| error = None | |
| run_info = None | |
| history_view = None | |
| if request.method == "POST": | |
| try: | |
| # Flask version of cli.get_user_config() (same logic) | |
| config = get_user_config_from_form(request.form) | |
| # Flask version of main.py | |
| evolution = MolecularEvolution(config) | |
| final_df, pareto_df = evolution.evolve() | |
| # Flask version of results.display_results() (same cols logic) | |
| final_table, pareto_table = display_results_as_tables(final_df, pareto_df, config) | |
| run_info = { | |
| "mode": "Maximize CN" if config.maximize_cn else f"Target CN = {config.target_cn}", | |
| "minimize_ysi": "Yes" if config.minimize_ysi else "No" | |
| } | |
| history_view = build_history_images(getattr(evolution, "history", [])) | |
| except Exception as e: | |
| error = str(e) | |
| return render_template( | |
| "generative.html", | |
| final_table=final_table, | |
| pareto_table=pareto_table, | |
| error=error, | |
| run_info=run_info, | |
| history=history_view | |
| ) | |
| def constraints(): | |
| return render_template("constraints.html") | |
| def dataset(): | |
| return render_template("dataset.html") | |
| def download_pure(): | |
| return send_file( | |
| "datasets/pure_fuel_properties_compiled_v2.xlsx", | |
| mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", | |
| as_attachment=True, | |
| download_name="pure_fuel_dataset.xlsx" | |
| ) | |
| def download_mixture(): | |
| return send_file( | |
| "datasets/mixture_database.xlsx", | |
| mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", | |
| as_attachment=True, | |
| download_name="mixture_fuel_dataset.xlsx" | |
| ) | |
| def about(): | |
| return render_template("about.html") | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7860) | |