carrotcake3's picture
Update app.py
f75669c verified
import os
os.environ["HF_HUB_DISABLE_PROGRESS_BARS"] = "1"
os.environ["HF_HUB_DISABLE_TELEMETRY"] = "1"
from flask import Flask, render_template, request, redirect, url_for, send_file, session
import sqlite3
import pandas as pd
import io
import json
import joblib
import numpy as np
from rdkit import Chem
from rdkit.Chem import Descriptors, Draw
from sklearn.base import BaseEstimator, RegressorMixin
from huggingface_hub import hf_hub_download
import sys
import pubchempy as pcp
import importlib.util
from core.config import EvolutionConfig
from core.evolution.evolution import MolecularEvolution
# ------------------------------
# MODEL REPOSITORY CONFIGURATION
# ------------------------------
REPO_ID_CN = "SalZa2004/Cetane_Number_Predictor"
REPO_ID_YSI = "SalZa2004/YSI_Predictor"
REPO_ID_BP = "SalZa2004/Boiling_Point_Predictor"
REPO_ID_DENSITY = "SalZa2004/Density_Predictor"
REPO_ID_LHV = "SalZa2004/LHV_Predictor"
REPO_ID_VISC = "SalZa2004/Dynamic_Viscosity_Predictor"
# -------------------------
# LOAD SHARED FEATURES
# -------------------------
def load_shared_features():
path = hf_hub_download(REPO_ID_CN, "shared_features.py")
spec = importlib.util.spec_from_file_location("shared_features", path)
shared = importlib.util.module_from_spec(spec)
spec.loader.exec_module(shared)
# Register module for joblib
sys.modules["shared_features"] = shared
sys.modules["main"] = shared
sys.modules["__main__"] = shared
globals()["FeatureSelector"] = shared.FeatureSelector
return shared
shared = load_shared_features()
# -------------------------
# GENERIC MODEL LOADER
# -------------------------
def load_model(repo_id):
model_path = hf_hub_download(repo_id, "model.joblib")
selector_path = hf_hub_download(repo_id, "selector.joblib")
model = joblib.load(model_path)
selector = joblib.load(selector_path)
return model, selector
# -------------------------
# LOAD ALL MODELS
# -------------------------
cn_model, cn_selector = load_model(REPO_ID_CN)
ysi_model, ysi_selector = load_model(REPO_ID_YSI)
bp_model, bp_selector = load_model(REPO_ID_BP)
density_model, density_selector = load_model(REPO_ID_DENSITY)
lhv_model, lhv_selector = load_model(REPO_ID_LHV)
visc_model, visc_selector = load_model(REPO_ID_VISC)
# -------------------------
# HELPER FUNCTIONS
# -------------------------
def validate_smiles(smiles):
if pd.isna(smiles) or smiles == "":
return False
return Chem.MolFromSmiles(smiles) is not None
def predict_property(smiles, model, selector):
X = shared.featurize_df([smiles], return_df=False)
if X is None:
return None
X = selector.transform(X)
return float(model.predict(X)[0])
def predict_cn(smiles):
return predict_property(smiles, cn_model, cn_selector)
def predict_ysi(smiles):
return predict_property(smiles, ysi_model, ysi_selector)
def predict_bp(smiles):
return predict_property(smiles, bp_model, bp_selector)
def predict_density(smiles):
return predict_property(smiles, density_model, density_selector)
def predict_lhv(smiles):
return predict_property(smiles, lhv_model, lhv_selector)
def predict_dynamic_viscosity(smiles):
return predict_property(smiles, visc_model, visc_selector)
def pubchem_name_to_smiles(name):
"""Return canonical SMILES from a compound name."""
if not name or not isinstance(name, str):
return None
name = name.strip()
if name == "":
return None
try:
results = pcp.get_compounds(name, "name")
if not results:
return None
return results[0].canonical_smiles
except Exception:
return None
def pubchem_smiles_to_name(smiles):
"""Return preferred IUPAC name from SMILES."""
try:
results = pcp.get_compounds(smiles, "smiles")
if not results:
return None
compound = results[0]
# Prefer IUPAC name if available
if getattr(compound, "iupac_name", None):
return compound.iupac_name
# Fallback to title
return compound.title
except Exception:
return None
def get_user_config_from_form(form) -> EvolutionConfig:
"""Flask version of cli.get_user_config() (same logic, different input source)."""
mode = form.get("mode", "target") # "target" or "maximize"
maximize_cn = (mode == "maximize")
if maximize_cn:
target = 100.0 # dummy, same as CLI
else:
target = float(form.get("target_cn") or "50")
minimize_ysi = (form.get("minimize_ysi") == "on")
return EvolutionConfig(target_cn=target, maximize_cn=maximize_cn, minimize_ysi=minimize_ysi)
def display_results_as_tables(final_df: pd.DataFrame, pareto_df: pd.DataFrame, config: EvolutionConfig):
"""Flask version of results.display_results() (same cols logic, rendered as HTML)."""
cols = ["rank", "smiles", "cn", "cn_error", "ysi", "bp", "density", "lhv", "dynamic_viscosity"]
if config.maximize_cn:
cols = [c for c in cols if c != "cn_error"]
final_cols = [c for c in cols if c in final_df.columns]
final_df = final_df.reset_index(drop=True)
final_table = final_df.head(10)[final_cols].to_html(index=False, classes="table table-striped")
pareto_table = None
if config.minimize_ysi and pareto_df is not None and not pareto_df.empty:
pareto_cols = [c for c in cols if c in pareto_df.columns]
pareto_df = pareto_df.reset_index(drop=True) if pareto_df is not None else None
pareto_table = pareto_df.head(20)[pareto_cols].to_html(index=False, classes="table table-striped")
return final_table, pareto_table
def build_history_images(history):
"""
Convert a small set of history SMILES into RDKit images saved in static/generated/.
Returns history enriched with SMILES, structure image, and name.
"""
out = []
base_dir = os.path.join("static", "generated")
os.makedirs(base_dir, exist_ok=True)
for h in history or []:
gen = h.get("generation")
samples = []
for i, smi in enumerate(h.get("smiles", []), start=1):
mol = Chem.MolFromSmiles(smi)
if mol is None:
continue
# draw structure
img = Draw.MolToImage(mol, size=(240, 200))
img_filename = f"evo_gen_{gen}_{i}.png"
img_path = os.path.join(base_dir, img_filename)
img.save(img_path)
# get molecule name using YOUR helper
name = pubchem_smiles_to_name(smi)
if not name:
name = f"Gen {gen} – Rank {i}"
samples.append({
"rank": i,
"name": name,
"smiles": smi,
"img_id": img_filename
})
out.append({
"generation": gen,
"samples": samples
})
return out
DB_PATH = os.path.join("data", "database", "database_compiled.db")
PURE_TABLE = "main_pure_datasets_cn_ysi"
def canonicalize_smiles(smiles: str):
mol = Chem.MolFromSmiles(smiles)
if mol is None:
return None
return Chem.MolToSmiles(mol, canonical=True)
def fetch_measured(name: str = None, smiles: str = None):
"""
Priority:
1) If smiles provided: match SMILES_Standardized
2) Else (or if not found): match Name
"""
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
# 1) Try SMILES_Standardized
if smiles:
can = canonicalize_smiles(smiles)
if can:
cur.execute(f"""
SELECT CN_Measured, YSI_Unified_Measured
FROM "{PURE_TABLE}"
WHERE SMILES_Standardized = ?
LIMIT 1
""", (can,))
row = cur.fetchone()
if row:
conn.close()
return {"measured_dcn": row[0], "measured_ysi": row[1]}
# 2) Fallback: Name match (case-insensitive)
if name and name.strip() and name != "-":
cur.execute(f"""
SELECT CN_Measured, YSI_Unified_Measured
FROM "{PURE_TABLE}"
WHERE LOWER(Name) = LOWER(?)
LIMIT 1
""", (name.strip(),))
row = cur.fetchone()
conn.close()
if row:
return {"measured_dcn": row[0], "measured_ysi": row[1]}
conn.close()
return {"measured_dcn": None, "measured_ysi": None}
# Run Flask app
app = Flask(__name__)
@app.route("/")
def dashboard():
return render_template("dashboard.html")
@app.route("/pure", methods=["GET", "POST"])
def pure_predictor():
results = []
error = None
#------------------
# CSV FILE INPUT
#------------------
if request.method == "POST" and request.form.get("mode") == "csv":
csv_file = request.files.get("csv_file")
if not csv_file:
error = "No CSV file uploaded."
return render_template("pure_predictor.html", results=results, error=error)
try:
df = pd.read_csv(csv_file)
if "SMILES" not in df.columns:
error = "CSV must contain a 'SMILES' column."
return render_template("pure_predictor.html", results=results, error=error)
for i, row in df.iterrows():
raw_name = row.get("IUPAC names", "")
if pd.isna(raw_name):
name = ""
else:
name = str(raw_name).strip()
raw_smiles = row.get("SMILES", "")
if pd.isna(raw_smiles):
smiles = ""
else:
smiles = str(raw_smiles).strip()
entry = {
"name": name if name else "-",
"smiles": smiles,
"dcn": None,
"ysi": None,
"bp": None,
"density": None,
"lhv": None,
"dynamic_viscosity": None,
"error": None,
"img_id": None,
"measured_dcn": None,
"measured_ysi": None
}
# STEP 1 — If SMILES empty → convert NAME → SMILES
if smiles == "" and name not in ("", None, "-"):
final_smiles = pubchem_name_to_smiles(name)
if final_smiles is None:
entry["error"] = "Name not found in PubChem"
results.append(entry)
continue
else:
final_smiles = smiles
# STEP 2 — Validate SMILES
if not validate_smiles(final_smiles):
entry["error"] = "Invalid SMILES"
results.append(entry)
continue
entry["smiles"] = final_smiles
# STEP 3 — Convert SMILES → IUPAC name
iupac_name = pubchem_smiles_to_name(final_smiles)
if (not name or name == "-") and iupac_name:
entry["name"] = iupac_name
# Extra step for measured value
meas = fetch_measured(name=entry["name"], smiles=final_smiles)
entry["measured_dcn"] = None if meas["measured_dcn"] is None else round(float(meas["measured_dcn"]), 2)
entry["measured_ysi"] = None if meas["measured_ysi"] is None else round(float(meas["measured_ysi"]), 2)
# STEP 4 — Predict DCN
pred_cn = predict_cn(final_smiles)
pred_ysi = predict_ysi(final_smiles)
pred_bp = predict_bp(final_smiles)
pred_density = predict_density(final_smiles)
pred_lhv = predict_lhv(final_smiles)
pred_visc = predict_dynamic_viscosity(final_smiles)
if pred_cn is None and pred_ysi is None:
entry["error"] = "Prediction failed"
else:
entry["dcn"] = round(pred_cn, 2) if pred_cn is not None else None
entry["ysi"] = round(pred_ysi, 2) if pred_ysi is not None else None
entry["bp"] = round(pred_bp, 2) if pred_bp is not None else None
entry["density"] = round(pred_density, 2) if pred_density is not None else None
entry["lhv"] = round(pred_lhv, 2) if pred_lhv is not None else None
entry["dynamic_viscosity"] = round(pred_visc, 2) if pred_visc is not None else None
mol = Chem.MolFromSmiles(final_smiles)
img = Draw.MolToImage(mol, size=(300, 250))
img_filename = f"mol_csv_{i}.png"
img_path = os.path.join("static", "generated", img_filename)
os.makedirs(os.path.dirname(img_path), exist_ok=True)
img.save(img_path)
entry["img_id"] = img_filename
results.append(entry)
return render_template("pure_predictor.html", results=results)
except Exception as e:
error = f"Failed to read CSV file: {e}"
return render_template("pure_predictor.html", results=results, error=error)
#------------------
# MANUAL INPUT
#------------------
elif request.method == "POST":
names = request.form.getlist("fuel_name[]")
smiles_list = request.form.getlist("smiles[]")
for i, (name, smiles) in enumerate(zip(names, smiles_list)):
name = name.strip()
smiles = smiles.strip()
entry = {
"name": name if name else "-",
"smiles": smiles,
"dcn": None,
"ysi": None,
"bp": None,
"density": None,
"lhv": None,
"dynamic_viscosity": None,
"error": None,
"img_id": None,
"measured_dcn": None,
"measured_ysi": None
}
# STEP 1 — If SMILES empty → convert NAME → SMILES
if smiles == "" and name not in ("", None, "-"):
final_smiles = pubchem_name_to_smiles(name)
if final_smiles is None:
entry["error"] = "Name not found in PubChem"
results.append(entry)
continue
else:
final_smiles = smiles
# STEP 2 — Validate SMILES
if not validate_smiles(final_smiles):
entry["error"] = "Invalid SMILES"
results.append(entry)
continue
entry["smiles"] = final_smiles
# STEP 3 — Convert SMILES → IUPAC name
iupac_name = pubchem_smiles_to_name(final_smiles)
if (not name or name == "-") and iupac_name:
entry["name"] = iupac_name
# Extra step for measured value
meas = fetch_measured(name=entry["name"], smiles=final_smiles)
entry["measured_dcn"] = None if meas["measured_dcn"] is None else round(float(meas["measured_dcn"]), 2)
entry["measured_ysi"] = None if meas["measured_ysi"] is None else round(float(meas["measured_ysi"]), 2)
# STEP 4 — Predict & draw molecule
pred_cn = predict_cn(final_smiles)
pred_ysi = predict_ysi(final_smiles)
pred_bp = predict_bp(final_smiles)
pred_density = predict_density(final_smiles)
pred_lhv = predict_lhv(final_smiles)
pred_visc = predict_dynamic_viscosity(final_smiles)
if pred_cn is None and pred_ysi is None:
entry["error"] = "Prediction failed"
else:
entry["dcn"] = round(pred_cn, 2) if pred_cn is not None else None
entry["ysi"] = round(pred_ysi, 2) if pred_ysi is not None else None
entry["bp"] = round(pred_bp, 2) if pred_bp is not None else None
entry["density"] = round(pred_density, 4) if pred_density is not None else None
entry["lhv"] = round(pred_lhv, 2) if pred_lhv is not None else None
entry["dynamic_viscosity"] = round(pred_visc, 4) if pred_visc is not None else None
mol = Chem.MolFromSmiles(final_smiles)
img = Draw.MolToImage(mol, size=(300, 250))
img_filename = f"mol_{i}.png"
img_path = os.path.join("static", "generated", img_filename)
os.makedirs(os.path.dirname(img_path), exist_ok=True)
img.save(img_path)
entry["img_id"] = img_filename
results.append(entry)
return render_template("pure_predictor.html", results=results, error=error)
@app.route("/download_results", methods=["POST"])
def download_results():
import io, json
results_json = request.form.get("results_data")
results = json.loads(results_json)
cleaned_rows = []
for r in results:
cleaned_rows.append({
"IUPAC Name": r.get("name", "-"),
"SMILES": r.get("smiles", "-"),
"Predicted DCN": r.get("dcn", None),
"Predicted YSI": r.get("ysi", None),
"Predicted BP": r.get("bp", None),
"Predicted Density": r.get("density", None),
"Predicted LHV": r.get("lhv", None),
"Predicted Dynamic Viscosity": r.get("dynamic_viscosity", None),
"Status": ("OK" if r.get("error") in (None, "", "OK") else r.get("error"))
})
df = pd.DataFrame(cleaned_rows)
# column order
df = df[["IUPAC Name", "SMILES", "Predicted DCN",
"Predicted YSI", "Predicted BP", "Predicted Density",
"Predicted LHV", "Predicted Dynamic Viscosity",
"Status"
]]
buffer = io.StringIO()
df.to_csv(buffer, index=False)
buffer.seek(0)
return send_file(
io.BytesIO(buffer.getvalue().encode()),
mimetype="text/csv",
as_attachment=True,
download_name="pure_fuel_predictions.csv"
)
@app.route("/mixture")
def mixture_predictor():
return render_template("mixture_predictor.html")
@app.route("/generate", methods=["GET", "POST"])
def generative():
final_table = None
pareto_table = None
error = None
run_info = None
history_view = None
if request.method == "POST":
try:
# Flask version of cli.get_user_config() (same logic)
config = get_user_config_from_form(request.form)
# Flask version of main.py
evolution = MolecularEvolution(config)
final_df, pareto_df = evolution.evolve()
# Flask version of results.display_results() (same cols logic)
final_table, pareto_table = display_results_as_tables(final_df, pareto_df, config)
run_info = {
"mode": "Maximize CN" if config.maximize_cn else f"Target CN = {config.target_cn}",
"minimize_ysi": "Yes" if config.minimize_ysi else "No"
}
history_view = build_history_images(getattr(evolution, "history", []))
except Exception as e:
error = str(e)
return render_template(
"generative.html",
final_table=final_table,
pareto_table=pareto_table,
error=error,
run_info=run_info,
history=history_view
)
@app.route("/constraints")
def constraints():
return render_template("constraints.html")
@app.route("/dataset")
def dataset():
return render_template("dataset.html")
@app.route("/download/pure")
def download_pure():
return send_file(
"datasets/pure_fuel_properties_compiled_v2.xlsx",
mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
as_attachment=True,
download_name="pure_fuel_dataset.xlsx"
)
@app.route("/download/mixture")
def download_mixture():
return send_file(
"datasets/mixture_database.xlsx",
mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
as_attachment=True,
download_name="mixture_fuel_dataset.xlsx"
)
@app.route("/about")
def about():
return render_template("about.html")
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860)