from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import status from django.conf import settings import torch from transformers import AutoTokenizer, AutoModelForMaskedLM import os import math # Model: roberta-base (Masked LM) # Logic: Low Perplexity = High AI Probability (Predictable) MODEL_NAME = "roberta-base" print(f"Loading Model: {MODEL_NAME}...") try: device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"Using Device: {device}") # Use MaskedLM to calculate loss/perplexity tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) model = AutoModelForMaskedLM.from_pretrained(MODEL_NAME) model.to(device) model.eval() print("Model Loaded Successfully!") except Exception as e: print(f"Error loading model: {e}") model = None tokenizer = None import time def calculate_token_scores(text): """ Analyzes text at token level to identify AI-generated regions. Returns: List of (start_char, end_char, score) tuples. """ if not tokenizer or not model: return [] encodings = tokenizer(text, return_tensors="pt", truncation=True, max_length=512, return_offsets_mapping=True) input_ids = encodings.input_ids.to(device) offsets = encodings.offset_mapping[0].cpu().numpy() # [(0,0), (0,3), (3,4)...] seq_len = input_ids.shape[1] if seq_len < 4: return [] # Calculate loss for every token (Stride=1 for max granularity) nlls = [] # Reduced BATCH_SIZE to 4 to prevent OOM on 4GB GPU BATCH_SIZE = 4 tensor_input_ids = input_ids.repeat(BATCH_SIZE, 1) # Iterate all tokens (excluding CLS/SEP) indices_to_mask = list(range(1, seq_len - 1)) token_losses = [0.0] * seq_len loss_fct = torch.nn.CrossEntropyLoss(reduction='none') torch.cuda.empty_cache() # Clear memory before loop for i in range(0, len(indices_to_mask), BATCH_SIZE): batch_indices = indices_to_mask[i : i + BATCH_SIZE] current_curr_size = len(batch_indices) if current_curr_size == 0: break batch_input_ids = tensor_input_ids[:current_curr_size].clone() batch_labels = torch.full(batch_input_ids.shape, -100).to(device) for j, pos in enumerate(batch_indices): batch_labels[j, pos] = batch_input_ids[j, pos].item() batch_input_ids[j, pos] = tokenizer.mask_token_id with torch.no_grad(): outputs = model(batch_input_ids) predictions = outputs.logits # [B, L, V] predictions = predictions.permute(0, 2, 1) loss = loss_fct(predictions, batch_labels) # [B, L] masked_losses = loss.sum(dim=1) # [B] for j, pos in enumerate(batch_indices): token_losses[pos] = masked_losses[j].item() # Identify ranges to highlight # Threshold: If loss is low (< approx 2.0 - 3.0), it's likely AI. # Let's say Threshold = 2.5 (similar to global threshold) # Adjusted to 1.0 (Strict) to avoid highlighting common human words. HIGHLIGHT_THRESHOLD = 1.0 highlights = [] for i in range(1, seq_len - 1): loss = token_losses[i] # Lower loss = More likely AI if loss < HIGHLIGHT_THRESHOLD: start, end = offsets[i] # Filter out special tokens or empty offsets if start == end: continue highlights.append((start, end, loss)) return highlights def calculate_perplexity(text): """ Calculates Pseudo-Perplexity (PPL) for Masked Language Models (like RoBERTa). Formula: PPL = exp( -1/N * sum( log(P(w_i | context)) ) ) OPTIMIZATION: - Strided Masking (Stride=3): Mask every 3rd token. 3x Speedup. - Batch Size: 16 """ t0 = time.time() # 1. Tokenize encodings = tokenizer(text, return_tensors="pt", truncation=True, max_length=512) input_ids = encodings.input_ids.to(device) # Shape: [1, seq_len] seq_len = input_ids.shape[1] # If sequence is too short, return default if seq_len < 4: return 100.0 nlls = [] # Negative Log Likelihoods # 2. Batched Masking for Efficiency BATCH_SIZE = 8 # STRIDE: How many tokens to skip? # Stride 1 = All tokens (Slowest, Most Accurate) # Stride 3 = Every 3rd token (3x Faster, Good Approx) STRIDE = 3 tensor_input_ids = input_ids.repeat(BATCH_SIZE, 1) # [BATCH, seq_len] # Iterate through tokens [1...seq_len-2] to avoid CLS/SEP start_idx = 1 end_idx = seq_len - 1 loss_fct = torch.nn.CrossEntropyLoss(reduction='none') # Create list of indices to mask indices_to_mask = list(range(start_idx, end_idx, STRIDE)) total_steps = len(indices_to_mask) for i in range(0, total_steps, BATCH_SIZE): # valid batch current step batch_indices = indices_to_mask[i : i + BATCH_SIZE] current_batch_size = len(batch_indices) if current_batch_size == 0: break # Prepare inputs: Clone the repeated text batch_input_ids = tensor_input_ids[:current_batch_size].clone() # Create labels: -100 means ignore index for loss calculation batch_labels = torch.full(batch_input_ids.shape, -100).to(device) # Mask the target tokens for j, token_pos in enumerate(batch_indices): # Save the original token ID as the label batch_labels[j, token_pos] = batch_input_ids[j, token_pos].item() # Replace input with [MASK] batch_input_ids[j, token_pos] = tokenizer.mask_token_id # Forward pass with torch.no_grad(): outputs = model(batch_input_ids) predictions = outputs.logits # [batch, seq_len, vocab] # Calculate loss only for the masked positions predictions = predictions.permute(0, 2, 1) loss = loss_fct(predictions, batch_labels) # Get scalar loss for the masked token masked_losses = loss.sum(dim=1) # [current_batch_size] nlls.append(masked_losses) if not nlls: return 0.0 # Stack all NLLs all_nlls = torch.cat(nlls) # Mean NLL mean_nll = all_nlls.mean() # Perplexity = exp(mean_nll) ppl = torch.exp(mean_nll) print(f"Inference Time: {time.time() - t0:.2f}s") return ppl.item() class AnalyzeView(APIView): def post(self, request): if not model or not tokenizer: return Response({"error": "Model not loaded"}, status=status.HTTP_503_SERVICE_UNAVAILABLE) data = request.data text = data.get('text', '') if not text and 'file' in request.FILES: text = "File content placeholder" if not text: return Response({"error": "No text provided"}, status=status.HTTP_400_BAD_REQUEST) try: ppl = calculate_perplexity(text) # PPL < 2.5 -> AI # PPL > 2.5 -> Human # Sigmoid Curve calibrated at 2.5 # Formula: 100 / (1 + exp(3 * (ppl - 2.5))) ai_score = 100 / (1 + math.exp(3 * (ppl - 2.5))) # Clamp between 2 and 99 ai_score = max(2.5, min(99.5, ai_score)) label = "AI Generated" if ai_score > 50 else "Human Written" return Response({ "score": round(ai_score, 1), "label": label, "perplexity": round(ppl, 2), "device": str(device) }) except Exception as e: return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) from django.http import FileResponse from reportlab.lib.pagesizes import letter from reportlab.lib import colors from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.lib.enums import TA_JUSTIFY import io class ReportView(APIView): def post(self, request): try: data = request.data text = data.get('text', '') score = data.get('score', 0) label = data.get('label', 'Unknown') # Granular Highlighting Logic # 1. Get highlight ranges highlights = calculate_token_scores(text) # 2. Sort highlights by start index highlights.sort(key=lambda x: x[0]) # 3. Construct XML tagged string # IMPORTANT: Escape text to prevent XML errors in ReportLab import html formatted_text = "" current_idx = 0 # Simple greedy tagging print(f"Generating PDF for text length: {len(text)} with {len(highlights)} highlights.") for start, end, loss in highlights: if start < current_idx: continue # Skip overlaps # Append non-highlighted text segment = text[current_idx:start] formatted_text += html.escape(segment) # Append highlighted text segment = text[start:end] # XML tag for yellow background formatted_text += f'{html.escape(segment)}' current_idx = end # Append remaining text formatted_text += html.escape(text[current_idx:]) # Handle newlines for HTML formatted_text = formatted_text.replace('\n', '
') # Create PDF buffer = io.BytesIO() doc = SimpleDocTemplate(buffer, pagesize=letter, rightMargin=72, leftMargin=72, topMargin=72, bottomMargin=18) Story = [] styles = getSampleStyleSheet() styles.add(ParagraphStyle(name='Justify', alignment=TA_JUSTIFY)) # Header Story.append(Paragraph("DetectAI Analysis Report", styles["Heading1"])) Story.append(Paragraph(f"Date: {time.strftime('%Y-%m-%d %H:%M:%S')}", styles["Normal"])) Story.append(Spacer(1, 12)) # Result # Calculate Highlighted Percentage total_chars = len(text) highlighted_chars = sum((h[1] - h[0]) for h in highlights) highlight_ratio = (highlighted_chars / total_chars * 100) if total_chars > 0 else 0 # Display metrics res_color = "red" if score > 50 else "green" # Primary Metric: AI Probability Story.append(Paragraph(f'AI Probability: {score}%', styles["Heading2"])) # Secondary Metric: Highlighted Content Story.append(Paragraph(f'Highlighted Content: {highlight_ratio:.1f}%', styles["Normal"])) Story.append(Spacer(1, 12)) # Content Header Story.append(Paragraph("Analyzed Content:", styles["Heading3"])) Story.append(Spacer(1, 6)) # The Content (Highlighted) Story.append(Paragraph(formatted_text, styles["Justify"])) doc.build(Story) buffer.seek(0) return FileResponse(buffer, as_attachment=True, filename='detectAI_report.pdf') except Exception as e: print(f"Report Generation Error: {e}") return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)