Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import os | |
| from pathlib import Path | |
| from mmpdblib.fragment_io import read_fragment_records | |
| from rdkit import Chem | |
| class Index_Dummy: | |
| """对 dummy 原子进行编号:变量和常量部分分别处理""" | |
| def __init__(self, df): | |
| self.df = df | |
| def index_constant(self, constSmi, attachmentOrder): | |
| count = -1 | |
| newConstSmi = "" | |
| for idx, ichar in enumerate(constSmi): | |
| if ichar == '*': | |
| count += 1 | |
| # 注意:attachmentOrder 应为可迭代对象,这里假设传入的 attachmentOrder 为列表或可转换为列表 | |
| ichar = f"[*:{int(attachmentOrder[count]) + 1}]" | |
| newConstSmi += ichar | |
| return newConstSmi | |
| def index_var(self, varSmi): | |
| count = 0 | |
| newVarSmi = "" | |
| for idx, ichar in enumerate(varSmi): | |
| if ichar == '*': | |
| count += 1 | |
| ichar = f"[*:{count}]" | |
| newVarSmi += ichar | |
| return newVarSmi | |
| def add_index(self): | |
| for idx, row in self.df.iterrows(): | |
| varSmi = row['variable_smiles'] | |
| constSmi = row['constant_smiles'] | |
| attachmentOrder = row['attachment_order'] | |
| self.df.loc[idx, 'variable_smiles'] = self.index_var(varSmi) | |
| self.df.loc[idx, 'constant_smiles'] = self.index_constant(constSmi, attachmentOrder) | |
| return self.df | |
| def count_heavy_atoms(smi): | |
| mol = Chem.MolFromSmiles(smi) | |
| if not mol: | |
| return 0 | |
| heavy_count = len([atom for atom in mol.GetAtoms() if atom.GetAtomicNum() > 1]) | |
| return heavy_count | |
| def fragmentize_molecule(smiles_string: str, max_ratio: float = 0.8) -> pd.DataFrame: | |
| """ | |
| 对单个分子进行 fragment 化处理: | |
| 1. 将 SMILES 字符串写入临时文件(同时写入标题信息) | |
| 2. 使用 mmpdb 工具 fragment 化分子 | |
| 3. 读取 fragment 文件,并依据 heavy atom 个数筛选合适的 fragment | |
| 4. 对 fragment 中 dummy 原子添加编号 | |
| 5. 最后返回 DataFrame 格式的 fragment 数据 | |
| """ | |
| # 定义临时文件名(这里保证文件名唯一性可根据需要进一步改进) | |
| input_file = "temp_input.smi" | |
| output_file = "temp_output.fragments" | |
| try: | |
| # 将 SMILES 字符串写入临时输入文件(标题默认写 “Molecule”) | |
| with open(input_file, "w") as f: | |
| f.write(smiles_string + "\t" + "Molecule" + "\n") | |
| # 使用 mmpdb 工具进行分子碎片化 | |
| ret = os.system(f"mmpdb fragment {input_file} -o {output_file}") | |
| if ret != 0: | |
| raise Exception("mmpdb fragment 命令执行失败,请确保 mmpdb 工具安装并配置正确。") | |
| # 读取并处理碎片 | |
| fragment_reader = read_fragment_records(output_file) | |
| frag_list = [] | |
| for record in fragment_reader: | |
| # 打印或记录当前处理的 record 信息,可根据需要选择注释掉 | |
| print(f"Processing record: {record.id}, {record.normalized_smiles}") | |
| for frag in record.fragments: | |
| if count_heavy_atoms(frag.variable_smiles) < count_heavy_atoms(record.normalized_smiles) * max_ratio: | |
| frag_list.append({ | |
| 'variable_smiles': frag.variable_smiles, | |
| 'constant_smiles': frag.constant_smiles, | |
| 'record_id': record.id, | |
| 'normalized_smiles': record.normalized_smiles, | |
| 'attachment_order': frag.attachment_order | |
| }) | |
| if not frag_list: | |
| raise Exception("未找到满足筛选条件的碎片。") | |
| # 构造 DataFrame,并对 dummy 原子添加编号 | |
| df_frag = pd.DataFrame(frag_list) | |
| index_dummy = Index_Dummy(df_frag) | |
| df_frag = index_dummy.add_index() | |
| return df_frag | |
| finally: | |
| # 删除临时文件,确保每次调用结束后文件被清理 | |
| if Path(input_file).exists(): | |
| os.remove(input_file) | |
| if Path(output_file).exists(): | |
| os.remove(output_file) | |