LLM-front / app.py
Songyou's picture
Rename clm-frontend-dev.py to app.py
dddf919 verified
import gradio as gr
from fastapi import FastAPI
from starlette.staticfiles import StaticFiles
import uvicorn
import logging
from pydantic import BaseModel
import pandas as pd
import time
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class SmilesData(BaseModel):
smiles: str
app = FastAPI()
# 将 ketcher 目录挂载为静态资源,确保 ketcher/index.html 存在
app.mount("/ketcher", StaticFiles(directory="ketcher"), name="ketcher")
# 接收前端发送的SMILES数据的接口
@app.post("/update_smiles")
async def update_smiles(data: SmilesData):
logger.info(f"Received SMILES from front-end: {data.smiles}")
print(f"[PRINT-Backend] Received SMILES from front-end: {data.smiles}")
return {"status": "ok", "received_smiles": data.smiles}
# 前端嵌入的HTML和脚本
KETCHER_HTML = r'''
<iframe id="ifKetcher" src="/ketcher/index.html" width="100%" height="600px" style="border: 1px solid #ccc;"></iframe>
<script>
console.log("[Front-end] Ketcher-Gradio integration script loaded.");
let ketcher = null;
let lastSmiles = '';
function findSmilesInput() {
const inputContainer = document.getElementById('combined_smiles_input');
if (!inputContainer) {
console.warn("[Front-end] combined_smiles_input element not found.");
return null;
}
const input = inputContainer.querySelector('input[type="text"]');
return input;
}
function updateGradioInput(smiles) {
const input = findSmilesInput();
if (input && input.value !== smiles) {
input.value = smiles;
// 手动触发input事件让 Gradio 更新状态
input.dispatchEvent(new Event('input', { bubbles: true }));
console.log("[Front-end] Updated Gradio input with SMILES:", smiles);
}
}
async function handleKetcherChange() {
console.log("[Front-end] handleKetcherChange called, retrieving SMILES...");
try {
const smiles = await ketcher.getSmiles({ arom: false });
console.log("[Front-end] SMILES retrieved from Ketcher:", smiles);
if (smiles !== lastSmiles) {
lastSmiles = smiles;
updateGradioInput(smiles);
// 将SMILES发送给后端 (可以保留,但可能不是核心需求)
console.log("[Front-end] Sending SMILES to backend...");
fetch('/update_smiles', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({smiles: smiles})
})
.then(res => res.json())
.then(data => {
console.log("[Front-end] Backend response:", data);
})
.catch(err => console.error("[Front-end] Error sending SMILES to backend:", err));
}
} catch (err) {
console.error("[Front-end] Error getting SMILES from Ketcher:", err);
}
}
function initKetcher() {
console.log("[Front-end] initKetcher started.");
const iframe = document.getElementById('ifKetcher');
if (!iframe) {
console.error("[Front-end] iframe not found.");
setTimeout(initKetcher, 500);
return;
}
const ketcherWindow = iframe.contentWindow;
if (!ketcherWindow || !ketcherWindow.ketcher) {
console.log("[Front-end] ketcher not yet available in iframe, retrying...");
setTimeout(initKetcher, 500);
return;
}
ketcher = ketcherWindow.ketcher;
console.log("[Front-end] Ketcher instance acquired:", ketcher);
// 设置初始分子
ketcher.setMolecule('C').then(() => {
console.log("[Front-end] Initial molecule set to 'C'.");
});
const editor = ketcher.editor;
console.log("[Front-end] Editor object:", editor);
// 尝试绑定变化事件
let eventBound = false;
if (editor && typeof editor.subscribe === 'function') {
console.log("[Front-end] Using editor.subscribe('change', ...)");
editor.subscribe('change', handleKetcherChange);
eventBound = true;
}
if (!eventBound) {
console.error("[Front-end] No suitable event binding found. Check Ketcher version and event API.");
}
}
document.getElementById('ifKetcher').addEventListener('load', () => {
console.log("[Front-end] iframe loaded. Initializing Ketcher in 1s...");
setTimeout(initKetcher, 1000);
});
</script>
'''
# --- 模拟后端功能 (来自第二个版本) ---
def smiles_to_structure(smiles):
"""Dummy function"""
time.sleep(1)
return f"Structure Generated from: {smiles}"
def fragment_molecule(smiles):
"""Dummy function"""
time.sleep(2)
return "Fragment1", "Fragment2", "1-2"
def generate_analogs(main_cls, minor_cls, number, delta_value):
"""Dummy function"""
time.sleep(3)
return [
{"SMILE": "c1cccc1", "MolWt": 100, "TPSA": 20, "SLogP": 1, "SA": 30, "QED": 0.8},
{"SMILE": "c1ccccc1", "MolWt": 105, "TPSA": 25, "SLogP": 1.2, "SA": 32, "QED": 0.9},
]
def update_output_table(data):
df = pd.DataFrame(data)
return df
# --- Gradio 界面 ---
def create_combined_interface():
with gr.Blocks() as demo:
gr.Markdown("# Fragment Optimization Tools with Ketcher")
with gr.Row():
with gr.Column(scale=2):
gr.HTML(KETCHER_HTML) # 嵌入 Ketcher
with gr.Column(scale=1):
with gr.Group():
gr.Markdown("### Input SMILES (From Ketcher)")
combined_smiles_input = gr.Textbox(
label="",
value="C",
placeholder="SMILES from Ketcher will appear here",
elem_id="combined_smiles_input"
)
with gr.Row():
# 可以保留一个按钮来触发某些操作,例如手动同步
get_ketcher_smiles_btn = gr.Button("Get SMILES from Ketcher")
# 示例:将当前输入框的 SMILES 设置到 Ketcher (需要额外的前端 JavaScript)
# set_ketcher_smiles_btn = gr.Button("Set SMILES to Ketcher")
fragment_btn = gr.Button("Fragmentize Molecule")
with gr.Group():
with gr.Row():
constant_frag_input = gr.Textbox(label="Constant Fragment", placeholder="SMILES of constant fragment")
variable_frag_input = gr.Textbox(label="Variable Fragment", placeholder="SMILES of variable fragment")
attach_order_input = gr.Textbox(label="Attachment Order", placeholder="Attachment Order of SMILES")
with gr.Group():
gr.Markdown("### Generate analogs")
with gr.Row():
main_cls_dropdown = gr.Dropdown(label="Main Cls", choices=["None", "Cl", "Br"])
minor_cls_dropdown = gr.Dropdown(label="Minor Cls", choices=["None", "Cl", "Br"])
number_input = gr.Number(label="Number", value=5, step=1)
delta_value_slider = gr.Slider(minimum=0, maximum=10, step=1, label="Delta Value", interactive=True)
generate_analogs_btn = gr.Button("Generate")
with gr.Row():
with gr.Column():
selected_columns = gr.CheckboxGroup(["SMILE", "MolWt", "TPSA", "SLogP", "SA", "QED"], value=["SMILE", "MolWt", "TPSA", "SLogP"], label="")
output_table = gr.Dataframe(headers=["SMILE", "MolWt", "TPSA", "SLogP", "SA", "QED"])
with gr.Row():
download_all_btn = gr.Button("Download All")
download_selected_btn = gr.Button("Download Selected")
# --- 事件处理 ---
# 当点击按钮时,手动从 Ketcher 获取 SMILES 并更新输入框
get_ketcher_smiles_btn.click(
fn=None,
inputs=None,
outputs=combined_smiles_input,
js="async () => { const iframe = document.getElementById('ifKetcher'); if(iframe && iframe.contentWindow && iframe.contentWindow.ketcher) { const smiles = await iframe.contentWindow.ketcher.getSmiles(); return smiles; } else { console.error('Ketcher not ready'); return ''; } }"
)
fragment_btn.click(fragment_molecule, inputs=combined_smiles_input, outputs=[constant_frag_input, variable_frag_input, attach_order_input])
def update_table_with_analogs(main_cls, minor_cls, number, delta_value):
analogs_data = generate_analogs(main_cls, minor_cls, number, delta_value)
return update_output_table(analogs_data)
generate_analogs_btn.click(update_table_with_analogs,
inputs=[main_cls_dropdown, minor_cls_dropdown, number_input, delta_value_slider],
outputs=output_table)
# TODO: 添加下载功能的回调
return demo
combined_demo = create_combined_interface()
app = gr.mount_gradio_app(app, combined_demo, path="/")
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=7860)