from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, field_validator, ValidationInfo from transformers import AutoTokenizer, AutoModelForSequenceClassification import torch import logging from typing import Optional, List import time import sys # ---------------- Logging ---------------- logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', stream=sys.stdout ) logger = logging.getLogger("detector") # ---------------- FastAPI ---------------- app = FastAPI( title="Detextly AI Detector API", description="AI Detector with chunked scoring and low-confidence filter", version="2.1.0" ) # CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ---------------- Pydantic Models ---------------- class ScanRequest(BaseModel): text: str scan_type: Optional[str] = None scanType: Optional[str] = None userId: Optional[str] = None @field_validator('scanType') @classmethod def map_scantype_to_scan_type(cls, v: Optional[str], info: ValidationInfo) -> Optional[str]: """Mapper to ensure backward compatibility with old 'scanType' parameter name.""" if v is not None: # Map the old 'scanType' field value to the new 'scan_type' field info.data['scan_type'] = v return v def get_scan_type(self) -> str: """Get the scan type, defaulting to 'basic' if not provided.""" # scan_type takes precedence as it's the canonical field name return self.scan_type or "basic" class ScanResponse(BaseModel): success: bool result: dict processingTime: int credits: Optional[dict] = None test_mode: bool = False # ---------------- AI Detector Core ---------------- MODEL_NAME = "openai-community/roberta-large-openai-detector" class AIDetector: def __init__(self): self.model = None self.tokenizer = None self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.label_map = None logger.info(f"Using device: {self.device}") def load_model(self): if self.model is not None: return logger.info(f"Loading model: {MODEL_NAME}") try: self.tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) self.model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME) # Store label mapping for debugging if hasattr(self.model.config, 'id2label'): self.label_map = self.model.config.id2label logger.info(f"Model label mapping: {self.label_map}") else: logger.warning("No label mapping found in model config") except Exception as e: logger.error(f"Error loading model: {e}") raise RuntimeError(f"Failed to load model: {e}") self.model.to(self.device) self.model.eval() logger.info("Model loaded successfully.") def predict(self, text: str, max_length: int = 512) -> dict: """Return both human and AI probabilities.""" if self.model is None: self.load_model() # Tokenize input tokens = self.tokenizer( text, return_tensors="pt", truncation=True, max_length=max_length, padding=True ) tokens = {k: v.to(self.device) for k, v in tokens.items()} with torch.no_grad(): outputs = self.model(**tokens) probs = torch.softmax(outputs.logits, dim=-1) # Get probabilities for both classes human_prob = float(probs[0][0].item()) # Class 0 ai_prob = float(probs[0][1].item()) # Class 1 # Debug logging logger.debug(f"Class 0 (Human): {human_prob:.4f}, Class 1 (AI): {ai_prob:.4f}") # Verify probabilities sum to ~1.0 total = human_prob + ai_prob if abs(total - 1.0) > 0.01: logger.warning(f"Probabilities don't sum to 1.0: {total:.4f}") return { "human_probability": human_prob, "ai_probability": ai_prob, "raw_probs": probs.tolist() } detector = AIDetector() # ---------------- Pattern Detection ---------------- def detect_chatgpt_patterns(text: str) -> bool: """Return True if ChatGPT patterns are detected.""" patterns = [ "as an ai language model", "i am an ai model", "i cannot provide medical", "as a language model", "based on the information provided", "my training data", "i don't have personal experiences", "i don't have feelings", "as an artificial intelligence", "i don't have personal opinions" ] lower = text.lower() for pattern in patterns: if pattern in lower: logger.debug(f"ChatGPT pattern detected: {pattern}") return True return False # ---------------- Highlight / Chunked Scan ---------------- def analyze_sections(text: str, chunk_size: int = 40) -> List[dict]: """Split text into smaller chunks and compute AI probability for each.""" sections = [] words = text.split() total_chunks = (len(words) + chunk_size - 1) // chunk_size logger.info(f"Analyzing {len(words)} words in {total_chunks} chunks") for i in range(0, len(words), chunk_size): chunk = " ".join(words[i:i+chunk_size]) if len(chunk.strip()) < 20: continue # Get probabilities from model probs = detector.predict(chunk) human_prob = probs["human_probability"] ai_prob = probs["ai_probability"] # Check for ChatGPT patterns has_pattern = detect_chatgpt_patterns(chunk) if has_pattern: ai_prob = max(ai_prob, 0.9) # Boost AI probability if pattern found human_prob = 1 - ai_prob sections.append({ "text": chunk[:200] + "..." if len(chunk) > 200 else chunk, "ai_probability": round(ai_prob, 4), "human_probability": round(human_prob, 4), "words": len(chunk.split()), "has_chatgpt_pattern": has_pattern }) logger.info(f"Generated {len(sections)} sections for analysis") return sections def compute_overall_score(sections: List[dict], confidence_threshold: float = 0.3) -> dict: """Compute weighted average probabilities with confidence filtering.""" if not sections: return {"ai_probability": 0.0, "human_probability": 1.0, "confidence": "low"} # Filter out low-confidence predictions (close to 0.5) confident_sections = [] for section in sections: ai_prob = section["ai_probability"] confidence = abs(ai_prob - 0.5) # Distance from uncertain (0.5) if confidence >= confidence_threshold: confident_sections.append(section) if not confident_sections: # If no confident sections, use all sections confident_sections = sections # Weighted average by word count total_words = sum(s["words"] for s in confident_sections) if total_words == 0: return {"ai_probability": 0.5, "human_probability": 0.5, "confidence": "low"} weighted_ai_sum = sum(s["ai_probability"] * s["words"] for s in confident_sections) weighted_human_sum = sum(s["human_probability"] * s["words"] for s in confident_sections) overall_ai = weighted_ai_sum / total_words overall_human = weighted_human_sum / total_words # Determine confidence level distance_from_mid = abs(overall_ai - 0.5) if distance_from_mid > 0.4: confidence_level = "high" elif distance_from_mid > 0.2: confidence_level = "medium" else: confidence_level = "low" return { "ai_probability": round(overall_ai, 4), "human_probability": round(overall_human, 4), "confidence": confidence_level, "sections_analyzed": len(sections), "confident_sections": len(confident_sections) } # ---------------- API Endpoints ---------------- @app.on_event("startup") async def startup(): """Initialize the model on startup.""" logger.info("Starting Detextly AI Detector API...") try: detector.load_model() logger.info("API startup complete") except Exception as e: logger.error(f"Failed to start API: {e}") raise @app.get("/") async def root(): return { "status": "online", "model": MODEL_NAME, "device": str(detector.device), "version": "2.1.0", "features": ["basic_scan", "highlight_scan", "chatgpt_pattern_detection"], "note": "Accepts both 'scan_type' and 'scanType' parameters" } @app.get("/health") async def health(): return { "status": "healthy", "model_loaded": detector.model is not None, "model": MODEL_NAME, "timestamp": time.time() } @app.get("/debug/test") async def debug_test(): """Test endpoint to verify model is working correctly.""" test_texts = [ "I went to the store yesterday to buy groceries.", "As an AI language model, I don't have personal experiences.", "The quick brown fox jumps over the lazy dog." ] results = [] for text in test_texts: probs = detector.predict(text) results.append({ "text": text[:50] + "..." if len(text) > 50 else text, "human_probability": probs["human_probability"], "ai_probability": probs["ai_probability"] }) return { "test_results": results, "model_info": { "name": MODEL_NAME, "labels": detector.label_map, "device": str(detector.device) } } @app.post("/api/scan", response_model=ScanResponse) async def scan_text(request: ScanRequest): """Main scanning endpoint.""" start_time = time.time() try: # Validate input if not request.text or len(request.text.strip()) < 10: raise HTTPException(status_code=400, detail="Text must be at least 10 characters long.") # Get scan type (handles both scan_type and scanType via the validator) scan_type = request.get_scan_type() logger.info(f"Scan request: type={scan_type}, userId={request.userId}, text_length={len(request.text)}") # Limit text length for performance text = request.text[:5000] # Check for ChatGPT patterns chatgpt_detected = detect_chatgpt_patterns(text) if scan_type == "highlight": # Chunked analysis sections = analyze_sections(text, chunk_size=40) overall = compute_overall_score(sections) # Identify AI-heavy sections ai_sections = [ { "text": s["text"], "ai_probability": s["ai_probability"], "human_probability": s["human_probability"], "words": s["words"] } for s in sections if s["ai_probability"] > 0.6 ] result = { "overall": overall["human_probability"], # Human probability for backward compatibility "ai_probability": overall["ai_probability"], "human_probability": overall["human_probability"], "model": MODEL_NAME, "confidence": overall["confidence"], "chatgpt_detected": chatgpt_detected, "scan_type": "highlight", "section_count": len(sections), "ai_section_count": len(ai_sections), "sections_analyzed": overall["sections_analyzed"], "confident_sections": overall["confident_sections"], "ai_sections": ai_sections[:10] # Limit to first 10 } else: # Basic scan (single analysis) probs = detector.predict(text) human_prob = probs["human_probability"] ai_prob = probs["ai_probability"] # Boost AI probability if ChatGPT patterns detected if chatgpt_detected: ai_prob = max(ai_prob, 0.9) human_prob = 1 - ai_prob # Determine confidence distance_from_mid = abs(ai_prob - 0.5) confidence = "high" if distance_from_mid > 0.4 else "medium" if distance_from_mid > 0.2 else "low" result = { "overall": human_prob, # Human probability for backward compatibility "ai_probability": ai_prob, "human_probability": human_prob, "model": MODEL_NAME, "confidence": confidence, "chatgpt_detected": chatgpt_detected, "scan_type": "basic" } # Calculate processing time processing_time = int((time.time() - start_time) * 1000) logger.info(f"Scan completed in {processing_time}ms: AI={result.get('ai_probability', 0):.2%}") return ScanResponse( success=True, result=result, processingTime=processing_time, credits={ "basic": 5, "highlight": 1, "resetTime": "2024-12-31T23:59:59Z", "test_mode": False }, test_mode=False ) except HTTPException: raise except Exception as e: logger.error(f"Scan error: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @app.get("/api/credits") async def get_credits(userId: Optional[str] = None): """Get credits information (for compatibility with worker).""" return { "basic": 5, "highlight": 1, "resetTime": "2024-12-31T23:59:59Z", "test_mode": False, "userId": userId or "unknown" } # ---------------- Main Entry Point ---------------- if __name__ == "__main__": import uvicorn uvicorn.run( app, host="0.0.0.0", port=7860, log_level="info", access_log=True )