|
|
from fastapi import FastAPI |
|
|
from pydantic import BaseModel |
|
|
from transformers import pipeline |
|
|
from datetime import datetime |
|
|
import os |
|
|
|
|
|
os.environ["HF_HOME"] = "/tmp" |
|
|
|
|
|
SPAM_MODEL = "valurank/distilroberta-spam-comments-detection" |
|
|
TOXIC_MODEL = "s-nlp/roberta_toxicity_classifier" |
|
|
SENTIMENT_MODEL = "nlptown/bert-base-multilingual-uncased-sentiment" |
|
|
NSFW_MODEL = "michellejieli/NSFW_text_classifier" |
|
|
|
|
|
|
|
|
spam = pipeline("text-classification", model=SPAM_MODEL) |
|
|
toxic = pipeline("text-classification", model=TOXIC_MODEL) |
|
|
sentiment = pipeline("text-classification", model=SENTIMENT_MODEL) |
|
|
nsfw = pipeline("text-classification", model=NSFW_MODEL) |
|
|
|
|
|
app = FastAPI(title="Plebzs AI Models API") |
|
|
|
|
|
class Query(BaseModel): |
|
|
text: str |
|
|
|
|
|
@app.get("/") |
|
|
def root(): |
|
|
return {"status": "ok", "message": "Plebzs AI Models API"} |
|
|
|
|
|
|
|
|
@app.get("/moderation/ping") |
|
|
def moderation_ping(): |
|
|
return { |
|
|
"status": "healthy", |
|
|
"models": ["spam", "toxic", "sentiment", "nsfw"], |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"version": "1.0.0" |
|
|
} |
|
|
|
|
|
|
|
|
@app.post("/toxicity") |
|
|
def predict_toxicity(query: Query): |
|
|
result = toxic(query.text)[0] |
|
|
|
|
|
|
|
|
toxicity_score = result["score"] if result["label"] == "TOXIC" else 1 - result["score"] |
|
|
|
|
|
return { |
|
|
"toxicity_score": round(toxicity_score, 3), |
|
|
"confidence": round(result["score"], 3), |
|
|
"raw_output": result |
|
|
} |
|
|
|
|
|
@app.post("/sentiment") |
|
|
def predict_sentiment(query: Query): |
|
|
result = sentiment(query.text)[0] |
|
|
|
|
|
|
|
|
label = result["label"] |
|
|
if "1" in label or "2" in label: |
|
|
sentiment_score = -0.7 |
|
|
elif "3" in label: |
|
|
sentiment_score = 0.0 |
|
|
else: |
|
|
sentiment_score = 0.7 |
|
|
|
|
|
return { |
|
|
"sentiment_score": round(sentiment_score, 3), |
|
|
"confidence": round(result["score"], 3), |
|
|
"raw_output": result |
|
|
} |
|
|
|
|
|
|
|
|
@app.post("/spam") |
|
|
def predict_spam(query: Query): |
|
|
result = spam(query.text)[0] |
|
|
spam_score = result["score"] if result["label"] == "SPAM" else 1 - result["score"] |
|
|
|
|
|
return { |
|
|
"spam_score": round(spam_score, 3), |
|
|
"confidence": round(result["score"], 3), |
|
|
"raw_output": result |
|
|
} |
|
|
|
|
|
@app.post("/nsfw") |
|
|
def predict_nsfw(query: Query): |
|
|
result = nsfw(query.text)[0] |
|
|
nsfw_score = result["score"] if result["label"] == "NSFW" else 1 - result["score"] |
|
|
|
|
|
return { |
|
|
"nsfw_score": round(nsfw_score, 3), |
|
|
"confidence": round(result["score"], 3), |
|
|
"raw_output": result |
|
|
} |
|
|
|
|
|
|
|
|
@app.get("/health") |
|
|
def health_check(): |
|
|
status = { |
|
|
"server": "running", |
|
|
"models": {} |
|
|
} |
|
|
|
|
|
models = { |
|
|
"spam": (SPAM_MODEL, spam), |
|
|
"toxic": (TOXIC_MODEL, toxic), |
|
|
"sentiment": (SENTIMENT_MODEL, sentiment), |
|
|
"nsfw": (NSFW_MODEL, nsfw), |
|
|
} |
|
|
|
|
|
for key, (model_name, model_pipeline) in models.items(): |
|
|
try: |
|
|
model_pipeline("test") |
|
|
status["models"][key] = { |
|
|
"model_name": model_name, |
|
|
"status": "running" |
|
|
} |
|
|
except Exception as e: |
|
|
status["models"][key] = { |
|
|
"model_name": model_name, |
|
|
"status": f"error: {str(e)}" |
|
|
} |
|
|
|
|
|
return status |