api_host / api.py
Pedrinho-Dev01
API DeBERTa Update
c7d60e2
"""
Spam Detection + Emotion Analysis API
Ensemble of RoBERTa-Large + ELECTRA-Large classifiers for spam,
and RoBERTa-Large + ELECTRA-Large + DeBERTa-v3-Large for emotion.
Run with: uvicorn api:app --reload
"""
import json
from typing import Optional
import email
from email import policy as email_policy
import torch
from fastapi import FastAPI, HTTPException, UploadFile, File
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from transformers import (
AutoTokenizer,
DebertaV2ForSequenceClassification,
ElectraForSequenceClassification,
RobertaForSequenceClassification,
)
# ── Config ────────────────────────────────────────────────────────────────────
ROBERTA_SPAM_REPO = "Dpedrinho01/trained_roberta_large"
ELECTRA_SPAM_REPO = "Dpedrinho01/trained_electra_large"
ROBERTA_EMOTION_REPO = "Dpedrinho01/trained_roberta_emotion"
ELECTRA_EMOTION_REPO = "Dpedrinho01/trained_electra_emotion"
DEBERTA_EMOTION_REPO = "Dpedrinho01/trained_deberta_v3_large_emotion"
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
MAYBE_SPAM_UPPER = 0.50 # [threshold, MAYBE_SPAM_UPPER) β†’ "maybe spam"
# ── App ───────────────────────────────────────────────────────────────────────
app = FastAPI(
title="Spam Detection + Emotion Analysis API",
description="Ensemble of RoBERTa-Large + ELECTRA-Large for spam/ham classification and emotion detection.",
version="2.0.0",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# ── Model loading ─────────────────────────────────────────────────────────────
class SpamModelBundle:
"""Binary spam/ham classifier with a single threshold."""
def __init__(self, repo_id: str, model_class):
print(f"Loading {repo_id} …")
self.tokenizer = AutoTokenizer.from_pretrained(repo_id)
self.model = model_class.from_pretrained(repo_id)
self.model.to(DEVICE)
self.model.eval()
from huggingface_hub import hf_hub_download
threshold_path = hf_hub_download(repo_id=repo_id, filename="threshold_config.json")
with open(threshold_path) as f:
cfg = json.load(f)
self.threshold: float = cfg["recommended_threshold"]
print(f" βœ“ {repo_id} loaded (threshold={self.threshold}, device={DEVICE})")
@torch.no_grad()
def predict_proba(self, text: str) -> float:
"""Return P(spam) as a float in [0, 1]."""
inputs = self.tokenizer(
text,
return_tensors="pt",
truncation=True,
max_length=512,
padding=True,
)
inputs = {k: v.to(DEVICE) for k, v in inputs.items()}
logits = self.model(**inputs).logits
proba = torch.softmax(logits, dim=-1)[0, 1].item()
return proba
class EmotionModelBundle:
"""Multi-label emotion classifier with per-class thresholds."""
def __init__(self, repo_id: str, model_class):
print(f"Loading {repo_id} …")
self.tokenizer = AutoTokenizer.from_pretrained(repo_id)
self.model = model_class.from_pretrained(repo_id)
self.model.to(DEVICE)
self.model.eval()
from huggingface_hub import hf_hub_download
config_path = hf_hub_download(repo_id=repo_id, filename="model_config.json")
with open(config_path) as f:
cfg = json.load(f)
self.id2label: dict[str, str] = cfg["id2label"]
self.threshold_global: float = cfg["threshold_global"]
self.threshold_per_class: dict[str, float] = cfg["threshold_per_class"]
self.num_labels: int = cfg["num_labels"]
print(f" βœ“ {repo_id} loaded ({self.num_labels} emotions, device={DEVICE})")
@torch.no_grad()
def predict_proba(self, text: str) -> dict[str, float]:
"""Return {emotion: probability} for all emotion classes."""
inputs = self.tokenizer(
text,
return_tensors="pt",
truncation=True,
max_length=512,
padding=True,
)
inputs = {k: v.to(DEVICE) for k, v in inputs.items()}
logits = self.model(**inputs).logits
# Multi-label β†’ sigmoid
probas = torch.sigmoid(logits)[0].cpu().tolist()
return {self.id2label[str(i)]: round(probas[i], 4) for i in range(self.num_labels)}
# Global model instances
roberta_spam_bundle: Optional[SpamModelBundle] = None
electra_spam_bundle: Optional[SpamModelBundle] = None
roberta_emotion_bundle: Optional[EmotionModelBundle] = None
electra_emotion_bundle: Optional[EmotionModelBundle] = None
deberta_emotion_bundle: Optional[EmotionModelBundle] = None
@app.on_event("startup")
def load_models():
global roberta_spam_bundle, electra_spam_bundle
global roberta_emotion_bundle, electra_emotion_bundle, deberta_emotion_bundle
roberta_spam_bundle = SpamModelBundle(ROBERTA_SPAM_REPO, RobertaForSequenceClassification)
electra_spam_bundle = SpamModelBundle(ELECTRA_SPAM_REPO, ElectraForSequenceClassification)
roberta_emotion_bundle = EmotionModelBundle(ROBERTA_EMOTION_REPO, RobertaForSequenceClassification)
electra_emotion_bundle = EmotionModelBundle(ELECTRA_EMOTION_REPO, ElectraForSequenceClassification)
deberta_emotion_bundle = EmotionModelBundle(DEBERTA_EMOTION_REPO, DebertaV2ForSequenceClassification)
print(f"All models ready on {DEVICE}.")
# ── Schemas ───────────────────────────────────────────────────────────────────
class PredictRequest(BaseModel):
text: str
model: str = "ensemble"
class ModelResult(BaseModel):
spam_probability: float
is_spam: bool
threshold: float
class PredictResponse(BaseModel):
text: str
model_used: str
is_spam: bool
maybe_spam: bool
spam_probability: float
ensemble_threshold: float
maybe_spam_upper_threshold: float
roberta: Optional[ModelResult] = None
electra: Optional[ModelResult] = None
class EmotionScore(BaseModel):
emotion: str
probability: float
detected: bool
threshold: float
class EmotionModelResult(BaseModel):
emotions: list[EmotionScore]
class EmotionPredictRequest(BaseModel):
text: str
class EmotionPredictResponse(BaseModel):
text: str
detected_emotions: list[str]
all_scores: list[EmotionScore] # ensemble averaged, sorted by probability
roberta: Optional[EmotionModelResult] = None
electra: Optional[EmotionModelResult] = None
deberta: Optional[EmotionModelResult] = None
class EmlRequest(BaseModel):
filename: str
content: str # base64 encoded
class FullEmlResponse(BaseModel):
spam: PredictResponse
emotion: EmotionPredictResponse
# ── Helpers ───────────────────────────────────────────────────────────────────
def classify_spam(proba: float, threshold: float) -> dict:
maybe_spam = threshold <= proba < MAYBE_SPAM_UPPER
is_spam = proba >= MAYBE_SPAM_UPPER
return {"is_spam": is_spam, "maybe_spam": maybe_spam}
def ensemble_emotions(
roberta_probas: dict[str, float],
electra_probas: dict[str, float],
deberta_probas: dict[str, float],
threshold_per_class: dict[str, float],
) -> tuple[list[str], list[EmotionScore]]:
"""Average all three models' probabilities and apply per-class thresholds."""
all_scores: list[EmotionScore] = []
detected: list[str] = []
for emotion, r_prob in roberta_probas.items():
e_prob = electra_probas.get(emotion, 0.0)
d_prob = deberta_probas.get(emotion, 0.0)
avg_prob = round((r_prob + e_prob + d_prob) / 3, 4)
threshold = threshold_per_class.get(emotion, 0.4)
is_detected = avg_prob >= threshold
all_scores.append(EmotionScore(
emotion=emotion,
probability=avg_prob,
detected=is_detected,
threshold=threshold,
))
if is_detected:
detected.append(emotion)
all_scores.sort(key=lambda x: x.probability, reverse=True)
return detected, all_scores
def _emotion_model_result(bundle: EmotionModelBundle, probas: dict[str, float]) -> EmotionModelResult:
scores = []
for emotion, prob in probas.items():
threshold = bundle.threshold_per_class.get(emotion, bundle.threshold_global)
scores.append(EmotionScore(
emotion=emotion,
probability=prob,
detected=prob >= threshold,
threshold=threshold,
))
scores.sort(key=lambda x: x.probability, reverse=True)
return EmotionModelResult(emotions=scores)
# ── EML parser ────────────────────────────────────────────────────────────────
def extract_text_from_eml(raw_bytes: bytes) -> str:
msg = email.message_from_bytes(raw_bytes, policy=email_policy.default)
parts = []
subject = msg.get("subject", "")
if subject:
parts.append(f"Subject: {subject}")
from_addr = msg.get("from", "")
if from_addr:
parts.append(f"From: {from_addr}")
if msg.is_multipart():
for part in msg.walk():
ct = part.get_content_type()
cd = str(part.get("Content-Disposition", ""))
if ct == "text/plain" and "attachment" not in cd:
parts.append(part.get_content())
elif ct == "text/html" and "attachment" not in cd and not any("plain" in p for p in parts):
import html as html_lib, re
raw_html = part.get_content()
text = re.sub(r"<[^>]+>", " ", raw_html)
text = html_lib.unescape(text)
text = re.sub(r"\s+", " ", text).strip()
parts.append(text)
else:
parts.append(msg.get_content())
return "\n".join(parts).strip()
# ── Endpoints ─────────────────────────────────────────────────────────────────
@app.get("/")
def root():
return {"status": "ok", "message": "Spam Detection + Emotion Analysis API is running."}
@app.get("/health")
def health():
return {
"status": "healthy",
"device": DEVICE,
"spam_models_loaded": roberta_spam_bundle is not None and electra_spam_bundle is not None,
"emotion_models_loaded": (
roberta_emotion_bundle is not None
and electra_emotion_bundle is not None
and deberta_emotion_bundle is not None
),
}
@app.post("/predict", response_model=PredictResponse)
def predict(req: PredictRequest):
if not req.text.strip():
raise HTTPException(status_code=422, detail="text must not be empty.")
model_key = req.model.lower()
if model_key not in ("ensemble", "roberta", "electra"):
raise HTTPException(status_code=422, detail="model must be 'ensemble', 'roberta', or 'electra'.")
roberta_proba = roberta_spam_bundle.predict_proba(req.text)
electra_proba = electra_spam_bundle.predict_proba(req.text)
roberta_result = ModelResult(
spam_probability=round(roberta_proba, 4),
is_spam=roberta_proba >= MAYBE_SPAM_UPPER,
threshold=roberta_spam_bundle.threshold,
)
electra_result = ModelResult(
spam_probability=round(electra_proba, 4),
is_spam=electra_proba >= MAYBE_SPAM_UPPER,
threshold=electra_spam_bundle.threshold,
)
if model_key == "roberta":
final_proba = roberta_proba
ensemble_threshold = roberta_spam_bundle.threshold
elif model_key == "electra":
final_proba = electra_proba
ensemble_threshold = electra_spam_bundle.threshold
else:
final_proba = (roberta_proba + electra_proba) / 2
ensemble_threshold = (roberta_spam_bundle.threshold + electra_spam_bundle.threshold) / 2
flags = classify_spam(final_proba, ensemble_threshold)
return PredictResponse(
text=req.text,
model_used=model_key,
is_spam=flags["is_spam"],
maybe_spam=flags["maybe_spam"],
spam_probability=round(final_proba, 4),
ensemble_threshold=ensemble_threshold,
maybe_spam_upper_threshold=MAYBE_SPAM_UPPER,
roberta=roberta_result,
electra=electra_result,
)
@app.post("/predict/emotion", response_model=EmotionPredictResponse)
def predict_emotion(req: EmotionPredictRequest):
if not req.text.strip():
raise HTTPException(status_code=422, detail="text must not be empty.")
roberta_probas = roberta_emotion_bundle.predict_proba(req.text)
electra_probas = electra_emotion_bundle.predict_proba(req.text)
deberta_probas = deberta_emotion_bundle.predict_proba(req.text)
# Use roberta's per-class thresholds (all models share the same config structure)
detected, all_scores = ensemble_emotions(
roberta_probas,
electra_probas,
deberta_probas,
roberta_emotion_bundle.threshold_per_class,
)
return EmotionPredictResponse(
text=req.text,
detected_emotions=detected,
all_scores=all_scores,
roberta=_emotion_model_result(roberta_emotion_bundle, roberta_probas),
electra=_emotion_model_result(electra_emotion_bundle, electra_probas),
deberta=_emotion_model_result(deberta_emotion_bundle, deberta_probas),
)
@app.post("/predict/batch")
def predict_batch(texts: list[str], model: str = "ensemble"):
if len(texts) > 50:
raise HTTPException(status_code=422, detail="Batch size limit is 50.")
return [predict(PredictRequest(text=t, model=model)) for t in texts]
@app.post("/predict/eml", response_model=FullEmlResponse)
async def predict_eml(req: EmlRequest):
if not req.filename.endswith(".eml"):
raise HTTPException(status_code=422, detail="Only .eml files are accepted.")
import base64
raw = base64.b64decode(req.content)
if len(raw) > 5 * 1024 * 1024:
raise HTTPException(status_code=413, detail="File too large (max 5 MB).")
try:
text = extract_text_from_eml(raw)
except Exception as e:
raise HTTPException(status_code=422, detail=f"Failed to parse .eml: {e}")
if not text.strip():
raise HTTPException(status_code=422, detail="Could not extract any text from the .eml file.")
analyzed_text = text.strip()
print("\n=== [EMAIL SCAN] Content analyzed ===")
print(analyzed_text)
print("=== [END EMAIL CONTENT] ===\n")
spam_result = predict(PredictRequest(text=analyzed_text, model="ensemble"))
emotion_result = predict_emotion(EmotionPredictRequest(text=analyzed_text))
return FullEmlResponse(spam=spam_result, emotion=emotion_result)