File size: 6,658 Bytes
c4d486b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import re
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB

# Absolute import from project root
from app.utils.config import get_logger


class NLPEngine:
    def __init__(self, config=None):
        self.config = config
        self.logger = get_logger(__name__)

        self.confidence_threshold = (
            config.nlp["confidence_threshold"] if config else 0.5
        )
        self.logger.info(
            "NLP Engine initialized with confidence threshold: %s",
            self.confidence_threshold,
        )

        self.nlp_en = None
        self.nlp_zh = None

        try:
            import spacy

            # Load English model
            try:
                self.nlp_en = spacy.load("en_core_web_sm")
                self.logger.info("English spaCy models loaded successfully")
            except OSError:
                self.logger.warning("English spaCy model not available")

            # Load Chinese model (optional)
            try:
                self.nlp_zh = spacy.load("zh_core_web_sm")
                self.logger.info("Chinese spaCy models loaded successfully")
            except OSError:
                self.logger.info("Chinese spaCy model not available")

            if not self.nlp_en and not self.nlp_zh:
                self.logger.warning(
                    "No spaCy models available, using fallback methods."
                )

        except (ImportError, OSError):
            # Fallback if models not available
            self.nlp_en = None
            self.nlp_zh = None
            self.logger.warning("spaCy models not available, using fallback methods.")

        self.intent_classifier = MultinomialNB()
        self.vectorizer = TfidfVectorizer()
        self.trained = False

    def detect_language(self, text):
        # Simple Chinese Character detection
        chinese_chars = len(re.findall(r"[\u4e00-\u9fff]", text))
        return "zh" if chinese_chars > 0 else "en"

    def preprocess_text(self, text):
        # Clean and normalize text
        text = text.lower().strip()
        text = re.sub(r"[^\w\s\u4e00-\u9fff]", "", text)
        return text

    def extract_entities(self, text, language="en"):
        entities = {}

        # Simplified entity extraction without spaCy if needed
        if language == "zh" and self.nlp_zh:
            nlp = self.nlp_zh
        elif self.nlp_en:
            nlp = self.nlp_en
        else:
            nlp = None

        if nlp:
            # Use spaCy for entity extraction
            doc = nlp(text)
            for ent in doc.ents:
                entities[ent.label_.lower()] = ent.text

        # Always do regex-based extraction (works with or without spaCy)

        # Extract order numbers (4-6 digits)
        order_pattern = r"\b\d{4,6}\b"
        orders = re.findall(order_pattern, text)
        if orders:
            entities["order_number"] = orders[0]
        # Extract email addresses
        email_pattern = r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b"
        emails = re.findall(email_pattern, text)
        if emails:
            entities["email"] = emails[0]

        # Extract phone numbers (basic pattern)
        phone_pattern = r"\b\d{3}-\d{3}-\d{4}\b|\b\d{10}\b|\(\d{3}\)\s*\d{3}-\d{4}"
        phones = re.findall(phone_pattern, text)
        if phones:
            entities["phone"] = phones[0]

        # Extract money amounts
        money_pattern = r"\$\d+(?:\.\d{2})?|\d+(?:\.\d{2})?\s*(?:dollars?|USD)"
        money = re.findall(money_pattern, text, re.IGNORECASE)
        if money:
            entities["amount"] = money[0]

        # Extract dates (basic patterns)
        date_patterns = [
            r"\b\d{1,2}/\d{1,2}/\d{4}\b",  # MM/DD/YYYY
            r"\b\d{4}-\d{1,2}-\d{1,2}\b",  # YYYY-MM-DD
            r"\b(?:today|tomorrow|yesterday)\b",  # Relative dates
        ]

        for pattern in date_patterns:
            dates = re.findall(pattern, text, re.IGNORECASE)
            if dates:
                entities["date"] = dates[0]
                break

        # Extract product names (simple approach - capitalized words)
        if language == "en":
            product_pattern = r"\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b"
            products = re.findall(product_pattern, text)
            # Filter out common words
            common_words = {"Hello", "Please", "Thank", "Could", "Would", "Should"}
            products = [p for p in products if p not in common_words]
            if products:
                entities["product"] = products[0]

        return entities

    def train_intent_classifier(self, training_data):
        self.logger.info("Starting intent classifier training...")
        texts = []
        labels = []

        for intent, examples in training_data.items():
            for example in examples:
                texts.append(self.preprocess_text(example))
                labels.append(intent)

        X = self.vectorizer.fit_transform(texts)
        self.intent_classifier.fit(X, labels)
        self.trained = True

        self.logger.info(
            "Intent classifier trained with %d examples across %d intents",
            len(texts),
            len(training_data),
        )

    def classify_intent(self, text):
        if not self.trained:
            self.logger.error("Intent classifier not trained!")
            return "unknown", 0.0

        processed_text = self.preprocess_text(text)
        self.logger.debug("Classifying intent for: %s", processed_text)

        # Add keyword-based rules for better accuracy
        if any(
            word in processed_text
            for word in ["buy", "purchase", "want to buy", "interested in"]
        ):
            if not any(
                word in processed_text
                for word in ["cancel", "stop", "remove", "refund"]
            ):
                self.logger.debug("Keyword-based classification: product_inquiry")
                return "product_inquiry", 0.95

        if any(
            word in processed_text for word in ["cancel", "stop", "remove", "refund"]
        ):
            self.logger.debug("Keyword-based classification: cancel_order")
            return "cancel_order", 0.9

        # Fall back to ML classification
        X = self.vectorizer.transform([processed_text])
        intent = self.intent_classifier.predict(X)[0]
        confidence = max(self.intent_classifier.predict_proba(X)[0])

        self.logger.debug(
            "ML classification: %s (confidence: %.2f)", intent, confidence
        )

        # Convert NumPy types to Python Types
        return intent, confidence