Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import numpy as np | |
| from dotenv import load_dotenv | |
| import torch | |
| from transformers import BertForSequenceClassification, AutoTokenizer | |
| from flask import Flask, request, jsonify | |
| from flask_cors import CORS | |
| import zipfile | |
| import shutil | |
| from pathlib import Path | |
| from werkzeug.utils import secure_filename | |
| from docx import Document | |
| load_dotenv() | |
| local_model_path = os.getenv("LOCAL_MODEL_PATH", "model/checkpoint-606") | |
| debug_mode = os.getenv("FLASK_DEBUG", 'false').lower() in ('true', '1', 't') | |
| app = Flask(__name__) | |
| CORS(app, resources={r"/*": {"origins": "*"}}) | |
| # File upload configuration | |
| UPLOAD_FOLDER = 'uploads' | |
| MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB | |
| ALLOWED_EXTENSIONS = {'.zip'} | |
| os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
| app.config['MAX_CONTENT_LENGTH'] = MAX_FILE_SIZE | |
| print("Loading the model from the directory:", local_model_path) | |
| try: | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| print(f"Using device: {device}") | |
| model = BertForSequenceClassification.from_pretrained(local_model_path) | |
| model.to(device) | |
| model.eval() | |
| tokenizer = AutoTokenizer.from_pretrained(local_model_path) | |
| print("Model loaded successfully") | |
| except Exception as e: | |
| print("Error loading model:", e) | |
| print("\nTrying to load from parent model directory...") | |
| try: | |
| parent_model_path = os.path.dirname(local_model_path) | |
| model = BertForSequenceClassification.from_pretrained(parent_model_path) | |
| model.to(device) | |
| model.eval() | |
| tokenizer = AutoTokenizer.from_pretrained(parent_model_path) | |
| print("Model loaded successfully from parent directory") | |
| except Exception as e2: | |
| print(f"Error loading from parent directory: {e2}") | |
| print("\nFallback: Loading base DistilBERT model from Hugging Face for memory efficiency...") | |
| try: | |
| # Use DistilBERT (smaller, ~260MB) instead of BERT (~440MB) to fit in 512MB RAM | |
| model_name = "distilbert-base-uncased" | |
| # Note: Using AutoModelForSequenceClassification to handle different architectures | |
| from transformers import AutoModelForSequenceClassification | |
| model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2) | |
| model.to(device) | |
| model.eval() | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| print(f"Successfully loaded fallback model: {model_name}") | |
| except Exception as e3: | |
| print(f"Critical Error: Could not load any model. {e3}") | |
| model = None | |
| def get_similarity_score(text_a: str, text_b: str) -> float: | |
| """ | |
| Use BERT classification model to predict if two texts are similar. | |
| Returns probability that they are plagiarized (similar). | |
| """ | |
| if not text_a or not text_b: | |
| return 0.0 | |
| inputs = tokenizer(text_a, text_b, return_tensors="pt", padding=True, truncation=True, max_length=512) | |
| inputs = {k: v.to(device) for k, v in inputs.items()} | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| logits = outputs.logits | |
| probabilities = torch.softmax(logits, dim=1) | |
| similarity_score = probabilities[0][1].item() | |
| return similarity_score | |
| def get_sentence_similarity_score(text_a: str, text_b: str) -> tuple[float, float]: | |
| """ | |
| Compare two texts sentence by sentence and return similarity scores. | |
| """ | |
| if not text_a or not text_b: | |
| return 0.0, 0.0 | |
| sentences_1 = [s.strip() for s in text_a.split(".") if s.strip()] | |
| sentences_2 = [s.strip() for s in text_b.split(".") if s.strip()] | |
| if not sentences_1 or not sentences_2: | |
| return 0.0, 0.0 | |
| scores = [] | |
| for s1 in sentences_1: | |
| sentence_scores = [] | |
| for s2 in sentences_2: | |
| score = get_similarity_score(s1, s2) | |
| sentence_scores.append(score) | |
| max_score = max(sentence_scores) if sentence_scores else 0.0 | |
| scores.append(max_score) | |
| overall_max_score = max(scores) if scores else 0.0 | |
| average_score = np.mean(scores) if scores else 0.0 | |
| return float(overall_max_score), float(average_score) | |
| def extract_text_from_file(file_path: Path) -> str: | |
| """ | |
| Extract text content from various file types. | |
| """ | |
| suffix = file_path.suffix.lower() | |
| try: | |
| if suffix in ['.py', '.java', '.cpp', '.js', '.txt']: | |
| # Plain text files | |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| return f.read() | |
| elif suffix == '.docx': | |
| # Word documents | |
| doc = Document(file_path) | |
| return '\n'.join([paragraph.text for paragraph in doc.paragraphs]) | |
| else: | |
| return "" | |
| except Exception as e: | |
| print(f"Error extracting text from {file_path}: {e}") | |
| return "" | |
| def extract_zip(zip_path: Path, extract_to: Path, allowed_extensions: set) -> list[Path]: | |
| """ | |
| Extract ZIP file and return list of files with allowed extensions. | |
| """ | |
| extracted_files = [] | |
| try: | |
| with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
| zip_ref.extractall(extract_to) | |
| for file_path in extract_to.rglob('*'): | |
| if file_path.is_file() and file_path.suffix.lower() in allowed_extensions: | |
| extracted_files.append(file_path) | |
| return extracted_files | |
| except Exception as e: | |
| print(f"Error extracting ZIP: {e}") | |
| return [] | |
| def compare_all_files(file_paths: list[Path], similarity_threshold: float) -> list[dict]: | |
| """ | |
| Compare all files pairwise and return suspicious pairs. | |
| """ | |
| results = [] | |
| n = len(file_paths) | |
| for i in range(n): | |
| for j in range(i + 1, n): | |
| file1 = file_paths[i] | |
| file2 = file_paths[j] | |
| text1 = extract_text_from_file(file1) | |
| text2 = extract_text_from_file(file2) | |
| if not text1 or not text2: | |
| continue | |
| similarity = get_similarity_score(text1, text2) | |
| similarity_percent = similarity * 100 | |
| if similarity_percent >= similarity_threshold: | |
| if similarity_percent >= 90: | |
| status = "Identical" | |
| elif similarity_percent >= 75: | |
| status = "Flagged" | |
| else: | |
| status = "Suspicious" | |
| results.append({ | |
| "id": f"{i}_{j}", | |
| "student1": file1.stem, | |
| "student2": file2.stem, | |
| "similarity": round(similarity_percent, 2), | |
| "status": status, | |
| "matchedSentences": 0 | |
| }) | |
| return results | |
| def index(): | |
| return jsonify({ | |
| "status": "ok", | |
| "message": "Plagiarism Detection API", | |
| "model": local_model_path, | |
| "device": str(device) | |
| }) | |
| def analyze_paraphrase(): | |
| if model is None: | |
| return jsonify({"error": "Model not loaded"}), 500 | |
| try: | |
| data = request.get_json() | |
| text_a = data.get("text_a") | |
| text_b = data.get("text_b") | |
| if not text_a or not text_b: | |
| return jsonify({"error": "Missing text_a or text_b"}), 400 | |
| overall_max_score, average_score = get_sentence_similarity_score(text_a, text_b) | |
| return jsonify({ | |
| "overall_max_score": overall_max_score, | |
| "average_score": average_score, | |
| "interpretation": { | |
| "overall_max": "highest similarity between any sentence pair", | |
| "average": "average similarity across all sentence pairs from text_a" | |
| } | |
| }) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| def analyze_simple(): | |
| """ | |
| Simple endpoint that compares two texts as a whole. | |
| """ | |
| if model is None: | |
| return jsonify({"error": "Model not loaded"}), 500 | |
| try: | |
| data = request.get_json() | |
| text_a = data.get("text_a") | |
| text_b = data.get("text_b") | |
| if not text_a or not text_b: | |
| return jsonify({"error": "Missing text_a or text_b"}), 400 | |
| similarity_score = get_similarity_score(text_a, text_b) | |
| return jsonify({ | |
| "similarity_score": similarity_score, | |
| "is_plagiarized": similarity_score > 0.5, | |
| "interpretation": "probability that text_b is plagiarized from text_a" | |
| }) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| def upload_and_analyze(): | |
| """ | |
| Handle ZIP file upload, extract files, and perform plagiarism analysis. | |
| """ | |
| if model is None: | |
| return jsonify({"error": "Model not loaded"}), 500 | |
| try: | |
| # Check if file is present | |
| if 'file' not in request.files: | |
| return jsonify({"error": "No file provided"}), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({"error": "No file selected"}), 400 | |
| # Get settings from form data | |
| assignment_name = request.form.get('assignmentName', 'Untitled') | |
| similarity_threshold = float(request.form.get('similarity', 70)) | |
| # Parse file types to analyze | |
| file_types = request.form.get('fileTypes', '') | |
| allowed_file_extensions = set() | |
| if file_types: | |
| file_types_dict = json.loads(file_types) | |
| for ext, enabled in file_types_dict.items(): | |
| if enabled: | |
| allowed_file_extensions.add(f'.{ext}') | |
| # If no file types selected, use all supported types | |
| if not allowed_file_extensions: | |
| allowed_file_extensions = {'.py', '.java', '.cpp', '.js', '.txt', '.docx'} | |
| # Secure the filename | |
| filename = secure_filename(file.filename) | |
| # Create unique folder for this upload | |
| import uuid | |
| upload_id = str(uuid.uuid4()) | |
| upload_path = Path(UPLOAD_FOLDER) / upload_id | |
| upload_path.mkdir(parents=True, exist_ok=True) | |
| # Save the ZIP file | |
| zip_path = upload_path / filename | |
| file.save(zip_path) | |
| # Extract ZIP | |
| extract_path = upload_path / 'extracted' | |
| extracted_files = extract_zip(zip_path, extract_path, allowed_file_extensions) | |
| if not extracted_files: | |
| # Cleanup | |
| shutil.rmtree(upload_path, ignore_errors=True) | |
| return jsonify({ | |
| "error": "No valid files found in ZIP. Please check file types." | |
| }), 400 | |
| if len(extracted_files) < 2: | |
| # Cleanup | |
| shutil.rmtree(upload_path, ignore_errors=True) | |
| return jsonify({ | |
| "error": "Need at least 2 files to compare. Found only 1 file." | |
| }), 400 | |
| # Compare all files | |
| suspicious_pairs = compare_all_files(extracted_files, similarity_threshold) | |
| # Calculate statistics | |
| total_submissions = len(extracted_files) | |
| avg_similarity = 0 | |
| high_risk_count = 0 | |
| if suspicious_pairs: | |
| avg_similarity = sum(p['similarity'] for p in suspicious_pairs) / len(suspicious_pairs) | |
| high_risk_count = len([p for p in suspicious_pairs if p['similarity'] >= 90]) | |
| # Cleanup temp files | |
| shutil.rmtree(upload_path, ignore_errors=True) | |
| return jsonify({ | |
| "success": True, | |
| "assignmentName": assignment_name, | |
| "totalSubmissions": total_submissions, | |
| "suspiciousPairs": suspicious_pairs, | |
| "statistics": { | |
| "totalPairs": len(suspicious_pairs), | |
| "avgSimilarity": round(avg_similarity, 2), | |
| "highRiskCount": high_risk_count | |
| } | |
| }) | |
| except Exception as e: | |
| print(f"Upload error: {e}") | |
| return jsonify({"error": str(e)}), 500 | |
| def health(): | |
| if model is None: | |
| return jsonify({ | |
| "status": "error", | |
| "message": "Model failed to load" | |
| }), 503 | |
| return jsonify({ | |
| "status": "ok", | |
| "message": "Model loaded successfully", | |
| "device": str(model.device) | |
| }), 200 | |
| if __name__ == "__main__": | |
| port = int(os.getenv("PORT", 5000)) | |
| app.run(debug=debug_mode, host="0.0.0.0", port=port) |