Spaces:
Running
Running
| # server.py - 7-INPUT SUPER ENSEMBLE + DYNAMIC HUGGING FACE LOADING | |
| import os | |
| import io | |
| import gc | |
| import cv2 | |
| import math | |
| import uuid | |
| import shutil | |
| import joblib | |
| import zipfile | |
| import numpy as np | |
| import torch | |
| import torch.nn.functional as F | |
| import timm | |
| from collections import Counter | |
| from typing import Optional | |
| # API & Image Handling | |
| from fastapi import FastAPI, HTTPException, UploadFile, File, Query | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from PIL import Image | |
| from torchvision import transforms | |
| from skimage.measure import shannon_entropy | |
| from scipy.stats import pearsonr | |
| from docx import Document | |
| # Transformers & Hub | |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
| from huggingface_hub import hf_hub_download, list_repo_files | |
| # ========================================================== | |
| # 1. CONFIGURATION & HUGGING FACE REPOS | |
| # ========================================================== | |
| # --- Text Models (The 3 Judges for the Ensemble) --- | |
| TEXT_MODEL_1_ID = "Yuvrajg2107/deberta-v3-hybrid-detector_v2_universal" | |
| TEXT_MODEL_2_ID = "Yuvrajg2107/roberta-base-cpp-final" | |
| TEXT_MODEL_3_ID = "Yuvrajg2107/electra-large-discriminator-cpp-final" | |
| # --- Code Model --- | |
| CODE_MODEL_ID = "Yashodhar29/Qwen2.5-Coder-0.5B-Instruct-cpp" | |
| # --- Image Model --- | |
| IMAGE_REPO_ID = "Yashodhar29/ConvNext-large-cpp" | |
| # We will dynamically find the .pth file in this repo later | |
| # --- Local Ensemble File --- | |
| ENSEMBLE_PATH = "model_ensemble_pro.pkl" # Ensure this is in your folder! | |
| # --- Device Setup --- | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| print(f"🚀 Server starting on device: {device.upper()}") | |
| # ========================================================== | |
| # 2. MODEL LOADING INFRASTRUCTURE | |
| # ========================================================== | |
| app = FastAPI() | |
| app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) | |
| # Global Model Storage | |
| models = { | |
| "text": [], | |
| "code": None, | |
| "image": None, | |
| "ensemble": None | |
| } | |
| def load_text_model(model_id): | |
| """Loads a HF text model and tokenizer.""" | |
| print(f" ⏳ Loading {model_id}...") | |
| try: | |
| tokenizer = AutoTokenizer.from_pretrained(model_id) | |
| model = AutoModelForSequenceClassification.from_pretrained(model_id).to(device) | |
| model.eval() | |
| return {"model": model, "tokenizer": tokenizer, "name": model_id} | |
| except Exception as e: | |
| print(f" ❌ Failed to load {model_id}: {e}") | |
| return None | |
| def load_image_model_from_hub(repo_id): | |
| """Downloads .pth from HF and loads into ConvNeXt.""" | |
| print(f" ⏳ Checking Image Repo: {repo_id}...") | |
| try: | |
| # 1. Find the .pth file dynamically | |
| files = list_repo_files(repo_id) | |
| pth_files = [f for f in files if f.endswith('.pth')] | |
| if not pth_files: | |
| print(" ❌ No .pth file found in image repo!") | |
| return None | |
| # Pick the first one (or prioritize 'best' if multiple) | |
| weights_filename = pth_files[0] | |
| print(f" ⬇️ Downloading weights: {weights_filename}") | |
| weights_path = hf_hub_download(repo_id=repo_id, filename=weights_filename) | |
| # 2. Create Architecture | |
| model = timm.create_model("convnext_large.fb_in22k_ft_in1k", pretrained=False, num_classes=2) | |
| # 3. Load Weights | |
| state_dict = torch.load(weights_path, map_location=device) | |
| model.load_state_dict(state_dict) | |
| model.to(device) | |
| model.eval() | |
| print(" ✅ Image Model Ready.") | |
| return model | |
| except Exception as e: | |
| print(f" ❌ Image Model Error: {e}") | |
| return None | |
| # --- INITIALIZATION --- | |
| print("\n⚙️ --- LOADING MODELS ---") | |
| # 1. Load Text Models (DeBERTa, RoBERTa, ELECTRA) | |
| models["text"].append(load_text_model(TEXT_MODEL_1_ID)) | |
| models["text"].append(load_text_model(TEXT_MODEL_2_ID)) | |
| models["text"].append(load_text_model(TEXT_MODEL_3_ID)) | |
| # 2. Load Code Model (Qwen) | |
| print(f" ⏳ Loading Code Model: {CODE_MODEL_ID}...") | |
| try: | |
| models["code"] = { | |
| "tokenizer": AutoTokenizer.from_pretrained(CODE_MODEL_ID), | |
| "model": AutoModelForSequenceClassification.from_pretrained(CODE_MODEL_ID).to(device) | |
| } | |
| models["code"]["model"].eval() | |
| except Exception as e: | |
| print(f" ❌ Code Model Failed: {e}") | |
| # 3. Load Image Model (ConvNeXt) | |
| models["image"] = load_image_model_from_hub(IMAGE_REPO_ID) | |
| # 4. Load Scikit-Learn Ensemble | |
| print(f" ⏳ Loading 'The Judge' ({ENSEMBLE_PATH})...") | |
| try: | |
| models["ensemble"] = joblib.load(ENSEMBLE_PATH) | |
| print(" ✅ Ensemble Loaded (VotingClassifier).") | |
| except Exception as e: | |
| print(f" ⚠️ Ensemble Pickle Not Found or Invalid: {e}") | |
| print(" ⚠️ Server will fall back to raw DeBERTa scores.") | |
| # ========================================================== | |
| # 3. HELPER FUNCTIONS | |
| # ========================================================== | |
| def get_stylometric_features(text): | |
| if not text: return [0,0,0,0] | |
| # 1. Entropy | |
| prob = [float(text.count(c)) / len(text) for c in dict.fromkeys(list(text))] | |
| entropy = - sum([p * math.log(p) / math.log(2.0) for p in prob]) | |
| # 2. Burstiness | |
| sentences = text.replace('!', '.').replace('?', '.').split('.') | |
| lengths = [len(s.split()) for s in sentences if len(s.split()) > 0] | |
| burstiness = np.std(lengths) if lengths else 0 | |
| # 3. TTR (Type-Token Ratio) | |
| words = text.lower().split() | |
| ttr = len(set(words)) / len(words) if words else 0 | |
| # 4. N-Gram Repetition | |
| if len(words) < 3: ngram_ratio = 0 | |
| else: | |
| ngrams = list(zip(*[words[i:] for i in range(3)])) | |
| counts = Counter(ngrams) | |
| repeated = sum(1 for count in counts.values() if count > 1) | |
| ngram_ratio = repeated / len(ngrams) | |
| return [entropy, burstiness, ttr, ngram_ratio] | |
| def get_image_transforms(): | |
| return transforms.Compose([ | |
| transforms.Resize((384, 384)), | |
| transforms.ToTensor(), | |
| transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), | |
| ]) | |
| def get_forensics(img_pil): | |
| """Calculates non-ML forensic metrics for images.""" | |
| img_np = np.array(img_pil) | |
| gray = cv2.cvtColor(img_np, cv2.COLOR_RGB2GRAY) | |
| dft = np.fft.fft2(gray) | |
| dft_shift = np.fft.fftshift(dft) | |
| magnitude_spectrum = np.log(np.abs(dft_shift) + 1) | |
| spectral_score = np.mean(magnitude_spectrum) | |
| perplexity = shannon_entropy(gray) | |
| edges = cv2.Canny(gray, 100, 200) | |
| burstiness = np.std(edges) | |
| return { | |
| "spectral_artifacts": round(float(spectral_score), 3), | |
| "perplexity": round(float(perplexity), 3), | |
| "burstiness": round(float(burstiness), 3) | |
| } | |
| # ========================================================== | |
| # 4. API ENDPOINTS | |
| # ========================================================== | |
| class DetectionRequest(BaseModel): | |
| text: str | |
| async def analyze_text(request: DetectionRequest): | |
| """ | |
| Main Text Detection Endpoint. | |
| Uses the 7-Input Super Ensemble: [DeBERTa, RoBERTa, ELECTRA, Entropy, Burstiness, TTR, NGram] | |
| """ | |
| user_text = request.text | |
| if len(user_text.strip()) < 5: | |
| return {"ai_score": 0, "label": "Too Short", "stats": {}} | |
| # --- A. Check for Code (Routing) --- | |
| # If text is actually code, route to simple logic or return early advice | |
| if "def " in user_text and ("return" in user_text or "class" in user_text): | |
| return {"ai_score": 0.0, "label": "Use /analyze_code endpoint", "stats": {}} | |
| # --- B. Get DL Probabilities (The 3 Inputs) --- | |
| dl_probs = [] | |
| # We rely on DeBERTa (Index 0) heavily, so if it fails, we abort. | |
| if not models["text"][0]: | |
| raise HTTPException(status_code=500, detail="Primary model (DeBERTa) not active.") | |
| for entry in models["text"]: | |
| if entry: | |
| try: | |
| inputs = entry["tokenizer"](user_text, return_tensors="pt", truncation=True, max_length=512).to(device) | |
| with torch.no_grad(): | |
| outputs = entry["model"](**inputs) | |
| probs = F.softmax(outputs.logits, dim=-1) | |
| # Assume Index 1 is AI (standard for these models) | |
| dl_probs.append(probs[0][1].item()) | |
| except Exception as e: | |
| print(f"Inference Error on {entry['name']}: {e}") | |
| dl_probs.append(0.5) # Neutral fallback | |
| else: | |
| dl_probs.append(0.5) # Missing model fallback | |
| # --- C. Get Stylometry (The 4 Inputs) --- | |
| stats = get_stylometric_features(user_text) # [Entropy, Burstiness, TTR, NGram] | |
| # --- D. Final Ensemble Prediction --- | |
| final_prob = dl_probs[0] # Default to DeBERTa if ensemble fails | |
| if models["ensemble"]: | |
| # Input Vector: [M1, M2, M3, Stat1, Stat2, Stat3, Stat4] | |
| input_vector = np.array([dl_probs + stats]) | |
| try: | |
| ensemble_probs = models["ensemble"].predict_proba(input_vector) | |
| final_prob = ensemble_probs[0][1] | |
| except Exception as e: | |
| print(f"Ensemble Voting Failed: {e}") | |
| return { | |
| "ai_score": round(float(final_prob), 4), | |
| "label": "🤖 AI GENERATED" if final_prob > 0.5 else "👤 HUMAN WRITTEN", | |
| "detailed_scores": { | |
| "deberta": round(dl_probs[0], 4), | |
| "roberta": round(dl_probs[1], 4), | |
| "electra": round(dl_probs[2], 4) | |
| }, | |
| "stats": { | |
| "entropy": round(stats[0], 2), | |
| "burstiness": round(stats[1], 2), | |
| "ttr": round(stats[2], 2), | |
| "ngram_ratio": round(stats[3], 2) | |
| } | |
| } | |
| async def analyze_code(request: DetectionRequest): | |
| """ | |
| Dedicated Code Detection using Qwen2.5-Coder. | |
| """ | |
| if not models["code"]: | |
| raise HTTPException(status_code=503, detail="Code model (Qwen) not loaded.") | |
| user_code = request.text | |
| try: | |
| inputs = models["code"]["tokenizer"](user_code, return_tensors="pt", truncation=True, max_length=512).to(device) | |
| with torch.no_grad(): | |
| outputs = models["code"]["model"](**inputs) | |
| probs = F.softmax(outputs.logits, dim=-1) | |
| ai_prob = probs[0][1].item() | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Code analysis failed: {e}") | |
| # Basic stats for frontend display | |
| stats = get_stylometric_features(user_code) | |
| return { | |
| "ai_score": round(float(ai_prob), 4), | |
| "label": "🤖 AI CODE" if ai_prob > 0.5 else "👤 HUMAN CODE", | |
| "stats": { | |
| "entropy": round(stats[0], 2), | |
| "burstiness": round(stats[1], 2) | |
| } | |
| } | |
| async def analyze_image(file: UploadFile = File(...)): | |
| """ | |
| Image Detection using ConvNeXt-Large. | |
| """ | |
| if not models["image"]: | |
| raise HTTPException(status_code=503, detail="Image model not loaded.") | |
| try: | |
| contents = await file.read() | |
| pil_img = Image.open(io.BytesIO(contents)).convert('RGB') | |
| # 1. Forensic Stats | |
| forensics = get_forensics(pil_img) | |
| # 2. AI Detection | |
| transform = get_image_transforms() | |
| img_t = transform(pil_img).unsqueeze(0).to(device) | |
| with torch.no_grad(): | |
| logits = models["image"](img_t) | |
| probs = F.softmax(logits, dim=1) | |
| ai_score = probs[0][0].item() # Check if label 0 is AI or Human based on your training. | |
| # Usually Index 0 is AI in these datasets, but verify if inverted. | |
| # Note: If your training had label 1 as AI, change to probs[0][1]. | |
| # Assuming standard label 0 = AI for ConvNeXt fine-tunes often used here. | |
| # If your previous code assumed index 0 is AI, we keep that. | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Image processing error: {str(e)}") | |
| return { | |
| "ai_score": round(float(ai_score), 4), | |
| "label": "AI Generated" if ai_score > 0.5 else "Real / Human", | |
| "forensics": forensics | |
| } | |
| async def analyze_video(file: UploadFile = File(...), num_samples: int = 10): | |
| """ | |
| Video Frame Extraction + Analysis. | |
| """ | |
| if not models["image"]: | |
| raise HTTPException(status_code=503, detail="Image model needed for video.") | |
| unique_name = f"temp_vid_{uuid.uuid4()}.mp4" | |
| try: | |
| with open(unique_name, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| cap = cv2.VideoCapture(unique_name) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| if total_frames < 1: | |
| raise ValueError("Empty video") | |
| indices = np.linspace(0, total_frames-1, num=min(num_samples, total_frames), dtype=int) | |
| scores = [] | |
| transform = get_image_transforms() | |
| for i in indices: | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, i) | |
| ret, frame = cap.read() | |
| if not ret: continue | |
| # Convert BGR (OpenCV) to RGB (PIL) | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| pil_img = Image.fromarray(frame_rgb) | |
| img_t = transform(pil_img).unsqueeze(0).to(device) | |
| with torch.no_grad(): | |
| logits = models["image"](img_t) | |
| probs = F.softmax(logits, dim=1) | |
| scores.append(probs[0][0].item()) # Using same index assumption as image | |
| cap.release() | |
| if not scores: return {"ai_score": 0, "label": "Error"} | |
| avg_score = sum(scores) / len(scores) | |
| return { | |
| "ai_score": round(avg_score, 4), | |
| "label": "AI Video" if avg_score > 0.5 else "Real Video", | |
| "frames_analyzed": len(scores) | |
| } | |
| except Exception as e: | |
| print(f"Video Error: {e}") | |
| return {"error": str(e)} | |
| finally: | |
| if os.path.exists(unique_name): | |
| os.remove(unique_name) | |
| async def analyze_document(file: UploadFile = File(...)): | |
| """ | |
| Hybrid Document Analysis (Text + Images inside Doc). | |
| """ | |
| try: | |
| content = await file.read() | |
| file_bytes = io.BytesIO(content) | |
| # 1. Extract Text | |
| try: | |
| doc = Document(file_bytes) | |
| full_text = "\n".join([para.text for para in doc.paragraphs]) | |
| except: | |
| full_text = "" | |
| # 2. Analyze Text | |
| text_res = None | |
| if len(full_text) > 50: | |
| # Manually trigger the logic from /analyze | |
| # For simplicity, we just take the raw Request object logic here or call internal function | |
| # We will just do a quick manual run: | |
| # Calc Probs | |
| t_inputs = models["text"][0]["tokenizer"](full_text[:2000], return_tensors="pt", truncation=True, max_length=512).to(device) | |
| with torch.no_grad(): | |
| t_out = models["text"][0]["model"](**t_inputs) | |
| t_prob = F.softmax(t_out.logits, dim=-1)[0][1].item() | |
| text_res = {"ai_score": t_prob, "preview": full_text[:100]} | |
| return { | |
| "type": "document_report", | |
| "text_analysis": text_res, | |
| "note": "Image extraction from docx disabled for brevity in this version." | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8000) |