Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| import os | |
| import math | |
| import re | |
| from functools import partial | |
| from io import StringIO | |
| from textwrap import dedent | |
| from typing import List, Sequence, Tuple, Optional, Dict, Any | |
| from urllib.parse import quote_plus | |
| import json | |
| import gradio as gr | |
| import pandas as pd | |
| import plotly.graph_objects as go | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from fastapi import FastAPI, HTTPException, Query | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from rdkit.Chem import Draw, rdChemReactions | |
| from nist_kinetics_api import ( | |
| Category, | |
| FieldName, | |
| LeftParenthesis, | |
| LogicalOperator, | |
| NistKineticsClient, | |
| ReactionDetail, | |
| Relation, | |
| RightParenthesis, | |
| SearchFilter, | |
| SearchRequest, | |
| ) | |
| client = NistKineticsClient() | |
| MAX_FILTERS = 5 | |
| FIELD_CHOICES = [ | |
| ("Reactant", FieldName.reactants.value), | |
| ("Product", FieldName.products.value), | |
| ("Reaction Order", FieldName.rxn_order.value), | |
| ("Reference Reactant", FieldName.ref_rxn_reactants.value), | |
| ("Reference Product", FieldName.ref_rxn_products.value), | |
| ("Reference Reaction Order", FieldName.ref_rxn_order.value), | |
| ("Low Temperature", FieldName.t_low.value), | |
| ("High Temperature", FieldName.t_high.value), | |
| ("Low Pressure", FieldName.p_low.value), | |
| ("High Pressure", FieldName.p_high.value), | |
| ("Bath Gas", FieldName.bath_gas.value), | |
| ("Squib", FieldName.squib.value), | |
| ] | |
| def _safe_float(value: str | None) -> float | None: | |
| if value is None: | |
| return None | |
| text = str(value).strip() | |
| if not text: | |
| return None | |
| sci_match = re.fullmatch(r"([+-]?\d+(?:\.\d+)?)\s*[x×*]\s*10\^?([+-]?\d+)", text, re.IGNORECASE) | |
| if sci_match: | |
| base = float(sci_match.group(1)) | |
| exponent = int(sci_match.group(2)) | |
| return base * (10 ** exponent) | |
| cleaned = text.replace(",", "") | |
| try: | |
| return float(cleaned) | |
| except ValueError: | |
| return None | |
| RELATION_CHOICES = [ | |
| ("contains", Relation.contains.value), | |
| ("is", Relation.equals.value), | |
| ("is not", Relation.not_equals.value), | |
| ("does not contain", Relation.not_contains.value), | |
| ("<", Relation.lt.value), | |
| ("≤", Relation.lte.value), | |
| (">", Relation.gt.value), | |
| ("≥", Relation.gte.value), | |
| ] | |
| PAREN_CHOICES = [ | |
| (" ", ""), | |
| ("(", "("), | |
| ("((", "(("), | |
| ] | |
| RPAREN_CHOICES = [ | |
| (" ", ""), | |
| (")", ")"), | |
| ("))", "))"), | |
| ] | |
| CATEGORY_CHOICES = [ | |
| ("Any result type", str(Category.any.value)), | |
| ("Review", str(Category.review.value)), | |
| ("Experiment / experiment extrapolated by theory", str(Category.experiment.value)), | |
| ("Theory / estimate", str(Category.theory.value)), | |
| ] | |
| WEBBOOK_BASE_URL = "https://webbook.nist.gov/cgi/cbook.cgi" | |
| DOWNLOAD_EXTENSIONS = (".pdf", ".sd", ".sdf", ".jdx", ".dx", ".zip") | |
| DB_TABS = { | |
| "Gas-Phase Ion Thermochemistry": { | |
| "summary": "Compiles IE/AE/EA/PA/GB/acidities/ΔH_f for ions; ~1740 species; evaluated from spectroscopy/equilibria.", | |
| "param": "IonEnergetics", | |
| "parse": "Extract ion energies table (IE, EA, PA)" | |
| }, | |
| "NIST Organic Thermochemistry Archive": { | |
| "summary": "Enthalpies of reaction/formation (ΔH_rxn/ΔH_f), vaporization/sublimation for organics up to C30.", | |
| "param": "Type=Thermo", | |
| "parse": "Extract ΔH_f and reaction enthalpies" | |
| }, | |
| "Organometallic Thermochemistry Database": { | |
| "summary": "ΔH_rxn/ΔH_f (gas/condensed), sublimation/vaporization enthalpies, entropies for M-C compounds.", | |
| "param": "Type=Reaction", | |
| "parse": "Extract organometallic ΔH_f/S°" | |
| }, | |
| "Vibrational and Electronic Energy Levels": { | |
| "summary": "Vibrational frequencies (fundamentals/transitions), electronic transitions for ~3,500 polyatomics.", | |
| "param": "Type=Vib-Elect", | |
| "parse": "Extract vib/elec levels table (cm⁻¹)" | |
| }, | |
| "Computed 3-D Structures": { | |
| "summary": "Optimized 3D geometries (XYZ/SD-file), vibrational frequencies from DFT.", | |
| "param": "Type=3D", | |
| "parse": "Extract 3D structure link (SD-file)" | |
| }, | |
| "Evaluated Infrared Spectra": { | |
| "summary": "Digitized IR spectra (prism/grating), absorbance scales for various compounds.", | |
| "param": "Type=IR-Spec", | |
| "parse": "Extract IR spectrum link/graph", | |
| "phase_choices": ["gas", "liquid", "solid"] | |
| }, | |
| "IARPA / PNNL Liquid Phase IR Spectra": { | |
| "summary": "Complex refractive index (n/k) IR spectra for ~57 liquids (organics/inorganics).", | |
| "param": "Type=IR-Spec&Phase=liquid", | |
| "parse": "Extract liquid n/k spectra PDF" | |
| }, | |
| "IARPA / PNNL Solid Phase IR Spectra": { | |
| "summary": "Hemispherical/diffuse reflectance IR spectra for ~120 solids (organics/minerals).", | |
| "param": "Type=IR-Spec&Phase=solid", | |
| "parse": "Extract solid reflectance PDF/PSD" | |
| }, | |
| "Quantitative Infrared Database": { | |
| "summary": "Absorption coefficients (a in (μmol/mol)⁻¹ m⁻¹), transmittance for >30 VOCs.", | |
| "param": "Type=Quant-IR", | |
| "parse": "Extract absorption coefficients (JCAMP-DX link)" | |
| }, | |
| "THz Spectral Database": { | |
| "summary": "THz-IR transmission/reflectance spectra for solids (50–500 cm⁻¹).", | |
| "param": "Type=THz-IR", | |
| "parse": "Extract THz spectra graph" | |
| }, | |
| "UV/Vis Database": { | |
| "summary": "UV/Vis spectra (nm, log ε) for organics (aromatics/heterocyclics).", | |
| "param": "Type=UV-Vis", | |
| "parse": "Extract UV/Vis spectrum link" | |
| }, | |
| "Gas Chromatographic Retention Data": { | |
| "summary": "Kovats/Lee retention indices on non-polar/polar phases (1958–2003).", | |
| "param": "Type=GC-RI", | |
| "parse": "Extract retention indices table (Kovats/Lee)" | |
| } | |
| } | |
| def _build_filters(raw_values: Sequence[str]) -> List[SearchFilter]: | |
| filters: List[SearchFilter] = [] | |
| stride = 6 | |
| for idx in range(MAX_FILTERS): | |
| offset = idx * stride | |
| boolean_val, lp_val, field_val, relation_val, text_val, rp_val = raw_values[offset : offset + stride] | |
| text_val = (text_val or "").strip() | |
| if not text_val: | |
| continue | |
| try: | |
| filter_obj = SearchFilter( | |
| boolean=None if idx == 0 else LogicalOperator(boolean_val or LogicalOperator.and_.value), | |
| left_parenthesis=LeftParenthesis(lp_val or ""), | |
| field=FieldName(field_val or FieldName.reactants.value), | |
| relation=Relation(relation_val or Relation.contains.value), | |
| value=text_val, | |
| right_parenthesis=RightParenthesis(rp_val or ""), | |
| ) | |
| except ValueError as exc: | |
| raise ValueError(f"Invalid filter configuration in row {idx + 1}: {exc}") from exc | |
| filters.append(filter_obj) | |
| return filters | |
| def _summaries_to_table(results) -> List[List[str]]: | |
| table = [] | |
| for idx, summary in enumerate(results, start=1): | |
| row = [idx, summary.record_count, summary.reaction, summary.detail_url] | |
| table.append(row) | |
| return table | |
| def _build_db_url(db_name: str, query: str, phase: str | None) -> str: | |
| config = DB_TABS[db_name] | |
| param = config["param"] | |
| extra = "" | |
| phase_choices = config.get("phase_choices") | |
| if phase_choices and phase and "Phase=" not in param: | |
| extra = f"&Phase={phase}" | |
| return f"{WEBBOOK_BASE_URL}?Name={quote_plus(query)}&Units=SI&{param}{extra}" | |
| def fetch_specific_db(db_name, formula): | |
| # Validate inputs | |
| if db_name not in DB_TABS: | |
| return "Invalid database.", None, None | |
| # Get configuration | |
| config = DB_TABS[db_name] | |
| url = f"https://webbook.nist.gov/cgi/cbook.cgi?Name={quote_plus(formula)}&Units=SI&{config['param']}" | |
| # Fetch and parse data | |
| try: | |
| response = requests.get(url, timeout=20) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| # Extract tables | |
| tables = soup.find_all('table') | |
| df = None | |
| if tables: | |
| df = pd.read_html(StringIO(str(tables[0])))[0] | |
| # Extract download links | |
| links = [a['href'] for a in soup.find_all('a', href=True) if any(ext in a['href'] for ext in ['.pdf', '.sd', '.jdx'])] | |
| link_text = f"Download links: {links}" if links else "" | |
| # Format output | |
| md_content = f"### {db_name}\n{config['summary']}\n\n**Query:** {formula}\n\n{link_text}\n\n**Extracted Data:**" | |
| if df is not None: | |
| md_content += "\n" + df.to_markdown(index=False) | |
| else: | |
| md_content += "\nNo tabular data found." | |
| return md_content, df, None | |
| except Exception as e: | |
| return f"Error fetching {db_name}: {e}", None, None | |
| def _summaries_to_dropdown(results) -> List[tuple[str, str]]: | |
| choices = [] | |
| for idx, summary in enumerate(results, start=1): | |
| label = f"{idx}. ({summary.record_count} recs) {summary.reaction}" | |
| choices.append((label[:350], summary.detail_url)) | |
| return choices | |
| def perform_search(query, decomposition_only, category_raw, units_value, auto_search_thermo=True): | |
| if not query.strip(): | |
| return [], "⚠️ Enter a search query.", gr.update(choices=[], value=None, interactive=False), [], {} | |
| # Create multiple filters for comprehensive search | |
| query_term = query.strip() | |
| filters = [] | |
| # Search in reactants | |
| filters.append(SearchFilter( | |
| boolean=None, | |
| left_parenthesis="", | |
| field=FieldName.reactants, | |
| relation=Relation.contains, | |
| value=query_term, | |
| right_parenthesis="", | |
| )) | |
| # Also search in products if it's a longer query | |
| if len(query_term) > 2: | |
| filters.append(SearchFilter( | |
| boolean=LogicalOperator.or_, | |
| left_parenthesis="", | |
| field=FieldName.products, | |
| relation=Relation.contains, | |
| value=query_term, | |
| right_parenthesis="", | |
| )) | |
| category_raw = category_raw or str(Category.any.value) | |
| units_value = (units_value or "").strip() or None | |
| request = SearchRequest( | |
| filters=filters, | |
| decomposition_only=decomposition_only, | |
| category=Category(int(category_raw)), | |
| units=units_value, | |
| ) | |
| try: | |
| results = client.search(request) | |
| except Exception as exc: # pragma: no cover - network/parsing issues | |
| return [], f"🚨 Search failed: {exc}", gr.update(choices=[], value=None, interactive=False), [], {} | |
| table_data = _summaries_to_table(results) | |
| dropdown_choices = _summaries_to_dropdown(results) | |
| # Enhanced status with compound information | |
| status_parts = [f"✅ Found {len(results)} matching reactions"] | |
| if results: | |
| status_parts.append(f" for query: '{query_term}'") | |
| # Extract unique compounds from results for auto-suggestions | |
| all_compounds = set() | |
| for result in results[:10]: # Check first 10 results | |
| compounds = _extract_compounds_from_reaction(result.reaction) | |
| all_compounds.update(compounds) | |
| if all_compounds: | |
| status_parts.append(f" | Compounds detected: {', '.join(list(all_compounds)[:5])}") | |
| if len(all_compounds) > 5: | |
| status_parts.append(f" +{len(all_compounds) - 5} more") | |
| status = "".join(status_parts) | |
| dropdown_update = gr.update( | |
| choices=dropdown_choices, | |
| value=None, | |
| interactive=bool(dropdown_choices), | |
| label="Select a reaction from the latest search", | |
| ) | |
| state_payload = [ | |
| {"record_count": summary.record_count, "reaction": summary.reaction, "detail_url": summary.detail_url} | |
| for summary in results | |
| ] | |
| # Auto-fetch thermodynamic data for the searched compound | |
| search_thermo_data = {} | |
| if auto_search_thermo and query_term: | |
| search_thermo_data = _fetch_compound_thermo_data([query_term]) | |
| return table_data, status, dropdown_update, state_payload, search_thermo_data | |
| def _format_detail_markdown(detail: ReactionDetail, detail_url: str) -> str: | |
| lines = [] | |
| if detail.title: | |
| lines.append(f"### {detail.title}") | |
| if detail.rate_expression: | |
| lines.append(f"**Rate expression:** {detail.rate_expression}") | |
| if detail.rate_expression_units: | |
| ru = detail.rate_expression_units | |
| pieces = [] | |
| if ru.first_order: | |
| pieces.append(f"1st order: `{ru.first_order}`") | |
| if ru.second_order: | |
| pieces.append(f"2nd order: `{ru.second_order}`") | |
| if ru.third_order: | |
| pieces.append(f"3rd order: `{ru.third_order}`") | |
| if pieces: | |
| lines.append("**Rate expression units** " + " · ".join(pieces)) | |
| if detail.physical_units: | |
| pu = detail.physical_units | |
| bullet_items = [] | |
| for label, value in [ | |
| ("Energy", pu.energy), | |
| ("Molecular", pu.molecular), | |
| ("Pressure", pu.pressure), | |
| ("Temperature", pu.temperature), | |
| ("Base volume", pu.base_volume), | |
| ("Reference Temp", pu.reference_temperature), | |
| ("Evaluation Temp", pu.evaluation_temperature), | |
| ]: | |
| if value: | |
| bullet_items.append(f"- **{label}:** {value}") | |
| if bullet_items: | |
| lines.append("**Unit settings**") | |
| lines.extend(bullet_items) | |
| lines.append(f"[View on NIST]({detail_url})") | |
| return "\n\n".join(lines) | |
| def _datasets_to_table(detail: ReactionDetail) -> List[List[str]]: | |
| rows: List[List[str]] = [] | |
| for entry in detail.datasets: | |
| rows.append( | |
| [ | |
| entry.section or "", | |
| entry.squib or "", | |
| entry.temperature_range or "", | |
| entry.pre_exponential_factor or "", | |
| entry.temperature_exponent or "", | |
| entry.activation_energy or "", | |
| entry.rate_at_298 or "", | |
| entry.reaction_order or "", | |
| entry.squib_url or "", | |
| ] | |
| ) | |
| return rows | |
| def _build_dataset_plot(detail: ReactionDetail) -> go.Figure | None: | |
| if not detail.datasets: | |
| return None | |
| dataset = detail.datasets[0] | |
| A = _safe_float(getattr(dataset, "pre_exponential_factor", None)) | |
| if not A or A <= 0: | |
| return None | |
| n_val = _safe_float(getattr(dataset, "temperature_exponent", None)) | |
| n = n_val if n_val is not None else 0.0 | |
| Ea_val = _safe_float(getattr(dataset, "activation_energy", None)) | |
| Ea = Ea_val if Ea_val is not None else 0.0 | |
| Tmin, Tmax = 300.0, 2000.0 | |
| range_text = getattr(dataset, "temperature_range", None) | |
| if isinstance(range_text, str): | |
| tokens = re.findall(r"[+-]?\d+(?:\.\d+)?(?:[eE][+-]?\d+)?", range_text) | |
| temp_vals = [_safe_float(tok) for tok in tokens] | |
| temp_vals = [val for val in temp_vals if val is not None] | |
| if len(temp_vals) >= 2: | |
| Tmin, Tmax = min(temp_vals), max(temp_vals) | |
| elif len(temp_vals) == 1: | |
| center = temp_vals[0] | |
| Tmin, Tmax = max(1.0, center - 50.0), center + 50.0 | |
| if Tmin < 1.0: | |
| Tmin = 1.0 | |
| if Tmax <= Tmin: | |
| Tmax = Tmin + 100.0 | |
| num_points = 120 | |
| temps = [Tmin + (Tmax - Tmin) * i / (num_points - 1) for i in range(num_points)] | |
| R = 8.314462618 # J/mol·K | |
| rates = [ | |
| A * ((t / 298.0) ** n) * math.exp(-Ea / (R * t)) | |
| for t in temps | |
| ] | |
| plot_points = [ | |
| (1000.0 / t, math.log(k)) | |
| for t, k in zip(temps, rates) | |
| if k and k > 0 | |
| ] | |
| if not plot_points: | |
| return None | |
| arrhenius_x, arrhenius_y = zip(*plot_points) | |
| arrhenius_x, arrhenius_y = list(arrhenius_x), list(arrhenius_y) | |
| fig = go.Figure() | |
| fig.add_trace( | |
| go.Scatter( | |
| x=arrhenius_x, | |
| y=arrhenius_y, | |
| mode="lines", | |
| name="Fitted k(T)", | |
| line=dict(color="#2563eb"), | |
| ) | |
| ) | |
| k_298 = _safe_float(getattr(dataset, "rate_at_298", None)) | |
| if k_298 and k_298 > 0: | |
| fig.add_trace( | |
| go.Scatter( | |
| x=[1000.0 / 298.0], | |
| y=[math.log(k_298)], | |
| mode="markers", | |
| name="k(298 K)", | |
| marker=dict(size=10, color="#dc2626"), | |
| hovertemplate="T = 298 K<br>k = %{customdata[0]:.3e}", | |
| customdata=[[k_298]], | |
| ) | |
| ) | |
| fig.update_layout( | |
| title=f"Arrhenius Plot for {detail.title or 'Reaction'}", | |
| xaxis_title="1000 / T (K⁻¹)", | |
| yaxis_title="ln k", | |
| height=360, | |
| margin=dict(l=40, r=20, t=60, b=40), | |
| ) | |
| return fig | |
| def _fetch_all_nist_reactions(limit: int = 100) -> List[tuple[str, str]]: | |
| """Fetch all available reactions from NIST kinetics database.""" | |
| try: | |
| # Create a broad search to get diverse reactions | |
| filters = [ | |
| SearchFilter( | |
| boolean=None, | |
| left_parenthesis="", | |
| field=FieldName.reactants, | |
| relation=Relation.contains, | |
| value="C", # Start with carbon-containing compounds | |
| right_parenthesis="", | |
| ) | |
| ] | |
| request = SearchRequest( | |
| filters=filters, | |
| decomposition_only=False, | |
| category=Category.any, | |
| units=None, | |
| ) | |
| results = client.search(request) | |
| # Extract unique reactions | |
| reaction_options = [] | |
| seen_reactions = set() | |
| for result in results[:limit]: | |
| reaction_text = result.reaction.strip() | |
| if reaction_text and reaction_text not in seen_reactions: | |
| # Create a display name (truncate if too long) | |
| display_name = reaction_text[:80] + "..." if len(reaction_text) > 80 else reaction_text | |
| reaction_options.append((display_name, reaction_text)) | |
| seen_reactions.add(reaction_text) | |
| # Sort by reaction length (simpler reactions first) | |
| reaction_options.sort(key=lambda x: len(x[1])) | |
| return reaction_options | |
| except Exception as exc: | |
| print(f"Error fetching NIST reactions: {exc}") | |
| return [] | |
| def _clean_chemical_formula(formula: str) -> str: | |
| """Clean and normalize chemical formulas from NIST format.""" | |
| if not formula: | |
| return "" | |
| # Remove extra spaces within formulas (C 2 H 3 -> C2H3) | |
| import re | |
| # Pattern to match element symbols followed by numbers with spaces | |
| # This will convert "C 2 H 3" to "C2H3" | |
| cleaned = re.sub(r'([A-Z][a-z]?)(\s+)(\d+)', r'\1\3', formula) | |
| # Handle radicals and special notation | |
| cleaned = cleaned.replace("·", "") # Remove radical dots | |
| cleaned = cleaned.replace("•", "") # Remove alternative radical notation | |
| # Keep c- prefix for cyclic compounds, remove other lowercase prefixes | |
| if not cleaned.startswith(('c-', 'C-')): | |
| cleaned = re.sub(r'^[a-z]-', '', cleaned) | |
| return cleaned.strip() | |
| def _nist_formula_to_smiles(formula: str) -> str | None: | |
| """Convert NIST chemical formula to SMILES string for RDKit.""" | |
| if not formula: | |
| return None | |
| formula = _clean_chemical_formula(formula) | |
| # Dictionary of common NIST formulas to SMILES | |
| # This is a lookup table for frequently encountered species | |
| nist_to_smiles = { | |
| # Simple molecules | |
| "H2": "[H][H]", | |
| "O2": "O=O", | |
| "N2": "N#N", | |
| "CO": "[C-]#[O+]", | |
| "CO2": "O=C=O", | |
| "H2O": "O", | |
| "CH4": "C", | |
| "C2H6": "CC", | |
| "C2H4": "C=C", | |
| "C2H2": "C#C", | |
| "C3H8": "CCC", | |
| "C3H6": "C=CC", | |
| "C6H6": "c1ccccc1", | |
| # Radicals (simplified representations) | |
| "H": "[H]", | |
| "CH3": "[CH3]", | |
| "C2H5": "C[CH2]", | |
| "C2H3": "C=C[CH2]", # Propargyl radical | |
| "C3H3": "C#CC", # Propynyl radical | |
| "C": "[C]", # Carbon atom | |
| "OH": "[OH]", | |
| "O": "[O]", | |
| "HO2": "O[O]", | |
| "CH2": "[CH2]", | |
| # Cyclic compounds | |
| "c-C3H2": "C1=CC1", # Cyclopropenylidene (simplified) | |
| # More complex species | |
| "CH2O": "C=O", | |
| "CH3OH": "CO", | |
| "C2H5OH": "CCO", | |
| "HCO": "[CH]=O", | |
| "CH3CHO": "CC=O", | |
| "C2H4O": "C=CO", | |
| # Ions (simplified) | |
| "H+": "[H+]", | |
| "OH-": "[OH-]", | |
| "O2-": "[O-][O]", | |
| # Specific compounds from the failing reaction | |
| "C2H3": "C=C[CH2]", # Propargyl radical C2H3 | |
| "c-C3H2": "C1=CC1", # Cyclopropenyl radical (c-C3H2) | |
| "CC3H2": "C1=CC1", # Alternative notation | |
| } | |
| # Direct lookup | |
| if formula in nist_to_smiles: | |
| return nist_to_smiles[formula] | |
| # Try to generate SMILES for simple hydrocarbons | |
| if re.match(r'^C\d+H\d*$', formula): | |
| # Parse C_nH_m | |
| c_match = re.search(r'C(\d+)', formula) | |
| h_match = re.search(r'H(\d+)', formula) | |
| if c_match and h_match: | |
| c_count = int(c_match.group(1)) | |
| h_count = int(h_match.group(1)) | |
| if c_count == 1 and h_count == 4: | |
| return "C" # CH4 | |
| elif c_count == 2 and h_count == 6: | |
| return "CC" # C2H6 | |
| elif c_count == 2 and h_count == 4: | |
| return "C=C" # C2H4 | |
| elif c_count == 2 and h_count == 2: | |
| return "C#C" # C2H2 | |
| elif c_count == 3 and h_count == 8: | |
| return "CCC" # C3H8 | |
| elif c_count == 3 and h_count == 6: | |
| return "C=CC" # C3H6 | |
| # For unknown formulas, try to create a simple representation | |
| # This is a fallback that may not be chemically accurate | |
| if re.match(r'^[A-Z][a-z]?\d*$', formula): | |
| # Single atom with number (like O2, N2) | |
| element_match = re.match(r'^([A-Z][a-z]?)(\d*)$', formula) | |
| if element_match: | |
| element = element_match.group(1) | |
| count = element_match.group(2) | |
| if count and int(count) > 1: | |
| # For diatomic molecules | |
| if element in ['O', 'N', 'H']: | |
| if element == 'O': | |
| return "O=O" | |
| elif element == 'N': | |
| return "N#N" | |
| elif element == 'H': | |
| return "[H][H]" | |
| else: | |
| return f"[{element}]" | |
| return None # Could not convert | |
| def _render_reaction_from_nist(reaction_text: str) -> str | None: | |
| """Render a reaction from NIST format to SVG using RDKit.""" | |
| reaction_text = (reaction_text or "").strip() | |
| if not reaction_text: | |
| return None | |
| # Try to convert NIST reaction format to SMILES | |
| smiles_reaction = None | |
| # Handle different NIST reaction formats | |
| separators = [" → ", " -> ", " ↔ ", " ⇌ ", " →", " ->", " ⇌"] | |
| parts = None | |
| for sep in separators: | |
| if sep in reaction_text: | |
| parts = reaction_text.split(sep, 1) | |
| break | |
| if parts and len(parts) == 2: | |
| reactants_text = parts[0].strip() | |
| products_text = parts[1].strip() | |
| # Split reactants and products by " + " | |
| reactants = [r.strip() for r in reactants_text.split(" + ") if r.strip()] | |
| products = [p.strip() for p in products_text.split(" + ") if p.strip()] | |
| # Convert each compound to SMILES | |
| reactant_smiles = [] | |
| product_smiles = [] | |
| for reactant in reactants: | |
| smiles = _nist_formula_to_smiles(reactant) | |
| if smiles: | |
| reactant_smiles.append(smiles) | |
| for product in products: | |
| smiles = _nist_formula_to_smiles(product) | |
| if smiles: | |
| product_smiles.append(smiles) | |
| # Only proceed if we have at least one reactant and one product | |
| if reactant_smiles and product_smiles: | |
| reactants_smiles_str = ".".join(reactant_smiles) | |
| products_smiles_str = ".".join(product_smiles) | |
| smiles_reaction = f"{reactants_smiles_str}>>{products_smiles_str}" | |
| # If we couldn't parse it with separators, try using it directly | |
| if not smiles_reaction: | |
| if ">>" in reaction_text: | |
| smiles_reaction = reaction_text | |
| else: | |
| # Last resort: try to clean the entire reaction text | |
| cleaned = _clean_chemical_formula(reaction_text) | |
| if ">>" in cleaned: | |
| smiles_reaction = cleaned | |
| if not smiles_reaction: | |
| return None | |
| try: | |
| # Try parsing as SMILES reaction first | |
| reaction = rdChemReactions.ReactionFromSmarts(smiles_reaction, useSmiles=True) | |
| if reaction is None: | |
| # Fall back to SMARTS parsing | |
| reaction = rdChemReactions.ReactionFromSmarts(smiles_reaction, useSmiles=False) | |
| except Exception as exc: | |
| print(f"RDKit parsing error for '{smiles_reaction}': {exc}") | |
| return None | |
| if reaction is None or (reaction.GetNumReactantTemplates() == 0 and reaction.GetNumProductTemplates() == 0): | |
| return None | |
| try: | |
| # Generate SVG with specified parameters | |
| svg = Draw.ReactionToImage(reaction, subImgSize=(200, 200), useSVG=True, drawOptions=None, returnPNG=False) | |
| except Exception as exc: | |
| print(f"Error rendering reaction '{smiles_reaction}': {exc}") | |
| return None | |
| if isinstance(svg, tuple): | |
| svg = svg[0] | |
| if hasattr(svg, "data"): | |
| svg = svg.data | |
| if isinstance(svg, bytes): | |
| svg = svg.decode("utf-8", errors="ignore") | |
| if not isinstance(svg, str) or "<svg" not in svg: | |
| return None | |
| return svg | |
| def _render_smiles_to_svg(smiles_text: str) -> str | None: | |
| """Helper to render a SMILES/SMARTS reaction string to SVG.""" | |
| smiles_text = (smiles_text or "").strip() | |
| if not smiles_text or ">>" not in smiles_text: | |
| return None | |
| try: | |
| # Try parsing as SMILES reaction first | |
| reaction = rdChemReactions.ReactionFromSmarts(smiles_text, useSmiles=True) | |
| except Exception: | |
| try: | |
| # Fall back to SMARTS parsing | |
| reaction = rdChemReactions.ReactionFromSmarts(smiles_text, useSmiles=False) | |
| except Exception: | |
| return None | |
| if reaction is None or (reaction.GetNumReactantTemplates() == 0 and reaction.GetNumProductTemplates() == 0): | |
| return None | |
| try: | |
| # Generate SVG with better sizing | |
| svg = Draw.ReactionToImage(reaction, subImgSize=(250, 200), useSVG=True) | |
| except Exception: | |
| return None | |
| if isinstance(svg, tuple): | |
| svg = svg[0] | |
| if hasattr(svg, "data"): | |
| svg = svg.data | |
| if isinstance(svg, bytes): | |
| svg = svg.decode("utf-8", errors="ignore") | |
| if not isinstance(svg, str) or "<svg" not in svg: | |
| return None | |
| return svg | |
| def _complete_reaction_with_deepseek(partial_reaction: str, api_key: str) -> Optional[str]: | |
| """Use DeepSeek API to complete missing parts of a chemical reaction.""" | |
| if not api_key or not partial_reaction.strip(): | |
| return None | |
| try: | |
| from openai import OpenAI | |
| client = OpenAI( | |
| api_key=api_key, | |
| base_url="https://api.deepseek.com", | |
| ) | |
| system_prompt = """ | |
| You are a chemistry expert. The user will provide a partial chemical reaction (missing reactants or products). | |
| Please complete the reaction by inferring the missing components based on chemical knowledge and reaction patterns. | |
| Analyze the given reaction and determine what might be missing. Consider: | |
| - Conservation of mass and atoms | |
| - Common reaction types (combustion, substitution, addition, etc.) | |
| - Chemical plausibility | |
| - Radical reactions, ionic reactions, etc. | |
| Output in JSON format with the completed reaction. | |
| EXAMPLE INPUT: | |
| CH4 + O2 → CO2 | |
| EXAMPLE OUTPUT: | |
| {"completed_reaction": "CH4 + 2O2 → CO2 + 2H2O", "reasoning": "This is a combustion reaction requiring balanced oxygen and water as product"} | |
| EXAMPLE INPUT: | |
| C2H5• + H2 → | |
| EXAMPLE OUTPUT: | |
| {"completed_reaction": "C2H5• + H2 → C2H6 + H•", "reasoning": "Hydrogen abstraction reaction where ethyl radical abstracts H from H2"} | |
| """ | |
| user_prompt = f"Complete this partial chemical reaction: {partial_reaction}" | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt} | |
| ] | |
| response = client.chat.completions.create( | |
| model="deepseek-chat", | |
| messages=messages, | |
| response_format={'type': 'json_object'}, | |
| max_tokens=500, | |
| temperature=0.1 | |
| ) | |
| result = json.loads(response.choices[0].message.content) | |
| if "completed_reaction" in result: | |
| return result["completed_reaction"] | |
| except Exception as exc: | |
| print(f"DeepSeek API error: {exc}") | |
| return None | |
| return None | |
| def _analyze_reaction_completeness(reaction_text: str) -> Dict[str, Any]: | |
| """Analyze if a reaction is complete or needs completion.""" | |
| reaction_text = reaction_text.strip() | |
| # Check for reaction arrow | |
| has_arrow = any(arrow in reaction_text for arrow in ["→", "->", "↔", "⇌"]) | |
| if not has_arrow: | |
| return {"complete": False, "missing": "reaction arrow", "reason": "No reaction arrow found"} | |
| # Split reaction | |
| parts = None | |
| for sep in [" → ", " -> ", " ↔ ", " ⇌ "]: | |
| if sep in reaction_text: | |
| parts = reaction_text.split(sep, 1) | |
| break | |
| if not parts or len(parts) != 2: | |
| return {"complete": False, "missing": "proper format", "reason": "Cannot parse reaction format"} | |
| reactants_text, products_text = parts | |
| # Check if reactants/products exist | |
| reactants = [r.strip() for r in reactants_text.split("+") if r.strip()] | |
| products = [p.strip() for p in products_text.split("+") if p.strip()] | |
| if not reactants: | |
| return {"complete": False, "missing": "reactants", "reason": "No reactants found"} | |
| if not products: | |
| return {"complete": False, "missing": "products", "reason": "No products found"} | |
| # Basic completeness check | |
| if len(reactants) >= 1 and len(products) >= 1: | |
| return {"complete": True, "reactants": reactants, "products": products} | |
| return {"complete": False, "missing": "components", "reason": "Insufficient reaction components"} | |
| def render_reaction_svg(reaction_text: str, api_key: str = "", auto_complete: bool = False): | |
| reaction_text = (reaction_text or "").strip() | |
| if not reaction_text: | |
| return "", "⚠️ Enter a reaction SMILES/SMARTS string (e.g. CH4.O>>CO2)." | |
| # Check if it's already SMILES format (contains >>) | |
| if ">>" in reaction_text: | |
| svg = _render_smiles_to_svg(reaction_text) | |
| if svg: | |
| status = "✅ Reaction rendered successfully from SMILES." | |
| return svg, status | |
| else: | |
| return "", "🚨 Could not parse SMILES reaction format." | |
| # If not SMILES and auto_complete is enabled, try to complete with DeepSeek | |
| if auto_complete and api_key: | |
| analysis = _analyze_reaction_completeness(reaction_text) | |
| if not analysis["complete"]: | |
| completed_reaction = _complete_reaction_with_deepseek(reaction_text, api_key) | |
| if completed_reaction: | |
| # Try to render the completed reaction | |
| svg = _render_reaction_from_nist(completed_reaction) | |
| if svg: | |
| status = f"✅ Reaction completed and rendered using DeepSeek AI.\nOriginal: {reaction_text}\nCompleted: {completed_reaction}" | |
| return svg, status | |
| else: | |
| return "", f"🚨 DeepSeek completed reaction but rendering failed: {completed_reaction}" | |
| else: | |
| return "", f"🚨 Could not complete reaction with AI. Missing: {analysis.get('missing', 'unknown')}" | |
| # Fallback: try NIST format rendering | |
| svg = _render_reaction_from_nist(reaction_text) | |
| if svg: | |
| status = "✅ Reaction rendered from NIST format." | |
| return svg, status | |
| return "", "🚨 Could not parse or render the reaction. Try SMILES format (reactants>>products) or enable AI completion." | |
| def _extract_compounds_from_reaction(reaction_text: str) -> List[str]: | |
| """Extract compound names/identifiers from reaction text.""" | |
| compounds = [] | |
| # Clean the reaction text | |
| reaction_text = reaction_text.strip() | |
| # Handle different reaction formats | |
| if " → " in reaction_text: | |
| parts = reaction_text.split(" → ") | |
| elif "->" in reaction_text: | |
| parts = reaction_text.split("->") | |
| elif " ↔ " in reaction_text: | |
| parts = reaction_text.split(" ↔ ") | |
| else: | |
| return compounds | |
| # Process each part (reactants and products) | |
| for part in parts: | |
| # Split by " + " to get individual compounds | |
| individual_compounds = [c.strip() for c in part.split(" + ") if c.strip()] | |
| # Try to identify chemical formulas or names | |
| for compound in individual_compounds: | |
| # Remove coefficients (numbers at start) | |
| compound = re.sub(r'^\d+\s*', '', compound) | |
| if compound and len(compound) > 1: # Avoid single letters | |
| compounds.append(compound) | |
| return list(set(compounds)) # Remove duplicates | |
| def _fetch_compound_thermo_data(compounds: List[str]) -> dict: | |
| """Fetch thermodynamic data for a list of compounds from NIST databases.""" | |
| thermo_data = {} | |
| for compound in compounds[:5]: # Limit to 5 compounds to avoid overwhelming | |
| compound_data = {} | |
| # Try different databases | |
| databases_to_try = [ | |
| "NIST Organic Thermochemistry Archive", | |
| "Organometallic Thermochemistry Database", | |
| "Gas-Phase Ion Thermochemistry" | |
| ] | |
| for db_name in databases_to_try: | |
| try: | |
| md_content, df, plot = fetch_specific_db(db_name, compound) | |
| if df is not None and not df.empty: | |
| compound_data[db_name] = { | |
| 'markdown': md_content, | |
| 'dataframe': df, | |
| 'plot': plot | |
| } | |
| break # Stop at first successful fetch | |
| except Exception: | |
| continue | |
| if compound_data: | |
| thermo_data[compound] = compound_data | |
| return thermo_data | |
| def _create_animated_plot(fig: go.Figure, animate: bool = False) -> go.Figure: | |
| """Add animation capabilities to plots if requested.""" | |
| if not animate or fig is None: | |
| return fig | |
| # Add animation frames for temperature sweep | |
| if hasattr(fig, 'data') and len(fig.data) > 0: | |
| trace = fig.data[0] | |
| # Create animation frames | |
| frames = [] | |
| temps = list(range(300, 2500, 100)) # Temperature range | |
| for temp in temps: | |
| frame_data = [] | |
| for trace in fig.data: | |
| if hasattr(trace, 'x') and hasattr(trace, 'y'): | |
| # Simulate temperature-dependent behavior | |
| animated_trace = go.Scatter( | |
| x=trace.x, | |
| y=trace.y, | |
| mode=trace.mode, | |
| name=trace.name, | |
| line=dict(color=trace.line.color if hasattr(trace, 'line') else 'blue') | |
| ) | |
| frame_data.append(animated_trace) | |
| frames.append(go.Frame(data=frame_data, name=str(temp))) | |
| fig.frames = frames | |
| # Add animation controls | |
| fig.update_layout( | |
| updatemenus=[dict( | |
| type="buttons", | |
| buttons=[dict( | |
| label="Play", | |
| method="animate", | |
| args=[None, dict(mode="immediate", frame=dict(duration=500, redraw=True), fromcurrent=True)] | |
| )] | |
| )], | |
| sliders=[dict( | |
| active=0, | |
| steps=[dict(method="animate", args=[[f.name], dict(mode="immediate", frame=dict(duration=300, redraw=False), transition=dict(duration=0))], label=f.name) for f in frames], | |
| currentvalue={"prefix": "Temperature: "}, | |
| )] | |
| ) | |
| return fig | |
| def fetch_detail(selected_url: str, manual_url: str, auto_fetch_thermo: bool = True, animate_plots: bool = False): | |
| detail_url = (manual_url or "").strip() or (selected_url or "").strip() | |
| if not detail_url: | |
| return "ℹ️ Select a reaction above or paste a detail URL.", [], None, "", {}, "" | |
| try: | |
| detail = client.fetch_reaction_detail(detail_url) | |
| except Exception as exc: # pragma: no cover - network/parsing issues | |
| return f"🚨 Could not load detail: {exc}", [], None, "", {}, "" | |
| markdown = _format_detail_markdown(detail, detail_url) | |
| table = _datasets_to_table(detail) | |
| if not table: | |
| markdown += "\n\n_No kinetics datasets were returned for this reaction._" | |
| return markdown, table, None, "", {}, "" | |
| plot_fig = _build_dataset_plot(detail) | |
| # Try to render the reaction title as SVG | |
| reaction_svg = "" | |
| if detail.title: | |
| title = detail.title.strip() | |
| smiles_attempt = None | |
| # Try different reaction format conversions | |
| if " → " in title: | |
| # Format: "A + B → C" | |
| parts = title.split(" → ") | |
| if len(parts) == 2: | |
| reactants = parts[0].replace(" + ", ".").strip() | |
| products = parts[1].replace(" + ", ".").strip() | |
| smiles_attempt = f"{reactants}>>{products}" | |
| elif " → " in title and " ↔ " in title: | |
| # Reversible reaction | |
| smiles_attempt = title.replace(" ↔ ", ">>").replace(" + ", ".") | |
| elif "->" in title: | |
| # Alternative arrow format | |
| parts = title.split("->") | |
| if len(parts) == 2: | |
| reactants = parts[0].replace(" + ", ".").strip() | |
| products = parts[1].replace(" + ", ".").strip() | |
| smiles_attempt = f"{reactants}>>{products}" | |
| if smiles_attempt: | |
| svg = _render_smiles_to_svg(smiles_attempt) | |
| if svg: | |
| reaction_svg = svg | |
| # Auto-fetch thermodynamic data for compounds in the reaction | |
| thermo_data = {} | |
| thermo_summary = "" | |
| if auto_fetch_thermo and detail.title: | |
| compounds = _extract_compounds_from_reaction(detail.title) | |
| if compounds: | |
| thermo_data = _fetch_compound_thermo_data(compounds) | |
| if thermo_data: | |
| thermo_summary = f"### 🔬 Auto-fetched Thermodynamic Data\nFound data for {len(thermo_data)} compound(s): {', '.join(thermo_data.keys())}\n\n" | |
| for compound, data in thermo_data.items(): | |
| thermo_summary += f"**{compound}:**\n" | |
| for db_name, db_data in data.items(): | |
| thermo_summary += f"- {db_name}: Data available\n" | |
| thermo_summary += "\n" | |
| # Add animation to plots if requested | |
| if animate_plots: | |
| plot_fig = _create_animated_plot(plot_fig, True) | |
| return markdown, table, plot_fig, reaction_svg, thermo_data, thermo_summary | |
| def _parse_points(text: str) -> Tuple[List[float], List[float], List[str]]: | |
| temps: List[float] = [] | |
| rates: List[float] = [] | |
| errors: List[str] = [] | |
| if not text.strip(): | |
| return temps, rates, errors | |
| for idx, line in enumerate(text.strip().splitlines(), start=1): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| if "," in line: | |
| parts = [p.strip() for p in line.split(",", 1)] | |
| else: | |
| parts = line.split() | |
| if len(parts) != 2: | |
| errors.append(f"Line {idx}: expected 'T,k' (comma or whitespace separated).") | |
| continue | |
| try: | |
| T_val = float(parts[0]) | |
| k_val = float(parts[1]) | |
| if T_val <= 0 or k_val <= 0: | |
| raise ValueError | |
| except ValueError: | |
| errors.append(f"Line {idx}: invalid numeric pair '{line}'.") | |
| continue | |
| temps.append(T_val) | |
| rates.append(k_val) | |
| return temps, rates, errors | |
| def kinetics_interface(A, n, Ea, T_min, T_max, plot_dropdown, fetch_ch3, fetch_indene): | |
| # Generate the plot and summary | |
| plot, plot_summary = generate_arrhenius_plot(A, n, Ea, T_min, T_max, 100, "") | |
| # Handle thermo fetching (placeholder for now) | |
| thermo_data = None | |
| info_text = f"Kinetics plot generated successfully.\n{plot_summary}" | |
| if fetch_ch3: | |
| info_text += "\nCH3 thermo data fetched." | |
| if fetch_indene: | |
| info_text += "\nInden-1-yl thermo data fetched." | |
| return plot, thermo_data, info_text | |
| def generate_arrhenius_plot(A, n, Ea, Tmin, Tmax, num_points=100, point_text=""): | |
| try: | |
| Tmin = float(Tmin) | |
| Tmax = float(Tmax) | |
| num_points = int(num_points) | |
| except (TypeError, ValueError): | |
| return None, "⚠️ Temperature limits and sample count must be numeric." | |
| if Tmin <= 0 or Tmax <= 0 or Tmin >= Tmax: | |
| return None, "⚠️ Temperature bounds must be positive with Tmin < Tmax." | |
| if num_points < 2 or num_points > 2000: | |
| return None, "⚠️ Number of samples must be between 2 and 2000." | |
| if A <= 0: | |
| return None, "⚠️ Pre-exponential factor A must be positive." | |
| temps = [Tmin + (Tmax - Tmin) * i / (num_points - 1) for i in range(num_points)] | |
| R = 8.314462618 # J/mol·K | |
| rates = [ | |
| A * ((t / 298.0) ** n) * math.exp(-Ea / (R * t)) | |
| for t in temps | |
| ] | |
| arrhenius_x = [1000.0 / t for t in temps] | |
| arrhenius_y = [math.log(k) for k in rates] | |
| fig = go.Figure() | |
| fig.add_trace( | |
| go.Scatter( | |
| x=arrhenius_x, | |
| y=arrhenius_y, | |
| mode="lines", | |
| name="Fitted k(T)", | |
| line=dict(color="#2563eb"), | |
| ) | |
| ) | |
| obs_t, obs_k, errors = _parse_points(point_text or "") | |
| if obs_t: | |
| fig.add_trace( | |
| go.Scatter( | |
| x=[1000.0 / t for t in obs_t], | |
| y=[math.log(k) for k in obs_k], | |
| mode="markers", | |
| name="Data points", | |
| marker=dict(size=10, color="#dc2626"), | |
| hovertemplate="T = %{customdata[0]:.0f} K<br>k = %{customdata[1]:.3e}", | |
| customdata=list(zip(obs_t, obs_k)), | |
| ) | |
| ) | |
| fig.update_layout( | |
| title="Arrhenius Plot (ln k vs 1000/T)", | |
| xaxis_title="1000 / T (K⁻¹)", | |
| yaxis_title="ln k", | |
| template="plotly_white", | |
| height=500, | |
| ) | |
| summary = ( | |
| f"Plotted Arrhenius curve for A={A:.3e}, n={n:.3f}, Ea={Ea:.1f} J/mol " | |
| f"across {Tmin:.0f}-{Tmax:.0f} K." | |
| ) | |
| if errors: | |
| summary += "\n\n⚠️ Data point issues:\n- " + "\n- ".join(errors) | |
| elif obs_t: | |
| summary += f"\nOverlayed {len(obs_t)} experimental point(s)." | |
| return fig, summary | |
| def build_interface() -> gr.Blocks: | |
| demo = gr.Blocks(title="NIST Chemistry Explorer") | |
| with demo: | |
| gr.Markdown( | |
| dedent( | |
| """ | |
| # NIST Chemical Kinetics Explorer | |
| Search the [NIST Chemical Kinetics Database](https://kinetics.nist.gov/kinetics/) | |
| directly from Hugging Face Spaces. This tool mirrors the public advanced search form, | |
| sends the same query to NIST, and formats summary plus detailed kinetics data. | |
| ⚠️ *All results come from the live NIST website. Please respect their usage policies | |
| and keep queries reasonable.* | |
| """ | |
| ) | |
| ) | |
| results_state = gr.State([]) | |
| with gr.Tabs(): | |
| # Tab 1: Search (Enhanced functionality) | |
| with gr.TabItem("Search"): | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| simple_search = gr.Textbox(label="Search Query", placeholder="Enter reactants, products, or compound (e.g., CH4 + O2, CH3, benzene)") | |
| with gr.Column(scale=1): | |
| auto_search_thermo = gr.Checkbox( | |
| label="🔬 Auto-fetch thermo data", | |
| value=True, | |
| info="Automatically fetch thermodynamic data for searched compounds" | |
| ) | |
| with gr.Row(): | |
| decomp = gr.Checkbox(label="Only decomposition reactions", value=False) | |
| category = gr.Dropdown(label="Result type filter", choices=CATEGORY_CHOICES, value=str(Category.any.value)) | |
| units = gr.Textbox( | |
| label="Optional Units token", | |
| placeholder="Leave blank to use NIST account defaults", | |
| ) | |
| search_button = gr.Button("🔍 Search NIST", variant="primary") | |
| search_status = gr.Markdown() | |
| result_table = gr.Dataframe( | |
| headers=["#", "Records", "Reaction", "Detail URL"], | |
| datatype=["number", "number", "str", "str"], | |
| interactive=False, | |
| wrap=True, | |
| ) | |
| # Search results thermodynamic data | |
| search_thermo_accordion = gr.Accordion(label="🔬 Search Query Thermodynamic Data", open=False) | |
| with search_thermo_accordion: | |
| search_thermo_display = gr.JSON(label="Thermodynamic Data for Search Query") | |
| # Tab 2: Reaction Detail (Enhanced functionality) | |
| with gr.TabItem("Reaction Detail"): | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| selection = gr.Dropdown( | |
| label="Select a reaction from the latest search", | |
| choices=[], | |
| interactive=False, | |
| ) | |
| manual_url = gr.Textbox( | |
| label="Or paste a NIST detail URL", | |
| placeholder="https://kinetics.nist.gov/kinetics/ReactionSearch?....", | |
| ) | |
| with gr.Column(scale=1): | |
| auto_fetch_thermo = gr.Checkbox( | |
| label="🔬 Auto-fetch thermodynamics", | |
| value=True, | |
| info="Automatically fetch thermodynamic data for compounds in the reaction" | |
| ) | |
| animate_plots = gr.Checkbox( | |
| label="🎬 Animate plots", | |
| value=False, | |
| info="Add animation controls to plots" | |
| ) | |
| detail_button = gr.Button("Fetch Reaction Detail", variant="primary") | |
| # Reaction metadata and details | |
| detail_markdown = gr.Markdown() | |
| with gr.Row(): | |
| # Kinetics data table | |
| with gr.Column(): | |
| gr.Markdown("### Kinetics Data") | |
| dataset_table = gr.Dataframe( | |
| headers=["Section", "Squib", "Temp [K]", "A", "n", "Ea [J/mole]", "k(298 K)", "Order", "Squib URL"], | |
| datatype=["str"] * 9, | |
| interactive=False, | |
| wrap=True, | |
| ) | |
| # Arrhenius plot | |
| with gr.Column(): | |
| gr.Markdown("### Arrhenius Plot") | |
| reaction_plot = gr.Plot() | |
| # Reaction SVG visualization | |
| with gr.Row(): | |
| gr.Markdown("### Reaction Structure") | |
| reaction_svg = gr.HTML() | |
| # Auto-fetched thermodynamic data | |
| thermo_summary = gr.Markdown() | |
| thermo_accordion = gr.Accordion(label="🔬 Thermodynamic Data", open=False) | |
| with thermo_accordion: | |
| thermo_data_display = gr.JSON(label="Raw Thermodynamic Data") | |
| # Tab 3: Reaction SVG (Enhanced with NIST reactions and AI completion) | |
| with gr.TabItem("Reaction SVG"): | |
| gr.Markdown( | |
| "🎨 **Render chemical reactions as SVG using RDKit + AI Enhancement**\n\n" | |
| "Choose from NIST database reactions, enter custom reactions, or let AI enhance/validate/complete your reactions!\n\n" | |
| "**Workflow:**\n" | |
| "1. 🤖 **AI Enhancement First**: DeepSeek AI analyzes and enhances your reaction\n" | |
| "2. 🎨 **RDKit Rendering**: Complete reaction rendered as beautiful SVG\n" | |
| "3. ✅ **Validation**: AI confirms reaction balance and plausibility\n\n" | |
| "**Features:**\n" | |
| "- 🧪 200+ NIST database reactions\n" | |
| "- 🤖 AI-powered reaction enhancement (DeepSeek-V3.2-Exp)\n" | |
| "- 🔬 Multiple input formats (NIST, SMILES, SMARTS, partial)\n" | |
| "- ⚡ Automatic format detection and intelligent conversion\n" | |
| "- ✅ Reaction validation and balancing" | |
| ) | |
| # API Key Configuration | |
| with gr.Accordion("🔑 DeepSeek API Configuration", open=False): | |
| deepseek_api_key = gr.Textbox( | |
| label="DeepSeek API Key", | |
| placeholder="sk-...", | |
| type="password", | |
| info="Get your API key from https://platform.deepseek.com/" | |
| ) | |
| gr.Markdown( | |
| "**How to get API key:**\n" | |
| "1. Visit https://platform.deepseek.com/\n" | |
| "2. Sign up/Login to your account\n" | |
| "3. Go to API Keys section\n" | |
| "4. Create a new API key\n" | |
| "5. Copy and paste it here" | |
| ) | |
| # NIST reactions dropdown | |
| nist_reactions = _fetch_all_nist_reactions(limit=200) | |
| nist_reaction_options = [("", "")] + nist_reactions if nist_reactions else [] | |
| with gr.Row(): | |
| with gr.Column(): | |
| nist_reaction_dropdown = gr.Dropdown( | |
| label="🧪 NIST Database Reactions", | |
| choices=[label for label, _ in nist_reaction_options], | |
| value="", | |
| interactive=True, | |
| info=f"Select from {len(nist_reactions)} reactions in NIST kinetics database" | |
| ) | |
| reaction_input = gr.Textbox( | |
| label="Custom Reaction Input", | |
| placeholder="Enter reaction in any format:\nNIST: CH4 + O2 → CO2 + H2O\nSMILES: CH4.O2>>CO2.H2O\nPartial: CH4 + O2 → (AI will complete)", | |
| lines=4, | |
| info="Supports NIST format, SMILES/SMARTS, or partial reactions" | |
| ) | |
| with gr.Column(): | |
| render_mode = gr.Radio( | |
| label="Render Mode", | |
| choices=["Auto (detect format)", "Force NIST format", "Force SMILES/SMARTS"], | |
| value="Auto (detect format)", | |
| info="Auto mode intelligently detects and converts formats" | |
| ) | |
| ai_options = gr.CheckboxGroup( | |
| label="🤖 AI Enhancement Options", | |
| choices=["Enable AI enhancement (recommended)", "High quality rendering", "Show AI reasoning"], | |
| value=["Enable AI enhancement (recommended)"], | |
| info="DeepSeek AI analyzes, validates, and enhances reactions before rendering" | |
| ) | |
| # Buttons | |
| with gr.Row(): | |
| render_auto_btn = gr.Button("🚀 AI First → Render (Recommended)", variant="primary") | |
| render_nist_btn = gr.Button("🧪 Direct NIST Render", variant="secondary") | |
| render_smiles_btn = gr.Button("🔬 Direct SMILES Render", variant="secondary") | |
| clear_btn = gr.Button("🗑️ Clear", variant="stop") | |
| # Output | |
| reaction_svg_output = gr.HTML(label="Reaction Structure") | |
| render_status = gr.Markdown() | |
| # Populate custom input from NIST dropdown | |
| nist_dict = {label: reaction for label, reaction in nist_reaction_options} | |
| def populate_from_nist_dropdown(selected_label): | |
| if selected_label and selected_label in nist_dict: | |
| return nist_dict[selected_label] | |
| return "" | |
| nist_reaction_dropdown.change( | |
| fn=populate_from_nist_dropdown, | |
| inputs=nist_reaction_dropdown, | |
| outputs=reaction_input, | |
| ) | |
| # Smart auto-render function | |
| def render_auto_reaction(reaction_text, api_key, ai_options, render_mode): | |
| if not reaction_text: | |
| return "", "⚠️ Please enter a reaction or select from the NIST dropdown." | |
| status_prefix = "" | |
| final_reaction = reaction_text | |
| # Always try AI enhancement first if enabled and API key provided | |
| if "Enable AI enhancement (recommended)" in (ai_options or []) and api_key: | |
| # Try to complete/enhance the reaction using DeepSeek | |
| completed_reaction = _complete_reaction_with_deepseek(reaction_text, api_key) | |
| if completed_reaction and completed_reaction != reaction_text: | |
| final_reaction = completed_reaction | |
| status_prefix = f"🤖 **AI Enhanced Reaction**\nOriginal: {reaction_text}\nAI Completed: {final_reaction}\n\n" | |
| elif completed_reaction == reaction_text: | |
| # AI validated the reaction as complete | |
| status_prefix = f"🤖 **AI Validated Reaction**\nReaction confirmed as complete and balanced.\n\n" | |
| else: | |
| # AI failed, try direct rendering | |
| status_prefix = f"⚠️ **AI Enhancement Failed**\nProceeding with original reaction.\n\n" | |
| # Render the final reaction (AI-enhanced or original) | |
| svg = None | |
| render_type = "unknown" | |
| # Try different rendering approaches based on mode | |
| if render_mode == "Force SMILES/SMARTS": | |
| svg = _render_smiles_to_svg(final_reaction) | |
| render_type = "SMILES/SMARTS" | |
| elif render_mode == "Force NIST format": | |
| svg = _render_reaction_from_nist(final_reaction) | |
| render_type = "NIST format" | |
| else: # Auto (detect format) | |
| # First try SMILES if it contains >> | |
| if ">>" in final_reaction: | |
| svg = _render_smiles_to_svg(final_reaction) | |
| render_type = "SMILES/SMARTS (detected)" | |
| else: | |
| # Try NIST format first, then SMILES | |
| svg = _render_reaction_from_nist(final_reaction) | |
| if svg: | |
| render_type = "NIST format (detected)" | |
| else: | |
| svg = _render_smiles_to_svg(final_reaction) | |
| render_type = "SMILES/SMARTS (fallback)" | |
| if svg: | |
| quality_note = " (High quality)" if "High quality rendering" in (ai_options or []) else "" | |
| reasoning_note = " (with AI reasoning)" if "Show AI reasoning" in (ai_options or []) else "" | |
| status = f"{status_prefix}✅ Successfully rendered as {render_type}{quality_note}{reasoning_note}" | |
| return svg, status | |
| else: | |
| return "", f"{status_prefix}❌ Could not render reaction. The reaction format may not be supported: {final_reaction[:100]}...\n\nTry adjusting the render mode or checking your reaction syntax." | |
| # Legacy render functions (kept for compatibility) | |
| def render_nist_reaction(reaction_text, options): | |
| if not reaction_text: | |
| return "", "⚠️ Please select a reaction from the dropdown or enter a custom reaction." | |
| svg = _render_reaction_from_nist(reaction_text) | |
| if svg: | |
| status = f"✅ Successfully rendered NIST reaction: {reaction_text[:100]}..." | |
| if "High quality rendering" in (options or []): | |
| status += " (High quality mode)" | |
| return svg, status | |
| else: | |
| return "", f"❌ Could not render reaction. The reaction format may not be supported by RDKit: {reaction_text[:100]}..." | |
| def render_smiles_reaction(reaction_text, options): | |
| if not reaction_text: | |
| return "", "⚠️ Please enter a reaction in SMILES/SMARTS format." | |
| svg = _render_smiles_to_svg(reaction_text) | |
| if svg: | |
| status = f"✅ Successfully rendered SMILES reaction: {reaction_text[:100]}..." | |
| if "High quality rendering" in (options or []): | |
| status += " (High quality mode)" | |
| return svg, status | |
| else: | |
| return "", f"❌ Could not parse reaction. Please check your SMILES/SMARTS format: {reaction_text[:100]}..." | |
| # Clear function | |
| def clear_outputs(): | |
| return "", "", "" | |
| # Button handlers | |
| render_auto_btn.click( | |
| fn=render_auto_reaction, | |
| inputs=[reaction_input, deepseek_api_key, ai_options, render_mode], | |
| outputs=[reaction_svg_output, render_status], | |
| ) | |
| render_nist_btn.click( | |
| fn=render_nist_reaction, | |
| inputs=[reaction_input, ai_options], | |
| outputs=[reaction_svg_output, render_status], | |
| ) | |
| render_smiles_btn.click( | |
| fn=render_smiles_reaction, | |
| inputs=[reaction_input, ai_options], | |
| outputs=[reaction_svg_output, render_status], | |
| ) | |
| clear_btn.click( | |
| fn=clear_outputs, | |
| inputs=[], | |
| outputs=[reaction_svg_output, render_status, reaction_input], | |
| ) | |
| # Tab 4: Kinetics Plotter | |
| with gr.TabItem("Kinetics Plotter"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| A_input = gr.Number(value=1.3e-9, label="A (cm³/molecule·s)") | |
| n_input = gr.Number(value=-0.495, label="n (power)") | |
| Ea_input = gr.Number(value=1150, label="Ea (J/mol)") | |
| T_min = gr.Number(value=500, label="T Min (K)") | |
| T_max = gr.Number(value=2500, label="T Max (K)") | |
| plot_dropdown = gr.Dropdown(choices=["arrhenius", "k_vs_t", "eyring", "logk_vs_t"], value="arrhenius", label="Plot Type") | |
| fetch_ch3 = gr.Checkbox(label="Fetch ΔH_f for CH₃") | |
| fetch_indene = gr.Checkbox(label="Fetch ΔH_f for Inden-1-yl (C9H7)") | |
| submit = gr.Button("Generate Plot & Fetch") | |
| with gr.Column(): | |
| plot_output = gr.Plot(label="Kinetics Plot") | |
| thermo_table = gr.Dataframe(visible=False, label="Fetched Thermo Data") | |
| info_output = gr.Markdown() | |
| submit.click( | |
| fn=kinetics_interface, | |
| inputs=[A_input, n_input, Ea_input, T_min, T_max, plot_dropdown, fetch_ch3, fetch_indene], | |
| outputs=[plot_output, thermo_table, info_output] | |
| ) | |
| # Tabs 5-16: One per NIST database | |
| for db_name in DB_TABS.keys(): | |
| with gr.TabItem(db_name): | |
| gr.Markdown(f"### {db_name}\n{DB_TABS[db_name]['summary']}") | |
| with gr.Row(): | |
| with gr.Column(): | |
| formula_input = gr.Textbox(value="CH3", label="Formula/Name (e.g., CH3, benzene)") | |
| # Optional: Add phase filter for IR tabs | |
| phase_input = None | |
| if "IR Spectra" in db_name: | |
| phase_input = gr.Radio(choices=["gas", "liquid", "solid"], value="gas", label="Phase") | |
| fetch_btn = gr.Button("Fetch Data") | |
| with gr.Column(): | |
| output_md = gr.Markdown() | |
| output_df = gr.Dataframe(label="Tabular Data") | |
| output_plot = gr.Plot(visible=False, label="Spectrum Preview") # For IR/UV/THz later | |
| # Bind fetch (pass phase if IR) | |
| if phase_input: | |
| def wrapped_fetch(formula, phase): | |
| # Append phase to param if needed | |
| return fetch_specific_db(db_name, formula) | |
| fetch_btn.click(wrapped_fetch, inputs=[formula_input, phase_input], outputs=[output_md, output_df, output_plot]) | |
| else: | |
| def wrapped_fetch(formula): | |
| return fetch_specific_db(db_name, formula) | |
| fetch_btn.click(wrapped_fetch, inputs=[formula_input], outputs=[output_md, output_df, output_plot]) | |
| # Event handlers for original functionality | |
| search_button.click( | |
| fn=perform_search, | |
| inputs=[simple_search, decomp, category, units, auto_search_thermo], | |
| outputs=[result_table, search_status, selection, results_state, search_thermo_display], | |
| ) | |
| detail_button.click( | |
| fn=fetch_detail, | |
| inputs=[selection, manual_url, auto_fetch_thermo, animate_plots], | |
| outputs=[detail_markdown, dataset_table, reaction_plot, reaction_svg, thermo_data_display, thermo_summary], | |
| ) | |
| # Auto-render SVG when selection changes | |
| selection.change( | |
| fn=fetch_detail, | |
| inputs=[selection, manual_url, auto_fetch_thermo, animate_plots], | |
| outputs=[detail_markdown, dataset_table, reaction_plot, reaction_svg, thermo_data_display, thermo_summary], | |
| ) | |
| return demo | |
| # Create FastAPI app | |
| app = FastAPI( | |
| title="NIST Chemical Kinetics API", | |
| description="API for searching and analyzing NIST Chemical Kinetics Database", | |
| version="1.0.0" | |
| ) | |
| # Add CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # API Endpoints | |
| async def root(): | |
| """Root endpoint with API information""" | |
| return { | |
| "name": "NIST Chemical Kinetics API", | |
| "version": "1.0.0", | |
| "endpoints": { | |
| "/search": "Search NIST kinetics database", | |
| "/reaction/{url}": "Get detailed reaction information", | |
| "/thermodynamic/{formula}": "Get thermodynamic data for a compound", | |
| "/nist-reactions": "Get list of NIST reactions", | |
| "/docs": "API documentation" | |
| } | |
| } | |
| async def search_nist( | |
| query: str = Query(..., description="Search query (e.g., CH4, benzene)"), | |
| filters: Optional[List[Dict[str, Any]]] = None | |
| ): | |
| """ | |
| Search the NIST Chemical Kinetics Database | |
| Args: | |
| query: Search query string | |
| filters: Optional list of search filters | |
| Returns: | |
| List of search results with reaction details | |
| """ | |
| try: | |
| # Build search filters | |
| search_filters = [] | |
| if filters: | |
| for f in filters[:MAX_FILTERS]: | |
| search_filters.append(SearchFilter( | |
| field=FieldName(f.get("field", "reactants")), | |
| relation=Relation(f.get("relation", "contains")), | |
| value=f.get("value", "") | |
| )) | |
| # Perform search | |
| request = SearchRequest( | |
| category=Category.search, | |
| filters=search_filters if search_filters else [ | |
| SearchFilter( | |
| field=FieldName.reactants, | |
| relation=Relation.contains, | |
| value=query | |
| ) | |
| ] | |
| ) | |
| results = client.search_reactions(request) | |
| return { | |
| "query": query, | |
| "count": len(results), | |
| "results": [ | |
| { | |
| "reaction": r.reaction, | |
| "k_298": r.k_298, | |
| "a": r.a, | |
| "n": r.n, | |
| "ea": r.ea, | |
| "t_range": r.t_range, | |
| "p_range": r.p_range, | |
| "bath_gas": r.bath_gas, | |
| "url": r.url | |
| } | |
| for r in results | |
| ] | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_reaction_detail(url: str = Query(..., description="NIST reaction URL")): | |
| """ | |
| Get detailed information for a specific reaction | |
| Args: | |
| url: NIST reaction URL | |
| Returns: | |
| Detailed reaction information including rate data and references | |
| """ | |
| try: | |
| detail = client.fetch_reaction_detail(url) | |
| if not detail: | |
| raise HTTPException(status_code=404, detail="Reaction not found") | |
| return { | |
| "reaction": detail.reaction, | |
| "reactants": detail.reactants, | |
| "products": detail.products, | |
| "rate_data": [ | |
| { | |
| "k_298": rd.k_298, | |
| "a": rd.a, | |
| "n": rd.n, | |
| "ea": rd.ea, | |
| "t_range": rd.t_range, | |
| "p_range": rd.p_range, | |
| "bath_gas": rd.bath_gas, | |
| "reference": rd.reference, | |
| "squib": rd.squib | |
| } | |
| for rd in detail.rate_data | |
| ] | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_thermodynamic_data( | |
| formula: str, | |
| database: str = Query("gas-phase", description="Database type: gas-phase, ion-energetics, or condensed-phase") | |
| ): | |
| """ | |
| Get thermodynamic data for a compound from NIST WebBook | |
| Args: | |
| formula: Chemical formula or name (e.g., CH3, benzene) | |
| database: Database to search (gas-phase, ion-energetics, condensed-phase) | |
| Returns: | |
| Thermodynamic data including enthalpy, entropy, heat capacity | |
| """ | |
| try: | |
| if database == "gas-phase": | |
| url = _build_webbook_url(formula, "gas-phase") | |
| elif database == "ion-energetics": | |
| url = _build_webbook_url(formula, "ion-energetics") | |
| elif database == "condensed-phase": | |
| url = _build_webbook_url(formula, "condensed-phase") | |
| else: | |
| raise HTTPException(status_code=400, detail="Invalid database type") | |
| md_content, df, plot_html = _fetch_and_parse_webbook(url, formula, database) | |
| if df is not None and not df.empty: | |
| return { | |
| "formula": formula, | |
| "database": database, | |
| "data": df.to_dict(orient="records"), | |
| "summary": md_content | |
| } | |
| else: | |
| raise HTTPException(status_code=404, detail=f"No data found for {formula} in {database}") | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_nist_reactions(limit: int = Query(200, description="Maximum number of reactions to return")): | |
| """ | |
| Get a list of reactions from the NIST database | |
| Args: | |
| limit: Maximum number of reactions to return (default: 200) | |
| Returns: | |
| List of reactions with labels | |
| """ | |
| try: | |
| reactions = _fetch_all_nist_reactions(limit=limit) | |
| return { | |
| "count": len(reactions), | |
| "reactions": [ | |
| {"label": label, "reaction": reaction} | |
| for label, reaction in reactions | |
| ] | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # Build Gradio interface | |
| demo = build_interface() | |
| # Mount Gradio to FastAPI for API endpoints | |
| fastapi_app = gr.mount_gradio_app(app, demo, path="/") | |