|
|
from rest_framework.views import APIView |
|
|
from rest_framework.response import Response |
|
|
from rest_framework import status |
|
|
from django.conf import settings |
|
|
import torch |
|
|
from transformers import AutoTokenizer, AutoModelForMaskedLM |
|
|
import os |
|
|
import math |
|
|
|
|
|
|
|
|
MODEL_NAME = "roberta-base" |
|
|
print(f"Loading Model: {MODEL_NAME}...") |
|
|
|
|
|
try: |
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
print(f"Using Device: {device}") |
|
|
|
|
|
|
|
|
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) |
|
|
model = AutoModelForMaskedLM.from_pretrained(MODEL_NAME) |
|
|
model.to(device) |
|
|
model.eval() |
|
|
print("Model Loaded Successfully!") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error loading model: {e}") |
|
|
model = None |
|
|
tokenizer = None |
|
|
|
|
|
import time |
|
|
|
|
|
def calculate_token_scores(text): |
|
|
""" |
|
|
Analyzes text at token level to identify AI-generated regions. |
|
|
Returns: List of (start_char, end_char, score) tuples. |
|
|
""" |
|
|
if not tokenizer or not model: |
|
|
return [] |
|
|
|
|
|
encodings = tokenizer(text, return_tensors="pt", truncation=True, max_length=512, return_offsets_mapping=True) |
|
|
input_ids = encodings.input_ids.to(device) |
|
|
offsets = encodings.offset_mapping[0].cpu().numpy() |
|
|
seq_len = input_ids.shape[1] |
|
|
|
|
|
if seq_len < 4: |
|
|
return [] |
|
|
|
|
|
|
|
|
nlls = [] |
|
|
|
|
|
|
|
|
BATCH_SIZE = 4 |
|
|
tensor_input_ids = input_ids.repeat(BATCH_SIZE, 1) |
|
|
|
|
|
|
|
|
indices_to_mask = list(range(1, seq_len - 1)) |
|
|
|
|
|
token_losses = [0.0] * seq_len |
|
|
|
|
|
loss_fct = torch.nn.CrossEntropyLoss(reduction='none') |
|
|
|
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
for i in range(0, len(indices_to_mask), BATCH_SIZE): |
|
|
batch_indices = indices_to_mask[i : i + BATCH_SIZE] |
|
|
current_curr_size = len(batch_indices) |
|
|
if current_curr_size == 0: break |
|
|
|
|
|
batch_input_ids = tensor_input_ids[:current_curr_size].clone() |
|
|
batch_labels = torch.full(batch_input_ids.shape, -100).to(device) |
|
|
|
|
|
for j, pos in enumerate(batch_indices): |
|
|
batch_labels[j, pos] = batch_input_ids[j, pos].item() |
|
|
batch_input_ids[j, pos] = tokenizer.mask_token_id |
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = model(batch_input_ids) |
|
|
predictions = outputs.logits |
|
|
|
|
|
predictions = predictions.permute(0, 2, 1) |
|
|
loss = loss_fct(predictions, batch_labels) |
|
|
|
|
|
masked_losses = loss.sum(dim=1) |
|
|
|
|
|
for j, pos in enumerate(batch_indices): |
|
|
token_losses[pos] = masked_losses[j].item() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
HIGHLIGHT_THRESHOLD = 1.0 |
|
|
|
|
|
highlights = [] |
|
|
|
|
|
for i in range(1, seq_len - 1): |
|
|
loss = token_losses[i] |
|
|
|
|
|
if loss < HIGHLIGHT_THRESHOLD: |
|
|
start, end = offsets[i] |
|
|
|
|
|
if start == end: continue |
|
|
highlights.append((start, end, loss)) |
|
|
|
|
|
return highlights |
|
|
|
|
|
def calculate_perplexity(text): |
|
|
""" |
|
|
Calculates Pseudo-Perplexity (PPL) for Masked Language Models (like RoBERTa). |
|
|
Formula: PPL = exp( -1/N * sum( log(P(w_i | context)) ) ) |
|
|
|
|
|
OPTIMIZATION: |
|
|
- Strided Masking (Stride=3): Mask every 3rd token. 3x Speedup. |
|
|
- Batch Size: 16 |
|
|
""" |
|
|
t0 = time.time() |
|
|
|
|
|
|
|
|
encodings = tokenizer(text, return_tensors="pt", truncation=True, max_length=512) |
|
|
input_ids = encodings.input_ids.to(device) |
|
|
seq_len = input_ids.shape[1] |
|
|
|
|
|
|
|
|
if seq_len < 4: |
|
|
return 100.0 |
|
|
|
|
|
nlls = [] |
|
|
|
|
|
|
|
|
BATCH_SIZE = 8 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
STRIDE = 3 |
|
|
|
|
|
tensor_input_ids = input_ids.repeat(BATCH_SIZE, 1) |
|
|
|
|
|
|
|
|
start_idx = 1 |
|
|
end_idx = seq_len - 1 |
|
|
|
|
|
loss_fct = torch.nn.CrossEntropyLoss(reduction='none') |
|
|
|
|
|
|
|
|
indices_to_mask = list(range(start_idx, end_idx, STRIDE)) |
|
|
total_steps = len(indices_to_mask) |
|
|
|
|
|
for i in range(0, total_steps, BATCH_SIZE): |
|
|
|
|
|
batch_indices = indices_to_mask[i : i + BATCH_SIZE] |
|
|
current_batch_size = len(batch_indices) |
|
|
|
|
|
if current_batch_size == 0: break |
|
|
|
|
|
|
|
|
batch_input_ids = tensor_input_ids[:current_batch_size].clone() |
|
|
|
|
|
|
|
|
batch_labels = torch.full(batch_input_ids.shape, -100).to(device) |
|
|
|
|
|
|
|
|
for j, token_pos in enumerate(batch_indices): |
|
|
|
|
|
batch_labels[j, token_pos] = batch_input_ids[j, token_pos].item() |
|
|
|
|
|
batch_input_ids[j, token_pos] = tokenizer.mask_token_id |
|
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = model(batch_input_ids) |
|
|
predictions = outputs.logits |
|
|
|
|
|
|
|
|
predictions = predictions.permute(0, 2, 1) |
|
|
loss = loss_fct(predictions, batch_labels) |
|
|
|
|
|
|
|
|
masked_losses = loss.sum(dim=1) |
|
|
nlls.append(masked_losses) |
|
|
|
|
|
if not nlls: |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
all_nlls = torch.cat(nlls) |
|
|
|
|
|
|
|
|
mean_nll = all_nlls.mean() |
|
|
|
|
|
|
|
|
ppl = torch.exp(mean_nll) |
|
|
|
|
|
print(f"Inference Time: {time.time() - t0:.2f}s") |
|
|
return ppl.item() |
|
|
|
|
|
class AnalyzeView(APIView): |
|
|
def post(self, request): |
|
|
if not model or not tokenizer: |
|
|
return Response({"error": "Model not loaded"}, status=status.HTTP_503_SERVICE_UNAVAILABLE) |
|
|
|
|
|
data = request.data |
|
|
text = data.get('text', '') |
|
|
if not text and 'file' in request.FILES: |
|
|
text = "File content placeholder" |
|
|
|
|
|
if not text: |
|
|
return Response({"error": "No text provided"}, status=status.HTTP_400_BAD_REQUEST) |
|
|
|
|
|
try: |
|
|
ppl = calculate_perplexity(text) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ai_score = 100 / (1 + math.exp(3 * (ppl - 2.5))) |
|
|
|
|
|
|
|
|
ai_score = max(2.5, min(99.5, ai_score)) |
|
|
|
|
|
label = "AI Generated" if ai_score > 50 else "Human Written" |
|
|
|
|
|
return Response({ |
|
|
"score": round(ai_score, 1), |
|
|
"label": label, |
|
|
"perplexity": round(ppl, 2), |
|
|
"device": str(device) |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
|
|
|
|
|
from django.http import FileResponse |
|
|
from reportlab.lib.pagesizes import letter |
|
|
from reportlab.lib import colors |
|
|
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer |
|
|
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle |
|
|
from reportlab.lib.enums import TA_JUSTIFY |
|
|
import io |
|
|
|
|
|
class ReportView(APIView): |
|
|
def post(self, request): |
|
|
try: |
|
|
data = request.data |
|
|
text = data.get('text', '') |
|
|
score = data.get('score', 0) |
|
|
label = data.get('label', 'Unknown') |
|
|
|
|
|
|
|
|
|
|
|
highlights = calculate_token_scores(text) |
|
|
|
|
|
|
|
|
highlights.sort(key=lambda x: x[0]) |
|
|
|
|
|
|
|
|
|
|
|
import html |
|
|
|
|
|
formatted_text = "" |
|
|
current_idx = 0 |
|
|
|
|
|
|
|
|
print(f"Generating PDF for text length: {len(text)} with {len(highlights)} highlights.") |
|
|
|
|
|
for start, end, loss in highlights: |
|
|
if start < current_idx: continue |
|
|
|
|
|
|
|
|
segment = text[current_idx:start] |
|
|
formatted_text += html.escape(segment) |
|
|
|
|
|
|
|
|
segment = text[start:end] |
|
|
|
|
|
formatted_text += f'<font backColor="yellow">{html.escape(segment)}</font>' |
|
|
|
|
|
current_idx = end |
|
|
|
|
|
|
|
|
formatted_text += html.escape(text[current_idx:]) |
|
|
|
|
|
|
|
|
formatted_text = formatted_text.replace('\n', '<br/>') |
|
|
|
|
|
|
|
|
buffer = io.BytesIO() |
|
|
doc = SimpleDocTemplate(buffer, pagesize=letter, |
|
|
rightMargin=72, leftMargin=72, |
|
|
topMargin=72, bottomMargin=18) |
|
|
|
|
|
Story = [] |
|
|
|
|
|
styles = getSampleStyleSheet() |
|
|
styles.add(ParagraphStyle(name='Justify', alignment=TA_JUSTIFY)) |
|
|
|
|
|
|
|
|
Story.append(Paragraph("DetectAI Analysis Report", styles["Heading1"])) |
|
|
Story.append(Paragraph(f"Date: {time.strftime('%Y-%m-%d %H:%M:%S')}", styles["Normal"])) |
|
|
Story.append(Spacer(1, 12)) |
|
|
|
|
|
|
|
|
|
|
|
total_chars = len(text) |
|
|
highlighted_chars = sum((h[1] - h[0]) for h in highlights) |
|
|
highlight_ratio = (highlighted_chars / total_chars * 100) if total_chars > 0 else 0 |
|
|
|
|
|
|
|
|
res_color = "red" if score > 50 else "green" |
|
|
|
|
|
|
|
|
Story.append(Paragraph(f'AI Probability: <font color="{res_color}"><b>{score}%</b></font>', styles["Heading2"])) |
|
|
|
|
|
|
|
|
Story.append(Paragraph(f'Highlighted Content: <b>{highlight_ratio:.1f}%</b>', styles["Normal"])) |
|
|
|
|
|
Story.append(Spacer(1, 12)) |
|
|
|
|
|
|
|
|
Story.append(Paragraph("Analyzed Content:", styles["Heading3"])) |
|
|
Story.append(Spacer(1, 6)) |
|
|
|
|
|
|
|
|
Story.append(Paragraph(formatted_text, styles["Justify"])) |
|
|
|
|
|
doc.build(Story) |
|
|
|
|
|
buffer.seek(0) |
|
|
return FileResponse(buffer, as_attachment=True, filename='detectAI_report.pdf') |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Report Generation Error: {e}") |
|
|
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
|
|
|