from __future__ import annotations
import os
import math
import re
from functools import partial
from io import StringIO
from textwrap import dedent
from typing import List, Sequence, Tuple, Optional, Dict, Any
from urllib.parse import quote_plus
import json
import gradio as gr
import pandas as pd
import plotly.graph_objects as go
import requests
from bs4 import BeautifulSoup
from fastapi import FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from rdkit.Chem import Draw, rdChemReactions
from nist_kinetics_api import (
Category,
FieldName,
LeftParenthesis,
LogicalOperator,
NistKineticsClient,
ReactionDetail,
Relation,
RightParenthesis,
SearchFilter,
SearchRequest,
)
client = NistKineticsClient()
MAX_FILTERS = 5
FIELD_CHOICES = [
("Reactant", FieldName.reactants.value),
("Product", FieldName.products.value),
("Reaction Order", FieldName.rxn_order.value),
("Reference Reactant", FieldName.ref_rxn_reactants.value),
("Reference Product", FieldName.ref_rxn_products.value),
("Reference Reaction Order", FieldName.ref_rxn_order.value),
("Low Temperature", FieldName.t_low.value),
("High Temperature", FieldName.t_high.value),
("Low Pressure", FieldName.p_low.value),
("High Pressure", FieldName.p_high.value),
("Bath Gas", FieldName.bath_gas.value),
("Squib", FieldName.squib.value),
]
def _safe_float(value: str | None) -> float | None:
if value is None:
return None
text = str(value).strip()
if not text:
return None
sci_match = re.fullmatch(r"([+-]?\d+(?:\.\d+)?)\s*[x×*]\s*10\^?([+-]?\d+)", text, re.IGNORECASE)
if sci_match:
base = float(sci_match.group(1))
exponent = int(sci_match.group(2))
return base * (10 ** exponent)
cleaned = text.replace(",", "")
try:
return float(cleaned)
except ValueError:
return None
RELATION_CHOICES = [
("contains", Relation.contains.value),
("is", Relation.equals.value),
("is not", Relation.not_equals.value),
("does not contain", Relation.not_contains.value),
("<", Relation.lt.value),
("≤", Relation.lte.value),
(">", Relation.gt.value),
("≥", Relation.gte.value),
]
PAREN_CHOICES = [
(" ", ""),
("(", "("),
("((", "(("),
]
RPAREN_CHOICES = [
(" ", ""),
(")", ")"),
("))", "))"),
]
CATEGORY_CHOICES = [
("Any result type", str(Category.any.value)),
("Review", str(Category.review.value)),
("Experiment / experiment extrapolated by theory", str(Category.experiment.value)),
("Theory / estimate", str(Category.theory.value)),
]
WEBBOOK_BASE_URL = "https://webbook.nist.gov/cgi/cbook.cgi"
DOWNLOAD_EXTENSIONS = (".pdf", ".sd", ".sdf", ".jdx", ".dx", ".zip")
DB_TABS = {
"Gas-Phase Ion Thermochemistry": {
"summary": "Compiles IE/AE/EA/PA/GB/acidities/ΔH_f for ions; ~1740 species; evaluated from spectroscopy/equilibria.",
"param": "IonEnergetics",
"parse": "Extract ion energies table (IE, EA, PA)"
},
"NIST Organic Thermochemistry Archive": {
"summary": "Enthalpies of reaction/formation (ΔH_rxn/ΔH_f), vaporization/sublimation for organics up to C30.",
"param": "Type=Thermo",
"parse": "Extract ΔH_f and reaction enthalpies"
},
"Organometallic Thermochemistry Database": {
"summary": "ΔH_rxn/ΔH_f (gas/condensed), sublimation/vaporization enthalpies, entropies for M-C compounds.",
"param": "Type=Reaction",
"parse": "Extract organometallic ΔH_f/S°"
},
"Vibrational and Electronic Energy Levels": {
"summary": "Vibrational frequencies (fundamentals/transitions), electronic transitions for ~3,500 polyatomics.",
"param": "Type=Vib-Elect",
"parse": "Extract vib/elec levels table (cm⁻¹)"
},
"Computed 3-D Structures": {
"summary": "Optimized 3D geometries (XYZ/SD-file), vibrational frequencies from DFT.",
"param": "Type=3D",
"parse": "Extract 3D structure link (SD-file)"
},
"Evaluated Infrared Spectra": {
"summary": "Digitized IR spectra (prism/grating), absorbance scales for various compounds.",
"param": "Type=IR-Spec",
"parse": "Extract IR spectrum link/graph",
"phase_choices": ["gas", "liquid", "solid"]
},
"IARPA / PNNL Liquid Phase IR Spectra": {
"summary": "Complex refractive index (n/k) IR spectra for ~57 liquids (organics/inorganics).",
"param": "Type=IR-Spec&Phase=liquid",
"parse": "Extract liquid n/k spectra PDF"
},
"IARPA / PNNL Solid Phase IR Spectra": {
"summary": "Hemispherical/diffuse reflectance IR spectra for ~120 solids (organics/minerals).",
"param": "Type=IR-Spec&Phase=solid",
"parse": "Extract solid reflectance PDF/PSD"
},
"Quantitative Infrared Database": {
"summary": "Absorption coefficients (a in (μmol/mol)⁻¹ m⁻¹), transmittance for >30 VOCs.",
"param": "Type=Quant-IR",
"parse": "Extract absorption coefficients (JCAMP-DX link)"
},
"THz Spectral Database": {
"summary": "THz-IR transmission/reflectance spectra for solids (50–500 cm⁻¹).",
"param": "Type=THz-IR",
"parse": "Extract THz spectra graph"
},
"UV/Vis Database": {
"summary": "UV/Vis spectra (nm, log ε) for organics (aromatics/heterocyclics).",
"param": "Type=UV-Vis",
"parse": "Extract UV/Vis spectrum link"
},
"Gas Chromatographic Retention Data": {
"summary": "Kovats/Lee retention indices on non-polar/polar phases (1958–2003).",
"param": "Type=GC-RI",
"parse": "Extract retention indices table (Kovats/Lee)"
}
}
def _build_filters(raw_values: Sequence[str]) -> List[SearchFilter]:
filters: List[SearchFilter] = []
stride = 6
for idx in range(MAX_FILTERS):
offset = idx * stride
boolean_val, lp_val, field_val, relation_val, text_val, rp_val = raw_values[offset : offset + stride]
text_val = (text_val or "").strip()
if not text_val:
continue
try:
filter_obj = SearchFilter(
boolean=None if idx == 0 else LogicalOperator(boolean_val or LogicalOperator.and_.value),
left_parenthesis=LeftParenthesis(lp_val or ""),
field=FieldName(field_val or FieldName.reactants.value),
relation=Relation(relation_val or Relation.contains.value),
value=text_val,
right_parenthesis=RightParenthesis(rp_val or ""),
)
except ValueError as exc:
raise ValueError(f"Invalid filter configuration in row {idx + 1}: {exc}") from exc
filters.append(filter_obj)
return filters
def _summaries_to_table(results) -> List[List[str]]:
table = []
for idx, summary in enumerate(results, start=1):
row = [idx, summary.record_count, summary.reaction, summary.detail_url]
table.append(row)
return table
def _build_db_url(db_name: str, query: str, phase: str | None) -> str:
config = DB_TABS[db_name]
param = config["param"]
extra = ""
phase_choices = config.get("phase_choices")
if phase_choices and phase and "Phase=" not in param:
extra = f"&Phase={phase}"
return f"{WEBBOOK_BASE_URL}?Name={quote_plus(query)}&Units=SI&{param}{extra}"
def fetch_specific_db(db_name, formula):
# Validate inputs
if db_name not in DB_TABS:
return "Invalid database.", None, None
# Get configuration
config = DB_TABS[db_name]
url = f"https://webbook.nist.gov/cgi/cbook.cgi?Name={quote_plus(formula)}&Units=SI&{config['param']}"
# Fetch and parse data
try:
response = requests.get(url, timeout=20)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# Extract tables
tables = soup.find_all('table')
df = None
if tables:
df = pd.read_html(StringIO(str(tables[0])))[0]
# Extract download links
links = [a['href'] for a in soup.find_all('a', href=True) if any(ext in a['href'] for ext in ['.pdf', '.sd', '.jdx'])]
link_text = f"Download links: {links}" if links else ""
# Format output
md_content = f"### {db_name}\n{config['summary']}\n\n**Query:** {formula}\n\n{link_text}\n\n**Extracted Data:**"
if df is not None:
md_content += "\n" + df.to_markdown(index=False)
else:
md_content += "\nNo tabular data found."
return md_content, df, None
except Exception as e:
return f"Error fetching {db_name}: {e}", None, None
def _summaries_to_dropdown(results) -> List[tuple[str, str]]:
choices = []
for idx, summary in enumerate(results, start=1):
label = f"{idx}. ({summary.record_count} recs) {summary.reaction}"
choices.append((label[:350], summary.detail_url))
return choices
def perform_search(query, decomposition_only, category_raw, units_value, auto_search_thermo=True):
if not query.strip():
return [], "⚠️ Enter a search query.", gr.update(choices=[], value=None, interactive=False), [], {}
# Create multiple filters for comprehensive search
query_term = query.strip()
filters = []
# Search in reactants
filters.append(SearchFilter(
boolean=None,
left_parenthesis="",
field=FieldName.reactants,
relation=Relation.contains,
value=query_term,
right_parenthesis="",
))
# Also search in products if it's a longer query
if len(query_term) > 2:
filters.append(SearchFilter(
boolean=LogicalOperator.or_,
left_parenthesis="",
field=FieldName.products,
relation=Relation.contains,
value=query_term,
right_parenthesis="",
))
category_raw = category_raw or str(Category.any.value)
units_value = (units_value or "").strip() or None
request = SearchRequest(
filters=filters,
decomposition_only=decomposition_only,
category=Category(int(category_raw)),
units=units_value,
)
try:
results = client.search(request)
except Exception as exc: # pragma: no cover - network/parsing issues
return [], f"🚨 Search failed: {exc}", gr.update(choices=[], value=None, interactive=False), [], {}
table_data = _summaries_to_table(results)
dropdown_choices = _summaries_to_dropdown(results)
# Enhanced status with compound information
status_parts = [f"✅ Found {len(results)} matching reactions"]
if results:
status_parts.append(f" for query: '{query_term}'")
# Extract unique compounds from results for auto-suggestions
all_compounds = set()
for result in results[:10]: # Check first 10 results
compounds = _extract_compounds_from_reaction(result.reaction)
all_compounds.update(compounds)
if all_compounds:
status_parts.append(f" | Compounds detected: {', '.join(list(all_compounds)[:5])}")
if len(all_compounds) > 5:
status_parts.append(f" +{len(all_compounds) - 5} more")
status = "".join(status_parts)
dropdown_update = gr.update(
choices=dropdown_choices,
value=None,
interactive=bool(dropdown_choices),
label="Select a reaction from the latest search",
)
state_payload = [
{"record_count": summary.record_count, "reaction": summary.reaction, "detail_url": summary.detail_url}
for summary in results
]
# Auto-fetch thermodynamic data for the searched compound
search_thermo_data = {}
if auto_search_thermo and query_term:
search_thermo_data = _fetch_compound_thermo_data([query_term])
return table_data, status, dropdown_update, state_payload, search_thermo_data
def _format_detail_markdown(detail: ReactionDetail, detail_url: str) -> str:
lines = []
if detail.title:
lines.append(f"### {detail.title}")
if detail.rate_expression:
lines.append(f"**Rate expression:** {detail.rate_expression}")
if detail.rate_expression_units:
ru = detail.rate_expression_units
pieces = []
if ru.first_order:
pieces.append(f"1st order: `{ru.first_order}`")
if ru.second_order:
pieces.append(f"2nd order: `{ru.second_order}`")
if ru.third_order:
pieces.append(f"3rd order: `{ru.third_order}`")
if pieces:
lines.append("**Rate expression units** " + " · ".join(pieces))
if detail.physical_units:
pu = detail.physical_units
bullet_items = []
for label, value in [
("Energy", pu.energy),
("Molecular", pu.molecular),
("Pressure", pu.pressure),
("Temperature", pu.temperature),
("Base volume", pu.base_volume),
("Reference Temp", pu.reference_temperature),
("Evaluation Temp", pu.evaluation_temperature),
]:
if value:
bullet_items.append(f"- **{label}:** {value}")
if bullet_items:
lines.append("**Unit settings**")
lines.extend(bullet_items)
lines.append(f"[View on NIST]({detail_url})")
return "\n\n".join(lines)
def _datasets_to_table(detail: ReactionDetail) -> List[List[str]]:
rows: List[List[str]] = []
for entry in detail.datasets:
rows.append(
[
entry.section or "",
entry.squib or "",
entry.temperature_range or "",
entry.pre_exponential_factor or "",
entry.temperature_exponent or "",
entry.activation_energy or "",
entry.rate_at_298 or "",
entry.reaction_order or "",
entry.squib_url or "",
]
)
return rows
def _build_dataset_plot(detail: ReactionDetail) -> go.Figure | None:
if not detail.datasets:
return None
dataset = detail.datasets[0]
A = _safe_float(getattr(dataset, "pre_exponential_factor", None))
if not A or A <= 0:
return None
n_val = _safe_float(getattr(dataset, "temperature_exponent", None))
n = n_val if n_val is not None else 0.0
Ea_val = _safe_float(getattr(dataset, "activation_energy", None))
Ea = Ea_val if Ea_val is not None else 0.0
Tmin, Tmax = 300.0, 2000.0
range_text = getattr(dataset, "temperature_range", None)
if isinstance(range_text, str):
tokens = re.findall(r"[+-]?\d+(?:\.\d+)?(?:[eE][+-]?\d+)?", range_text)
temp_vals = [_safe_float(tok) for tok in tokens]
temp_vals = [val for val in temp_vals if val is not None]
if len(temp_vals) >= 2:
Tmin, Tmax = min(temp_vals), max(temp_vals)
elif len(temp_vals) == 1:
center = temp_vals[0]
Tmin, Tmax = max(1.0, center - 50.0), center + 50.0
if Tmin < 1.0:
Tmin = 1.0
if Tmax <= Tmin:
Tmax = Tmin + 100.0
num_points = 120
temps = [Tmin + (Tmax - Tmin) * i / (num_points - 1) for i in range(num_points)]
R = 8.314462618 # J/mol·K
rates = [
A * ((t / 298.0) ** n) * math.exp(-Ea / (R * t))
for t in temps
]
plot_points = [
(1000.0 / t, math.log(k))
for t, k in zip(temps, rates)
if k and k > 0
]
if not plot_points:
return None
arrhenius_x, arrhenius_y = zip(*plot_points)
arrhenius_x, arrhenius_y = list(arrhenius_x), list(arrhenius_y)
fig = go.Figure()
fig.add_trace(
go.Scatter(
x=arrhenius_x,
y=arrhenius_y,
mode="lines",
name="Fitted k(T)",
line=dict(color="#2563eb"),
)
)
k_298 = _safe_float(getattr(dataset, "rate_at_298", None))
if k_298 and k_298 > 0:
fig.add_trace(
go.Scatter(
x=[1000.0 / 298.0],
y=[math.log(k_298)],
mode="markers",
name="k(298 K)",
marker=dict(size=10, color="#dc2626"),
hovertemplate="T = 298 K
k = %{customdata[0]:.3e}",
customdata=[[k_298]],
)
)
fig.update_layout(
title=f"Arrhenius Plot for {detail.title or 'Reaction'}",
xaxis_title="1000 / T (K⁻¹)",
yaxis_title="ln k",
height=360,
margin=dict(l=40, r=20, t=60, b=40),
)
return fig
def _fetch_all_nist_reactions(limit: int = 100) -> List[tuple[str, str]]:
"""Fetch all available reactions from NIST kinetics database."""
try:
# Create a broad search to get diverse reactions
filters = [
SearchFilter(
boolean=None,
left_parenthesis="",
field=FieldName.reactants,
relation=Relation.contains,
value="C", # Start with carbon-containing compounds
right_parenthesis="",
)
]
request = SearchRequest(
filters=filters,
decomposition_only=False,
category=Category.any,
units=None,
)
results = client.search(request)
# Extract unique reactions
reaction_options = []
seen_reactions = set()
for result in results[:limit]:
reaction_text = result.reaction.strip()
if reaction_text and reaction_text not in seen_reactions:
# Create a display name (truncate if too long)
display_name = reaction_text[:80] + "..." if len(reaction_text) > 80 else reaction_text
reaction_options.append((display_name, reaction_text))
seen_reactions.add(reaction_text)
# Sort by reaction length (simpler reactions first)
reaction_options.sort(key=lambda x: len(x[1]))
return reaction_options
except Exception as exc:
print(f"Error fetching NIST reactions: {exc}")
return []
def _clean_chemical_formula(formula: str) -> str:
"""Clean and normalize chemical formulas from NIST format."""
if not formula:
return ""
# Remove extra spaces within formulas (C 2 H 3 -> C2H3)
import re
# Pattern to match element symbols followed by numbers with spaces
# This will convert "C 2 H 3" to "C2H3"
cleaned = re.sub(r'([A-Z][a-z]?)(\s+)(\d+)', r'\1\3', formula)
# Handle radicals and special notation
cleaned = cleaned.replace("·", "") # Remove radical dots
cleaned = cleaned.replace("•", "") # Remove alternative radical notation
# Keep c- prefix for cyclic compounds, remove other lowercase prefixes
if not cleaned.startswith(('c-', 'C-')):
cleaned = re.sub(r'^[a-z]-', '', cleaned)
return cleaned.strip()
def _nist_formula_to_smiles(formula: str) -> str | None:
"""Convert NIST chemical formula to SMILES string for RDKit."""
if not formula:
return None
formula = _clean_chemical_formula(formula)
# Dictionary of common NIST formulas to SMILES
# This is a lookup table for frequently encountered species
nist_to_smiles = {
# Simple molecules
"H2": "[H][H]",
"O2": "O=O",
"N2": "N#N",
"CO": "[C-]#[O+]",
"CO2": "O=C=O",
"H2O": "O",
"CH4": "C",
"C2H6": "CC",
"C2H4": "C=C",
"C2H2": "C#C",
"C3H8": "CCC",
"C3H6": "C=CC",
"C6H6": "c1ccccc1",
# Radicals (simplified representations)
"H": "[H]",
"CH3": "[CH3]",
"C2H5": "C[CH2]",
"C2H3": "C=C[CH2]", # Propargyl radical
"C3H3": "C#CC", # Propynyl radical
"C": "[C]", # Carbon atom
"OH": "[OH]",
"O": "[O]",
"HO2": "O[O]",
"CH2": "[CH2]",
# Cyclic compounds
"c-C3H2": "C1=CC1", # Cyclopropenylidene (simplified)
# More complex species
"CH2O": "C=O",
"CH3OH": "CO",
"C2H5OH": "CCO",
"HCO": "[CH]=O",
"CH3CHO": "CC=O",
"C2H4O": "C=CO",
# Ions (simplified)
"H+": "[H+]",
"OH-": "[OH-]",
"O2-": "[O-][O]",
# Specific compounds from the failing reaction
"C2H3": "C=C[CH2]", # Propargyl radical C2H3
"c-C3H2": "C1=CC1", # Cyclopropenyl radical (c-C3H2)
"CC3H2": "C1=CC1", # Alternative notation
}
# Direct lookup
if formula in nist_to_smiles:
return nist_to_smiles[formula]
# Try to generate SMILES for simple hydrocarbons
if re.match(r'^C\d+H\d*$', formula):
# Parse C_nH_m
c_match = re.search(r'C(\d+)', formula)
h_match = re.search(r'H(\d+)', formula)
if c_match and h_match:
c_count = int(c_match.group(1))
h_count = int(h_match.group(1))
if c_count == 1 and h_count == 4:
return "C" # CH4
elif c_count == 2 and h_count == 6:
return "CC" # C2H6
elif c_count == 2 and h_count == 4:
return "C=C" # C2H4
elif c_count == 2 and h_count == 2:
return "C#C" # C2H2
elif c_count == 3 and h_count == 8:
return "CCC" # C3H8
elif c_count == 3 and h_count == 6:
return "C=CC" # C3H6
# For unknown formulas, try to create a simple representation
# This is a fallback that may not be chemically accurate
if re.match(r'^[A-Z][a-z]?\d*$', formula):
# Single atom with number (like O2, N2)
element_match = re.match(r'^([A-Z][a-z]?)(\d*)$', formula)
if element_match:
element = element_match.group(1)
count = element_match.group(2)
if count and int(count) > 1:
# For diatomic molecules
if element in ['O', 'N', 'H']:
if element == 'O':
return "O=O"
elif element == 'N':
return "N#N"
elif element == 'H':
return "[H][H]"
else:
return f"[{element}]"
return None # Could not convert
def _render_reaction_from_nist(reaction_text: str) -> str | None:
"""Render a reaction from NIST format to SVG using RDKit."""
reaction_text = (reaction_text or "").strip()
if not reaction_text:
return None
# Try to convert NIST reaction format to SMILES
smiles_reaction = None
# Handle different NIST reaction formats
separators = [" → ", " -> ", " ↔ ", " ⇌ ", " →", " ->", " ⇌"]
parts = None
for sep in separators:
if sep in reaction_text:
parts = reaction_text.split(sep, 1)
break
if parts and len(parts) == 2:
reactants_text = parts[0].strip()
products_text = parts[1].strip()
# Split reactants and products by " + "
reactants = [r.strip() for r in reactants_text.split(" + ") if r.strip()]
products = [p.strip() for p in products_text.split(" + ") if p.strip()]
# Convert each compound to SMILES
reactant_smiles = []
product_smiles = []
for reactant in reactants:
smiles = _nist_formula_to_smiles(reactant)
if smiles:
reactant_smiles.append(smiles)
for product in products:
smiles = _nist_formula_to_smiles(product)
if smiles:
product_smiles.append(smiles)
# Only proceed if we have at least one reactant and one product
if reactant_smiles and product_smiles:
reactants_smiles_str = ".".join(reactant_smiles)
products_smiles_str = ".".join(product_smiles)
smiles_reaction = f"{reactants_smiles_str}>>{products_smiles_str}"
# If we couldn't parse it with separators, try using it directly
if not smiles_reaction:
if ">>" in reaction_text:
smiles_reaction = reaction_text
else:
# Last resort: try to clean the entire reaction text
cleaned = _clean_chemical_formula(reaction_text)
if ">>" in cleaned:
smiles_reaction = cleaned
if not smiles_reaction:
return None
try:
# Try parsing as SMILES reaction first
reaction = rdChemReactions.ReactionFromSmarts(smiles_reaction, useSmiles=True)
if reaction is None:
# Fall back to SMARTS parsing
reaction = rdChemReactions.ReactionFromSmarts(smiles_reaction, useSmiles=False)
except Exception as exc:
print(f"RDKit parsing error for '{smiles_reaction}': {exc}")
return None
if reaction is None or (reaction.GetNumReactantTemplates() == 0 and reaction.GetNumProductTemplates() == 0):
return None
try:
# Generate SVG with specified parameters
svg = Draw.ReactionToImage(reaction, subImgSize=(200, 200), useSVG=True, drawOptions=None, returnPNG=False)
except Exception as exc:
print(f"Error rendering reaction '{smiles_reaction}': {exc}")
return None
if isinstance(svg, tuple):
svg = svg[0]
if hasattr(svg, "data"):
svg = svg.data
if isinstance(svg, bytes):
svg = svg.decode("utf-8", errors="ignore")
if not isinstance(svg, str) or "