from fastapi import FastAPI, HTTPException from pydantic import BaseModel from transformers import ( pipeline, AutoTokenizer, ) from sentence_transformers import SentenceTransformer import torch import re import hashlib import logging import spacy from langdetect import detect, LangDetectException # ── Logging ──────────────────────────────────────────────────────────────── logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # ── App ──────────────────────────────────────────────────────────────────── app = FastAPI( title="CivicPulse NLP API", description="NLP microservice for Civic Pulse Engine — Municipality of Pulilan, Bulacan", version="1.7.0", ) # ═══════════════════════════════════════════════════════════════════════════ # MODEL LOADING # All models loaded once at startup — never inside endpoint functions. # ═══════════════════════════════════════════════════════════════════════════ DEVICE = 0 if torch.cuda.is_available() else -1 # ── 1. Sentiment Model ───────────────────────────────────────────────────── # CHANGED in v1.7.0: Switched from tabularisai/multilingual-sentiment-analysis # to rayubaldo44/civicpulse-sentiment-v2 (Stage 1 fine-tuned model) # Base: dost-asti/RoBERTa-tl-sentiment-analysis, fine-tuned on: # - scaredmeow/shopee-reviews-tl-stars (15K Tagalog reviews, star→3-class mapped) # - legacy-datasets/hate_speech_filipino (10K election tweets, negative detection) # Result: Direct 3-class output (negative/neutral/positive) — no aggregation needed. # Fixes: "Salamat sa bagong street lights" now correctly classified as positive # (was neutral at 0.99 with tabularisai). 6/6 civic smoke test passed. logger.info("Loading sentiment model: rayubaldo44/civicpulse-sentiment-v2...") sentiment_pipeline = pipeline( task="text-classification", model="rayubaldo44/civicpulse-sentiment-v2", tokenizer="dost-asti/RoBERTa-tl-sentiment-analysis", device=DEVICE, top_k=None, ) logger.info("Sentiment model loaded.") # ── 2. Claim Detection Tokenizer ─────────────────────────────────────────── CLAIM_DETECTION_MODE = "heuristic" # switch to "model" after fine-tuning logger.info("Loading claim detection tokenizer: jcblaise/roberta-tagalog-large...") claim_tokenizer = AutoTokenizer.from_pretrained("jcblaise/roberta-tagalog-large") logger.info("Claim tokenizer loaded. (Heuristic mode until fine-tuned.)") # ── 3. Topic Classification Model ────────────────────────────────────────── # CHANGED in v1.5.0: Switched from cross-encoder/nli-MiniLM2-L6-H768 (English-only, 82M params) # to MoritzLaurer/bge-m3-zeroshot-v2.0 (multilingual 100+ langs, 568M params, newest v2.0 architecture) # Reason: Previous model could not classify Taglish text — nearly all comments fell into "Other" # bge-m3-zeroshot-v2.0 is the most accurate multilingual zero-shot model available (2024) # NOTE v1.7.0: Kept as-is. Stage 1 fine-tuned topic model collapsed to single class due to # insufficient civic topic diversity in public datasets. Zero-shot remains superior until # Stage 2 fine-tuning with annotated civic data. logger.info("Loading topic model: MoritzLaurer/bge-m3-zeroshot-v2.0...") topic_pipeline_model = pipeline( task="zero-shot-classification", model="MoritzLaurer/bge-m3-zeroshot-v2.0", device=DEVICE, ) logger.info("Topic classification model loaded.") # ── 4. spaCy NER (for /preprocess PII masking) ───────────────────────────── logger.info("Loading spaCy NER model: en_core_web_sm...") nlp_spacy = spacy.load("en_core_web_sm") logger.info("spaCy NER model loaded.") # ── 5. Embedding Model (for RAG pipeline) ────────────────────────────────── logger.info("Loading embedding model: meedan/paraphrase-filipino-mpnet-base-v2...") embedding_model = SentenceTransformer("meedan/paraphrase-filipino-mpnet-base-v2") logger.info("Embedding model loaded.") # ═══════════════════════════════════════════════════════════════════════════ # CONSTANTS # ═══════════════════════════════════════════════════════════════════════════ CONFIDENCE_THRESHOLD = 0.65 TOPIC_REVIEW_THRESHOLD = 0.50 # CHANGED in v1.5.0: Descriptive labels for better zero-shot matching on Taglish civic text. CIVIC_TOPICS_DESCRIPTIVE = [ "government infrastructure projects, roads, bridges, flood control, buildings, and construction", "healthcare, hospitals, medical services, clinics, and public health programs", "waste management, garbage collection, recycling, and environmental sanitation", "public safety, police, crime, drugs, peace and order, and law enforcement", "other government services, civic matters, elections, and general topics", ] # Map descriptive labels back to clean database-friendly labels TOPIC_LABEL_MAP = { "government infrastructure projects, roads, bridges, flood control, buildings, and construction": "Infrastructure and Public Works", "healthcare, hospitals, medical services, clinics, and public health programs": "Healthcare and Medical Services", "waste management, garbage collection, recycling, and environmental sanitation": "Waste Management and Sanitation", "public safety, police, crime, drugs, peace and order, and law enforcement": "Public Safety and Peace and Order", "other government services, civic matters, elections, and general topics": "Other", } # Reverse map for custom_labels fallback (keeps backward compatibility) CIVIC_TOPICS = list(TOPIC_LABEL_MAP.values()) # Claim detection heuristic patterns — Filipino/Taglish signals of a verifiable claim. # Intentionally broad — RAG + Claude Haiku filters false positives later. CLAIM_PATTERNS = [ r"\bsinabi\b", r"\bayon\b", r"\bdaw\b", r"\braw\b", r"\bdiba\b", r"\btotoo\b", r"\bkatotohanan\b", r"\bbalita\b", r"\bnews\b", r"\bconfirmed\b", r"\bofficial\b", r"\bpahayag\b", r"\bannounced\b", r"\bnagsabi\b", r"\bsinabi ng\b", r"\bayon sa\b", r"\bpumirma\b", r"\b\d+\s*(piso|million|billion|porsyento|%|metro|km|kilometro)\b", r"\b\d+\s*(beses|taon|buwan|araw|oras)\b", r"\b(mayor|gobernador|konseho|lgu|barangay|kapitan)\b.*\b(nag|mag|ipa|sini|apro)\w+", r"\baccording to\b", r"\breport(ed|s)?\b", r"\bstatement\b", r"\bproject\b.*\b(million|billion|piso)\b", r"\b(budget|funds|pondo)\b.*\b\d+\b", ] COMPILED_PATTERNS = [re.compile(p, re.IGNORECASE) for p in CLAIM_PATTERNS] # Taglish SMS abbreviation map — high-frequency only TAGLISH_MAP = { r"\bkc\b": "kasi", r"\bksi\b": "kasi", r"\bdk\b": "di ko", r"\bnman\b": "naman", r"\bsna\b": "sana", r"\bkau\b": "kayo", r"\bkaw\b": "ikaw", r"\bnyo\b": "ninyo", r"\bbaket\b": "bakit", r"\bpede\b": "pwede", r"\bpls\b": "please", r"\bplz\b": "please", r"\bomg\b": "", r"\blol\b": "", r"\bhaha+\b": "", r"\bhehe+\b": "", } # ═══════════════════════════════════════════════════════════════════════════ # PREPROCESS HELPERS # ═══════════════════════════════════════════════════════════════════════════ def hash_user_id(user_id: str) -> str: return hashlib.sha256(user_id.encode()).hexdigest() def normalize_platform(text: str) -> str: """Remove URLs, anonymize @mentions, strip # symbol but keep the word.""" text = re.sub(r"http\S+", "", text) text = re.sub(r"@\w+", "[USER]", text) text = re.sub(r"#(\w+)", r"\1", text) return re.sub(r"\s+", " ", text).strip() def normalize_taglish(text: str) -> str: """Expand high-frequency Taglish SMS abbreviations.""" for pattern, replacement in TAGLISH_MAP.items(): text = re.sub(pattern, replacement, text, flags=re.IGNORECASE) return re.sub(r"\s+", " ", text).strip() def mask_named_entities(text: str) -> str: """Mask PERSON entities only — keep ORG/GPE so civic context survives.""" doc = nlp_spacy(text) masked = text for ent in reversed(doc.ents): # reversed so char offsets stay valid if ent.label_ == "PERSON": masked = masked[:ent.start_char] + "[NAME]" + masked[ent.end_char:] return masked def detect_language(text: str) -> str: """Return 'tl', 'en', or 'tl-en' (Taglish). Falls back to 'unknown'.""" try: lang = detect(text) if lang == "tl": return "tl" if lang == "en": tagalog_markers = {"ang", "ng", "mga", "na", "sa", "si", "ko", "ka", "po", "ba", "ay"} words = set(text.lower().split()) if len(words & tagalog_markers) >= 2: return "tl-en" return "en" return lang except LangDetectException: return "unknown" def is_spam(text: str) -> bool: """True if text is too short or contains no real words.""" if len(text.strip()) < 5: return True return len(re.findall(r"[a-zA-ZÀ-ÿ\u0100-\u024F]+", text)) == 0 # ═══════════════════════════════════════════════════════════════════════════ # SCHEMAS # ═══════════════════════════════════════════════════════════════════════════ class PreprocessRequest(BaseModel): text: str user_id: str comment_id: str | None = None class PreprocessResponse(BaseModel): comment_id: str | None hashed_user_id: str cleaned_text: str language: str is_spam: bool class EmbedRequest(BaseModel): text: str class EmbedResponse(BaseModel): embedding: list[float] dimensions: int class SentimentRequest(BaseModel): text: str comment_id: str | None = None class SentimentScore(BaseModel): label: str score: float class SentimentResponse(BaseModel): comment_id: str | None sentiment: str confidence: float all_scores: list[SentimentScore] needs_human_review: bool model: str class ClaimRequest(BaseModel): text: str comment_id: str | None = None class ClaimResponse(BaseModel): comment_id: str | None has_claim: bool confidence: float detection_mode: str matched_patterns: list[str] token_count: int model: str class TopicRequest(BaseModel): text: str comment_id: str | None = None custom_labels: list[str] | None = None class TopicScore(BaseModel): label: str score: float class TopicResponse(BaseModel): comment_id: str | None topic: str confidence: float all_scores: list[TopicScore] needs_human_review: bool model: str # ═══════════════════════════════════════════════════════════════════════════ # ENDPOINTS # ═══════════════════════════════════════════════════════════════════════════ @app.get("/") def root(): return { "service": "CivicPulse NLP API", "version": "1.7.0", "status": "running", "endpoints": ["/preprocess", "/embed", "/sentiment", "/claim-detection", "/topic-classification", "/health"], } @app.get("/health") def health(): """Keep-alive endpoint. GitHub Actions pings this every 25 min.""" return { "status": "ok", "models_loaded": ["sentiment", "claim-tokenizer", "topic-classification", "spacy-ner", "embedding"], "claim_detection_mode": CLAIM_DETECTION_MODE, } # ── Pre-Processing ───────────────────────────────────────────────────────── @app.post("/preprocess", response_model=PreprocessResponse) def preprocess(request: PreprocessRequest): """ Cleans and anonymizes a raw comment before NLP processing. Call this first — pass cleaned_text to the three NLP endpoints. Steps (in order): 1. SHA-256 hash user_id (PII strip) 2. Remove URLs, anonymize @mentions, strip # symbol 3. Expand Taglish SMS abbreviations 4. spaCy NER masks PERSON entities → [NAME] 5. Detect language: tl / en / tl-en 6. Spam check (too short or no real words) """ text = request.text.strip() if not text: raise HTTPException(status_code=422, detail="text field cannot be empty.") hashed_uid = hash_user_id(request.user_id) text = normalize_platform(text) text = normalize_taglish(text) text = mask_named_entities(text) language = detect_language(text) spam = is_spam(text) return PreprocessResponse( comment_id=request.comment_id, hashed_user_id=hashed_uid, cleaned_text=text, language=language, is_spam=spam, ) # ── Embed ────────────────────────────────────────────────────────────────── @app.post("/embed", response_model=EmbedResponse) def embed(request: EmbedRequest): """ Generate a 768-dimension dense vector embedding for a text string. Used for: (1) embedding lgu_documents into pgvector, and (2) embedding flagged claims for cosine similarity search. Model: meedan/paraphrase-filipino-mpnet-base-v2 """ text = request.text.strip() if not text: raise HTTPException(status_code=422, detail="text field cannot be empty.") try: vector = embedding_model.encode(text, normalize_embeddings=True).tolist() return EmbedResponse(embedding=vector, dimensions=len(vector)) except Exception as e: logger.error(f"Embedding error: {e}") raise HTTPException(status_code=500, detail=f"Embedding error: {str(e)}") # ── Sentiment ────────────────────────────────────────────────────────────── @app.post("/sentiment", response_model=SentimentResponse) def analyze_sentiment(request: SentimentRequest): """ Classify a comment as positive, negative, or neutral. v1.7.0: Uses rayubaldo44/civicpulse-sentiment-v2 (Stage 1 fine-tuned). Base: dost-asti/RoBERTa-tl-sentiment-analysis fine-tuned on Shopee reviews (15K Tagalog) + hate speech Filipino (10K election tweets). Direct 3-class output — no aggregation needed. Confidence below 0.65 is routed to human review queue. """ text = request.text.strip() if not text: raise HTTPException(status_code=422, detail="text field cannot be empty.") if len(text) > 1000: logger.warning(f"Long text ({len(text)} chars), tokenizer will truncate.") try: raw_results = sentiment_pipeline(text, truncation=True, max_length=512) except Exception as e: logger.error(f"Sentiment inference error: {e}") raise HTTPException(status_code=500, detail=f"Model inference error: {str(e)}") # v1.7.0: Model outputs 3 classes directly — negative, neutral, positive scores = raw_results[0] # Build all_scores list all_scores = [ SentimentScore(label=s["label"], score=round(s["score"], 4)) for s in scores ] # Top prediction top = max(scores, key=lambda s: s["score"]) top_label = top["label"] top_confidence = round(top["score"], 4) needs_review = top_confidence < CONFIDENCE_THRESHOLD return SentimentResponse( comment_id=request.comment_id, sentiment="review" if needs_review else top_label, confidence=top_confidence, all_scores=all_scores, needs_human_review=needs_review, model="rayubaldo44/civicpulse-sentiment-v2", ) # ── Claim Detection ──────────────────────────────────────────────────────── @app.post("/claim-detection", response_model=ClaimResponse) def detect_claim(request: ClaimRequest): """ Detect whether a comment contains a verifiable factual claim. has_claim = True means the comment will be passed to the RAG pipeline. """ text = request.text.strip() if not text: raise HTTPException(status_code=422, detail="text field cannot be empty.") tokens = claim_tokenizer( text, truncation=True, max_length=512, return_tensors=None, ) token_count = len(tokens["input_ids"]) if CLAIM_DETECTION_MODE == "model": raise HTTPException( status_code=501, detail="Model mode not yet available. Fine-tune jcblaise/roberta-tagalog-large first." ) matched = [] for pattern in COMPILED_PATTERNS: match = pattern.search(text) if match: matched.append(match.group(0).lower()) return ClaimResponse( comment_id=request.comment_id, has_claim=len(matched) > 0, confidence=1.0 if matched else 0.0, detection_mode="heuristic", matched_patterns=list(set(matched)), token_count=token_count, model="jcblaise/roberta-tagalog-large (tokenizer only — pending fine-tune)", ) # ── Topic Classification ─────────────────────────────────────────────────── @app.post("/topic-classification", response_model=TopicResponse) def classify_topic(request: TopicRequest): """ Classify a comment into one of five civic topic areas using zero-shot NLI. No training data required — labels passed at runtime. Confidence below 0.50 is flagged for human review. v1.5.0: Switched to bge-m3-zeroshot-v2.0 (568M params, 100+ languages) for most accurate multilingual Taglish classification. Uses descriptive candidate labels mapped back to clean database categories. """ text = request.text.strip() if not text: raise HTTPException(status_code=422, detail="text field cannot be empty.") # If custom labels are provided, use them directly (no mapping) if request.custom_labels: labels = request.custom_labels use_mapping = False else: labels = CIVIC_TOPICS_DESCRIPTIVE use_mapping = True if len(labels) < 2: raise HTTPException(status_code=422, detail="At least 2 candidate labels required.") try: result = topic_pipeline_model( text, candidate_labels=labels, truncation=True, max_length=512, ) except Exception as e: logger.error(f"Topic classification error: {e}") raise HTTPException(status_code=500, detail=f"Model inference error: {str(e)}") # Map descriptive labels back to clean database labels if use_mapping: mapped_labels = [TOPIC_LABEL_MAP.get(l, l) for l in result["labels"]] else: mapped_labels = result["labels"] all_scores = [ TopicScore(label=label, score=round(score, 4)) for label, score in zip(mapped_labels, result["scores"]) ] top_label = mapped_labels[0] top_score = round(result["scores"][0], 4) return TopicResponse( comment_id=request.comment_id, topic=top_label, confidence=top_score, all_scores=all_scores, needs_human_review=top_score < TOPIC_REVIEW_THRESHOLD, model="MoritzLaurer/bge-m3-zeroshot-v2.0", )