abdullah-azeemi's picture
add files
c491936
import os
import json
import numpy as np
from dotenv import load_dotenv
import torch
from transformers import BertForSequenceClassification, AutoTokenizer
from flask import Flask, request, jsonify
from flask_cors import CORS
import zipfile
import shutil
from pathlib import Path
from werkzeug.utils import secure_filename
from docx import Document
load_dotenv()
local_model_path = os.getenv("LOCAL_MODEL_PATH", "model/checkpoint-606")
debug_mode = os.getenv("FLASK_DEBUG", 'false').lower() in ('true', '1', 't')
app = Flask(__name__)
CORS(app, resources={r"/*": {"origins": "*"}})
# File upload configuration
UPLOAD_FOLDER = 'uploads'
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB
ALLOWED_EXTENSIONS = {'.zip'}
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
app.config['MAX_CONTENT_LENGTH'] = MAX_FILE_SIZE
print("Loading the model from the directory:", local_model_path)
try:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
model = BertForSequenceClassification.from_pretrained(local_model_path)
model.to(device)
model.eval()
tokenizer = AutoTokenizer.from_pretrained(local_model_path)
print("Model loaded successfully")
except Exception as e:
print("Error loading model:", e)
print("\nTrying to load from parent model directory...")
try:
parent_model_path = os.path.dirname(local_model_path)
model = BertForSequenceClassification.from_pretrained(parent_model_path)
model.to(device)
model.eval()
tokenizer = AutoTokenizer.from_pretrained(parent_model_path)
print("Model loaded successfully from parent directory")
except Exception as e2:
print(f"Error loading from parent directory: {e2}")
print("\nFallback: Loading base DistilBERT model from Hugging Face for memory efficiency...")
try:
# Use DistilBERT (smaller, ~260MB) instead of BERT (~440MB) to fit in 512MB RAM
model_name = "distilbert-base-uncased"
# Note: Using AutoModelForSequenceClassification to handle different architectures
from transformers import AutoModelForSequenceClassification
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2)
model.to(device)
model.eval()
tokenizer = AutoTokenizer.from_pretrained(model_name)
print(f"Successfully loaded fallback model: {model_name}")
except Exception as e3:
print(f"Critical Error: Could not load any model. {e3}")
model = None
def get_similarity_score(text_a: str, text_b: str) -> float:
"""
Use BERT classification model to predict if two texts are similar.
Returns probability that they are plagiarized (similar).
"""
if not text_a or not text_b:
return 0.0
inputs = tokenizer(text_a, text_b, return_tensors="pt", padding=True, truncation=True, max_length=512)
inputs = {k: v.to(device) for k, v in inputs.items()}
with torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits
probabilities = torch.softmax(logits, dim=1)
similarity_score = probabilities[0][1].item()
return similarity_score
def get_sentence_similarity_score(text_a: str, text_b: str) -> tuple[float, float]:
"""
Compare two texts sentence by sentence and return similarity scores.
"""
if not text_a or not text_b:
return 0.0, 0.0
sentences_1 = [s.strip() for s in text_a.split(".") if s.strip()]
sentences_2 = [s.strip() for s in text_b.split(".") if s.strip()]
if not sentences_1 or not sentences_2:
return 0.0, 0.0
scores = []
for s1 in sentences_1:
sentence_scores = []
for s2 in sentences_2:
score = get_similarity_score(s1, s2)
sentence_scores.append(score)
max_score = max(sentence_scores) if sentence_scores else 0.0
scores.append(max_score)
overall_max_score = max(scores) if scores else 0.0
average_score = np.mean(scores) if scores else 0.0
return float(overall_max_score), float(average_score)
def extract_text_from_file(file_path: Path) -> str:
"""
Extract text content from various file types.
"""
suffix = file_path.suffix.lower()
try:
if suffix in ['.py', '.java', '.cpp', '.js', '.txt']:
# Plain text files
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
return f.read()
elif suffix == '.docx':
# Word documents
doc = Document(file_path)
return '\n'.join([paragraph.text for paragraph in doc.paragraphs])
else:
return ""
except Exception as e:
print(f"Error extracting text from {file_path}: {e}")
return ""
def extract_zip(zip_path: Path, extract_to: Path, allowed_extensions: set) -> list[Path]:
"""
Extract ZIP file and return list of files with allowed extensions.
"""
extracted_files = []
try:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(extract_to)
for file_path in extract_to.rglob('*'):
if file_path.is_file() and file_path.suffix.lower() in allowed_extensions:
extracted_files.append(file_path)
return extracted_files
except Exception as e:
print(f"Error extracting ZIP: {e}")
return []
def compare_all_files(file_paths: list[Path], similarity_threshold: float) -> list[dict]:
"""
Compare all files pairwise and return suspicious pairs.
"""
results = []
n = len(file_paths)
for i in range(n):
for j in range(i + 1, n):
file1 = file_paths[i]
file2 = file_paths[j]
text1 = extract_text_from_file(file1)
text2 = extract_text_from_file(file2)
if not text1 or not text2:
continue
similarity = get_similarity_score(text1, text2)
similarity_percent = similarity * 100
if similarity_percent >= similarity_threshold:
if similarity_percent >= 90:
status = "Identical"
elif similarity_percent >= 75:
status = "Flagged"
else:
status = "Suspicious"
results.append({
"id": f"{i}_{j}",
"student1": file1.stem,
"student2": file2.stem,
"similarity": round(similarity_percent, 2),
"status": status,
"matchedSentences": 0
})
return results
@app.route("/", methods=["GET"])
def index():
return jsonify({
"status": "ok",
"message": "Plagiarism Detection API",
"model": local_model_path,
"device": str(device)
})
@app.route("/analyze-paraphrase", methods=["POST"])
def analyze_paraphrase():
if model is None:
return jsonify({"error": "Model not loaded"}), 500
try:
data = request.get_json()
text_a = data.get("text_a")
text_b = data.get("text_b")
if not text_a or not text_b:
return jsonify({"error": "Missing text_a or text_b"}), 400
overall_max_score, average_score = get_sentence_similarity_score(text_a, text_b)
return jsonify({
"overall_max_score": overall_max_score,
"average_score": average_score,
"interpretation": {
"overall_max": "highest similarity between any sentence pair",
"average": "average similarity across all sentence pairs from text_a"
}
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/analyze-simple", methods=["POST"])
def analyze_simple():
"""
Simple endpoint that compares two texts as a whole.
"""
if model is None:
return jsonify({"error": "Model not loaded"}), 500
try:
data = request.get_json()
text_a = data.get("text_a")
text_b = data.get("text_b")
if not text_a or not text_b:
return jsonify({"error": "Missing text_a or text_b"}), 400
similarity_score = get_similarity_score(text_a, text_b)
return jsonify({
"similarity_score": similarity_score,
"is_plagiarized": similarity_score > 0.5,
"interpretation": "probability that text_b is plagiarized from text_a"
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/upload-and-analyze", methods=["POST"])
def upload_and_analyze():
"""
Handle ZIP file upload, extract files, and perform plagiarism analysis.
"""
if model is None:
return jsonify({"error": "Model not loaded"}), 500
try:
# Check if file is present
if 'file' not in request.files:
return jsonify({"error": "No file provided"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"error": "No file selected"}), 400
# Get settings from form data
assignment_name = request.form.get('assignmentName', 'Untitled')
similarity_threshold = float(request.form.get('similarity', 70))
# Parse file types to analyze
file_types = request.form.get('fileTypes', '')
allowed_file_extensions = set()
if file_types:
file_types_dict = json.loads(file_types)
for ext, enabled in file_types_dict.items():
if enabled:
allowed_file_extensions.add(f'.{ext}')
# If no file types selected, use all supported types
if not allowed_file_extensions:
allowed_file_extensions = {'.py', '.java', '.cpp', '.js', '.txt', '.docx'}
# Secure the filename
filename = secure_filename(file.filename)
# Create unique folder for this upload
import uuid
upload_id = str(uuid.uuid4())
upload_path = Path(UPLOAD_FOLDER) / upload_id
upload_path.mkdir(parents=True, exist_ok=True)
# Save the ZIP file
zip_path = upload_path / filename
file.save(zip_path)
# Extract ZIP
extract_path = upload_path / 'extracted'
extracted_files = extract_zip(zip_path, extract_path, allowed_file_extensions)
if not extracted_files:
# Cleanup
shutil.rmtree(upload_path, ignore_errors=True)
return jsonify({
"error": "No valid files found in ZIP. Please check file types."
}), 400
if len(extracted_files) < 2:
# Cleanup
shutil.rmtree(upload_path, ignore_errors=True)
return jsonify({
"error": "Need at least 2 files to compare. Found only 1 file."
}), 400
# Compare all files
suspicious_pairs = compare_all_files(extracted_files, similarity_threshold)
# Calculate statistics
total_submissions = len(extracted_files)
avg_similarity = 0
high_risk_count = 0
if suspicious_pairs:
avg_similarity = sum(p['similarity'] for p in suspicious_pairs) / len(suspicious_pairs)
high_risk_count = len([p for p in suspicious_pairs if p['similarity'] >= 90])
# Cleanup temp files
shutil.rmtree(upload_path, ignore_errors=True)
return jsonify({
"success": True,
"assignmentName": assignment_name,
"totalSubmissions": total_submissions,
"suspiciousPairs": suspicious_pairs,
"statistics": {
"totalPairs": len(suspicious_pairs),
"avgSimilarity": round(avg_similarity, 2),
"highRiskCount": high_risk_count
}
})
except Exception as e:
print(f"Upload error: {e}")
return jsonify({"error": str(e)}), 500
@app.route("/health", methods=["GET"])
def health():
if model is None:
return jsonify({
"status": "error",
"message": "Model failed to load"
}), 503
return jsonify({
"status": "ok",
"message": "Model loaded successfully",
"device": str(model.device)
}), 200
if __name__ == "__main__":
port = int(os.getenv("PORT", 5000))
app.run(debug=debug_mode, host="0.0.0.0", port=port)