# server.py - 7-INPUT SUPER ENSEMBLE + DYNAMIC HUGGING FACE LOADING import os import io import gc import cv2 import math import uuid import shutil import joblib import zipfile import numpy as np import torch import torch.nn.functional as F import timm from collections import Counter from typing import Optional # API & Image Handling from fastapi import FastAPI, HTTPException, UploadFile, File, Query from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from PIL import Image from torchvision import transforms from skimage.measure import shannon_entropy from scipy.stats import pearsonr from docx import Document # Transformers & Hub from transformers import AutoTokenizer, AutoModelForSequenceClassification from huggingface_hub import hf_hub_download, list_repo_files # ========================================================== # 1. CONFIGURATION & HUGGING FACE REPOS # ========================================================== # --- Text Models (The 3 Judges for the Ensemble) --- TEXT_MODEL_1_ID = "Yuvrajg2107/deberta-v3-hybrid-detector_v2_universal" TEXT_MODEL_2_ID = "Yuvrajg2107/roberta-base-cpp-final" TEXT_MODEL_3_ID = "Yuvrajg2107/electra-large-discriminator-cpp-final" # --- Code Model --- CODE_MODEL_ID = "Yashodhar29/Qwen2.5-Coder-0.5B-Instruct-cpp" # --- Image Model --- IMAGE_REPO_ID = "Yashodhar29/ConvNext-large-cpp" # We will dynamically find the .pth file in this repo later # --- Local Ensemble File --- ENSEMBLE_PATH = "model_ensemble_pro.pkl" # Ensure this is in your folder! # --- Device Setup --- device = "cuda" if torch.cuda.is_available() else "cpu" print(f"šŸš€ Server starting on device: {device.upper()}") # ========================================================== # 2. MODEL LOADING INFRASTRUCTURE # ========================================================== app = FastAPI() app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) # Global Model Storage models = { "text": [], "code": None, "image": None, "ensemble": None } def load_text_model(model_id): """Loads a HF text model and tokenizer.""" print(f" ā³ Loading {model_id}...") try: tokenizer = AutoTokenizer.from_pretrained(model_id) model = AutoModelForSequenceClassification.from_pretrained(model_id).to(device) model.eval() return {"model": model, "tokenizer": tokenizer, "name": model_id} except Exception as e: print(f" āŒ Failed to load {model_id}: {e}") return None def load_image_model_from_hub(repo_id): """Downloads .pth from HF and loads into ConvNeXt.""" print(f" ā³ Checking Image Repo: {repo_id}...") try: # 1. Find the .pth file dynamically files = list_repo_files(repo_id) pth_files = [f for f in files if f.endswith('.pth')] if not pth_files: print(" āŒ No .pth file found in image repo!") return None # Pick the first one (or prioritize 'best' if multiple) weights_filename = pth_files[0] print(f" ā¬‡ļø Downloading weights: {weights_filename}") weights_path = hf_hub_download(repo_id=repo_id, filename=weights_filename) # 2. Create Architecture model = timm.create_model("convnext_large.fb_in22k_ft_in1k", pretrained=False, num_classes=2) # 3. Load Weights state_dict = torch.load(weights_path, map_location=device) model.load_state_dict(state_dict) model.to(device) model.eval() print(" āœ… Image Model Ready.") return model except Exception as e: print(f" āŒ Image Model Error: {e}") return None # --- INITIALIZATION --- print("\nāš™ļø --- LOADING MODELS ---") # 1. Load Text Models (DeBERTa, RoBERTa, ELECTRA) models["text"].append(load_text_model(TEXT_MODEL_1_ID)) models["text"].append(load_text_model(TEXT_MODEL_2_ID)) models["text"].append(load_text_model(TEXT_MODEL_3_ID)) # 2. Load Code Model (Qwen) print(f" ā³ Loading Code Model: {CODE_MODEL_ID}...") try: models["code"] = { "tokenizer": AutoTokenizer.from_pretrained(CODE_MODEL_ID), "model": AutoModelForSequenceClassification.from_pretrained(CODE_MODEL_ID).to(device) } models["code"]["model"].eval() except Exception as e: print(f" āŒ Code Model Failed: {e}") # 3. Load Image Model (ConvNeXt) models["image"] = load_image_model_from_hub(IMAGE_REPO_ID) # 4. Load Scikit-Learn Ensemble print(f" ā³ Loading 'The Judge' ({ENSEMBLE_PATH})...") try: models["ensemble"] = joblib.load(ENSEMBLE_PATH) print(" āœ… Ensemble Loaded (VotingClassifier).") except Exception as e: print(f" āš ļø Ensemble Pickle Not Found or Invalid: {e}") print(" āš ļø Server will fall back to raw DeBERTa scores.") # ========================================================== # 3. HELPER FUNCTIONS # ========================================================== def get_stylometric_features(text): if not text: return [0,0,0,0] # 1. Entropy prob = [float(text.count(c)) / len(text) for c in dict.fromkeys(list(text))] entropy = - sum([p * math.log(p) / math.log(2.0) for p in prob]) # 2. Burstiness sentences = text.replace('!', '.').replace('?', '.').split('.') lengths = [len(s.split()) for s in sentences if len(s.split()) > 0] burstiness = np.std(lengths) if lengths else 0 # 3. TTR (Type-Token Ratio) words = text.lower().split() ttr = len(set(words)) / len(words) if words else 0 # 4. N-Gram Repetition if len(words) < 3: ngram_ratio = 0 else: ngrams = list(zip(*[words[i:] for i in range(3)])) counts = Counter(ngrams) repeated = sum(1 for count in counts.values() if count > 1) ngram_ratio = repeated / len(ngrams) return [entropy, burstiness, ttr, ngram_ratio] def get_image_transforms(): return transforms.Compose([ transforms.Resize((384, 384)), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), ]) def get_forensics(img_pil): """Calculates non-ML forensic metrics for images.""" img_np = np.array(img_pil) gray = cv2.cvtColor(img_np, cv2.COLOR_RGB2GRAY) dft = np.fft.fft2(gray) dft_shift = np.fft.fftshift(dft) magnitude_spectrum = np.log(np.abs(dft_shift) + 1) spectral_score = np.mean(magnitude_spectrum) perplexity = shannon_entropy(gray) edges = cv2.Canny(gray, 100, 200) burstiness = np.std(edges) return { "spectral_artifacts": round(float(spectral_score), 3), "perplexity": round(float(perplexity), 3), "burstiness": round(float(burstiness), 3) } # ========================================================== # 4. API ENDPOINTS # ========================================================== class DetectionRequest(BaseModel): text: str @app.post("/analyze") async def analyze_text(request: DetectionRequest): """ Main Text Detection Endpoint. Uses the 7-Input Super Ensemble: [DeBERTa, RoBERTa, ELECTRA, Entropy, Burstiness, TTR, NGram] """ user_text = request.text if len(user_text.strip()) < 5: return {"ai_score": 0, "label": "Too Short", "stats": {}} # --- A. Check for Code (Routing) --- # If text is actually code, route to simple logic or return early advice if "def " in user_text and ("return" in user_text or "class" in user_text): return {"ai_score": 0.0, "label": "Use /analyze_code endpoint", "stats": {}} # --- B. Get DL Probabilities (The 3 Inputs) --- dl_probs = [] # We rely on DeBERTa (Index 0) heavily, so if it fails, we abort. if not models["text"][0]: raise HTTPException(status_code=500, detail="Primary model (DeBERTa) not active.") for entry in models["text"]: if entry: try: inputs = entry["tokenizer"](user_text, return_tensors="pt", truncation=True, max_length=512).to(device) with torch.no_grad(): outputs = entry["model"](**inputs) probs = F.softmax(outputs.logits, dim=-1) # Assume Index 1 is AI (standard for these models) dl_probs.append(probs[0][1].item()) except Exception as e: print(f"Inference Error on {entry['name']}: {e}") dl_probs.append(0.5) # Neutral fallback else: dl_probs.append(0.5) # Missing model fallback # --- C. Get Stylometry (The 4 Inputs) --- stats = get_stylometric_features(user_text) # [Entropy, Burstiness, TTR, NGram] # --- D. Final Ensemble Prediction --- final_prob = dl_probs[0] # Default to DeBERTa if ensemble fails if models["ensemble"]: # Input Vector: [M1, M2, M3, Stat1, Stat2, Stat3, Stat4] input_vector = np.array([dl_probs + stats]) try: ensemble_probs = models["ensemble"].predict_proba(input_vector) final_prob = ensemble_probs[0][1] except Exception as e: print(f"Ensemble Voting Failed: {e}") return { "ai_score": round(float(final_prob), 4), "label": "šŸ¤– AI GENERATED" if final_prob > 0.5 else "šŸ‘¤ HUMAN WRITTEN", "detailed_scores": { "deberta": round(dl_probs[0], 4), "roberta": round(dl_probs[1], 4), "electra": round(dl_probs[2], 4) }, "stats": { "entropy": round(stats[0], 2), "burstiness": round(stats[1], 2), "ttr": round(stats[2], 2), "ngram_ratio": round(stats[3], 2) } } @app.post("/analyze_code") async def analyze_code(request: DetectionRequest): """ Dedicated Code Detection using Qwen2.5-Coder. """ if not models["code"]: raise HTTPException(status_code=503, detail="Code model (Qwen) not loaded.") user_code = request.text try: inputs = models["code"]["tokenizer"](user_code, return_tensors="pt", truncation=True, max_length=512).to(device) with torch.no_grad(): outputs = models["code"]["model"](**inputs) probs = F.softmax(outputs.logits, dim=-1) ai_prob = probs[0][1].item() except Exception as e: raise HTTPException(status_code=500, detail=f"Code analysis failed: {e}") # Basic stats for frontend display stats = get_stylometric_features(user_code) return { "ai_score": round(float(ai_prob), 4), "label": "šŸ¤– AI CODE" if ai_prob > 0.5 else "šŸ‘¤ HUMAN CODE", "stats": { "entropy": round(stats[0], 2), "burstiness": round(stats[1], 2) } } @app.post("/analyze_image") async def analyze_image(file: UploadFile = File(...)): """ Image Detection using ConvNeXt-Large. """ if not models["image"]: raise HTTPException(status_code=503, detail="Image model not loaded.") try: contents = await file.read() pil_img = Image.open(io.BytesIO(contents)).convert('RGB') # 1. Forensic Stats forensics = get_forensics(pil_img) # 2. AI Detection transform = get_image_transforms() img_t = transform(pil_img).unsqueeze(0).to(device) with torch.no_grad(): logits = models["image"](img_t) probs = F.softmax(logits, dim=1) ai_score = probs[0][0].item() # Check if label 0 is AI or Human based on your training. # Usually Index 0 is AI in these datasets, but verify if inverted. # Note: If your training had label 1 as AI, change to probs[0][1]. # Assuming standard label 0 = AI for ConvNeXt fine-tunes often used here. # If your previous code assumed index 0 is AI, we keep that. except Exception as e: raise HTTPException(status_code=500, detail=f"Image processing error: {str(e)}") return { "ai_score": round(float(ai_score), 4), "label": "AI Generated" if ai_score > 0.5 else "Real / Human", "forensics": forensics } @app.post("/analyze_video") async def analyze_video(file: UploadFile = File(...), num_samples: int = 10): """ Video Frame Extraction + Analysis. """ if not models["image"]: raise HTTPException(status_code=503, detail="Image model needed for video.") unique_name = f"temp_vid_{uuid.uuid4()}.mp4" try: with open(unique_name, "wb") as buffer: shutil.copyfileobj(file.file, buffer) cap = cv2.VideoCapture(unique_name) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if total_frames < 1: raise ValueError("Empty video") indices = np.linspace(0, total_frames-1, num=min(num_samples, total_frames), dtype=int) scores = [] transform = get_image_transforms() for i in indices: cap.set(cv2.CAP_PROP_POS_FRAMES, i) ret, frame = cap.read() if not ret: continue # Convert BGR (OpenCV) to RGB (PIL) frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) pil_img = Image.fromarray(frame_rgb) img_t = transform(pil_img).unsqueeze(0).to(device) with torch.no_grad(): logits = models["image"](img_t) probs = F.softmax(logits, dim=1) scores.append(probs[0][0].item()) # Using same index assumption as image cap.release() if not scores: return {"ai_score": 0, "label": "Error"} avg_score = sum(scores) / len(scores) return { "ai_score": round(avg_score, 4), "label": "AI Video" if avg_score > 0.5 else "Real Video", "frames_analyzed": len(scores) } except Exception as e: print(f"Video Error: {e}") return {"error": str(e)} finally: if os.path.exists(unique_name): os.remove(unique_name) @app.post("/analyze_document") async def analyze_document(file: UploadFile = File(...)): """ Hybrid Document Analysis (Text + Images inside Doc). """ try: content = await file.read() file_bytes = io.BytesIO(content) # 1. Extract Text try: doc = Document(file_bytes) full_text = "\n".join([para.text for para in doc.paragraphs]) except: full_text = "" # 2. Analyze Text text_res = None if len(full_text) > 50: # Manually trigger the logic from /analyze # For simplicity, we just take the raw Request object logic here or call internal function # We will just do a quick manual run: # Calc Probs t_inputs = models["text"][0]["tokenizer"](full_text[:2000], return_tensors="pt", truncation=True, max_length=512).to(device) with torch.no_grad(): t_out = models["text"][0]["model"](**t_inputs) t_prob = F.softmax(t_out.logits, dim=-1)[0][1].item() text_res = {"ai_score": t_prob, "preview": full_text[:100]} return { "type": "document_report", "text_analysis": text_res, "note": "Image extraction from docx disabled for brevity in this version." } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)