vivek1192's picture
Setup CI/CD for Hugging Face
171eb01
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from django.conf import settings
import torch
from transformers import AutoTokenizer, AutoModelForMaskedLM
import os
import math
# Model: roberta-base (Masked LM)
# Logic: Low Perplexity = High AI Probability (Predictable)
MODEL_NAME = "roberta-base"
print(f"Loading Model: {MODEL_NAME}...")
try:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using Device: {device}")
# Use MaskedLM to calculate loss/perplexity
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForMaskedLM.from_pretrained(MODEL_NAME)
model.to(device)
model.eval()
print("Model Loaded Successfully!")
except Exception as e:
print(f"Error loading model: {e}")
model = None
tokenizer = None
import time
def calculate_token_scores(text):
"""
Analyzes text at token level to identify AI-generated regions.
Returns: List of (start_char, end_char, score) tuples.
"""
if not tokenizer or not model:
return []
encodings = tokenizer(text, return_tensors="pt", truncation=True, max_length=512, return_offsets_mapping=True)
input_ids = encodings.input_ids.to(device)
offsets = encodings.offset_mapping[0].cpu().numpy() # [(0,0), (0,3), (3,4)...]
seq_len = input_ids.shape[1]
if seq_len < 4:
return []
# Calculate loss for every token (Stride=1 for max granularity)
nlls = []
# Reduced BATCH_SIZE to 4 to prevent OOM on 4GB GPU
BATCH_SIZE = 4
tensor_input_ids = input_ids.repeat(BATCH_SIZE, 1)
# Iterate all tokens (excluding CLS/SEP)
indices_to_mask = list(range(1, seq_len - 1))
token_losses = [0.0] * seq_len
loss_fct = torch.nn.CrossEntropyLoss(reduction='none')
torch.cuda.empty_cache() # Clear memory before loop
for i in range(0, len(indices_to_mask), BATCH_SIZE):
batch_indices = indices_to_mask[i : i + BATCH_SIZE]
current_curr_size = len(batch_indices)
if current_curr_size == 0: break
batch_input_ids = tensor_input_ids[:current_curr_size].clone()
batch_labels = torch.full(batch_input_ids.shape, -100).to(device)
for j, pos in enumerate(batch_indices):
batch_labels[j, pos] = batch_input_ids[j, pos].item()
batch_input_ids[j, pos] = tokenizer.mask_token_id
with torch.no_grad():
outputs = model(batch_input_ids)
predictions = outputs.logits # [B, L, V]
predictions = predictions.permute(0, 2, 1)
loss = loss_fct(predictions, batch_labels) # [B, L]
masked_losses = loss.sum(dim=1) # [B]
for j, pos in enumerate(batch_indices):
token_losses[pos] = masked_losses[j].item()
# Identify ranges to highlight
# Threshold: If loss is low (< approx 2.0 - 3.0), it's likely AI.
# Let's say Threshold = 2.5 (similar to global threshold)
# Adjusted to 1.0 (Strict) to avoid highlighting common human words.
HIGHLIGHT_THRESHOLD = 1.0
highlights = []
for i in range(1, seq_len - 1):
loss = token_losses[i]
# Lower loss = More likely AI
if loss < HIGHLIGHT_THRESHOLD:
start, end = offsets[i]
# Filter out special tokens or empty offsets
if start == end: continue
highlights.append((start, end, loss))
return highlights
def calculate_perplexity(text):
"""
Calculates Pseudo-Perplexity (PPL) for Masked Language Models (like RoBERTa).
Formula: PPL = exp( -1/N * sum( log(P(w_i | context)) ) )
OPTIMIZATION:
- Strided Masking (Stride=3): Mask every 3rd token. 3x Speedup.
- Batch Size: 16
"""
t0 = time.time()
# 1. Tokenize
encodings = tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
input_ids = encodings.input_ids.to(device) # Shape: [1, seq_len]
seq_len = input_ids.shape[1]
# If sequence is too short, return default
if seq_len < 4:
return 100.0
nlls = [] # Negative Log Likelihoods
# 2. Batched Masking for Efficiency
BATCH_SIZE = 8
# STRIDE: How many tokens to skip?
# Stride 1 = All tokens (Slowest, Most Accurate)
# Stride 3 = Every 3rd token (3x Faster, Good Approx)
STRIDE = 3
tensor_input_ids = input_ids.repeat(BATCH_SIZE, 1) # [BATCH, seq_len]
# Iterate through tokens [1...seq_len-2] to avoid CLS/SEP
start_idx = 1
end_idx = seq_len - 1
loss_fct = torch.nn.CrossEntropyLoss(reduction='none')
# Create list of indices to mask
indices_to_mask = list(range(start_idx, end_idx, STRIDE))
total_steps = len(indices_to_mask)
for i in range(0, total_steps, BATCH_SIZE):
# valid batch current step
batch_indices = indices_to_mask[i : i + BATCH_SIZE]
current_batch_size = len(batch_indices)
if current_batch_size == 0: break
# Prepare inputs: Clone the repeated text
batch_input_ids = tensor_input_ids[:current_batch_size].clone()
# Create labels: -100 means ignore index for loss calculation
batch_labels = torch.full(batch_input_ids.shape, -100).to(device)
# Mask the target tokens
for j, token_pos in enumerate(batch_indices):
# Save the original token ID as the label
batch_labels[j, token_pos] = batch_input_ids[j, token_pos].item()
# Replace input with [MASK]
batch_input_ids[j, token_pos] = tokenizer.mask_token_id
# Forward pass
with torch.no_grad():
outputs = model(batch_input_ids)
predictions = outputs.logits # [batch, seq_len, vocab]
# Calculate loss only for the masked positions
predictions = predictions.permute(0, 2, 1)
loss = loss_fct(predictions, batch_labels)
# Get scalar loss for the masked token
masked_losses = loss.sum(dim=1) # [current_batch_size]
nlls.append(masked_losses)
if not nlls:
return 0.0
# Stack all NLLs
all_nlls = torch.cat(nlls)
# Mean NLL
mean_nll = all_nlls.mean()
# Perplexity = exp(mean_nll)
ppl = torch.exp(mean_nll)
print(f"Inference Time: {time.time() - t0:.2f}s")
return ppl.item()
class AnalyzeView(APIView):
def post(self, request):
if not model or not tokenizer:
return Response({"error": "Model not loaded"}, status=status.HTTP_503_SERVICE_UNAVAILABLE)
data = request.data
text = data.get('text', '')
if not text and 'file' in request.FILES:
text = "File content placeholder"
if not text:
return Response({"error": "No text provided"}, status=status.HTTP_400_BAD_REQUEST)
try:
ppl = calculate_perplexity(text)
# PPL < 2.5 -> AI
# PPL > 2.5 -> Human
# Sigmoid Curve calibrated at 2.5
# Formula: 100 / (1 + exp(3 * (ppl - 2.5)))
ai_score = 100 / (1 + math.exp(3 * (ppl - 2.5)))
# Clamp between 2 and 99
ai_score = max(2.5, min(99.5, ai_score))
label = "AI Generated" if ai_score > 50 else "Human Written"
return Response({
"score": round(ai_score, 1),
"label": label,
"perplexity": round(ppl, 2),
"device": str(device)
})
except Exception as e:
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
from django.http import FileResponse
from reportlab.lib.pagesizes import letter
from reportlab.lib import colors
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.enums import TA_JUSTIFY
import io
class ReportView(APIView):
def post(self, request):
try:
data = request.data
text = data.get('text', '')
score = data.get('score', 0)
label = data.get('label', 'Unknown')
# Granular Highlighting Logic
# 1. Get highlight ranges
highlights = calculate_token_scores(text)
# 2. Sort highlights by start index
highlights.sort(key=lambda x: x[0])
# 3. Construct XML tagged string
# IMPORTANT: Escape text to prevent XML errors in ReportLab
import html
formatted_text = ""
current_idx = 0
# Simple greedy tagging
print(f"Generating PDF for text length: {len(text)} with {len(highlights)} highlights.")
for start, end, loss in highlights:
if start < current_idx: continue # Skip overlaps
# Append non-highlighted text
segment = text[current_idx:start]
formatted_text += html.escape(segment)
# Append highlighted text
segment = text[start:end]
# XML tag for yellow background
formatted_text += f'<font backColor="yellow">{html.escape(segment)}</font>'
current_idx = end
# Append remaining text
formatted_text += html.escape(text[current_idx:])
# Handle newlines for HTML
formatted_text = formatted_text.replace('\n', '<br/>')
# Create PDF
buffer = io.BytesIO()
doc = SimpleDocTemplate(buffer, pagesize=letter,
rightMargin=72, leftMargin=72,
topMargin=72, bottomMargin=18)
Story = []
styles = getSampleStyleSheet()
styles.add(ParagraphStyle(name='Justify', alignment=TA_JUSTIFY))
# Header
Story.append(Paragraph("DetectAI Analysis Report", styles["Heading1"]))
Story.append(Paragraph(f"Date: {time.strftime('%Y-%m-%d %H:%M:%S')}", styles["Normal"]))
Story.append(Spacer(1, 12))
# Result
# Calculate Highlighted Percentage
total_chars = len(text)
highlighted_chars = sum((h[1] - h[0]) for h in highlights)
highlight_ratio = (highlighted_chars / total_chars * 100) if total_chars > 0 else 0
# Display metrics
res_color = "red" if score > 50 else "green"
# Primary Metric: AI Probability
Story.append(Paragraph(f'AI Probability: <font color="{res_color}"><b>{score}%</b></font>', styles["Heading2"]))
# Secondary Metric: Highlighted Content
Story.append(Paragraph(f'Highlighted Content: <b>{highlight_ratio:.1f}%</b>', styles["Normal"]))
Story.append(Spacer(1, 12))
# Content Header
Story.append(Paragraph("Analyzed Content:", styles["Heading3"]))
Story.append(Spacer(1, 6))
# The Content (Highlighted)
Story.append(Paragraph(formatted_text, styles["Justify"]))
doc.build(Story)
buffer.seek(0)
return FileResponse(buffer, as_attachment=True, filename='detectAI_report.pdf')
except Exception as e:
print(f"Report Generation Error: {e}")
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)