civicpulse-nlp / app.py
rayubaldo44's picture
feat: update sentiment tokenizer to dost-asti/RoBERTa-tl-sentiment-analysis
6e03871
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from transformers import (
pipeline,
AutoTokenizer,
)
from sentence_transformers import SentenceTransformer
import torch
import re
import hashlib
import logging
import spacy
from langdetect import detect, LangDetectException
# ── Logging ────────────────────────────────────────────────────────────────
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ── App ────────────────────────────────────────────────────────────────────
app = FastAPI(
title="CivicPulse NLP API",
description="NLP microservice for Civic Pulse Engine β€” Municipality of Pulilan, Bulacan",
version="1.7.0",
)
# ═══════════════════════════════════════════════════════════════════════════
# MODEL LOADING
# All models loaded once at startup β€” never inside endpoint functions.
# ═══════════════════════════════════════════════════════════════════════════
DEVICE = 0 if torch.cuda.is_available() else -1
# ── 1. Sentiment Model ─────────────────────────────────────────────────────
# CHANGED in v1.7.0: Switched from tabularisai/multilingual-sentiment-analysis
# to rayubaldo44/civicpulse-sentiment-v2 (Stage 1 fine-tuned model)
# Base: dost-asti/RoBERTa-tl-sentiment-analysis, fine-tuned on:
# - scaredmeow/shopee-reviews-tl-stars (15K Tagalog reviews, star→3-class mapped)
# - legacy-datasets/hate_speech_filipino (10K election tweets, negative detection)
# Result: Direct 3-class output (negative/neutral/positive) β€” no aggregation needed.
# Fixes: "Salamat sa bagong street lights" now correctly classified as positive
# (was neutral at 0.99 with tabularisai). 6/6 civic smoke test passed.
logger.info("Loading sentiment model: rayubaldo44/civicpulse-sentiment-v2...")
sentiment_pipeline = pipeline(
task="text-classification",
model="rayubaldo44/civicpulse-sentiment-v2",
tokenizer="dost-asti/RoBERTa-tl-sentiment-analysis",
device=DEVICE,
top_k=None,
)
logger.info("Sentiment model loaded.")
# ── 2. Claim Detection Tokenizer ───────────────────────────────────────────
CLAIM_DETECTION_MODE = "heuristic" # switch to "model" after fine-tuning
logger.info("Loading claim detection tokenizer: jcblaise/roberta-tagalog-large...")
claim_tokenizer = AutoTokenizer.from_pretrained("jcblaise/roberta-tagalog-large")
logger.info("Claim tokenizer loaded. (Heuristic mode until fine-tuned.)")
# ── 3. Topic Classification Model ──────────────────────────────────────────
# CHANGED in v1.5.0: Switched from cross-encoder/nli-MiniLM2-L6-H768 (English-only, 82M params)
# to MoritzLaurer/bge-m3-zeroshot-v2.0 (multilingual 100+ langs, 568M params, newest v2.0 architecture)
# Reason: Previous model could not classify Taglish text β€” nearly all comments fell into "Other"
# bge-m3-zeroshot-v2.0 is the most accurate multilingual zero-shot model available (2024)
# NOTE v1.7.0: Kept as-is. Stage 1 fine-tuned topic model collapsed to single class due to
# insufficient civic topic diversity in public datasets. Zero-shot remains superior until
# Stage 2 fine-tuning with annotated civic data.
logger.info("Loading topic model: MoritzLaurer/bge-m3-zeroshot-v2.0...")
topic_pipeline_model = pipeline(
task="zero-shot-classification",
model="MoritzLaurer/bge-m3-zeroshot-v2.0",
device=DEVICE,
)
logger.info("Topic classification model loaded.")
# ── 4. spaCy NER (for /preprocess PII masking) ─────────────────────────────
logger.info("Loading spaCy NER model: en_core_web_sm...")
nlp_spacy = spacy.load("en_core_web_sm")
logger.info("spaCy NER model loaded.")
# ── 5. Embedding Model (for RAG pipeline) ──────────────────────────────────
logger.info("Loading embedding model: meedan/paraphrase-filipino-mpnet-base-v2...")
embedding_model = SentenceTransformer("meedan/paraphrase-filipino-mpnet-base-v2")
logger.info("Embedding model loaded.")
# ═══════════════════════════════════════════════════════════════════════════
# CONSTANTS
# ═══════════════════════════════════════════════════════════════════════════
CONFIDENCE_THRESHOLD = 0.65
TOPIC_REVIEW_THRESHOLD = 0.50
# CHANGED in v1.5.0: Descriptive labels for better zero-shot matching on Taglish civic text.
CIVIC_TOPICS_DESCRIPTIVE = [
"government infrastructure projects, roads, bridges, flood control, buildings, and construction",
"healthcare, hospitals, medical services, clinics, and public health programs",
"waste management, garbage collection, recycling, and environmental sanitation",
"public safety, police, crime, drugs, peace and order, and law enforcement",
"other government services, civic matters, elections, and general topics",
]
# Map descriptive labels back to clean database-friendly labels
TOPIC_LABEL_MAP = {
"government infrastructure projects, roads, bridges, flood control, buildings, and construction": "Infrastructure and Public Works",
"healthcare, hospitals, medical services, clinics, and public health programs": "Healthcare and Medical Services",
"waste management, garbage collection, recycling, and environmental sanitation": "Waste Management and Sanitation",
"public safety, police, crime, drugs, peace and order, and law enforcement": "Public Safety and Peace and Order",
"other government services, civic matters, elections, and general topics": "Other",
}
# Reverse map for custom_labels fallback (keeps backward compatibility)
CIVIC_TOPICS = list(TOPIC_LABEL_MAP.values())
# Claim detection heuristic patterns β€” Filipino/Taglish signals of a verifiable claim.
# Intentionally broad β€” RAG + Claude Haiku filters false positives later.
CLAIM_PATTERNS = [
r"\bsinabi\b", r"\bayon\b", r"\bdaw\b", r"\braw\b", r"\bdiba\b",
r"\btotoo\b", r"\bkatotohanan\b", r"\bbalita\b", r"\bnews\b",
r"\bconfirmed\b", r"\bofficial\b", r"\bpahayag\b", r"\bannounced\b",
r"\bnagsabi\b", r"\bsinabi ng\b", r"\bayon sa\b", r"\bpumirma\b",
r"\b\d+\s*(piso|million|billion|porsyento|%|metro|km|kilometro)\b",
r"\b\d+\s*(beses|taon|buwan|araw|oras)\b",
r"\b(mayor|gobernador|konseho|lgu|barangay|kapitan)\b.*\b(nag|mag|ipa|sini|apro)\w+",
r"\baccording to\b", r"\breport(ed|s)?\b", r"\bstatement\b",
r"\bproject\b.*\b(million|billion|piso)\b",
r"\b(budget|funds|pondo)\b.*\b\d+\b",
]
COMPILED_PATTERNS = [re.compile(p, re.IGNORECASE) for p in CLAIM_PATTERNS]
# Taglish SMS abbreviation map β€” high-frequency only
TAGLISH_MAP = {
r"\bkc\b": "kasi",
r"\bksi\b": "kasi",
r"\bdk\b": "di ko",
r"\bnman\b": "naman",
r"\bsna\b": "sana",
r"\bkau\b": "kayo",
r"\bkaw\b": "ikaw",
r"\bnyo\b": "ninyo",
r"\bbaket\b": "bakit",
r"\bpede\b": "pwede",
r"\bpls\b": "please",
r"\bplz\b": "please",
r"\bomg\b": "",
r"\blol\b": "",
r"\bhaha+\b": "",
r"\bhehe+\b": "",
}
# ═══════════════════════════════════════════════════════════════════════════
# PREPROCESS HELPERS
# ═══════════════════════════════════════════════════════════════════════════
def hash_user_id(user_id: str) -> str:
return hashlib.sha256(user_id.encode()).hexdigest()
def normalize_platform(text: str) -> str:
"""Remove URLs, anonymize @mentions, strip # symbol but keep the word."""
text = re.sub(r"http\S+", "", text)
text = re.sub(r"@\w+", "[USER]", text)
text = re.sub(r"#(\w+)", r"\1", text)
return re.sub(r"\s+", " ", text).strip()
def normalize_taglish(text: str) -> str:
"""Expand high-frequency Taglish SMS abbreviations."""
for pattern, replacement in TAGLISH_MAP.items():
text = re.sub(pattern, replacement, text, flags=re.IGNORECASE)
return re.sub(r"\s+", " ", text).strip()
def mask_named_entities(text: str) -> str:
"""Mask PERSON entities only β€” keep ORG/GPE so civic context survives."""
doc = nlp_spacy(text)
masked = text
for ent in reversed(doc.ents): # reversed so char offsets stay valid
if ent.label_ == "PERSON":
masked = masked[:ent.start_char] + "[NAME]" + masked[ent.end_char:]
return masked
def detect_language(text: str) -> str:
"""Return 'tl', 'en', or 'tl-en' (Taglish). Falls back to 'unknown'."""
try:
lang = detect(text)
if lang == "tl":
return "tl"
if lang == "en":
tagalog_markers = {"ang", "ng", "mga", "na", "sa", "si", "ko", "ka", "po", "ba", "ay"}
words = set(text.lower().split())
if len(words & tagalog_markers) >= 2:
return "tl-en"
return "en"
return lang
except LangDetectException:
return "unknown"
def is_spam(text: str) -> bool:
"""True if text is too short or contains no real words."""
if len(text.strip()) < 5:
return True
return len(re.findall(r"[a-zA-ZΓ€-ΓΏ\u0100-\u024F]+", text)) == 0
# ═══════════════════════════════════════════════════════════════════════════
# SCHEMAS
# ═══════════════════════════════════════════════════════════════════════════
class PreprocessRequest(BaseModel):
text: str
user_id: str
comment_id: str | None = None
class PreprocessResponse(BaseModel):
comment_id: str | None
hashed_user_id: str
cleaned_text: str
language: str
is_spam: bool
class EmbedRequest(BaseModel):
text: str
class EmbedResponse(BaseModel):
embedding: list[float]
dimensions: int
class SentimentRequest(BaseModel):
text: str
comment_id: str | None = None
class SentimentScore(BaseModel):
label: str
score: float
class SentimentResponse(BaseModel):
comment_id: str | None
sentiment: str
confidence: float
all_scores: list[SentimentScore]
needs_human_review: bool
model: str
class ClaimRequest(BaseModel):
text: str
comment_id: str | None = None
class ClaimResponse(BaseModel):
comment_id: str | None
has_claim: bool
confidence: float
detection_mode: str
matched_patterns: list[str]
token_count: int
model: str
class TopicRequest(BaseModel):
text: str
comment_id: str | None = None
custom_labels: list[str] | None = None
class TopicScore(BaseModel):
label: str
score: float
class TopicResponse(BaseModel):
comment_id: str | None
topic: str
confidence: float
all_scores: list[TopicScore]
needs_human_review: bool
model: str
# ═══════════════════════════════════════════════════════════════════════════
# ENDPOINTS
# ═══════════════════════════════════════════════════════════════════════════
@app.get("/")
def root():
return {
"service": "CivicPulse NLP API",
"version": "1.7.0",
"status": "running",
"endpoints": ["/preprocess", "/embed", "/sentiment", "/claim-detection", "/topic-classification", "/health"],
}
@app.get("/health")
def health():
"""Keep-alive endpoint. GitHub Actions pings this every 25 min."""
return {
"status": "ok",
"models_loaded": ["sentiment", "claim-tokenizer", "topic-classification", "spacy-ner", "embedding"],
"claim_detection_mode": CLAIM_DETECTION_MODE,
}
# ── Pre-Processing ─────────────────────────────────────────────────────────
@app.post("/preprocess", response_model=PreprocessResponse)
def preprocess(request: PreprocessRequest):
"""
Cleans and anonymizes a raw comment before NLP processing.
Call this first β€” pass cleaned_text to the three NLP endpoints.
Steps (in order):
1. SHA-256 hash user_id (PII strip)
2. Remove URLs, anonymize @mentions, strip # symbol
3. Expand Taglish SMS abbreviations
4. spaCy NER masks PERSON entities β†’ [NAME]
5. Detect language: tl / en / tl-en
6. Spam check (too short or no real words)
"""
text = request.text.strip()
if not text:
raise HTTPException(status_code=422, detail="text field cannot be empty.")
hashed_uid = hash_user_id(request.user_id)
text = normalize_platform(text)
text = normalize_taglish(text)
text = mask_named_entities(text)
language = detect_language(text)
spam = is_spam(text)
return PreprocessResponse(
comment_id=request.comment_id,
hashed_user_id=hashed_uid,
cleaned_text=text,
language=language,
is_spam=spam,
)
# ── Embed ──────────────────────────────────────────────────────────────────
@app.post("/embed", response_model=EmbedResponse)
def embed(request: EmbedRequest):
"""
Generate a 768-dimension dense vector embedding for a text string.
Used for: (1) embedding lgu_documents into pgvector, and
(2) embedding flagged claims for cosine similarity search.
Model: meedan/paraphrase-filipino-mpnet-base-v2
"""
text = request.text.strip()
if not text:
raise HTTPException(status_code=422, detail="text field cannot be empty.")
try:
vector = embedding_model.encode(text, normalize_embeddings=True).tolist()
return EmbedResponse(embedding=vector, dimensions=len(vector))
except Exception as e:
logger.error(f"Embedding error: {e}")
raise HTTPException(status_code=500, detail=f"Embedding error: {str(e)}")
# ── Sentiment ──────────────────────────────────────────────────────────────
@app.post("/sentiment", response_model=SentimentResponse)
def analyze_sentiment(request: SentimentRequest):
"""
Classify a comment as positive, negative, or neutral.
v1.7.0: Uses rayubaldo44/civicpulse-sentiment-v2 (Stage 1 fine-tuned).
Base: dost-asti/RoBERTa-tl-sentiment-analysis fine-tuned on Shopee reviews
(15K Tagalog) + hate speech Filipino (10K election tweets).
Direct 3-class output β€” no aggregation needed.
Confidence below 0.65 is routed to human review queue.
"""
text = request.text.strip()
if not text:
raise HTTPException(status_code=422, detail="text field cannot be empty.")
if len(text) > 1000:
logger.warning(f"Long text ({len(text)} chars), tokenizer will truncate.")
try:
raw_results = sentiment_pipeline(text, truncation=True, max_length=512)
except Exception as e:
logger.error(f"Sentiment inference error: {e}")
raise HTTPException(status_code=500, detail=f"Model inference error: {str(e)}")
# v1.7.0: Model outputs 3 classes directly β€” negative, neutral, positive
scores = raw_results[0]
# Build all_scores list
all_scores = [
SentimentScore(label=s["label"], score=round(s["score"], 4))
for s in scores
]
# Top prediction
top = max(scores, key=lambda s: s["score"])
top_label = top["label"]
top_confidence = round(top["score"], 4)
needs_review = top_confidence < CONFIDENCE_THRESHOLD
return SentimentResponse(
comment_id=request.comment_id,
sentiment="review" if needs_review else top_label,
confidence=top_confidence,
all_scores=all_scores,
needs_human_review=needs_review,
model="rayubaldo44/civicpulse-sentiment-v2",
)
# ── Claim Detection ────────────────────────────────────────────────────────
@app.post("/claim-detection", response_model=ClaimResponse)
def detect_claim(request: ClaimRequest):
"""
Detect whether a comment contains a verifiable factual claim.
has_claim = True means the comment will be passed to the RAG pipeline.
"""
text = request.text.strip()
if not text:
raise HTTPException(status_code=422, detail="text field cannot be empty.")
tokens = claim_tokenizer(
text, truncation=True, max_length=512, return_tensors=None,
)
token_count = len(tokens["input_ids"])
if CLAIM_DETECTION_MODE == "model":
raise HTTPException(
status_code=501,
detail="Model mode not yet available. Fine-tune jcblaise/roberta-tagalog-large first."
)
matched = []
for pattern in COMPILED_PATTERNS:
match = pattern.search(text)
if match:
matched.append(match.group(0).lower())
return ClaimResponse(
comment_id=request.comment_id,
has_claim=len(matched) > 0,
confidence=1.0 if matched else 0.0,
detection_mode="heuristic",
matched_patterns=list(set(matched)),
token_count=token_count,
model="jcblaise/roberta-tagalog-large (tokenizer only β€” pending fine-tune)",
)
# ── Topic Classification ───────────────────────────────────────────────────
@app.post("/topic-classification", response_model=TopicResponse)
def classify_topic(request: TopicRequest):
"""
Classify a comment into one of five civic topic areas using
zero-shot NLI. No training data required β€” labels passed at runtime.
Confidence below 0.50 is flagged for human review.
v1.5.0: Switched to bge-m3-zeroshot-v2.0 (568M params, 100+ languages)
for most accurate multilingual Taglish classification. Uses descriptive
candidate labels mapped back to clean database categories.
"""
text = request.text.strip()
if not text:
raise HTTPException(status_code=422, detail="text field cannot be empty.")
# If custom labels are provided, use them directly (no mapping)
if request.custom_labels:
labels = request.custom_labels
use_mapping = False
else:
labels = CIVIC_TOPICS_DESCRIPTIVE
use_mapping = True
if len(labels) < 2:
raise HTTPException(status_code=422, detail="At least 2 candidate labels required.")
try:
result = topic_pipeline_model(
text,
candidate_labels=labels,
truncation=True,
max_length=512,
)
except Exception as e:
logger.error(f"Topic classification error: {e}")
raise HTTPException(status_code=500, detail=f"Model inference error: {str(e)}")
# Map descriptive labels back to clean database labels
if use_mapping:
mapped_labels = [TOPIC_LABEL_MAP.get(l, l) for l in result["labels"]]
else:
mapped_labels = result["labels"]
all_scores = [
TopicScore(label=label, score=round(score, 4))
for label, score in zip(mapped_labels, result["scores"])
]
top_label = mapped_labels[0]
top_score = round(result["scores"][0], 4)
return TopicResponse(
comment_id=request.comment_id,
topic=top_label,
confidence=top_score,
all_scores=all_scores,
needs_human_review=top_score < TOPIC_REVIEW_THRESHOLD,
model="MoritzLaurer/bge-m3-zeroshot-v2.0",
)