Spaces:
Running
Running
| """ | |
| Phonix Database MCP Server | |
| Hugging Face Spaces deployment with Gradio + MCP | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import re | |
| from typing import Any | |
| import gradio as gr | |
| import pandas as pd | |
| from datasets import load_dataset | |
| DATASET_NAME = "phonix-db/phonix-summary" | |
| MAX_RETURN_ROWS = 50 | |
| # Keep the app robust against dataset schema drift. | |
| REQUIRED_COLUMNS = { | |
| "mp_id", | |
| "unique_id", | |
| "formula", | |
| "spg_number", | |
| "structure", | |
| "klat[W/mK]", | |
| } | |
| # Cached DataFrame | |
| _df: pd.DataFrame | None = None | |
| # Columns excluded from list-like responses (too large for compact payloads). | |
| _HEAVY_COLUMNS = { | |
| "structure", | |
| "trans_conv2prim", | |
| "trans_conv2sc", | |
| "phfreq[cm^-1]", | |
| "phdos[a.u.]", | |
| "pdos[a.u.]", | |
| "kspec_freq[W/mK/cm^-1]", | |
| "kcumu_norm_freq", | |
| "mfp[nm]", | |
| "log10[mfp[nm]]", | |
| "kspec_mfp[W/mK/nm]", | |
| "kcumu_norm_mfp", | |
| } | |
| # ── Data loading / validation ────────────────────────────────────── | |
| def get_df() -> pd.DataFrame: | |
| """ | |
| Load the dataset once and cache it as a pandas DataFrame. | |
| Validate required columns so failures are explicit. | |
| """ | |
| global _df | |
| if _df is None: | |
| ds = load_dataset(DATASET_NAME, split="train") | |
| _df = ds.to_pandas() | |
| missing = REQUIRED_COLUMNS - set(_df.columns) | |
| if missing: | |
| raise ValueError(f"Missing required columns: {sorted(missing)}") | |
| return _df | |
| # ── Helpers ──────────────────────────────────────────────────────── | |
| def _json_response(payload: Any) -> str: | |
| return json.dumps(payload, ensure_ascii=False, indent=2) | |
| def _error(message: str) -> str: | |
| return _json_response({"error": message}) | |
| def _safe_value(v: Any) -> Any: | |
| if pd.isna(v): | |
| return None | |
| return v | |
| def _serialize(df: pd.DataFrame, max_rows: int = MAX_RETURN_ROWS) -> str: | |
| """ | |
| Serialize a DataFrame to JSON. | |
| Exclude heavy columns for list-like responses to keep payload compact. | |
| """ | |
| cols = [c for c in df.columns if c not in _HEAVY_COLUMNS] | |
| subset = df[cols].head(max_rows).copy() | |
| records = subset.where(pd.notna(subset), None).to_dict(orient="records") | |
| return _json_response( | |
| { | |
| "total_count": int(len(df)), | |
| "returned": int(len(subset)), | |
| "entries": records, | |
| } | |
| ) | |
| def _normalize_formula(formula: str) -> str: | |
| return (formula or "").strip() | |
| def _formula_to_elements(formula: str) -> set[str]: | |
| """ | |
| Extract element symbols from a chemical formula. | |
| Examples: | |
| SiO2 -> {"Si", "O"} | |
| LaP7 -> {"La", "P"} | |
| MgAl2O4 -> {"Mg", "Al", "O"} | |
| """ | |
| if not formula: | |
| return set() | |
| return set(re.findall(r"[A-Z][a-z]?", formula)) | |
| def _prepare_element_query(elements: str) -> list[str]: | |
| """ | |
| Parse a comma-separated list like "Si,O" into ["Si", "O"]. | |
| """ | |
| items = [e.strip() for e in (elements or "").split(",")] | |
| items = [e for e in items if e] | |
| return items | |
| # ── MCP tools ────────────────────────────────────────────────────── | |
| def search_by_formula(formula: str) -> str: | |
| """ | |
| Search the Phonix database by chemical formula using partial match. | |
| Args: | |
| formula: Chemical formula fragment or element symbol, e.g. "Si", "MgO", "BeTe", "LaP" | |
| Returns: | |
| JSON string with matched entries (up to 50 rows). | |
| """ | |
| try: | |
| q = _normalize_formula(formula) | |
| if not q: | |
| return _error("formula must not be empty.") | |
| df = get_df() | |
| mask = df["formula"].fillna("").astype(str).str.contains(q, case=False, na=False) | |
| result = df[mask].sort_values(["formula", "unique_id"], kind="stable") | |
| return _serialize(result) | |
| except Exception as e: | |
| return _error(f"search_by_formula failed: {e}") | |
| def get_by_formula_exact(formula: str) -> dict[str, Any]: | |
| try: | |
| q = _normalize_formula(formula) | |
| if not q: | |
| return {"error": "formula must not be empty."} | |
| df = get_df() | |
| mask = df["formula"].fillna("").astype(str).str.lower() == q.lower() | |
| result = df[mask].sort_values(["formula", "unique_id"], kind="stable") | |
| cols = [c for c in result.columns if c not in _HEAVY_COLUMNS] | |
| subset = result[cols].head(MAX_RETURN_ROWS).copy() | |
| records = subset.where(pd.notna(subset), None).to_dict(orient="records") | |
| return { | |
| "total_count": int(len(result)), | |
| "returned": int(len(subset)), | |
| "entries": records, | |
| } | |
| except Exception as e: | |
| return {"error": f"get_by_formula_exact failed: {e}"} | |
| def search_by_elements(elements: str) -> dict: | |
| """ | |
| Search entries that contain ALL specified elements. | |
| Element matching is symbol-aware rather than naive substring matching. | |
| Args: | |
| elements: Comma-separated element symbols, e.g. "Si,O" or "Mg,O" | |
| Returns: | |
| dict with matched entries (up to 50 rows). | |
| """ | |
| try: | |
| wanted = _prepare_element_query(elements) | |
| if not wanted: | |
| return {"error": "elements must contain at least one element symbol."} | |
| wanted_set = set(wanted) | |
| df = get_df() | |
| element_sets = df["formula"].fillna("").astype(str).map(_formula_to_elements) | |
| mask = element_sets.map(lambda s: wanted_set.issubset(s)) | |
| result = df[mask].sort_values(["formula", "unique_id"], kind="stable") | |
| cols = [c for c in result.columns if c not in _HEAVY_COLUMNS] | |
| subset = result[cols].head(MAX_RETURN_ROWS).copy() | |
| records = subset.where(pd.notna(subset), None).to_dict(orient="records") | |
| return { | |
| "total_count": int(len(result)), | |
| "returned": int(len(subset)), | |
| "entries": records, | |
| } | |
| except Exception as e: | |
| return {"error": f"search_by_elements failed: {e}"} | |
| def filter_by_kappa(min_klat: float, max_klat: float, only_converged: bool) -> str: | |
| """ | |
| Filter entries by lattice thermal conductivity klat [W/mK]. | |
| Args: | |
| min_klat: Minimum klat value in W/mK (use -1 to skip) | |
| max_klat: Maximum klat value in W/mK (use -1 to skip) | |
| only_converged: If True, exclude entries where klat is null | |
| Returns: | |
| JSON string with matched entries sorted by klat descending (up to 50 rows). | |
| """ | |
| try: | |
| df = get_df() | |
| result = df.copy() | |
| col = "klat[W/mK]" | |
| if only_converged: | |
| result = result[result[col].notna()] | |
| if min_klat >= 0: | |
| result = result[result[col] >= min_klat] | |
| if max_klat >= 0: | |
| result = result[result[col] <= max_klat] | |
| result = result.sort_values(col, ascending=False, kind="stable") | |
| return _serialize(result) | |
| except Exception as e: | |
| return _error(f"filter_by_kappa failed: {e}") | |
| def filter_by_spacegroup(spg_number: int) -> str: | |
| """ | |
| Filter entries by space group number. | |
| Args: | |
| spg_number: International space group number (1-230), e.g. 225 for Fm-3m, 227 for Fd-3m | |
| Returns: | |
| JSON string with matched entries (up to 50 rows). | |
| """ | |
| try: | |
| if not (1 <= int(spg_number) <= 230): | |
| return _error("spg_number must be between 1 and 230.") | |
| df = get_df() | |
| result = df[df["spg_number"] == int(spg_number)].sort_values( | |
| ["formula", "unique_id"], kind="stable" | |
| ) | |
| return _serialize(result) | |
| except Exception as e: | |
| return _error(f"filter_by_spacegroup failed: {e}") | |
| def get_entry(unique_id: str) -> str: | |
| """ | |
| Get full details for a specific calculation entry, including structure data. | |
| Args: | |
| unique_id: The unique entry identifier, e.g. "mp-149", "mp-149-2", "mp-24" | |
| Returns: | |
| JSON string with all columns including parsed structure data if possible. | |
| """ | |
| try: | |
| key = (unique_id or "").strip() | |
| if not key: | |
| return _error("unique_id must not be empty.") | |
| df = get_df() | |
| result = df[df["unique_id"] == key] | |
| if result.empty: | |
| return _error(f"Entry '{key}' not found.") | |
| row = {k: _safe_value(v) for k, v in result.iloc[0].to_dict().items()} | |
| # Parse JSON string columns into native objects. | |
| for col in row: | |
| val = row[col] | |
| if isinstance(val, str) and val.startswith(("{", "[")): | |
| try: | |
| row[col] = json.loads(val) | |
| except Exception: | |
| pass | |
| return _json_response(row) | |
| except Exception as e: | |
| return _error(f"get_entry failed: {e}") | |
| def list_columns() -> str: | |
| """ | |
| List all available columns in the Phonix summary database with descriptions. | |
| Returns: | |
| JSON string with column names and their descriptions. | |
| """ | |
| columns = { | |
| "mp_id", "Materials Project ID (e.g. mp-149 for Si diamond)", | |
| "unique_id", "Unique entry identifier (use this for get_entry)", | |
| "formula", "Chemical formula (e.g. Si, MgO, BeTe)", | |
| "spg_number", "International space group number (e.g. 227 for diamond silicon)", | |
| "natoms_prim", "Number of atoms in primitive cell", | |
| "natoms_conv", "Number of atoms in conventional cell", | |
| "natoms_sc", "Number of atoms in supercell for force constant calculations", | |
| "trans_conv2prim", "Transformation matrix: conventional to primitive cell (JSON 3x3)", | |
| "trans_conv2sc", "Transformation matrix: conventional to supercell (JSON 3x3)", | |
| "structure", "Crystal structure JSON (cell, positions, symbols, pbc)", | |
| "volume[A^3]", "Cell volume in cubic angstroms", | |
| "nac", "Non-analytical correction flag (0: no NAC, 1: damping method, 2: mixed-space approach, 3: Ewald method)", | |
| "volume_relaxation", "Additional volume relaxation flag (0: no additional relaxation, 1: additionally relaxed)", | |
| "scph", "Self-consistent phonon (SCPH) flag", | |
| "four", "4-phonon scattering flag", | |
| "modulus[GPa]", "Bulk modulus [GPa]", | |
| "fc2_error[%]", "2nd-order (Harmonic) force constants fitting error [%]", | |
| "fc3_error[%]", "3rd-order (Cubic) force constants fitting error [%]", | |
| "fc_higher_error[%]", "Higher-order force constants fitting error [%]", | |
| "kp[W/mK]", "Particle contribution to lattice thermal conductivity [W/mK]", | |
| "kc[W/mK]", "Coherence contribution to lattice thermal conductivity [W/mK]", | |
| "klat[W/mK]", "Total lattice thermal conductivity (kp+kc) [W/mK]", | |
| "qmesh", "q-point mesh for the Boltzmann transport equation (e.g. '19x19x19')", | |
| "qmesh_density", "q-point mesh density", | |
| "min_phfreq[cm^-1]", "Minimum phonon frequency [cm^-1]", | |
| "max_phfreq[cm^-1]", "Maximum phonon frequency [cm^-1]", | |
| "phfreq[cm^-1]", "Phonon frequency grid points (JSON array, shape=[51])", | |
| "phdos[a.u.]", "Total phonon density of states (JSON array, shape=[51])", | |
| "pdos[a.u.]", "Element-projected phonon DOS (JSON dict of arrays)", | |
| "kspec_freq[W/mK/cm^-1]", "Spectral thermal conductivity (kp) vs frequency (JSON array)", | |
| "kcumu_norm_freq", "Normalized cumulative kappa (kp) vs frequency (JSON array)", | |
| "mfp[nm]", "Mean-free-path grid (JSON array)", | |
| "log10[mfp[nm]]", "Log10 of mean-free-path grid (JSON array)", | |
| "kspec_mfp[W/mK/nm]", "Spectral thermal conductivity vs MFP (JSON array)", | |
| "kcumu_norm_mfp", "Normalized cumulative kappa vs MFP (JSON array)", | |
| "calc_time[sec]", "Computational time [sec]", | |
| } | |
| return _json_response(columns) | |
| def dataset_summary() -> dict[str, Any]: | |
| try: | |
| df = get_df() | |
| return { | |
| "dataset": DATASET_NAME, | |
| "rows": int(len(df)), | |
| "columns": list(df.columns), | |
| } | |
| except Exception as e: | |
| return {"error": f"dataset_summary failed: {e}"} | |
| # ── Gradio UI ────────────────────────────────────────────────────── | |
| with gr.Blocks(title="Phonix Database MCP") as demo: | |
| gr.Markdown( | |
| """ | |
| # Phonix Database MCP Server | |
| **Database for Anharmonic Phonon Interactions** · First-Principles · ~28,000 calculations | |
| **MCP Endpoint**: | |
| `https://phonix-db-phonix-mcp-server.hf.space/gradio_api/mcp/sse` | |
| **Schema**: | |
| `https://phonix-db-phonix-mcp-server.hf.space/gradio_api/mcp/schema` | |
| **Reference**: | |
| M. Ohnishi et al., "Database and deep-learning scalability of anharmonic phonon | |
| properties by automated brute-force first-principles calculations," npj Computational Materials (2026). | |
| [arXiv:2504.21245](https://arxiv.org/abs/2504.21245) | |
| """ | |
| ) | |
| with gr.Tabs(): | |
| with gr.Tab("Formula Search"): | |
| formula_in = gr.Textbox( | |
| label="Chemical Formula (partial match)", | |
| placeholder="Si, MgO, BeTe, LaP7 ...", | |
| ) | |
| formula_btn = gr.Button("Search", variant="primary") | |
| formula_out = gr.Code(language="json", label="Results") | |
| formula_btn.click( | |
| search_by_formula, | |
| inputs=formula_in, | |
| outputs=formula_out, | |
| api_name="search_by_formula", | |
| queue=False, | |
| ) | |
| with gr.Tab("Exact Formula"): | |
| formula_exact_in = gr.Textbox( | |
| label="Chemical Formula (exact match)", | |
| placeholder="Si, MgO, BeTe ...", | |
| ) | |
| formula_exact_btn = gr.Button("Search", variant="primary") | |
| formula_exact_out = gr.Code(language="json", label="Results") | |
| formula_exact_btn.click( | |
| get_by_formula_exact, | |
| inputs=formula_exact_in, | |
| outputs=formula_exact_out, | |
| api_name="get_by_formula_exact", | |
| queue=False, | |
| ) | |
| with gr.Tab("Element Search"): | |
| elements_in = gr.Textbox( | |
| label="Elements (comma-separated)", | |
| placeholder="Si,O or Mg,O", | |
| ) | |
| elements_btn = gr.Button("Search", variant="primary") | |
| elements_out = gr.Code(language="json", label="Results") | |
| elements_btn.click( | |
| search_by_elements, | |
| inputs=elements_in, | |
| outputs=elements_out, | |
| api_name="search_by_elements", | |
| queue=False, | |
| ) | |
| with gr.Tab("Kappa Filter"): | |
| with gr.Row(): | |
| min_k = gr.Number(label="Min klat [W/mK] (-1 to skip)", value=-1) | |
| max_k = gr.Number(label="Max klat [W/mK] (-1 to skip)", value=-1) | |
| converged = gr.Checkbox( | |
| label="Only converged entries (klat not null)", | |
| value=True, | |
| ) | |
| kappa_btn = gr.Button("Filter", variant="primary") | |
| kappa_out = gr.Code(language="json", label="Results") | |
| kappa_btn.click( | |
| filter_by_kappa, | |
| inputs=[min_k, max_k, converged], | |
| outputs=kappa_out, | |
| api_name="filter_by_kappa", | |
| queue=False, | |
| ) | |
| with gr.Tab("Space Group Filter"): | |
| spg_in = gr.Number(label="Space Group Number (1-230)", value=227) | |
| spg_btn = gr.Button("Filter", variant="primary") | |
| spg_out = gr.Code(language="json", label="Results") | |
| spg_btn.click( | |
| filter_by_spacegroup, | |
| inputs=spg_in, | |
| outputs=spg_out, | |
| api_name="filter_by_spacegroup", | |
| queue=False, | |
| ) | |
| with gr.Tab("Entry Detail"): | |
| entry_in = gr.Textbox( | |
| label="unique_id", | |
| placeholder="mp-149, mp-149-2, mp-24 ...", | |
| ) | |
| entry_btn = gr.Button("Get Entry", variant="primary") | |
| entry_out = gr.Code(language="json", label="Full Entry Data") | |
| entry_btn.click( | |
| get_entry, | |
| inputs=entry_in, | |
| outputs=entry_out, | |
| api_name="get_entry", | |
| queue=False, | |
| ) | |
| with gr.Tab("Column Guide"): | |
| col_btn = gr.Button("Show Column Descriptions", variant="primary") | |
| col_out = gr.Code(language="json", label="Columns") | |
| col_btn.click( | |
| list_columns, | |
| outputs=col_out, | |
| api_name="list_columns", | |
| queue=False, | |
| ) | |
| with gr.Tab("Dataset Summary"): | |
| summary_btn = gr.Button("Show Dataset Summary", variant="primary") | |
| summary_out = gr.Code(language="json", label="Summary") | |
| summary_btn.click( | |
| dataset_summary, | |
| outputs=summary_out, | |
| api_name="dataset_summary", | |
| queue=False, | |
| ) | |
| # | |
| # To Do: Add more UI interactions as needed, but the core MCP endpoints | |
| # are already defined above and can be used independently of the UI. | |
| # | |
| # Optional MCP-only endpoints (useful even without dedicated UI interactions) | |
| # with gr.Blocks(title="Phonix Database MCP") as demo: | |
| # # ... UI定義 ... | |
| # gr.api(fn=get_by_formula_exact, api_name="get_by_formula_exact_api") | |
| # gr.api(fn=dataset_summary, api_name="dataset_summary_api") | |
| gr.Markdown( | |
| "Dataset: [phonix-db/phonix-summary](https://huggingface.co/datasets/phonix-db/phonix-summary) · " | |
| "[phonix-db.org](https://phonix-db.org) · License: CC BY 4.0" | |
| ) | |
| # ── Launch ───────────────────────────────────────────────────────── | |
| if __name__ == "__main__": | |
| demo.launch( | |
| mcp_server=True, | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| ssr_mode=False, | |
| share=True, | |
| ) | |