RafzE's picture
Update app.py
1583931 verified
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, field_validator, ValidationInfo
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
import logging
from typing import Optional, List
import time
import sys
# ---------------- Logging ----------------
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
stream=sys.stdout
)
logger = logging.getLogger("detector")
# ---------------- FastAPI ----------------
app = FastAPI(
title="Detextly AI Detector API",
description="AI Detector with chunked scoring and low-confidence filter",
version="2.1.0"
)
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ---------------- Pydantic Models ----------------
class ScanRequest(BaseModel):
text: str
scan_type: Optional[str] = None
scanType: Optional[str] = None
userId: Optional[str] = None
@field_validator('scanType')
@classmethod
def map_scantype_to_scan_type(cls, v: Optional[str], info: ValidationInfo) -> Optional[str]:
"""Mapper to ensure backward compatibility with old 'scanType' parameter name."""
if v is not None:
# Map the old 'scanType' field value to the new 'scan_type' field
info.data['scan_type'] = v
return v
def get_scan_type(self) -> str:
"""Get the scan type, defaulting to 'basic' if not provided."""
# scan_type takes precedence as it's the canonical field name
return self.scan_type or "basic"
class ScanResponse(BaseModel):
success: bool
result: dict
processingTime: int
credits: Optional[dict] = None
test_mode: bool = False
# ---------------- AI Detector Core ----------------
MODEL_NAME = "openai-community/roberta-large-openai-detector"
class AIDetector:
def __init__(self):
self.model = None
self.tokenizer = None
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.label_map = None
logger.info(f"Using device: {self.device}")
def load_model(self):
if self.model is not None:
return
logger.info(f"Loading model: {MODEL_NAME}")
try:
self.tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
self.model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME)
# Store label mapping for debugging
if hasattr(self.model.config, 'id2label'):
self.label_map = self.model.config.id2label
logger.info(f"Model label mapping: {self.label_map}")
else:
logger.warning("No label mapping found in model config")
except Exception as e:
logger.error(f"Error loading model: {e}")
raise RuntimeError(f"Failed to load model: {e}")
self.model.to(self.device)
self.model.eval()
logger.info("Model loaded successfully.")
def predict(self, text: str, max_length: int = 512) -> dict:
"""Return both human and AI probabilities."""
if self.model is None:
self.load_model()
# Tokenize input
tokens = self.tokenizer(
text,
return_tensors="pt",
truncation=True,
max_length=max_length,
padding=True
)
tokens = {k: v.to(self.device) for k, v in tokens.items()}
with torch.no_grad():
outputs = self.model(**tokens)
probs = torch.softmax(outputs.logits, dim=-1)
# Get probabilities for both classes
human_prob = float(probs[0][0].item()) # Class 0
ai_prob = float(probs[0][1].item()) # Class 1
# Debug logging
logger.debug(f"Class 0 (Human): {human_prob:.4f}, Class 1 (AI): {ai_prob:.4f}")
# Verify probabilities sum to ~1.0
total = human_prob + ai_prob
if abs(total - 1.0) > 0.01:
logger.warning(f"Probabilities don't sum to 1.0: {total:.4f}")
return {
"human_probability": human_prob,
"ai_probability": ai_prob,
"raw_probs": probs.tolist()
}
detector = AIDetector()
# ---------------- Pattern Detection ----------------
def detect_chatgpt_patterns(text: str) -> bool:
"""Return True if ChatGPT patterns are detected."""
patterns = [
"as an ai language model",
"i am an ai model",
"i cannot provide medical",
"as a language model",
"based on the information provided",
"my training data",
"i don't have personal experiences",
"i don't have feelings",
"as an artificial intelligence",
"i don't have personal opinions"
]
lower = text.lower()
for pattern in patterns:
if pattern in lower:
logger.debug(f"ChatGPT pattern detected: {pattern}")
return True
return False
# ---------------- Highlight / Chunked Scan ----------------
def analyze_sections(text: str, chunk_size: int = 40) -> List[dict]:
"""Split text into smaller chunks and compute AI probability for each."""
sections = []
words = text.split()
total_chunks = (len(words) + chunk_size - 1) // chunk_size
logger.info(f"Analyzing {len(words)} words in {total_chunks} chunks")
for i in range(0, len(words), chunk_size):
chunk = " ".join(words[i:i+chunk_size])
if len(chunk.strip()) < 20:
continue
# Get probabilities from model
probs = detector.predict(chunk)
human_prob = probs["human_probability"]
ai_prob = probs["ai_probability"]
# Check for ChatGPT patterns
has_pattern = detect_chatgpt_patterns(chunk)
if has_pattern:
ai_prob = max(ai_prob, 0.9) # Boost AI probability if pattern found
human_prob = 1 - ai_prob
sections.append({
"text": chunk[:200] + "..." if len(chunk) > 200 else chunk,
"ai_probability": round(ai_prob, 4),
"human_probability": round(human_prob, 4),
"words": len(chunk.split()),
"has_chatgpt_pattern": has_pattern
})
logger.info(f"Generated {len(sections)} sections for analysis")
return sections
def compute_overall_score(sections: List[dict], confidence_threshold: float = 0.3) -> dict:
"""Compute weighted average probabilities with confidence filtering."""
if not sections:
return {"ai_probability": 0.0, "human_probability": 1.0, "confidence": "low"}
# Filter out low-confidence predictions (close to 0.5)
confident_sections = []
for section in sections:
ai_prob = section["ai_probability"]
confidence = abs(ai_prob - 0.5) # Distance from uncertain (0.5)
if confidence >= confidence_threshold:
confident_sections.append(section)
if not confident_sections:
# If no confident sections, use all sections
confident_sections = sections
# Weighted average by word count
total_words = sum(s["words"] for s in confident_sections)
if total_words == 0:
return {"ai_probability": 0.5, "human_probability": 0.5, "confidence": "low"}
weighted_ai_sum = sum(s["ai_probability"] * s["words"] for s in confident_sections)
weighted_human_sum = sum(s["human_probability"] * s["words"] for s in confident_sections)
overall_ai = weighted_ai_sum / total_words
overall_human = weighted_human_sum / total_words
# Determine confidence level
distance_from_mid = abs(overall_ai - 0.5)
if distance_from_mid > 0.4:
confidence_level = "high"
elif distance_from_mid > 0.2:
confidence_level = "medium"
else:
confidence_level = "low"
return {
"ai_probability": round(overall_ai, 4),
"human_probability": round(overall_human, 4),
"confidence": confidence_level,
"sections_analyzed": len(sections),
"confident_sections": len(confident_sections)
}
# ---------------- API Endpoints ----------------
@app.on_event("startup")
async def startup():
"""Initialize the model on startup."""
logger.info("Starting Detextly AI Detector API...")
try:
detector.load_model()
logger.info("API startup complete")
except Exception as e:
logger.error(f"Failed to start API: {e}")
raise
@app.get("/")
async def root():
return {
"status": "online",
"model": MODEL_NAME,
"device": str(detector.device),
"version": "2.1.0",
"features": ["basic_scan", "highlight_scan", "chatgpt_pattern_detection"],
"note": "Accepts both 'scan_type' and 'scanType' parameters"
}
@app.get("/health")
async def health():
return {
"status": "healthy",
"model_loaded": detector.model is not None,
"model": MODEL_NAME,
"timestamp": time.time()
}
@app.get("/debug/test")
async def debug_test():
"""Test endpoint to verify model is working correctly."""
test_texts = [
"I went to the store yesterday to buy groceries.",
"As an AI language model, I don't have personal experiences.",
"The quick brown fox jumps over the lazy dog."
]
results = []
for text in test_texts:
probs = detector.predict(text)
results.append({
"text": text[:50] + "..." if len(text) > 50 else text,
"human_probability": probs["human_probability"],
"ai_probability": probs["ai_probability"]
})
return {
"test_results": results,
"model_info": {
"name": MODEL_NAME,
"labels": detector.label_map,
"device": str(detector.device)
}
}
@app.post("/api/scan", response_model=ScanResponse)
async def scan_text(request: ScanRequest):
"""Main scanning endpoint."""
start_time = time.time()
try:
# Validate input
if not request.text or len(request.text.strip()) < 10:
raise HTTPException(status_code=400, detail="Text must be at least 10 characters long.")
# Get scan type (handles both scan_type and scanType via the validator)
scan_type = request.get_scan_type()
logger.info(f"Scan request: type={scan_type}, userId={request.userId}, text_length={len(request.text)}")
# Limit text length for performance
text = request.text[:5000]
# Check for ChatGPT patterns
chatgpt_detected = detect_chatgpt_patterns(text)
if scan_type == "highlight":
# Chunked analysis
sections = analyze_sections(text, chunk_size=40)
overall = compute_overall_score(sections)
# Identify AI-heavy sections
ai_sections = [
{
"text": s["text"],
"ai_probability": s["ai_probability"],
"human_probability": s["human_probability"],
"words": s["words"]
}
for s in sections if s["ai_probability"] > 0.6
]
result = {
"overall": overall["human_probability"], # Human probability for backward compatibility
"ai_probability": overall["ai_probability"],
"human_probability": overall["human_probability"],
"model": MODEL_NAME,
"confidence": overall["confidence"],
"chatgpt_detected": chatgpt_detected,
"scan_type": "highlight",
"section_count": len(sections),
"ai_section_count": len(ai_sections),
"sections_analyzed": overall["sections_analyzed"],
"confident_sections": overall["confident_sections"],
"ai_sections": ai_sections[:10] # Limit to first 10
}
else:
# Basic scan (single analysis)
probs = detector.predict(text)
human_prob = probs["human_probability"]
ai_prob = probs["ai_probability"]
# Boost AI probability if ChatGPT patterns detected
if chatgpt_detected:
ai_prob = max(ai_prob, 0.9)
human_prob = 1 - ai_prob
# Determine confidence
distance_from_mid = abs(ai_prob - 0.5)
confidence = "high" if distance_from_mid > 0.4 else "medium" if distance_from_mid > 0.2 else "low"
result = {
"overall": human_prob, # Human probability for backward compatibility
"ai_probability": ai_prob,
"human_probability": human_prob,
"model": MODEL_NAME,
"confidence": confidence,
"chatgpt_detected": chatgpt_detected,
"scan_type": "basic"
}
# Calculate processing time
processing_time = int((time.time() - start_time) * 1000)
logger.info(f"Scan completed in {processing_time}ms: AI={result.get('ai_probability', 0):.2%}")
return ScanResponse(
success=True,
result=result,
processingTime=processing_time,
credits={
"basic": 5,
"highlight": 1,
"resetTime": "2024-12-31T23:59:59Z",
"test_mode": False
},
test_mode=False
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Scan error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@app.get("/api/credits")
async def get_credits(userId: Optional[str] = None):
"""Get credits information (for compatibility with worker)."""
return {
"basic": 5,
"highlight": 1,
"resetTime": "2024-12-31T23:59:59Z",
"test_mode": False,
"userId": userId or "unknown"
}
# ---------------- Main Entry Point ----------------
if __name__ == "__main__":
import uvicorn
uvicorn.run(
app,
host="0.0.0.0",
port=7860,
log_level="info",
access_log=True
)