Spaces:
Sleeping
Sleeping
| """ | |
| Two-Stage Semantic Cache với MongoDB Atlas Vector Search. | |
| Kiến trúc 3 lớp: | |
| - Lớp 0 (Normalize): LLM chuẩn hóa câu hỏi → dạng cốt lõi (tránh miss do paraphrase) | |
| - Lớp 1 (Recall): Vector Search tìm câu hỏi tương đồng (cosine > 0.80) | |
| - Lớp 2 (Precision): NER + Regex so sánh thực thể (số, địa danh, tên người) | |
| Cache HIT → trả kết quả cũ trong < 3 giây | |
| Cache MISS → chạy Multi-Agent pipeline → lưu kết quả mới vào DB | |
| """ | |
| import os | |
| import re | |
| from datetime import datetime, timedelta, timezone | |
| from typing import Any | |
| from dotenv import load_dotenv | |
| from pathlib import Path | |
| from pymongo import MongoClient | |
| from pymongo.errors import PyMongoError | |
| from sentence_transformers import SentenceTransformer # type: ignore[import-untyped] | |
| from underthesea import ner # type: ignore[import-untyped] | |
| from utils.logger import get_logger, log_agent_step | |
| # Load .env | |
| load_dotenv(Path(__file__).resolve().parent.parent / ".env") | |
| logger = get_logger("Cache.Mongo") | |
| # ============================================================ | |
| # Constants | |
| # ============================================================ | |
| SIMILARITY_THRESHOLD = 0.88 | |
| CACHE_TTL_DAYS = 7 | |
| VECTOR_INDEX_NAME = "vector_index" | |
| DB_NAME = "FakeNewsDB" | |
| COLLECTION_NAME = "CacheLogs" | |
| class MongoSemanticCache: | |
| """ | |
| Two-Stage Semantic Cache cho hệ thống kiểm chứng tin tức. | |
| Stage 1: Embedding + Vector Search (MongoDB Atlas) → tìm candidate | |
| Stage 2: NER entity comparison → xác nhận chính xác | |
| Usage: | |
| cache = MongoSemanticCache() | |
| result = cache.check_cache("Bộ TC đề xuất thuế 5%...") | |
| if result["hit"]: | |
| print(result["data"]) # Trả kết quả cũ | |
| else: | |
| verdict = run_pipeline(...) | |
| cache.save_to_cache(query, verdict) | |
| """ | |
| def __init__(self) -> None: | |
| """ | |
| Khởi tạo kết nối MongoDB và load Embedding model. | |
| Raises: | |
| ValueError: Nếu MONGODB_URI chưa được cấu hình | |
| """ | |
| # --- MongoDB --- | |
| mongo_uri = os.getenv("MONGODB_URI", "") | |
| if not mongo_uri: | |
| raise ValueError( | |
| "MONGODB_URI chưa được cấu hình.\n" | |
| "Mở file .env và thêm dòng:\n" | |
| "MONGODB_URI=mongodb+srv://<user>:<pass>@<cluster>.mongodb.net/" | |
| ) | |
| try: | |
| self.client: MongoClient = MongoClient(mongo_uri, serverSelectionTimeoutMS=5000) | |
| # Ping kiểm tra kết nối ngay | |
| self.client.admin.command("ping") | |
| self.db = self.client[DB_NAME] | |
| self.collection = self.db[COLLECTION_NAME] | |
| # --- TTL Index: tự động xóa document khi expires_at đã qua --- | |
| self.collection.create_index( | |
| "expires_at", | |
| expireAfterSeconds=0, | |
| name="ttl_expires_at", | |
| ) | |
| logger.info(f"[Cache] Connected to MongoDB Atlas — {DB_NAME}.{COLLECTION_NAME}") | |
| logger.info("[Cache] TTL index ensured — documents auto-delete after 7 days") | |
| except PyMongoError as e: | |
| logger.error(f"[Cache] MongoDB connection failed: {e}") | |
| raise | |
| # --- Embedding Model --- | |
| # QUAN TRỌNG: Revert lại bản vietnamese-sbert theo yêu cầu của anh để khớp với DB cũ | |
| import gc | |
| logger.info("[Cache] Loading embedding model: keepitreal/vietnamese-sbert") | |
| self.embedder: SentenceTransformer = SentenceTransformer("keepitreal/vietnamese-sbert") | |
| gc.collect() # Buộc Python dọn dẹp RAM dư thừa sau khi load model nặng | |
| logger.info("[Cache] Embedding model loaded successfully") | |
| # --- LLM cho Query Normalization (dùng Groq, siêu nhanh ~1s) --- | |
| from agents.query_agent import QueryAgent | |
| self.normalizer_agent = QueryAgent() | |
| logger.info("[Cache] Query normalizer Agent loaded (AGENT1/Groq)") | |
| # ============================================================ | |
| # Stage 0: Query Normalization (LLM-based) | |
| # ============================================================ | |
| _NORMALIZE_PROMPT = ( | |
| "Trích xuất các sự kiện cốt lõi từ tin đồn sau thành 1 câu ngắn gọn duy nhất.\n" | |
| "BẮT BUỘC giữ lại nguyên vẹn TẤT CẢ các con số (số tiền, ngày tháng, phần trăm...).\n" | |
| "Giữ lại: chủ thể, hành động chính, thời gian, con số, địa danh, tên tổ chức.\n" | |
| "Bỏ hết: lời kêu gọi, cảm xúc, từ ngữ cường điệu, từ đệm.\n" | |
| "Viết thường, không dấu câu thừa, chỉ trả về đúng 1 câu.\n\n" | |
| 'Tin đồn: "{query}"\n' | |
| "Câu chuẩn hóa:" | |
| ) | |
| def _normalize_query(self, user_query: str) -> str: | |
| """ | |
| Chuẩn hóa câu hỏi bằng LLM (Groq ~1s). | |
| Biến câu dài/cường điệu thành dạng cốt lõi ngắn gọn để embedding chính xác hơn. | |
| VD: "Cảnh báo khẩn cấp: Từ ngày 15/05/2026, tất cả thẻ BHYT bản giấy..." | |
| → "thẻ bhyt giấy hủy bỏ 15/05/2026 phí 150000 vssid tạm dừng quyền lợi" | |
| """ | |
| try: | |
| prompt = self._NORMALIZE_PROMPT.format(query=user_query) | |
| normalized = self.normalizer_agent.call_llm( | |
| system_prompt="Bạn là chuyên gia ngôn ngữ học. Chỉ trả về một câu duy nhất.", | |
| user_prompt=prompt | |
| ) | |
| normalized = normalized.strip().strip('"').strip() | |
| # Fallback nếu LLM trả về rỗng hoặc quá dài | |
| if not normalized or len(normalized) > len(user_query): | |
| logger.warning("[Cache] Normalize returned empty/too long, using raw query") | |
| return user_query | |
| log_agent_step(logger, "Cache", "Normalized query", f"{normalized[:100]}...") | |
| return normalized | |
| except Exception as e: | |
| logger.warning(f"[Cache] Normalize failed, using raw query: {e}") | |
| return user_query | |
| # ============================================================ | |
| # Stage 2: Entity Extraction (NER + Regex) | |
| # ============================================================ | |
| def _extract_entities(self, text: str) -> dict[str, set[str]]: | |
| """ | |
| Bóc tách thực thể từ text bằng Regex (số) + Underthesea NER. | |
| Args: | |
| text: Văn bản cần trích xuất thực thể | |
| Returns: | |
| Dict chứa các set: | |
| { | |
| "nums": {"5", "50000000"}, | |
| "loc": {"việt nam"}, | |
| "org": {"bộ tài chính"}, | |
| "per": set() | |
| } | |
| """ | |
| entities: dict[str, set[str]] = { | |
| "nums": set(), | |
| "loc": set(), | |
| "org": set(), | |
| "per": set(), | |
| } | |
| # --- Regex: trích xuất số có ≥ 2 chữ số (bỏ qua số 1 chữ số vì thường là noise) --- | |
| numbers = re.findall(r'\d{2,}', text) | |
| entities["nums"] = set(numbers) | |
| # --- Regex: trích xuất ngày tháng --- | |
| dates = re.findall(r'\d{1,2}/\d{1,4}(?:/\d{2,4})?', text) | |
| for d in dates: | |
| entities["nums"].add(d) | |
| # --- Underthesea NER --- | |
| try: | |
| ner_results = ner(text) | |
| # ner trả về list of tuples: [(word, pos, chunking, ner_tag), ...] | |
| current_entity = "" | |
| current_tag = "" | |
| for token in ner_results: | |
| word = token[0] | |
| tag = token[3] # NER tag: B-PER, I-PER, B-LOC, I-LOC, B-ORG, I-ORG, O | |
| if tag.startswith("B-"): | |
| # Lưu entity trước đó nếu có | |
| if current_entity and current_tag: | |
| self._add_entity(entities, current_tag, current_entity.strip()) | |
| # Bắt đầu entity mới | |
| current_entity = word | |
| current_tag = tag[2:] # "PER", "LOC", "ORG" | |
| elif tag.startswith("I-") and tag[2:] == current_tag: | |
| # Nối tiếp entity hiện tại | |
| current_entity += " " + word | |
| else: | |
| # Kết thúc entity | |
| if current_entity and current_tag: | |
| self._add_entity(entities, current_tag, current_entity.strip()) | |
| current_entity = "" | |
| current_tag = "" | |
| # Xử lý entity cuối cùng | |
| if current_entity and current_tag: | |
| self._add_entity(entities, current_tag, current_entity.strip()) | |
| except Exception as e: | |
| logger.warning(f"[Cache] NER extraction error: {e}") | |
| return entities | |
| def _add_entity(entities: dict[str, set[str]], tag: str, value: str) -> None: | |
| """Thêm entity vào đúng category.""" | |
| tag_map = {"PER": "per", "LOC": "loc", "ORG": "org"} | |
| key = tag_map.get(tag) | |
| if key: | |
| entities[key].add(value.lower()) | |
| # ============================================================ | |
| # Stage 1: Vector Search (MongoDB Atlas) | |
| # ============================================================ | |
| def check_cache(self, user_query: str) -> dict[str, Any]: | |
| """ | |
| Kiểm tra cache 2 lớp. | |
| Lớp 1: Vector Search tìm candidate (cosine > SIMILARITY_THRESHOLD) | |
| Lớp 2: So sánh entities (nums + loc phải khớp 100%) | |
| Args: | |
| user_query: Câu hỏi/tin đồn từ user | |
| Returns: | |
| {"hit": True, "data": {...verdict...}} nếu cache hit | |
| {"hit": False} nếu cache miss | |
| """ | |
| try: | |
| log_agent_step(logger, "Cache", "Checking cache", f"Query: {user_query[:80]}...") | |
| # --- Lớp 0: Normalize query bằng LLM --- | |
| normalized = self._normalize_query(user_query) | |
| # --- Lớp 1: Embedding + Vector Search (dùng câu đã normalize) --- | |
| query_vector = self.embedder.encode(normalized).tolist() | |
| pipeline = [ | |
| { | |
| "$vectorSearch": { | |
| "index": VECTOR_INDEX_NAME, | |
| "path": "vector", | |
| "queryVector": query_vector, | |
| "numCandidates": 10, | |
| "limit": 3, # Top 3 candidates | |
| } | |
| }, | |
| { | |
| "$addFields": { | |
| "score": {"$meta": "vectorSearchScore"} | |
| } | |
| }, | |
| { | |
| # Lọc bỏ documents đã hết hạn | |
| "$match": { | |
| "expires_at": {"$gte": datetime.now(timezone.utc)} | |
| } | |
| }, | |
| ] | |
| candidates = list(self.collection.aggregate(pipeline)) | |
| if not candidates: | |
| log_agent_step(logger, "Cache", "Cache MISS", "Không tìm thấy candidate nào") | |
| return {"hit": False} | |
| # --- Lớp 2: NER Comparison cho từng candidate --- | |
| new_ent_raw = self._extract_entities(user_query) | |
| new_ent_norm = self._extract_entities(normalized) | |
| new_entities = {k: new_ent_raw.get(k, set()) | new_ent_norm.get(k, set()) for k in new_ent_raw} | |
| log_agent_step( | |
| logger, "Cache", "Extracted entities (query)", | |
| f"nums={new_entities['nums']}, loc={new_entities['loc']}, " | |
| f"org={new_entities['org']}, per={new_entities['per']}" | |
| ) | |
| for candidate in candidates: | |
| score = candidate.get("score", 0.0) | |
| if score < SIMILARITY_THRESHOLD: | |
| log_agent_step( | |
| logger, "Cache", "Score too low", | |
| f"score={score:.4f} < {SIMILARITY_THRESHOLD}" | |
| ) | |
| continue | |
| # Lấy entities đã lưu trong cache | |
| cached_entities_raw = candidate.get("entities", {}) | |
| cached_entities: dict[str, set[str]] = { | |
| k: set(v) for k, v in cached_entities_raw.items() | |
| } | |
| log_agent_step( | |
| logger, "Cache", "Comparing entities", | |
| f"score={score:.4f} | " | |
| f"cached_nums={cached_entities.get('nums', set[str]())}, " | |
| f"cached_loc={cached_entities.get('loc', set[str]())}" | |
| ) | |
| nums_match = ( | |
| not new_entities["nums"] # query mới không có số → skip | |
| or new_entities["nums"].issubset(cached_entities.get("nums", set())) | |
| ) | |
| # Check chống false positive tên người (vd: Tô Lâm vs Volodin) | |
| # Nếu query mới có tên người, phải ĐẢM BẢO toàn bộ các tên người đều xuất hiện trong cache | |
| # Tránh tình trạng: Query = "Tô Lâm và Putin" map nhầm vào cache "Tô Lâm" đơn thuần | |
| person_match = True | |
| if new_entities["per"]: | |
| cached_per = cached_entities.get("per", set()) | |
| for np in new_entities["per"]: | |
| # Tìm xem tên np có khớp một phần với cp nào trong cached không | |
| match_found_for_this_person = False | |
| for cp in cached_per: | |
| if np in cp or cp in np: | |
| match_found_for_this_person = True | |
| break | |
| if not match_found_for_this_person: | |
| person_match = False | |
| break | |
| if nums_match and person_match: | |
| # Cache HIT! | |
| log_agent_step( | |
| logger, "Cache", "Cache HIT ✅", | |
| f"score={score:.4f} | Entities khớp an toàn" | |
| ) | |
| # Tăng hit_count | |
| self.collection.update_one( | |
| {"_id": candidate["_id"]}, | |
| {"$inc": {"hit_count": 1}} | |
| ) | |
| return { | |
| "hit": True, | |
| "data": candidate.get("full_response", {}), | |
| "score": score, | |
| "cached_query": candidate.get("query", ""), | |
| } | |
| log_agent_step( | |
| logger, "Cache", "Entities LỆCH ❌", | |
| f"nums_match={nums_match}" | |
| ) | |
| # Không candidate nào pass Lớp 2 | |
| log_agent_step(logger, "Cache", "Cache MISS", "Có candidate nhưng entities không khớp") | |
| return {"hit": False} | |
| except PyMongoError as e: | |
| logger.error(f"[Cache] MongoDB error during check: {e}") | |
| return {"hit": False} | |
| except Exception as e: | |
| logger.error(f"[Cache] Unexpected error during check: {e}") | |
| return {"hit": False} | |
| # ============================================================ | |
| # Save to Cache | |
| # ============================================================ | |
| def save_to_cache(self, user_query: str, full_response: dict) -> bool: | |
| """ | |
| Lưu kết quả kiểm chứng vào MongoDB để phục vụ cache tương lai. | |
| Args: | |
| user_query: Câu hỏi gốc từ user | |
| full_response: Toàn bộ verdict dict từ Agent 3 | |
| Returns: | |
| True nếu lưu thành công, False nếu lỗi | |
| """ | |
| try: | |
| log_agent_step(logger, "Cache", "Saving to cache", f"Query: {user_query[:80]}...") | |
| # Normalize query trước khi encode (đảm bảo cùng dạng với check_cache) | |
| normalized = self._normalize_query(user_query) | |
| # Encode vector từ câu đã normalize | |
| query_vector = self.embedder.encode(normalized).tolist() | |
| # Trích xuất entities (kết hợp cả raw query & normalized để không lọt số) | |
| ent_raw = self._extract_entities(user_query) | |
| ent_norm = self._extract_entities(normalized) | |
| entities = {k: ent_raw.get(k, set()) | ent_norm.get(k, set()) for k in ent_raw} | |
| # Convert sets → lists cho BSON serialization | |
| entities_serializable = {k: list(v) for k, v in entities.items()} | |
| now = datetime.now(timezone.utc) | |
| document = { | |
| "query": user_query, | |
| "vector": query_vector, | |
| "entities": entities_serializable, | |
| "full_response": full_response, | |
| "created_at": now, | |
| "expires_at": now + timedelta(days=CACHE_TTL_DAYS), | |
| "hit_count": 0, | |
| } | |
| self.collection.insert_one(document) | |
| log_agent_step( | |
| logger, "Cache", "Saved ✅", | |
| f"entities: nums={entities['nums']}, loc={entities['loc']}" | |
| ) | |
| return True | |
| except PyMongoError as e: | |
| logger.error(f"[Cache] MongoDB save error: {e}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"[Cache] Unexpected save error: {e}") | |
| return False | |
| # ============================================================ | |
| # Utility | |
| # ============================================================ | |
| def get_stats(self) -> dict: | |
| """Trả về thống kê cache.""" | |
| try: | |
| total = self.collection.count_documents({}) | |
| active = self.collection.count_documents( | |
| {"expires_at": {"$gte": datetime.now(timezone.utc)}} | |
| ) | |
| total_hits = sum( | |
| doc.get("hit_count", 0) | |
| for doc in self.collection.find({}, {"hit_count": 1}) | |
| ) | |
| return { | |
| "total_documents": total, | |
| "active_documents": active, | |
| "expired_documents": total - active, | |
| "total_cache_hits": total_hits, | |
| } | |
| except Exception: | |
| return {"error": "Cannot retrieve stats"} | |
| # ============================================================ | |
| # Singleton helper — dùng chung 1 instance trong toàn app | |
| # ============================================================ | |
| _cache_instance: MongoSemanticCache | None = None | |
| def get_cache() -> MongoSemanticCache | None: | |
| """ | |
| Trả về singleton MongoSemanticCache. | |
| Nếu chưa khởi tạo thì tạo mới, lưu lại để tái sử dụng. | |
| Trả về None nếu MONGODB_URI chưa cấu hình hoặc kết nối lỗi. | |
| """ | |
| global _cache_instance | |
| if _cache_instance is None: | |
| try: | |
| _cache_instance = MongoSemanticCache() | |
| except Exception as e: | |
| logger.warning(f"[Cache] Khởi tạo cache thất bại, fallback no-cache: {e}") | |
| return None | |
| return _cache_instance | |