File size: 7,466 Bytes
b8cde37 c6c590f b8cde37 c6c590f b8cde37 c6c590f b8cde37 c6c590f b8cde37 c6c590f b8cde37 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
import joblib
import numpy as np
import math
from collections import Counter
class BiologicalFeatureExtractor:
"""Standalone extractor for GenetiForest (RandomForest)"""
def __init__(self, kmer_size=3):
self.kmer_size = kmer_size
self.kmers = self._generate_kmers(kmer_size)
def _generate_kmers(self, k):
bases = ['A', 'C', 'G', 'T']
if k == 1: return bases
return [b + s for b in bases for s in self._generate_kmers(k-1)]
def transform(self, X):
features = []
for seq in X:
seq = seq.upper().replace('U', 'T')
row = []
length = len(seq)
# 1. GC Content
gc_content = (seq.count('G') + seq.count('C')) / length if length > 0 else 0
row.append(gc_content)
# 2. Shannon Entropy
row.append(self._calculate_entropy(seq))
# 3. K-mer Frequency
total_kmers = length - self.kmer_size + 1
if total_kmers > 0:
counts = Counter([seq[i:i+self.kmer_size] for i in range(total_kmers)])
for kmer in self.kmers:
row.append(counts.get(kmer, 0) / total_kmers)
else:
row.extend([0] * len(self.kmers))
features.append(row)
return np.array(features)
def _calculate_entropy(self, seq):
if not seq: return 0
counts = Counter(seq)
total = len(seq)
entropy = 0
for count in counts.values():
p = count / total
entropy -= p * math.log2(p)
return entropy
class SequenceFeatureExtractor:
"""Standalone extractor for ViralBoost (GradientBoosting)"""
def __init__(self, kmer_size=5):
self.kmer_size = kmer_size
self.kmers = self._generate_kmers(kmer_size)
self.dinucleotides = ['AA', 'AT', 'AG', 'AC', 'TA', 'TT', 'TG', 'TC',
'GA', 'GT', 'GG', 'GC', 'CA', 'CT', 'CG', 'CC']
def _generate_kmers(self, k):
bases = ['A', 'C', 'G', 'T']
if k == 1: return bases
return [b + s for b in bases for s in self._generate_kmers(k-1)]
def transform(self, X):
features = []
for seq in X:
seq = seq.upper().replace('U', 'T')
row = []
length = len(seq)
row.append((seq.count('G') + seq.count('C')) / length if length > 0 else 0) # GC
row.append(self._calc_skew(seq, 'G', 'C')) # GC Skew
row.append(self._calc_skew(seq, 'A', 'T')) # AT Skew
row.append(self._calc_entropy(seq)) # Entropy
# 5-mer (Top 20)
t_kmers = length - self.kmer_size + 1
if t_kmers > 0:
k_counts = Counter([seq[i:i+self.kmer_size] for i in range(t_kmers)])
row.extend([k_counts.get(k, 0) / t_kmers for k in self.kmers[:20]])
else:
row.extend([0] * 20)
# Dinucleotides
t_di = length - 1
if t_di > 0:
d_counts = Counter([seq[i:i+2] for i in range(t_di)])
row.extend([d_counts.get(d, 0) / t_di for d in self.dinucleotides])
else:
row.extend([0] * 16)
row.append(self._calc_repeat(seq)) # repeat score
row.append(self._calc_cpg(seq, length)) # CpG
row.extend(self._calc_codon_bias(seq)) # Codon Pos Bias
features.append(row)
return np.array(features)
def _calc_skew(self, seq, b1, b2):
c1, c2 = seq.count(b1), seq.count(b2)
return (c1 - c2) / (c1 + c2) if (c1 + c2) > 0 else 0
def _calc_entropy(self, seq):
if not seq: return 0
c = Counter(seq); t = len(seq); e = 0
for v in c.values():
p = v/t
if p > 0: e -= p * math.log2(p)
return e
def _calc_repeat(self, seq):
if len(seq) < 6: return 0
cnt = 0
for l in [2, 3, 4]:
for i in range(len(seq) - l*2):
if seq[i:i+l] == seq[i+l:i+l*2]: cnt += 1
return cnt / len(seq)
def _calc_cpg(self, seq, length):
if length < 2: return 0
obs = seq.count('CG')
exp = (seq.count('C') * seq.count('G')) / length
return obs / exp if exp > 0 else 0
def _calc_codon_bias(self, seq):
if len(seq) < 3: return [0] * 12
p_c = [{}, {}, {}]
for i in range(0, len(seq)-2, 3):
for j in range(3):
b = seq[i+j]
if b in 'ATGC': p_c[j][b] = p_c[j].get(b, 0) + 1
res = []
for p in range(3):
t = sum(p_c[p].values()) or 1
for b in 'ATGC': res.append(p_c[p].get(b, 0) / t)
return res
def predict_dna(sequence, confidence_threshold=0.55, rare_class_threshold=0.65):
"""
DNA sequence prediction with confidence thresholds.
Args:
sequence: DNA sequence string
confidence_threshold: Minimum confidence for general classification (default 55%)
rare_class_threshold: Higher threshold for rare classes like Influenza B (default 65%)
"""
# Load Models
rf_model = joblib.load("dna_classifier.joblib")
rf_scaler = joblib.load("scaler_rf.joblib")
gb_model = joblib.load("sequence_model.joblib")
gb_scaler = joblib.load("scaler_gb.joblib")
# 1. GenetiForest Prediction (Synthetic vs Biological)
extractor_rf = BiologicalFeatureExtractor()
feat_rf = extractor_rf.transform([sequence])
scaled_rf = rf_scaler.transform(feat_rf)
type_basic = rf_model.predict(scaled_rf)[0]
rf_proba = rf_model.predict_proba(scaled_rf)[0]
rf_confidence = max(rf_proba)
# 2. ViralBoost Prediction (Virus Type) with Confidence Check
extractor_gb = SequenceFeatureExtractor()
feat_gb = extractor_gb.transform([sequence])
scaled_gb = gb_scaler.transform(feat_gb)
gb_proba = gb_model.predict_proba(scaled_gb)[0]
gb_confidence = max(gb_proba)
predicted_idx = gb_proba.argmax()
predicted_class = gb_model.classes_[predicted_idx]
# ํฌ๊ท ํด๋์ค (Influenza B ๋ฑ)๋ ๋ ๋์ ์ ๋ขฐ๋ ์๊ตฌ
rare_classes = ['Influenza B', 'Chicken anemia virus']
if predicted_class in rare_classes:
effective_threshold = rare_class_threshold
else:
effective_threshold = confidence_threshold
# ์ ๋ขฐ๋ ์๊ณ๊ฐ ๋ฏธ๋ฌ ์ 'Unknown'์ผ๋ก ๋ถ๋ฅ
if gb_confidence < effective_threshold:
type_virus = 'Unknown'
virus_confidence = gb_confidence
else:
type_virus = predicted_class
virus_confidence = gb_confidence
return {
"classification": type_basic,
"classification_confidence": float(rf_confidence),
"virus_identity": type_virus,
"virus_confidence": float(virus_confidence),
"raw_prediction": predicted_class, # ์๋ ์์ธก (๋๋ฒ๊น
์ฉ)
"raw_confidence": float(gb_confidence)
}
if __name__ == "__main__":
# Example usage
test_seq = "ATGCTAGCTAGCTAGCTAGCGGCTAGCTAGCTAGCTAGCTAGC"
try:
results = predict_dna(test_seq)
print(f"Results for sequence: {test_seq[:20]}...")
print(f"GenetiForest Result: {results['classification']}")
print(f"ViralBoost Result: {results['virus_identity']}")
except Exception as e:
print(f"Error: {e}")
print("Ensure all .joblib files are in the same directory.")
|