| import json | |
| import os | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| import torch.nn.functional as F | |
| from torch.utils.data import Dataset, DataLoader | |
| from transformers import AutoTokenizer, AutoModel | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| tokenizer_embeddings = AutoTokenizer.from_pretrained('sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2') | |
| model_embeddings = AutoModel.from_pretrained('sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2').to(device) | |
| class ModerationModel(nn.Module): | |
| def __init__(self): | |
| input_size = 384 | |
| hidden_size = 128 | |
| output_size = 11 | |
| super(ModerationModel, self).__init__() | |
| self.fc1 = nn.Linear(input_size, hidden_size) | |
| self.fc2 = nn.Linear(hidden_size, output_size) | |
| def forward(self, x): | |
| x = F.relu(self.fc1(x)) | |
| x = self.fc2(x) | |
| return x | |
| def mean_pooling(model_output, attention_mask): | |
| token_embeddings = model_output[0] | |
| input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() | |
| return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9) | |
| def getEmbeddings(sentences): | |
| encoded_input = tokenizer_embeddings(sentences, padding=True, truncation=True, return_tensors='pt').to(device) | |
| with torch.no_grad(): | |
| model_output = model_embeddings(**encoded_input) | |
| sentence_embeddings = mean_pooling(model_output, encoded_input['attention_mask']) | |
| return sentence_embeddings.cpu() | |
| def getEmb(text): | |
| sentences = [text] | |
| sentence_embeddings = getEmbeddings(sentences) | |
| return sentence_embeddings.tolist()[0] | |
| def predict(model, embeddings): | |
| model.eval() | |
| with torch.no_grad(): | |
| embeddings_tensor = torch.tensor(embeddings, dtype=torch.float) | |
| outputs = model(embeddings_tensor.unsqueeze(0)) | |
| predicted_scores = torch.sigmoid(outputs) | |
| predicted_scores = predicted_scores.squeeze(0).tolist() | |
| category_names = ["harassment", "harassment-threatening", "hate", "hate-threatening", "self-harm", "self-harm-instructions", "self-harm-intent", "sexual", "sexual-minors", "violence", "violence-graphic"] | |
| result = {category: score for category, score in zip(category_names, predicted_scores)} | |
| detected = {category: score > 0.5 for category, score in zip(category_names, predicted_scores)} | |
| detect_value = any(detected.values()) | |
| return {"category_scores": result, 'detect': detected, 'detected': detect_value} | |