Spaces:
Running
Running
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel, field_validator, ValidationInfo | |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
| import torch | |
| import logging | |
| from typing import Optional, List | |
| import time | |
| import sys | |
| # ---------------- Logging ---------------- | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| stream=sys.stdout | |
| ) | |
| logger = logging.getLogger("detector") | |
| # ---------------- FastAPI ---------------- | |
| app = FastAPI( | |
| title="Detextly AI Detector API", | |
| description="AI Detector with chunked scoring and low-confidence filter", | |
| version="2.1.0" | |
| ) | |
| # CORS | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # ---------------- Pydantic Models ---------------- | |
| class ScanRequest(BaseModel): | |
| text: str | |
| scan_type: Optional[str] = None | |
| scanType: Optional[str] = None | |
| userId: Optional[str] = None | |
| def map_scantype_to_scan_type(cls, v: Optional[str], info: ValidationInfo) -> Optional[str]: | |
| """Mapper to ensure backward compatibility with old 'scanType' parameter name.""" | |
| if v is not None: | |
| # Map the old 'scanType' field value to the new 'scan_type' field | |
| info.data['scan_type'] = v | |
| return v | |
| def get_scan_type(self) -> str: | |
| """Get the scan type, defaulting to 'basic' if not provided.""" | |
| # scan_type takes precedence as it's the canonical field name | |
| return self.scan_type or "basic" | |
| class ScanResponse(BaseModel): | |
| success: bool | |
| result: dict | |
| processingTime: int | |
| credits: Optional[dict] = None | |
| test_mode: bool = False | |
| # ---------------- AI Detector Core ---------------- | |
| MODEL_NAME = "openai-community/roberta-large-openai-detector" | |
| class AIDetector: | |
| def __init__(self): | |
| self.model = None | |
| self.tokenizer = None | |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| self.label_map = None | |
| logger.info(f"Using device: {self.device}") | |
| def load_model(self): | |
| if self.model is not None: | |
| return | |
| logger.info(f"Loading model: {MODEL_NAME}") | |
| try: | |
| self.tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) | |
| self.model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME) | |
| # Store label mapping for debugging | |
| if hasattr(self.model.config, 'id2label'): | |
| self.label_map = self.model.config.id2label | |
| logger.info(f"Model label mapping: {self.label_map}") | |
| else: | |
| logger.warning("No label mapping found in model config") | |
| except Exception as e: | |
| logger.error(f"Error loading model: {e}") | |
| raise RuntimeError(f"Failed to load model: {e}") | |
| self.model.to(self.device) | |
| self.model.eval() | |
| logger.info("Model loaded successfully.") | |
| def predict(self, text: str, max_length: int = 512) -> dict: | |
| """Return both human and AI probabilities.""" | |
| if self.model is None: | |
| self.load_model() | |
| # Tokenize input | |
| tokens = self.tokenizer( | |
| text, | |
| return_tensors="pt", | |
| truncation=True, | |
| max_length=max_length, | |
| padding=True | |
| ) | |
| tokens = {k: v.to(self.device) for k, v in tokens.items()} | |
| with torch.no_grad(): | |
| outputs = self.model(**tokens) | |
| probs = torch.softmax(outputs.logits, dim=-1) | |
| # Get probabilities for both classes | |
| human_prob = float(probs[0][0].item()) # Class 0 | |
| ai_prob = float(probs[0][1].item()) # Class 1 | |
| # Debug logging | |
| logger.debug(f"Class 0 (Human): {human_prob:.4f}, Class 1 (AI): {ai_prob:.4f}") | |
| # Verify probabilities sum to ~1.0 | |
| total = human_prob + ai_prob | |
| if abs(total - 1.0) > 0.01: | |
| logger.warning(f"Probabilities don't sum to 1.0: {total:.4f}") | |
| return { | |
| "human_probability": human_prob, | |
| "ai_probability": ai_prob, | |
| "raw_probs": probs.tolist() | |
| } | |
| detector = AIDetector() | |
| # ---------------- Pattern Detection ---------------- | |
| def detect_chatgpt_patterns(text: str) -> bool: | |
| """Return True if ChatGPT patterns are detected.""" | |
| patterns = [ | |
| "as an ai language model", | |
| "i am an ai model", | |
| "i cannot provide medical", | |
| "as a language model", | |
| "based on the information provided", | |
| "my training data", | |
| "i don't have personal experiences", | |
| "i don't have feelings", | |
| "as an artificial intelligence", | |
| "i don't have personal opinions" | |
| ] | |
| lower = text.lower() | |
| for pattern in patterns: | |
| if pattern in lower: | |
| logger.debug(f"ChatGPT pattern detected: {pattern}") | |
| return True | |
| return False | |
| # ---------------- Highlight / Chunked Scan ---------------- | |
| def analyze_sections(text: str, chunk_size: int = 40) -> List[dict]: | |
| """Split text into smaller chunks and compute AI probability for each.""" | |
| sections = [] | |
| words = text.split() | |
| total_chunks = (len(words) + chunk_size - 1) // chunk_size | |
| logger.info(f"Analyzing {len(words)} words in {total_chunks} chunks") | |
| for i in range(0, len(words), chunk_size): | |
| chunk = " ".join(words[i:i+chunk_size]) | |
| if len(chunk.strip()) < 20: | |
| continue | |
| # Get probabilities from model | |
| probs = detector.predict(chunk) | |
| human_prob = probs["human_probability"] | |
| ai_prob = probs["ai_probability"] | |
| # Check for ChatGPT patterns | |
| has_pattern = detect_chatgpt_patterns(chunk) | |
| if has_pattern: | |
| ai_prob = max(ai_prob, 0.9) # Boost AI probability if pattern found | |
| human_prob = 1 - ai_prob | |
| sections.append({ | |
| "text": chunk[:200] + "..." if len(chunk) > 200 else chunk, | |
| "ai_probability": round(ai_prob, 4), | |
| "human_probability": round(human_prob, 4), | |
| "words": len(chunk.split()), | |
| "has_chatgpt_pattern": has_pattern | |
| }) | |
| logger.info(f"Generated {len(sections)} sections for analysis") | |
| return sections | |
| def compute_overall_score(sections: List[dict], confidence_threshold: float = 0.3) -> dict: | |
| """Compute weighted average probabilities with confidence filtering.""" | |
| if not sections: | |
| return {"ai_probability": 0.0, "human_probability": 1.0, "confidence": "low"} | |
| # Filter out low-confidence predictions (close to 0.5) | |
| confident_sections = [] | |
| for section in sections: | |
| ai_prob = section["ai_probability"] | |
| confidence = abs(ai_prob - 0.5) # Distance from uncertain (0.5) | |
| if confidence >= confidence_threshold: | |
| confident_sections.append(section) | |
| if not confident_sections: | |
| # If no confident sections, use all sections | |
| confident_sections = sections | |
| # Weighted average by word count | |
| total_words = sum(s["words"] for s in confident_sections) | |
| if total_words == 0: | |
| return {"ai_probability": 0.5, "human_probability": 0.5, "confidence": "low"} | |
| weighted_ai_sum = sum(s["ai_probability"] * s["words"] for s in confident_sections) | |
| weighted_human_sum = sum(s["human_probability"] * s["words"] for s in confident_sections) | |
| overall_ai = weighted_ai_sum / total_words | |
| overall_human = weighted_human_sum / total_words | |
| # Determine confidence level | |
| distance_from_mid = abs(overall_ai - 0.5) | |
| if distance_from_mid > 0.4: | |
| confidence_level = "high" | |
| elif distance_from_mid > 0.2: | |
| confidence_level = "medium" | |
| else: | |
| confidence_level = "low" | |
| return { | |
| "ai_probability": round(overall_ai, 4), | |
| "human_probability": round(overall_human, 4), | |
| "confidence": confidence_level, | |
| "sections_analyzed": len(sections), | |
| "confident_sections": len(confident_sections) | |
| } | |
| # ---------------- API Endpoints ---------------- | |
| async def startup(): | |
| """Initialize the model on startup.""" | |
| logger.info("Starting Detextly AI Detector API...") | |
| try: | |
| detector.load_model() | |
| logger.info("API startup complete") | |
| except Exception as e: | |
| logger.error(f"Failed to start API: {e}") | |
| raise | |
| async def root(): | |
| return { | |
| "status": "online", | |
| "model": MODEL_NAME, | |
| "device": str(detector.device), | |
| "version": "2.1.0", | |
| "features": ["basic_scan", "highlight_scan", "chatgpt_pattern_detection"], | |
| "note": "Accepts both 'scan_type' and 'scanType' parameters" | |
| } | |
| async def health(): | |
| return { | |
| "status": "healthy", | |
| "model_loaded": detector.model is not None, | |
| "model": MODEL_NAME, | |
| "timestamp": time.time() | |
| } | |
| async def debug_test(): | |
| """Test endpoint to verify model is working correctly.""" | |
| test_texts = [ | |
| "I went to the store yesterday to buy groceries.", | |
| "As an AI language model, I don't have personal experiences.", | |
| "The quick brown fox jumps over the lazy dog." | |
| ] | |
| results = [] | |
| for text in test_texts: | |
| probs = detector.predict(text) | |
| results.append({ | |
| "text": text[:50] + "..." if len(text) > 50 else text, | |
| "human_probability": probs["human_probability"], | |
| "ai_probability": probs["ai_probability"] | |
| }) | |
| return { | |
| "test_results": results, | |
| "model_info": { | |
| "name": MODEL_NAME, | |
| "labels": detector.label_map, | |
| "device": str(detector.device) | |
| } | |
| } | |
| async def scan_text(request: ScanRequest): | |
| """Main scanning endpoint.""" | |
| start_time = time.time() | |
| try: | |
| # Validate input | |
| if not request.text or len(request.text.strip()) < 10: | |
| raise HTTPException(status_code=400, detail="Text must be at least 10 characters long.") | |
| # Get scan type (handles both scan_type and scanType via the validator) | |
| scan_type = request.get_scan_type() | |
| logger.info(f"Scan request: type={scan_type}, userId={request.userId}, text_length={len(request.text)}") | |
| # Limit text length for performance | |
| text = request.text[:5000] | |
| # Check for ChatGPT patterns | |
| chatgpt_detected = detect_chatgpt_patterns(text) | |
| if scan_type == "highlight": | |
| # Chunked analysis | |
| sections = analyze_sections(text, chunk_size=40) | |
| overall = compute_overall_score(sections) | |
| # Identify AI-heavy sections | |
| ai_sections = [ | |
| { | |
| "text": s["text"], | |
| "ai_probability": s["ai_probability"], | |
| "human_probability": s["human_probability"], | |
| "words": s["words"] | |
| } | |
| for s in sections if s["ai_probability"] > 0.6 | |
| ] | |
| result = { | |
| "overall": overall["human_probability"], # Human probability for backward compatibility | |
| "ai_probability": overall["ai_probability"], | |
| "human_probability": overall["human_probability"], | |
| "model": MODEL_NAME, | |
| "confidence": overall["confidence"], | |
| "chatgpt_detected": chatgpt_detected, | |
| "scan_type": "highlight", | |
| "section_count": len(sections), | |
| "ai_section_count": len(ai_sections), | |
| "sections_analyzed": overall["sections_analyzed"], | |
| "confident_sections": overall["confident_sections"], | |
| "ai_sections": ai_sections[:10] # Limit to first 10 | |
| } | |
| else: | |
| # Basic scan (single analysis) | |
| probs = detector.predict(text) | |
| human_prob = probs["human_probability"] | |
| ai_prob = probs["ai_probability"] | |
| # Boost AI probability if ChatGPT patterns detected | |
| if chatgpt_detected: | |
| ai_prob = max(ai_prob, 0.9) | |
| human_prob = 1 - ai_prob | |
| # Determine confidence | |
| distance_from_mid = abs(ai_prob - 0.5) | |
| confidence = "high" if distance_from_mid > 0.4 else "medium" if distance_from_mid > 0.2 else "low" | |
| result = { | |
| "overall": human_prob, # Human probability for backward compatibility | |
| "ai_probability": ai_prob, | |
| "human_probability": human_prob, | |
| "model": MODEL_NAME, | |
| "confidence": confidence, | |
| "chatgpt_detected": chatgpt_detected, | |
| "scan_type": "basic" | |
| } | |
| # Calculate processing time | |
| processing_time = int((time.time() - start_time) * 1000) | |
| logger.info(f"Scan completed in {processing_time}ms: AI={result.get('ai_probability', 0):.2%}") | |
| return ScanResponse( | |
| success=True, | |
| result=result, | |
| processingTime=processing_time, | |
| credits={ | |
| "basic": 5, | |
| "highlight": 1, | |
| "resetTime": "2024-12-31T23:59:59Z", | |
| "test_mode": False | |
| }, | |
| test_mode=False | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Scan error: {e}", exc_info=True) | |
| raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") | |
| async def get_credits(userId: Optional[str] = None): | |
| """Get credits information (for compatibility with worker).""" | |
| return { | |
| "basic": 5, | |
| "highlight": 1, | |
| "resetTime": "2024-12-31T23:59:59Z", | |
| "test_mode": False, | |
| "userId": userId or "unknown" | |
| } | |
| # ---------------- Main Entry Point ---------------- | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run( | |
| app, | |
| host="0.0.0.0", | |
| port=7860, | |
| log_level="info", | |
| access_log=True | |
| ) |