Spaces:
Sleeping
Sleeping
| import sys | |
| import json | |
| import asyncio | |
| import base64 | |
| import re | |
| import time | |
| import unicodedata | |
| import random | |
| import os | |
| from pathlib import Path | |
| CURRENT_FILE = Path(__file__).resolve() | |
| CORE_DIR = CURRENT_FILE.parent | |
| AI_DIR = CORE_DIR.parent | |
| DATA_DIR = AI_DIR / "data" | |
| STRATEGY_DIR = AI_DIR / "strategy" | |
| # Thêm CORE_DIR vào sys.path để tìm được file analyze.py nằm ngay cạnh | |
| sys.path.append(str(CORE_DIR)) | |
| sys.path.append(str(AI_DIR)) | |
| sys.path.append(str(DATA_DIR)) | |
| sys.path.append(str(STRATEGY_DIR)) | |
| # --- BÂY GIỜ MỚI IMPORT CÁC MODULE Ở FOLDER CHA --- | |
| try: | |
| from services import AIServices | |
| from database import db | |
| except ImportError as e: | |
| # Fallback nếu chạy từ root | |
| try: | |
| from backend.ai.services import AIServices | |
| from backend.ai.database import db | |
| except ImportError: | |
| print(f"❌ Lỗi Import: Không tìm thấy services hoặc database. {e}") | |
| sys.exit(1) | |
| # --- IMPORT MODULE VỆ TINH (CÓ DỰ PHÒNG) --- | |
| try: | |
| from analyze import VN_VoiceBot_Parallel | |
| from data_engine import DataEngine | |
| from strategy_competitor import CompetitorStrategy | |
| from strategy_low_data import LowDataStrategy | |
| from strategy_network import NetworkStrategy | |
| print(" [SYSTEM] Đã load đầy đủ các module vệ tinh.") | |
| except ImportError as e: | |
| print(f"⚠️ CẢNH BÁO: Thiếu file vệ tinh ({e}). Đang kích hoạt chế độ DỰ PHÒNG.") | |
| # --- TẠO CLASS GIẢ ĐỂ TRÁNH LỖI NAME ERROR --- | |
| class VN_VoiceBot_Parallel: | |
| def analyze_sentiment(self, text): | |
| return 0.0, "neutral" # Trả về cảm xúc trung tính | |
| def classify_issue(self, text): | |
| return None # Không phát hiện vấn đề gì | |
| # Nếu thiếu DataEngine thì tạo giả luôn (nếu cần) | |
| if 'DataEngine' not in locals(): | |
| class DataEngine: | |
| def __init__(self, *args): pass | |
| def get_full_context(self, cid): return {} | |
| # Nếu thiếu Strategy thì tạo giả | |
| class DummyStrategy: | |
| def __init__(self, *args): pass | |
| def execute_stream_gen(self, *args): | |
| async def gen(): yield "Dạ em nghe rồi ạ." | |
| return gen() | |
| def execute(self, *args): return "Dạ, em xin ghi nhận ý kiến ạ." | |
| if 'CompetitorStrategy' not in locals(): CompetitorStrategy = DummyStrategy | |
| if 'LowDataStrategy' not in locals(): LowDataStrategy = DummyStrategy | |
| if 'NetworkStrategy' not in locals(): NetworkStrategy = DummyStrategy | |
| # --- ADAPTER (GIỮ NGUYÊN) --- | |
| class GeminiAdapter: | |
| def __init__(self, ai_service): | |
| self.ai_service = ai_service | |
| self.main_loop = None | |
| def set_main_loop(self, loop): | |
| self.main_loop = loop | |
| def generate_content(self, prompt): | |
| if self.main_loop and self.main_loop.is_running(): | |
| future = asyncio.run_coroutine_threadsafe( | |
| self.ai_service.chat_gemini_fallback(prompt), | |
| self.main_loop | |
| ) | |
| try: text = future.result(timeout=10) | |
| except Exception: text = "Dạ, em nghe rõ ạ." | |
| return type('obj', (object,), {'text': text})() | |
| else: | |
| return type('obj', (object,), {'text': "Dạ vâng ạ."})() | |
| # --- CONTROLLER CHÍNH --- | |
| class TelesalesAgent: | |
| def __init__(self): | |
| print("\n" + "="*40) | |
| print(" [SYSTEM] KHỞI ĐỘNG LOGIC v3.2 (FIX DASHBOARD UPDATE)") | |
| print(f" [DEBUG] Logic Path: {CORE_DIR}") | |
| print(f" [DEBUG] Data Path: {DATA_DIR}") | |
| print("="*40 + "\n") | |
| self.ai_service = AIServices() | |
| self.analyzer = VN_VoiceBot_Parallel() | |
| # --- [SỬA ĐOẠN NÀY] DÙNG ĐƯỜNG DẪN ĐÃ TÍNH TOÁN Ở TRÊN --- | |
| csv_path = DATA_DIR / "test_customer.csv" | |
| json_path = DATA_DIR / "product_collection.json" | |
| # Kiểm tra file tồn tại chưa để báo lỗi rõ ràng | |
| if not csv_path.exists(): print(f"❌ KHÔNG TÌM THẤY: {csv_path}") | |
| if not json_path.exists(): print(f"❌ KHÔNG TÌM THẤY: {json_path}") | |
| # Truyền đường dẫn (chuyển về string) vào DataEngine | |
| self.data_engine = DataEngine(str(csv_path), str(json_path)) | |
| # --------------------------------------------------------- | |
| self.adapter = GeminiAdapter(self.ai_service) | |
| self.strategies = { | |
| "Đối thủ": CompetitorStrategy(self.adapter, self.data_engine), | |
| "Ít data": LowDataStrategy(self.adapter, self.data_engine), | |
| "Mạng nghẽn": NetworkStrategy(self.adapter, self.data_engine) | |
| } | |
| self.sales_data = {} | |
| self._load_scripts() | |
| self.sessions = {} | |
| self.MSG_FALLBACK = "Xin lỗi khách hàng nhưng trường hợp này chưa được tích hợp trong MVP. Xin hãy sử dụng những tình huống có trong kịch bản cung cấp." | |
| self.MSG_CLOSING = "Cảm ơn vì đã dành thời gian để nghe tư vấn, đội kỹ thuật sẽ liên hệ với {pronoun} để tích hợp gói cước sau ạ." | |
| self.MSG_MVP_END = "Đây là toàn bộ phần MVP của \"Quarter Zip.\"" | |
| self.sentence_split_regex = re.compile(r'(?<=[.?!;])\s+') | |
| def _load_scripts(self): | |
| try: | |
| # --- [SỬA ĐOẠN NÀY] --- | |
| script_path = DATA_DIR / 'final_voice_scripts.json' | |
| if not script_path.exists(): | |
| print(f"⚠️ Warning: Không thấy file kịch bản tại {script_path}") | |
| return | |
| with open(script_path, 'r', encoding='utf-8') as f: | |
| scripts = json.load(f) | |
| for item in scripts: | |
| self.sales_data[str(item['customer_id'])] = item | |
| print(f" [DATA] Đã load {len(self.sales_data)} kịch bản.") | |
| except Exception as e: | |
| print(f" [DATA ERROR] Lỗi đọc script: {e}") | |
| # --- HÀM HELPER GIỮ NGUYÊN --- | |
| def _create_metrics_payload(self, stt_latency, start_process_time, sentiment, intent): | |
| return json.dumps({ | |
| "type": "metrics_update", | |
| "data": { | |
| "latency_stt": round(stt_latency, 2), | |
| "latency_logic": round(time.time() - start_process_time, 2), | |
| "sentiment": round(sentiment, 2), | |
| "intent": intent | |
| } | |
| }) + "\n" | |
| # --- [MỚI] HÀM LƯU DB RIÊNG BIỆT (GỌI SỚM) --- | |
| def _save_call_to_db(self, cid, session, sentiment_score, intent, is_upsell=False): | |
| try: | |
| # Tính thời gian (Mock nếu session quá ngắn) | |
| session_start = session.get('start_time', time.time() - 30) | |
| duration = time.time() - session_start | |
| if duration < 5: duration = 30 | |
| cost_vnd = int(1000 + (duration * 50)) | |
| sent_label = "neutral" | |
| if sentiment_score >= 0.4: sent_label = "positive" | |
| elif sentiment_score <= -0.4: sent_label = "negative" | |
| csat_val = 5 if sent_label == "positive" else (2 if sent_label == "negative" else 4) | |
| # Map intent sang chuẩn DB | |
| intent_mapping = { | |
| "Mạng nghẽn": "network_issue", | |
| "Mạng yếu": "network_issue", | |
| "Ít data": "low_data", | |
| "Hủy gói": "cancel_package", | |
| "Đối thủ": "competitor", | |
| "Giá cao": "competitor", | |
| "Buying Signal": "network_issue", | |
| "General": "network_issue", | |
| "Angry/Handover": "cancel_package", | |
| "Fallback": "general" | |
| } | |
| final_intent = intent_mapping.get(intent, "general") | |
| db.add_call( | |
| customer_id=cid, | |
| duration=int(duration), | |
| intent=final_intent, | |
| sentiment=sent_label, | |
| ai_resolved=True, | |
| upsell=is_upsell, | |
| cost={'value': cost_vnd, 'csat': csat_val} | |
| ) | |
| print(f" [DB SAVED] Đã lưu cuộc gọi {cid} vào Database. Intent: {final_intent}") | |
| except Exception as e: | |
| print(f" [DB ERROR] Lỗi lưu DB: {e}") | |
| # --- HÀM PROCESS_STREAM --- | |
| async def process_stream(self, customer_id_str, user_audio): | |
| cid = str(customer_id_str) | |
| current_loop = asyncio.get_running_loop() | |
| self.adapter.set_main_loop(current_loop) | |
| # 1. INIT SESSION | |
| if cid not in self.sessions or user_audio is None: | |
| print(f"\n--- NEW SESSION: {cid} ---") | |
| ctx = self.data_engine.get_full_context(cid) | |
| cust_data = ctx['customer'] if ctx else {} | |
| cust_name = cust_data.get('Display_Name', 'Quý khách') | |
| gender_raw = cust_data.get('Gender', '').lower().strip() | |
| if gender_raw in ['nam', 'male', 'trai', 'mr', 'anh']: pronoun = "Anh" | |
| elif gender_raw in ['nu', 'nữ', 'female', 'gái', 'ms', 'mrs', 'chị']: pronoun = "Chị" | |
| else: pronoun = "Mình" | |
| self.sessions[cid] = { | |
| 'step': 1, | |
| 'history': [], | |
| 'pronoun': pronoun, | |
| 'start_time': time.time(), | |
| 'detected_intent': "General" | |
| } | |
| greeting = f"Dạ, em chào {pronoun} {cust_name}, em là nhân viên CSKH VNPT. Em thấy {pronoun.lower()} đang dùng dịch vụ bên em, không biết quá trình sử dụng có ổn định không ạ?" | |
| greeting = self._normalize_pronouns(greeting, pronoun) | |
| async for chunk in self._stream_text_and_audio(greeting): yield chunk | |
| return | |
| session = self.sessions[cid] | |
| step = session['step'] | |
| current_pronoun = session.get('pronoun', 'Mình') | |
| detected_intent = session.get('detected_intent', 'General') | |
| # [FILLER] | |
| if user_audio: | |
| filler_options = [ | |
| "Dạ, {p} chờ em một chút nhé.", | |
| "Dạ vâng, để em kiểm tra trên hệ thống giúp {p} ngay ạ.", | |
| "Dạ em nghe rõ rồi ạ, {p} đợi em xíu nhé.", | |
| "Vâng ạ, em đang tra cứu thông tin cho {p} đây ạ.", | |
| "Dạ, {p} giữ máy giúp em vài giây nhé." | |
| ] | |
| chosen_template = random.choice(filler_options) | |
| filler_text = chosen_template.format(p=current_pronoun.lower()) | |
| async for chunk in self._stream_text_and_audio(filler_text): yield chunk | |
| # 2. STT | |
| start_stt = time.time() | |
| user_text = await self.ai_service.speech_to_text(user_audio) | |
| stt_latency = time.time() - start_stt | |
| if not user_text: | |
| async for chunk in self._stream_text_and_audio("Alo, em không nghe rõ, nói lại giúp em nhé?"): yield chunk | |
| return | |
| user_text = unicodedata.normalize('NFC', user_text) | |
| print(f" [USER] {user_text}") | |
| session['history'].append(f"User: {user_text}") | |
| yield json.dumps({"user_text": user_text}) + "\n" | |
| bot_reply_accumulated = "" | |
| end_of_mvp = False | |
| start_process = time.time() | |
| sentiment_score = 0.0 | |
| # --- LOGIC XỬ LÝ --- | |
| triggers_agree = ["đồng ý", "đăng ký", "ok", "chốt", "lấy gói này", "được đấy", "nhất trí"] | |
| is_agreed = False | |
| if any(w in user_text.lower() for w in triggers_agree): | |
| is_agreed = True | |
| negation_patterns = [r"không\s+.*đăng\s+ký", r"không\s+.*đồng\s+ý", r"không\s+.*chốt", r"chưa\s+.*đăng\s+ký", r"không\s+cần", r"không\s+muốn"] | |
| for p in negation_patterns: | |
| if re.search(p, user_text.lower()): is_agreed = False; break | |
| if is_agreed: | |
| print(" [ACTION] Khách đồng ý -> Chốt đơn.") | |
| response_text = self.MSG_CLOSING.format(pronoun=current_pronoun) | |
| end_of_mvp = True; session['step'] = 0 | |
| sentiment_score = 1.0 | |
| # [LƯU DB NGAY LẬP TỨC] | |
| self._save_call_to_db(cid, session, sentiment_score, "Buying Signal", is_upsell=True) | |
| yield self._create_metrics_payload(stt_latency, start_process, sentiment_score, detected_intent) | |
| response_text = self._normalize_pronouns(response_text, current_pronoun) | |
| async for chunk in self._stream_text_and_audio(response_text): yield chunk | |
| bot_reply_accumulated = response_text | |
| else: | |
| sentiment_score, _ = self.analyzer.analyze_sentiment(user_text) | |
| toxic_keywords = ["lừa đảo", "mất dạy", "biến đi", "cút", "hủy ngay", "cắt ngay", "dẹp ngay", "hủy gói", "huỷ gói", "chán ngấy", "không thể chịu", "bực mình", "ức chế"] | |
| is_toxic_confirmed = False | |
| if any(kw in user_text.lower() for kw in toxic_keywords): | |
| print(f" [OVERRIDE] Phát hiện từ khóa TOXIC/HỦY -> Ép sentiment về -5.0.") | |
| sentiment_score = -5.0 | |
| is_toxic_confirmed = True | |
| if not is_toxic_confirmed: | |
| sales_keywords = ["đắt", "giá", "cước", "tiền", "so sánh", "tốc độ", "dung lượng", "nhà mạng", "ưu đãi", "gói", "lý do"] | |
| if any(kw in user_text.lower() for kw in sales_keywords): sentiment_score = 0.5 | |
| if sentiment_score <= -0.8: | |
| print(f" [ALERT] Toxic/Huỷ -> Chuyển tư vấn viên.") | |
| response_text = f"Dạ, em thành thật xin lỗi vì trải nghiệm chưa tốt của {current_pronoun.lower()}. Vấn đề này vượt quá thẩm quyền của em, em sẽ nối máy ngay tới chuyên viên cấp cao để hỗ trợ {current_pronoun.lower()} xử lý yêu cầu ạ." | |
| detected_intent = "Angry/Handover" | |
| end_of_mvp = True; session['step'] = 0 | |
| # [LƯU DB NGAY LẬP TỨC] | |
| self._save_call_to_db(cid, session, sentiment_score, detected_intent, is_upsell=False) | |
| yield self._create_metrics_payload(stt_latency, start_process, sentiment_score, detected_intent) | |
| response_text = self._normalize_pronouns(response_text, current_pronoun) | |
| async for chunk in self._stream_text_and_audio(response_text): yield chunk | |
| bot_reply_accumulated = response_text | |
| elif re.search(r"(không\s+cần|không\s+muốn|không\s+đăng\s+ký|thôi\s+em)", user_text.lower()): | |
| print(f" [INTENT] Khách từ chối -> Kết thúc.") | |
| response_text = f"Dạ vâng ạ, em cảm ơn {current_pronoun.lower()} đã dành thời gian trao đổi. Em chào {current_pronoun.lower()} ạ." | |
| end_of_mvp = True; session['step'] = 0 | |
| # [LƯU DB NGAY LẬP TỨC] | |
| self._save_call_to_db(cid, session, sentiment_score, "cancel_package", is_upsell=False) | |
| yield self._create_metrics_payload(stt_latency, start_process, sentiment_score, detected_intent) | |
| response_text = self._normalize_pronouns(response_text, current_pronoun) | |
| async for chunk in self._stream_text_and_audio(response_text): yield chunk | |
| bot_reply_accumulated = response_text | |
| else: | |
| detected = self.analyzer.classify_issue(user_text) | |
| issue = detected[0].split(" ('")[0] if detected else None | |
| if issue: | |
| detected_intent = issue | |
| session['detected_intent'] = issue | |
| if issue and issue in self.strategies: | |
| print(f" [PIPELINE] Kích hoạt Streaming Strategy: {issue}") | |
| if step == 1: session['step'] = 2 | |
| # [LƯU DB NGAY LẬP TỨC - Coi như đã nhận diện được vấn đề] | |
| #self._save_call_to_db(cid, session, sentiment_score, detected_intent, is_upsell=False) | |
| # Check có hàm stream không để gọi (Fix lỗi AttributeError) | |
| strategy_obj = self.strategies[issue] | |
| if hasattr(strategy_obj, 'execute_stream_gen'): | |
| stream_gen = strategy_obj.execute_stream_gen(cid, user_text) | |
| else: | |
| # Fallback cho strategy cũ | |
| async def fallback_stream(): | |
| res = await asyncio.to_thread(strategy_obj.execute, cid, user_text) | |
| yield res | |
| stream_gen = fallback_stream() | |
| buffer = "" | |
| yield self._create_metrics_payload(stt_latency, start_process, sentiment_score, detected_intent) | |
| async for chunk_text in stream_gen: | |
| buffer += chunk_text | |
| if " " in chunk_text or len(buffer) > 10: | |
| yield json.dumps({"bot_text": chunk_text}) + "\n" | |
| parts = self.sentence_split_regex.split(buffer) | |
| if len(parts) > 1: | |
| for i in range(len(parts) - 1): | |
| sentence = parts[i].strip() | |
| if sentence: | |
| sentence = self._normalize_pronouns(sentence, current_pronoun) | |
| bot_reply_accumulated += sentence + " " | |
| print(f" 🌊 [Flow] Gửi TTS câu: {sentence[:30]}...") | |
| async for audio_chunk in self._stream_audio_only(sentence): yield audio_chunk | |
| buffer = parts[-1] | |
| if buffer.strip(): | |
| buffer = self._normalize_pronouns(buffer, current_pronoun) | |
| bot_reply_accumulated += buffer | |
| print(f" 🌊 [Flow] Gửi TTS câu cuối: {buffer[:30]}...") | |
| async for chunk in self._stream_audio_only(buffer): yield chunk | |
| elif step == 2 and self.sales_data.get(cid): | |
| response_text = self.sales_data.get(cid)['script_greeting'] | |
| yield self._create_metrics_payload(stt_latency, start_process, sentiment_score, detected_intent) | |
| response_text = self._normalize_pronouns(response_text, current_pronoun) | |
| async for chunk in self._stream_text_and_audio(response_text): yield chunk | |
| bot_reply_accumulated = response_text | |
| else: | |
| if "đắt" in user_text.lower() or "giá" in user_text.lower(): | |
| if hasattr(self.strategies["Đối thủ"], 'execute'): | |
| response_text = await asyncio.to_thread(self.strategies["Đối thủ"].execute, cid, user_text) | |
| else: | |
| response_text = "Dạ bên em có nhiều ưu đãi hơn ạ." | |
| else: | |
| print(" [MVP] Out of scope -> Fallback Message.") | |
| response_text = self.MSG_FALLBACK | |
| detected_intent = "Fallback/Out of Scope" | |
| yield self._create_metrics_payload(stt_latency, start_process, sentiment_score, detected_intent) | |
| # [LƯU DB NGAY LẬP TỨC - Trường hợp Fallback] | |
| #self._save_call_to_db(cid, session, sentiment_score, detected_intent, is_upsell=False) | |
| response_text = self._normalize_pronouns(response_text, current_pronoun) | |
| async for chunk in self._stream_text_and_audio(response_text): yield chunk | |
| bot_reply_accumulated = response_text | |
| print(f" [BOT FINAL] {bot_reply_accumulated[:100]}...") | |
| if end_of_mvp: | |
| print(f" [MVP END] Gửi thông báo màn hình: {self.MSG_MVP_END}") | |
| yield json.dumps({ "bot_text": self.MSG_MVP_END, "end_session": True }) + "\n" | |
| def _normalize_pronouns(self, text, target_pronoun): | |
| candidates = ["quý khách", "bạn", "mình", "anh", "chị", "các bạn", "khách hàng", "khách"] | |
| for word in candidates: | |
| if word.lower() == target_pronoun.lower(): continue | |
| pattern = r'\b' + re.escape(word) + r'\b' | |
| text = re.sub(pattern, target_pronoun, text, flags=re.IGNORECASE) | |
| for _ in range(3): | |
| pattern = r'(?i)\b' + re.escape(target_pronoun) + r'\s*(?:[.,;]\s*)?' + re.escape(target_pronoun) + r'\b' | |
| text = re.sub(pattern, target_pronoun, text) | |
| text = text.strip() | |
| if text and text.lower().startswith(target_pronoun.lower()): | |
| text = target_pronoun.capitalize() + text[len(target_pronoun):] | |
| return text | |
| async def _stream_text_and_audio(self, full_text): | |
| yield json.dumps({"bot_text": full_text}) + "\n" | |
| sentences = re.split(r'(?<=[.?!])\s+', full_text) | |
| buffer = "" | |
| for s in sentences: | |
| if not s.strip(): continue | |
| buffer += s + " " | |
| if len(buffer) > 10 or s == sentences[-1]: | |
| async for audio_chunk in self._stream_audio_only(buffer): yield audio_chunk | |
| buffer = "" | |
| async def _stream_audio_only(self, text): | |
| if not text.strip(): return | |
| try: | |
| audio_bytes = await self.ai_service.text_to_speech(text) | |
| if audio_bytes: | |
| b64 = base64.b64encode(audio_bytes).decode("utf-8") | |
| yield json.dumps({"audio_base64": b64}) + "\n" | |
| except: pass | |
| agent = TelesalesAgent() |