CPU-Based-AI-Guardrail / training2.py
boying07's picture
Upload 2 files
8487231 verified
Raw
History Blame Contribute Delete
7.39 kB
# training_final.py
import os
import json
import random
import argparse
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModel
from torch.utils.data import Dataset, DataLoader
def set_seed(seed=42):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
class JsonDataset(Dataset):
def __init__(self, rows, tokenizer, max_length=256):
self.rows = rows
self.tokenizer = tokenizer
self.max_length = max_length
def __len__(self):
return len(self.rows)
def __getitem__(self, idx):
row = self.rows[idx]
enc = self.tokenizer(
row["text"],
truncation=True,
padding="max_length",
max_length=self.max_length,
return_tensors="pt"
)
return {
"input_ids": enc["input_ids"].squeeze(0),
"attention_mask": enc["attention_mask"].squeeze(0),
"label": torch.tensor(row["label"], dtype=torch.float)
}
class PairDataset(Dataset):
def __init__(self, rows, tokenizer, max_length=256):
self.rows = rows
self.tokenizer = tokenizer
self.max_length = max_length
self.normal = [x for x in rows if x["label"] == 0]
self.attack = [x for x in rows if x["label"] == 1]
def __len__(self):
return len(self.rows)
def encode(self, text):
enc = self.tokenizer(
text,
truncation=True,
padding="max_length",
max_length=self.max_length,
return_tensors="pt"
)
return enc
def __getitem__(self, idx):
anchor = self.rows[idx]
if random.random() < 0.5:
other = random.choice(
self.normal if anchor["label"] == 0 else self.attack
)
target = 1
else:
other = random.choice(
self.attack if anchor["label"] == 0 else self.normal
)
target = -1
a = self.encode(anchor["text"])
b = self.encode(other["text"])
return {
"a_input_ids": a["input_ids"].squeeze(0),
"a_attention_mask": a["attention_mask"].squeeze(0),
"b_input_ids": b["input_ids"].squeeze(0),
"b_attention_mask": b["attention_mask"].squeeze(0),
"target": torch.tensor(target, dtype=torch.float)
}
class SharedEncoder(nn.Module):
def __init__(self, model_name):
super().__init__()
self.encoder = AutoModel.from_pretrained(model_name)
def mean_pool(self, hidden, mask):
mask = mask.unsqueeze(-1).expand(hidden.size()).float()
masked = hidden * mask
summed = masked.sum(1)
counts = mask.sum(1).clamp(min=1e-9)
return summed / counts
def forward(self, input_ids, attention_mask):
outputs = self.encoder(
input_ids=input_ids,
attention_mask=attention_mask
)
pooled = self.mean_pool(outputs.last_hidden_state, attention_mask)
pooled = F.normalize(pooled, p=2, dim=-1)
return pooled
class ClassifierHead(nn.Module):
def __init__(self, dim=768):
super().__init__()
self.net = nn.Sequential(
nn.Linear(dim, 256),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(256, 1)
)
def forward(self, x):
return self.net(x).squeeze(-1)
def load_jsonl(path):
rows = []
with open(path, "r", encoding="utf-8") as f:
for line in f:
row = json.loads(line)
label = row["label"]
label = 0 if label == "NORMAL" else 1
rows.append({
"text": row["text"],
"label": label
})
return rows
def train_encoder(model, loader, device, epochs=2):
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-5)
criterion = nn.CosineEmbeddingLoss(margin=0.2)
model.train()
for epoch in range(epochs):
losses = []
for batch in loader:
emb1 = model(
batch["a_input_ids"].to(device),
batch["a_attention_mask"].to(device)
)
emb2 = model(
batch["b_input_ids"].to(device),
batch["b_attention_mask"].to(device)
)
loss = criterion(emb1, emb2, batch["target"].to(device))
optimizer.zero_grad()
loss.backward()
optimizer.step()
losses.append(loss.item())
print(f"[Encoder Epoch {epoch+1}] Loss: {np.mean(losses):.4f}")
def extract_embeddings(model, loader, device):
model.eval()
X = []
y = []
with torch.no_grad():
for batch in loader:
emb = model(
batch["input_ids"].to(device),
batch["attention_mask"].to(device)
)
X.append(emb.cpu().numpy())
y.extend(batch["label"].numpy())
return np.concatenate(X), np.array(y)
def train_classifier(X_train, y_train, device):
model = ClassifierHead().to(device)
X = torch.tensor(X_train).float().to(device)
y = torch.tensor(y_train).float().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.BCEWithLogitsLoss()
for epoch in range(8):
logits = model(X)
loss = criterion(logits, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f"[Classifier Epoch {epoch+1}] Loss: {loss.item():.4f}")
return model
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--train", nargs="+", required=True)
parser.add_argument("--output-dir", required=True)
parser.add_argument("--model-name", default="dbmdz/bert-base-turkish-cased")
args = parser.parse_args()
set_seed()
os.makedirs(args.output_dir, exist_ok=True)
device = "cuda" if torch.cuda.is_available() else "cpu"
rows = []
for path in args.train:
print(f"Loading: {path}")
rows.extend(load_jsonl(path))
print("Loaded rows:", len(rows))
tokenizer = AutoTokenizer.from_pretrained(args.model_name)
pair_dataset = PairDataset(rows, tokenizer)
pair_loader = DataLoader(pair_dataset, batch_size=16, shuffle=True)
encoder = SharedEncoder(args.model_name).to(device)
print("Training encoder...")
train_encoder(encoder, pair_loader, device)
ds = JsonDataset(rows, tokenizer)
loader = DataLoader(ds, batch_size=16)
print("Extracting embeddings...")
X, y = extract_embeddings(encoder, loader, device)
normal_bank = X[y == 0]
attack_bank = X[y == 1]
np.save(
os.path.join(args.output_dir, "homayshield_normal_bank.npy"),
normal_bank
)
np.save(
os.path.join(args.output_dir, "homayshield_attack_bank.npy"),
attack_bank
)
torch.save(
encoder.state_dict(),
os.path.join(args.output_dir, "homayshield_encoder.pt")
)
print("Training classifier...")
classifier = train_classifier(X, y, device)
torch.save(
classifier.state_dict(),
os.path.join(args.output_dir, "homayshield_classifier.pt")
)
print("Training completed.")
if __name__ == "__main__":
main()