smart-chatbot-api / app /models /nlp_engine.py
GitHub Actions
Deploy from GitHub Actions (2026-03-03 04:20 UTC)
c4d486b
import re
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
# Absolute import from project root
from app.utils.config import get_logger
class NLPEngine:
def __init__(self, config=None):
self.config = config
self.logger = get_logger(__name__)
self.confidence_threshold = (
config.nlp["confidence_threshold"] if config else 0.5
)
self.logger.info(
"NLP Engine initialized with confidence threshold: %s",
self.confidence_threshold,
)
self.nlp_en = None
self.nlp_zh = None
try:
import spacy
# Load English model
try:
self.nlp_en = spacy.load("en_core_web_sm")
self.logger.info("English spaCy models loaded successfully")
except OSError:
self.logger.warning("English spaCy model not available")
# Load Chinese model (optional)
try:
self.nlp_zh = spacy.load("zh_core_web_sm")
self.logger.info("Chinese spaCy models loaded successfully")
except OSError:
self.logger.info("Chinese spaCy model not available")
if not self.nlp_en and not self.nlp_zh:
self.logger.warning(
"No spaCy models available, using fallback methods."
)
except (ImportError, OSError):
# Fallback if models not available
self.nlp_en = None
self.nlp_zh = None
self.logger.warning("spaCy models not available, using fallback methods.")
self.intent_classifier = MultinomialNB()
self.vectorizer = TfidfVectorizer()
self.trained = False
def detect_language(self, text):
# Simple Chinese Character detection
chinese_chars = len(re.findall(r"[\u4e00-\u9fff]", text))
return "zh" if chinese_chars > 0 else "en"
def preprocess_text(self, text):
# Clean and normalize text
text = text.lower().strip()
text = re.sub(r"[^\w\s\u4e00-\u9fff]", "", text)
return text
def extract_entities(self, text, language="en"):
entities = {}
# Simplified entity extraction without spaCy if needed
if language == "zh" and self.nlp_zh:
nlp = self.nlp_zh
elif self.nlp_en:
nlp = self.nlp_en
else:
nlp = None
if nlp:
# Use spaCy for entity extraction
doc = nlp(text)
for ent in doc.ents:
entities[ent.label_.lower()] = ent.text
# Always do regex-based extraction (works with or without spaCy)
# Extract order numbers (4-6 digits)
order_pattern = r"\b\d{4,6}\b"
orders = re.findall(order_pattern, text)
if orders:
entities["order_number"] = orders[0]
# Extract email addresses
email_pattern = r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b"
emails = re.findall(email_pattern, text)
if emails:
entities["email"] = emails[0]
# Extract phone numbers (basic pattern)
phone_pattern = r"\b\d{3}-\d{3}-\d{4}\b|\b\d{10}\b|\(\d{3}\)\s*\d{3}-\d{4}"
phones = re.findall(phone_pattern, text)
if phones:
entities["phone"] = phones[0]
# Extract money amounts
money_pattern = r"\$\d+(?:\.\d{2})?|\d+(?:\.\d{2})?\s*(?:dollars?|USD)"
money = re.findall(money_pattern, text, re.IGNORECASE)
if money:
entities["amount"] = money[0]
# Extract dates (basic patterns)
date_patterns = [
r"\b\d{1,2}/\d{1,2}/\d{4}\b", # MM/DD/YYYY
r"\b\d{4}-\d{1,2}-\d{1,2}\b", # YYYY-MM-DD
r"\b(?:today|tomorrow|yesterday)\b", # Relative dates
]
for pattern in date_patterns:
dates = re.findall(pattern, text, re.IGNORECASE)
if dates:
entities["date"] = dates[0]
break
# Extract product names (simple approach - capitalized words)
if language == "en":
product_pattern = r"\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b"
products = re.findall(product_pattern, text)
# Filter out common words
common_words = {"Hello", "Please", "Thank", "Could", "Would", "Should"}
products = [p for p in products if p not in common_words]
if products:
entities["product"] = products[0]
return entities
def train_intent_classifier(self, training_data):
self.logger.info("Starting intent classifier training...")
texts = []
labels = []
for intent, examples in training_data.items():
for example in examples:
texts.append(self.preprocess_text(example))
labels.append(intent)
X = self.vectorizer.fit_transform(texts)
self.intent_classifier.fit(X, labels)
self.trained = True
self.logger.info(
"Intent classifier trained with %d examples across %d intents",
len(texts),
len(training_data),
)
def classify_intent(self, text):
if not self.trained:
self.logger.error("Intent classifier not trained!")
return "unknown", 0.0
processed_text = self.preprocess_text(text)
self.logger.debug("Classifying intent for: %s", processed_text)
# Add keyword-based rules for better accuracy
if any(
word in processed_text
for word in ["buy", "purchase", "want to buy", "interested in"]
):
if not any(
word in processed_text
for word in ["cancel", "stop", "remove", "refund"]
):
self.logger.debug("Keyword-based classification: product_inquiry")
return "product_inquiry", 0.95
if any(
word in processed_text for word in ["cancel", "stop", "remove", "refund"]
):
self.logger.debug("Keyword-based classification: cancel_order")
return "cancel_order", 0.9
# Fall back to ML classification
X = self.vectorizer.transform([processed_text])
intent = self.intent_classifier.predict(X)[0]
confidence = max(self.intent_classifier.predict_proba(X)[0])
self.logger.debug(
"ML classification: %s (confidence: %.2f)", intent, confidence
)
# Convert NumPy types to Python Types
return intent, confidence