import os os.environ["HF_HUB_DISABLE_PROGRESS_BARS"] = "1" os.environ["HF_HUB_DISABLE_TELEMETRY"] = "1" from flask import Flask, render_template, request, redirect, url_for, send_file, session import sqlite3 import pandas as pd import io import json import joblib import numpy as np from rdkit import Chem from rdkit.Chem import Descriptors, Draw from sklearn.base import BaseEstimator, RegressorMixin from huggingface_hub import hf_hub_download import sys import pubchempy as pcp import importlib.util from core.config import EvolutionConfig from core.evolution.evolution import MolecularEvolution # ------------------------------ # MODEL REPOSITORY CONFIGURATION # ------------------------------ REPO_ID_CN = "SalZa2004/Cetane_Number_Predictor" REPO_ID_YSI = "SalZa2004/YSI_Predictor" REPO_ID_BP = "SalZa2004/Boiling_Point_Predictor" REPO_ID_DENSITY = "SalZa2004/Density_Predictor" REPO_ID_LHV = "SalZa2004/LHV_Predictor" REPO_ID_VISC = "SalZa2004/Dynamic_Viscosity_Predictor" # ------------------------- # LOAD SHARED FEATURES # ------------------------- def load_shared_features(): path = hf_hub_download(REPO_ID_CN, "shared_features.py") spec = importlib.util.spec_from_file_location("shared_features", path) shared = importlib.util.module_from_spec(spec) spec.loader.exec_module(shared) # Register module for joblib sys.modules["shared_features"] = shared sys.modules["main"] = shared sys.modules["__main__"] = shared globals()["FeatureSelector"] = shared.FeatureSelector return shared shared = load_shared_features() # ------------------------- # GENERIC MODEL LOADER # ------------------------- def load_model(repo_id): model_path = hf_hub_download(repo_id, "model.joblib") selector_path = hf_hub_download(repo_id, "selector.joblib") model = joblib.load(model_path) selector = joblib.load(selector_path) return model, selector # ------------------------- # LOAD ALL MODELS # ------------------------- cn_model, cn_selector = load_model(REPO_ID_CN) ysi_model, ysi_selector = load_model(REPO_ID_YSI) bp_model, bp_selector = load_model(REPO_ID_BP) density_model, density_selector = load_model(REPO_ID_DENSITY) lhv_model, lhv_selector = load_model(REPO_ID_LHV) visc_model, visc_selector = load_model(REPO_ID_VISC) # ------------------------- # HELPER FUNCTIONS # ------------------------- def validate_smiles(smiles): if pd.isna(smiles) or smiles == "": return False return Chem.MolFromSmiles(smiles) is not None def predict_property(smiles, model, selector): X = shared.featurize_df([smiles], return_df=False) if X is None: return None X = selector.transform(X) return float(model.predict(X)[0]) def predict_cn(smiles): return predict_property(smiles, cn_model, cn_selector) def predict_ysi(smiles): return predict_property(smiles, ysi_model, ysi_selector) def predict_bp(smiles): return predict_property(smiles, bp_model, bp_selector) def predict_density(smiles): return predict_property(smiles, density_model, density_selector) def predict_lhv(smiles): return predict_property(smiles, lhv_model, lhv_selector) def predict_dynamic_viscosity(smiles): return predict_property(smiles, visc_model, visc_selector) def pubchem_name_to_smiles(name): """Return canonical SMILES from a compound name.""" if not name or not isinstance(name, str): return None name = name.strip() if name == "": return None try: results = pcp.get_compounds(name, "name") if not results: return None return results[0].canonical_smiles except Exception: return None def pubchem_smiles_to_name(smiles): """Return preferred IUPAC name from SMILES.""" try: results = pcp.get_compounds(smiles, "smiles") if not results: return None compound = results[0] # Prefer IUPAC name if available if getattr(compound, "iupac_name", None): return compound.iupac_name # Fallback to title return compound.title except Exception: return None def get_user_config_from_form(form) -> EvolutionConfig: """Flask version of cli.get_user_config() (same logic, different input source).""" mode = form.get("mode", "target") # "target" or "maximize" maximize_cn = (mode == "maximize") if maximize_cn: target = 100.0 # dummy, same as CLI else: target = float(form.get("target_cn") or "50") minimize_ysi = (form.get("minimize_ysi") == "on") return EvolutionConfig(target_cn=target, maximize_cn=maximize_cn, minimize_ysi=minimize_ysi) def display_results_as_tables(final_df: pd.DataFrame, pareto_df: pd.DataFrame, config: EvolutionConfig): """Flask version of results.display_results() (same cols logic, rendered as HTML).""" cols = ["rank", "smiles", "cn", "cn_error", "ysi", "bp", "density", "lhv", "dynamic_viscosity"] if config.maximize_cn: cols = [c for c in cols if c != "cn_error"] final_cols = [c for c in cols if c in final_df.columns] final_df = final_df.reset_index(drop=True) final_table = final_df.head(10)[final_cols].to_html(index=False, classes="table table-striped") pareto_table = None if config.minimize_ysi and pareto_df is not None and not pareto_df.empty: pareto_cols = [c for c in cols if c in pareto_df.columns] pareto_df = pareto_df.reset_index(drop=True) if pareto_df is not None else None pareto_table = pareto_df.head(20)[pareto_cols].to_html(index=False, classes="table table-striped") return final_table, pareto_table def build_history_images(history): """ Convert a small set of history SMILES into RDKit images saved in static/generated/. Returns history enriched with SMILES, structure image, and name. """ out = [] base_dir = os.path.join("static", "generated") os.makedirs(base_dir, exist_ok=True) for h in history or []: gen = h.get("generation") samples = [] for i, smi in enumerate(h.get("smiles", []), start=1): mol = Chem.MolFromSmiles(smi) if mol is None: continue # draw structure img = Draw.MolToImage(mol, size=(240, 200)) img_filename = f"evo_gen_{gen}_{i}.png" img_path = os.path.join(base_dir, img_filename) img.save(img_path) # get molecule name using YOUR helper name = pubchem_smiles_to_name(smi) if not name: name = f"Gen {gen} – Rank {i}" samples.append({ "rank": i, "name": name, "smiles": smi, "img_id": img_filename }) out.append({ "generation": gen, "samples": samples }) return out DB_PATH = os.path.join("data", "database", "database_compiled.db") PURE_TABLE = "main_pure_datasets_cn_ysi" def canonicalize_smiles(smiles: str): mol = Chem.MolFromSmiles(smiles) if mol is None: return None return Chem.MolToSmiles(mol, canonical=True) def fetch_measured(name: str = None, smiles: str = None): """ Priority: 1) If smiles provided: match SMILES_Standardized 2) Else (or if not found): match Name """ conn = sqlite3.connect(DB_PATH) cur = conn.cursor() # 1) Try SMILES_Standardized if smiles: can = canonicalize_smiles(smiles) if can: cur.execute(f""" SELECT CN_Measured, YSI_Unified_Measured FROM "{PURE_TABLE}" WHERE SMILES_Standardized = ? LIMIT 1 """, (can,)) row = cur.fetchone() if row: conn.close() return {"measured_dcn": row[0], "measured_ysi": row[1]} # 2) Fallback: Name match (case-insensitive) if name and name.strip() and name != "-": cur.execute(f""" SELECT CN_Measured, YSI_Unified_Measured FROM "{PURE_TABLE}" WHERE LOWER(Name) = LOWER(?) LIMIT 1 """, (name.strip(),)) row = cur.fetchone() conn.close() if row: return {"measured_dcn": row[0], "measured_ysi": row[1]} conn.close() return {"measured_dcn": None, "measured_ysi": None} # Run Flask app app = Flask(__name__) @app.route("/") def dashboard(): return render_template("dashboard.html") @app.route("/pure", methods=["GET", "POST"]) def pure_predictor(): results = [] error = None #------------------ # CSV FILE INPUT #------------------ if request.method == "POST" and request.form.get("mode") == "csv": csv_file = request.files.get("csv_file") if not csv_file: error = "No CSV file uploaded." return render_template("pure_predictor.html", results=results, error=error) try: df = pd.read_csv(csv_file) if "SMILES" not in df.columns: error = "CSV must contain a 'SMILES' column." return render_template("pure_predictor.html", results=results, error=error) for i, row in df.iterrows(): raw_name = row.get("IUPAC names", "") if pd.isna(raw_name): name = "" else: name = str(raw_name).strip() raw_smiles = row.get("SMILES", "") if pd.isna(raw_smiles): smiles = "" else: smiles = str(raw_smiles).strip() entry = { "name": name if name else "-", "smiles": smiles, "dcn": None, "ysi": None, "bp": None, "density": None, "lhv": None, "dynamic_viscosity": None, "error": None, "img_id": None, "measured_dcn": None, "measured_ysi": None } # STEP 1 — If SMILES empty → convert NAME → SMILES if smiles == "" and name not in ("", None, "-"): final_smiles = pubchem_name_to_smiles(name) if final_smiles is None: entry["error"] = "Name not found in PubChem" results.append(entry) continue else: final_smiles = smiles # STEP 2 — Validate SMILES if not validate_smiles(final_smiles): entry["error"] = "Invalid SMILES" results.append(entry) continue entry["smiles"] = final_smiles # STEP 3 — Convert SMILES → IUPAC name iupac_name = pubchem_smiles_to_name(final_smiles) if (not name or name == "-") and iupac_name: entry["name"] = iupac_name # Extra step for measured value meas = fetch_measured(name=entry["name"], smiles=final_smiles) entry["measured_dcn"] = None if meas["measured_dcn"] is None else round(float(meas["measured_dcn"]), 2) entry["measured_ysi"] = None if meas["measured_ysi"] is None else round(float(meas["measured_ysi"]), 2) # STEP 4 — Predict DCN pred_cn = predict_cn(final_smiles) pred_ysi = predict_ysi(final_smiles) pred_bp = predict_bp(final_smiles) pred_density = predict_density(final_smiles) pred_lhv = predict_lhv(final_smiles) pred_visc = predict_dynamic_viscosity(final_smiles) if pred_cn is None and pred_ysi is None: entry["error"] = "Prediction failed" else: entry["dcn"] = round(pred_cn, 2) if pred_cn is not None else None entry["ysi"] = round(pred_ysi, 2) if pred_ysi is not None else None entry["bp"] = round(pred_bp, 2) if pred_bp is not None else None entry["density"] = round(pred_density, 2) if pred_density is not None else None entry["lhv"] = round(pred_lhv, 2) if pred_lhv is not None else None entry["dynamic_viscosity"] = round(pred_visc, 2) if pred_visc is not None else None mol = Chem.MolFromSmiles(final_smiles) img = Draw.MolToImage(mol, size=(300, 250)) img_filename = f"mol_csv_{i}.png" img_path = os.path.join("static", "generated", img_filename) os.makedirs(os.path.dirname(img_path), exist_ok=True) img.save(img_path) entry["img_id"] = img_filename results.append(entry) return render_template("pure_predictor.html", results=results) except Exception as e: error = f"Failed to read CSV file: {e}" return render_template("pure_predictor.html", results=results, error=error) #------------------ # MANUAL INPUT #------------------ elif request.method == "POST": names = request.form.getlist("fuel_name[]") smiles_list = request.form.getlist("smiles[]") for i, (name, smiles) in enumerate(zip(names, smiles_list)): name = name.strip() smiles = smiles.strip() entry = { "name": name if name else "-", "smiles": smiles, "dcn": None, "ysi": None, "bp": None, "density": None, "lhv": None, "dynamic_viscosity": None, "error": None, "img_id": None, "measured_dcn": None, "measured_ysi": None } # STEP 1 — If SMILES empty → convert NAME → SMILES if smiles == "" and name not in ("", None, "-"): final_smiles = pubchem_name_to_smiles(name) if final_smiles is None: entry["error"] = "Name not found in PubChem" results.append(entry) continue else: final_smiles = smiles # STEP 2 — Validate SMILES if not validate_smiles(final_smiles): entry["error"] = "Invalid SMILES" results.append(entry) continue entry["smiles"] = final_smiles # STEP 3 — Convert SMILES → IUPAC name iupac_name = pubchem_smiles_to_name(final_smiles) if (not name or name == "-") and iupac_name: entry["name"] = iupac_name # Extra step for measured value meas = fetch_measured(name=entry["name"], smiles=final_smiles) entry["measured_dcn"] = None if meas["measured_dcn"] is None else round(float(meas["measured_dcn"]), 2) entry["measured_ysi"] = None if meas["measured_ysi"] is None else round(float(meas["measured_ysi"]), 2) # STEP 4 — Predict & draw molecule pred_cn = predict_cn(final_smiles) pred_ysi = predict_ysi(final_smiles) pred_bp = predict_bp(final_smiles) pred_density = predict_density(final_smiles) pred_lhv = predict_lhv(final_smiles) pred_visc = predict_dynamic_viscosity(final_smiles) if pred_cn is None and pred_ysi is None: entry["error"] = "Prediction failed" else: entry["dcn"] = round(pred_cn, 2) if pred_cn is not None else None entry["ysi"] = round(pred_ysi, 2) if pred_ysi is not None else None entry["bp"] = round(pred_bp, 2) if pred_bp is not None else None entry["density"] = round(pred_density, 4) if pred_density is not None else None entry["lhv"] = round(pred_lhv, 2) if pred_lhv is not None else None entry["dynamic_viscosity"] = round(pred_visc, 4) if pred_visc is not None else None mol = Chem.MolFromSmiles(final_smiles) img = Draw.MolToImage(mol, size=(300, 250)) img_filename = f"mol_{i}.png" img_path = os.path.join("static", "generated", img_filename) os.makedirs(os.path.dirname(img_path), exist_ok=True) img.save(img_path) entry["img_id"] = img_filename results.append(entry) return render_template("pure_predictor.html", results=results, error=error) @app.route("/download_results", methods=["POST"]) def download_results(): import io, json results_json = request.form.get("results_data") results = json.loads(results_json) cleaned_rows = [] for r in results: cleaned_rows.append({ "IUPAC Name": r.get("name", "-"), "SMILES": r.get("smiles", "-"), "Predicted DCN": r.get("dcn", None), "Predicted YSI": r.get("ysi", None), "Predicted BP": r.get("bp", None), "Predicted Density": r.get("density", None), "Predicted LHV": r.get("lhv", None), "Predicted Dynamic Viscosity": r.get("dynamic_viscosity", None), "Status": ("OK" if r.get("error") in (None, "", "OK") else r.get("error")) }) df = pd.DataFrame(cleaned_rows) # column order df = df[["IUPAC Name", "SMILES", "Predicted DCN", "Predicted YSI", "Predicted BP", "Predicted Density", "Predicted LHV", "Predicted Dynamic Viscosity", "Status" ]] buffer = io.StringIO() df.to_csv(buffer, index=False) buffer.seek(0) return send_file( io.BytesIO(buffer.getvalue().encode()), mimetype="text/csv", as_attachment=True, download_name="pure_fuel_predictions.csv" ) @app.route("/mixture") def mixture_predictor(): return render_template("mixture_predictor.html") @app.route("/generate", methods=["GET", "POST"]) def generative(): final_table = None pareto_table = None error = None run_info = None history_view = None if request.method == "POST": try: # Flask version of cli.get_user_config() (same logic) config = get_user_config_from_form(request.form) # Flask version of main.py evolution = MolecularEvolution(config) final_df, pareto_df = evolution.evolve() # Flask version of results.display_results() (same cols logic) final_table, pareto_table = display_results_as_tables(final_df, pareto_df, config) run_info = { "mode": "Maximize CN" if config.maximize_cn else f"Target CN = {config.target_cn}", "minimize_ysi": "Yes" if config.minimize_ysi else "No" } history_view = build_history_images(getattr(evolution, "history", [])) except Exception as e: error = str(e) return render_template( "generative.html", final_table=final_table, pareto_table=pareto_table, error=error, run_info=run_info, history=history_view ) @app.route("/constraints") def constraints(): return render_template("constraints.html") @app.route("/dataset") def dataset(): return render_template("dataset.html") @app.route("/download/pure") def download_pure(): return send_file( "datasets/pure_fuel_properties_compiled_v2.xlsx", mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", as_attachment=True, download_name="pure_fuel_dataset.xlsx" ) @app.route("/download/mixture") def download_mixture(): return send_file( "datasets/mixture_database.xlsx", mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", as_attachment=True, download_name="mixture_fuel_dataset.xlsx" ) @app.route("/about") def about(): return render_template("about.html") if __name__ == "__main__": app.run(host="0.0.0.0", port=7860)