Upload DataProcessor_pdeeppp.py with huggingface_hub
Browse files- DataProcessor_pdeeppp.py +87 -0
DataProcessor_pdeeppp.py
ADDED
|
@@ -0,0 +1,87 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
from transformers.processing_utils import ProcessorMixin
|
| 2 |
+
from transformers.tokenization_utils_base import BatchEncoding
|
| 3 |
+
|
| 4 |
+
|
| 5 |
+
class PDeepPPProcessor(ProcessorMixin):
|
| 6 |
+
def __init__(self, pad_char="X", target_length=33):
|
| 7 |
+
self.pad_char = pad_char
|
| 8 |
+
self.target_length = target_length
|
| 9 |
+
|
| 10 |
+
def pad_sequence(self, seq):
|
| 11 |
+
"""确保序列长度为 target_length,不足的部分用 pad_char 在两侧均匀填充"""
|
| 12 |
+
if len(seq) < self.target_length:
|
| 13 |
+
total_padding = self.target_length - len(seq)
|
| 14 |
+
left_padding = total_padding // 2
|
| 15 |
+
right_padding = total_padding - left_padding
|
| 16 |
+
seq = self.pad_char * left_padding + seq + self.pad_char * right_padding
|
| 17 |
+
return seq[:self.target_length]
|
| 18 |
+
|
| 19 |
+
def extract_ptm_sequences(self, sequences):
|
| 20 |
+
"""处理 PTM 数据,确保目标氨基酸(S、T、Y)位于序列中心"""
|
| 21 |
+
ptm_data = []
|
| 22 |
+
for seq in sequences:
|
| 23 |
+
for i in range(len(seq)):
|
| 24 |
+
if seq[i] in {'S', 'T', 'Y'}: # 仅提取 S、T、Y 作为中心的片段
|
| 25 |
+
start = max(0, i - self.target_length // 2)
|
| 26 |
+
end = min(len(seq), start + self.target_length)
|
| 27 |
+
padded_seq = self.pad_sequence(seq[start:end])
|
| 28 |
+
ptm_data.append(padded_seq)
|
| 29 |
+
return ptm_data
|
| 30 |
+
|
| 31 |
+
def extract_bps_sequences(self, sequences, overlapping=True, step_size=5):
|
| 32 |
+
"""处理生物活性数据(BPS),关注整个序列,可重叠"""
|
| 33 |
+
bioactive_data = []
|
| 34 |
+
for seq in sequences:
|
| 35 |
+
if len(seq) < self.target_length:
|
| 36 |
+
# 如果序列长度不足,直接填充到 target_length
|
| 37 |
+
padded_seq = self.pad_sequence(seq)
|
| 38 |
+
bioactive_data.append(padded_seq)
|
| 39 |
+
else:
|
| 40 |
+
# 如果序列长度足够,按照滑动窗口提取片段
|
| 41 |
+
for i in range(0, len(seq) - self.target_length + 1,
|
| 42 |
+
step_size if overlapping else self.target_length):
|
| 43 |
+
bioactive_data.append(self.pad_sequence(seq[i:i + self.target_length]))
|
| 44 |
+
return bioactive_data
|
| 45 |
+
|
| 46 |
+
def __call__(
|
| 47 |
+
self,
|
| 48 |
+
sequences,
|
| 49 |
+
mode, # 去除默认值,强制外部传入
|
| 50 |
+
overlapping=True,
|
| 51 |
+
step_size=5,
|
| 52 |
+
**kwargs
|
| 53 |
+
):
|
| 54 |
+
"""
|
| 55 |
+
预处理蛋白质序列,仅处理数据到指定长度。
|
| 56 |
+
|
| 57 |
+
Args:
|
| 58 |
+
sequences: 序列列表或单个序列字符串。
|
| 59 |
+
mode: 选择处理模式,必须从外部传入,"PTM" 或 "BPS"。
|
| 60 |
+
overlapping: BPS 模式下是否使用重叠窗口。
|
| 61 |
+
step_size: BPS 模式下的步长。
|
| 62 |
+
"""
|
| 63 |
+
# 确保 sequences 是列表
|
| 64 |
+
if isinstance(sequences, str):
|
| 65 |
+
sequences = [sequences]
|
| 66 |
+
|
| 67 |
+
# 根据模式提取序列
|
| 68 |
+
if mode == "PTM":
|
| 69 |
+
processed_sequences = self.extract_ptm_sequences(sequences)
|
| 70 |
+
elif mode == "BPS":
|
| 71 |
+
processed_sequences = self.extract_bps_sequences(
|
| 72 |
+
sequences,
|
| 73 |
+
overlapping=overlapping,
|
| 74 |
+
step_size=step_size
|
| 75 |
+
)
|
| 76 |
+
else:
|
| 77 |
+
raise ValueError("Invalid mode. Please choose 'PTM' or 'BPS'.")
|
| 78 |
+
|
| 79 |
+
if len(processed_sequences) == 0:
|
| 80 |
+
raise ValueError("No sequences processed. Check input data and processing logic.")
|
| 81 |
+
|
| 82 |
+
# 创建返回字典,仅包含预处理后的序列
|
| 83 |
+
model_inputs = {
|
| 84 |
+
"raw_sequences": processed_sequences, # 预处理后的序列
|
| 85 |
+
}
|
| 86 |
+
|
| 87 |
+
return BatchEncoding(data=model_inputs) # 返回处理后的数据
|