Spaces:
Running
Running
| import re | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.naive_bayes import MultinomialNB | |
| # Absolute import from project root | |
| from app.utils.config import get_logger | |
| class NLPEngine: | |
| def __init__(self, config=None): | |
| self.config = config | |
| self.logger = get_logger(__name__) | |
| self.confidence_threshold = ( | |
| config.nlp["confidence_threshold"] if config else 0.5 | |
| ) | |
| self.logger.info( | |
| "NLP Engine initialized with confidence threshold: %s", | |
| self.confidence_threshold, | |
| ) | |
| self.nlp_en = None | |
| self.nlp_zh = None | |
| try: | |
| import spacy | |
| # Load English model | |
| try: | |
| self.nlp_en = spacy.load("en_core_web_sm") | |
| self.logger.info("English spaCy models loaded successfully") | |
| except OSError: | |
| self.logger.warning("English spaCy model not available") | |
| # Load Chinese model (optional) | |
| try: | |
| self.nlp_zh = spacy.load("zh_core_web_sm") | |
| self.logger.info("Chinese spaCy models loaded successfully") | |
| except OSError: | |
| self.logger.info("Chinese spaCy model not available") | |
| if not self.nlp_en and not self.nlp_zh: | |
| self.logger.warning( | |
| "No spaCy models available, using fallback methods." | |
| ) | |
| except (ImportError, OSError): | |
| # Fallback if models not available | |
| self.nlp_en = None | |
| self.nlp_zh = None | |
| self.logger.warning("spaCy models not available, using fallback methods.") | |
| self.intent_classifier = MultinomialNB() | |
| self.vectorizer = TfidfVectorizer() | |
| self.trained = False | |
| def detect_language(self, text): | |
| # Simple Chinese Character detection | |
| chinese_chars = len(re.findall(r"[\u4e00-\u9fff]", text)) | |
| return "zh" if chinese_chars > 0 else "en" | |
| def preprocess_text(self, text): | |
| # Clean and normalize text | |
| text = text.lower().strip() | |
| text = re.sub(r"[^\w\s\u4e00-\u9fff]", "", text) | |
| return text | |
| def extract_entities(self, text, language="en"): | |
| entities = {} | |
| # Simplified entity extraction without spaCy if needed | |
| if language == "zh" and self.nlp_zh: | |
| nlp = self.nlp_zh | |
| elif self.nlp_en: | |
| nlp = self.nlp_en | |
| else: | |
| nlp = None | |
| if nlp: | |
| # Use spaCy for entity extraction | |
| doc = nlp(text) | |
| for ent in doc.ents: | |
| entities[ent.label_.lower()] = ent.text | |
| # Always do regex-based extraction (works with or without spaCy) | |
| # Extract order numbers (4-6 digits) | |
| order_pattern = r"\b\d{4,6}\b" | |
| orders = re.findall(order_pattern, text) | |
| if orders: | |
| entities["order_number"] = orders[0] | |
| # Extract email addresses | |
| email_pattern = r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b" | |
| emails = re.findall(email_pattern, text) | |
| if emails: | |
| entities["email"] = emails[0] | |
| # Extract phone numbers (basic pattern) | |
| phone_pattern = r"\b\d{3}-\d{3}-\d{4}\b|\b\d{10}\b|\(\d{3}\)\s*\d{3}-\d{4}" | |
| phones = re.findall(phone_pattern, text) | |
| if phones: | |
| entities["phone"] = phones[0] | |
| # Extract money amounts | |
| money_pattern = r"\$\d+(?:\.\d{2})?|\d+(?:\.\d{2})?\s*(?:dollars?|USD)" | |
| money = re.findall(money_pattern, text, re.IGNORECASE) | |
| if money: | |
| entities["amount"] = money[0] | |
| # Extract dates (basic patterns) | |
| date_patterns = [ | |
| r"\b\d{1,2}/\d{1,2}/\d{4}\b", # MM/DD/YYYY | |
| r"\b\d{4}-\d{1,2}-\d{1,2}\b", # YYYY-MM-DD | |
| r"\b(?:today|tomorrow|yesterday)\b", # Relative dates | |
| ] | |
| for pattern in date_patterns: | |
| dates = re.findall(pattern, text, re.IGNORECASE) | |
| if dates: | |
| entities["date"] = dates[0] | |
| break | |
| # Extract product names (simple approach - capitalized words) | |
| if language == "en": | |
| product_pattern = r"\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b" | |
| products = re.findall(product_pattern, text) | |
| # Filter out common words | |
| common_words = {"Hello", "Please", "Thank", "Could", "Would", "Should"} | |
| products = [p for p in products if p not in common_words] | |
| if products: | |
| entities["product"] = products[0] | |
| return entities | |
| def train_intent_classifier(self, training_data): | |
| self.logger.info("Starting intent classifier training...") | |
| texts = [] | |
| labels = [] | |
| for intent, examples in training_data.items(): | |
| for example in examples: | |
| texts.append(self.preprocess_text(example)) | |
| labels.append(intent) | |
| X = self.vectorizer.fit_transform(texts) | |
| self.intent_classifier.fit(X, labels) | |
| self.trained = True | |
| self.logger.info( | |
| "Intent classifier trained with %d examples across %d intents", | |
| len(texts), | |
| len(training_data), | |
| ) | |
| def classify_intent(self, text): | |
| if not self.trained: | |
| self.logger.error("Intent classifier not trained!") | |
| return "unknown", 0.0 | |
| processed_text = self.preprocess_text(text) | |
| self.logger.debug("Classifying intent for: %s", processed_text) | |
| # Add keyword-based rules for better accuracy | |
| if any( | |
| word in processed_text | |
| for word in ["buy", "purchase", "want to buy", "interested in"] | |
| ): | |
| if not any( | |
| word in processed_text | |
| for word in ["cancel", "stop", "remove", "refund"] | |
| ): | |
| self.logger.debug("Keyword-based classification: product_inquiry") | |
| return "product_inquiry", 0.95 | |
| if any( | |
| word in processed_text for word in ["cancel", "stop", "remove", "refund"] | |
| ): | |
| self.logger.debug("Keyword-based classification: cancel_order") | |
| return "cancel_order", 0.9 | |
| # Fall back to ML classification | |
| X = self.vectorizer.transform([processed_text]) | |
| intent = self.intent_classifier.predict(X)[0] | |
| confidence = max(self.intent_classifier.predict_proba(X)[0]) | |
| self.logger.debug( | |
| "ML classification: %s (confidence: %.2f)", intent, confidence | |
| ) | |
| # Convert NumPy types to Python Types | |
| return intent, confidence | |