import hashlib import logging import os from typing import Dict, List, Optional from dotenv import load_dotenv # type: ignore[import] from qdrant_client import QdrantClient, models from src.vector_db.local_embeddings import LocalEmbeddingManager # .env 파일에서 환경 변수 로드 (로컬 개발 편의성) load_dotenv() logger = logging.getLogger(__name__) class QdrantManager: """Qdrant Cloud 기반 벡터 캐시 관리 클래스. - 임베딩 생성: 로컬 BAAI/bge-m3 - 벡터 저장/검색: Qdrant Cloud """ def __init__(self, collection_name: str = "CodeWeaver") -> None: """Qdrant Cloud 클라이언트를 초기화하고 컬렉션을 준비한다.""" qdrant_url = os.getenv("QDRANT_URL") qdrant_api_key = os.getenv("QDRANT_API_KEY") if not qdrant_url or not qdrant_api_key: raise ValueError( "QDRANT_URL 및 QDRANT_API_KEY 환경 변수가 모두 설정되어 있어야 합니다." ) # Qdrant Cloud 공식 가이드와 유사한 초기화 형태 사용 # https://qdrant.tech/documentation/tutorials-and-examples/cloud-inference-hybrid-search/ self.client = QdrantClient( url=qdrant_url, api_key=qdrant_api_key, timeout=30, ) self.collection_name = collection_name self.embedding_manager = LocalEmbeddingManager() logger.info("QdrantManager 초기화: collection=%s, url=%s", collection_name, qdrant_url) # 컬렉션이 없다면 생성 self._init_collection() def _init_collection(self) -> None: """컬렉션이 없으면 생성한다.""" try: exists = self.client.collection_exists(self.collection_name) except Exception as e: # pragma: no cover - 방어적 코드 logger.error("Qdrant 컬렉션 존재 여부 확인 실패: %s", e, exc_info=True) raise if exists: logger.info("Qdrant 컬렉션 이미 존재: %s", self.collection_name) return try: self.client.create_collection( collection_name=self.collection_name, vectors_config=models.VectorParams( size=1024, # bge-m3 임베딩 차원 distance=models.Distance.COSINE, ), ) logger.info("Qdrant 컬렉션 생성 완료: %s", self.collection_name) except Exception as e: logger.error("Qdrant 컬렉션 생성 실패: %s", e, exc_info=True) raise async def get_embedding(self, text: str) -> List[float]: """로컬 임베딩 모델을 사용해 텍스트 임베딩을 생성한다.""" try: embedding = self.embedding_manager.get_embedding(text) logger.debug("임베딩 생성 완료 (길이=%d)", len(embedding)) return embedding except Exception as e: logger.error("임베딩 생성 실패: %s", e, exc_info=True) raise async def search_cache( self, question: str, threshold: float = 0.85, ) -> Optional[str]: """질문에 대한 캐시된 답변을 Qdrant에서 검색한다. threshold보다 높은 score를 가진 결과가 있을 때만 answer를 반환한다. """ try: embedding = await self.get_embedding(question) except Exception: # 이미 get_embedding 내부에서 로그를 남기므로 여기서는 조용히 실패 처리 return None try: # Qdrant 공식 문서: query_points를 사용한 벡터 검색 # 단일 벡터 컬렉션의 경우 query 파라미터에 벡터 리스트를 직접 전달 # https://qdrant.tech/documentation/tutorials-and-examples/cloud-inference-hybrid-search/ results = self.client.query_points( collection_name=self.collection_name, query=embedding, # 단일 벡터 컬렉션: 벡터를 직접 전달 limit=1, with_payload=True, ) except Exception as e: logger.error("Qdrant 캐시 검색 실패: %s", e, exc_info=True) return None if not results.points: logger.info("캐시 미스: 결과 없음 (question=%s)", question) return None top = results.points[0] score = getattr(top, "score", None) payload = getattr(top, "payload", {}) or {} if score is None: logger.warning("검색 결과에 score가 없습니다. payload=%s", payload) return None if score < threshold: logger.info( "캐시 미스: score(%.4f) < threshold(%.4f) (question=%s)", score, threshold, question, ) return None answer = payload.get("answer") if answer is None: logger.info("캐시 히트이지만 payload에 answer가 없습니다. payload=%s", payload) return None logger.info( "캐시 히트: score=%.4f, question=%s, answer_length=%d", score, question, len(str(answer)), ) return str(answer) async def save_to_cache(self, question: str, answer: str) -> None: """질문-답변 쌍을 Qdrant 캐시에 저장한다. 동일한 질문에 대해서는 deterministic ID를 사용하여, upsert 시 기존 엔트리를 덮어쓰게 함으로써 중복을 방지한다. """ try: embedding = await self.get_embedding(question) except Exception: # 임베딩 실패 시 캐시에 저장하지 않는다. logger.warning("임베딩 실패로 인해 캐시에 저장하지 않음. question=%s", question) return # UUID 대신 질문 해시 기반 deterministic ID 사용 # → 동일 질문 = 동일 ID → upsert가 덮어쓰기로 동작 → 중복 방지 # # 주의: Qdrant point id는 "unsigned int" 또는 "UUID"만 허용한다. # 따라서 sha256 hex(64자)를 그대로 쓰지 않고, 앞 32자를 UUID 포맷으로 변환해 사용한다. digest = hashlib.sha256(question.encode("utf-8")).hexdigest() point_id = f"{digest[:8]}-{digest[8:12]}-{digest[12:16]}-{digest[16:20]}-{digest[20:32]}" # 기존 엔트리 존재 시(덮어쓰기) 로그를 남긴다. 실패해도 upsert는 계속 시도. try: existing = self.client.retrieve( collection_name=self.collection_name, ids=[point_id], with_payload=False, with_vectors=False, ) if existing: logger.info("기존 캐시 엔트리를 덮어씁니다: point_id=%s", point_id) except Exception: pass point = models.PointStruct( id=point_id, vector=embedding, payload={ "question": question, "answer": answer, }, ) try: self.client.upsert( collection_name=self.collection_name, points=[point], ) logger.info( "Qdrant 캐시에 저장 완료 (hash ID로 중복 방지): point_id=%s, question_length=%d, answer_length=%d", point_id, len(question), len(answer), ) except Exception as e: logger.error("Qdrant 캐시 저장 실패: %s", e, exc_info=True) async def get_cache_stats(self) -> Dict[str, int]: """현재 컬렉션의 캐시 통계를 반환한다.""" try: info = self.client.get_collection(self.collection_name) # qdrant_client의 CollectionInfo는 points_count 속성을 제공 points_count = getattr(info, "points_count", 0) or 0 logger.debug( "Qdrant 캐시 통계 조회: collection=%s, total_entries=%d", self.collection_name, points_count, ) return {"total_entries": int(points_count)} except Exception as e: logger.error("Qdrant 캐시 통계 조회 실패: %s", e, exc_info=True) # 호출 측에서 에러 메시지를 참고할 수 있도록 포함 return { "total_entries": 0, "error": str(e), # type: ignore[dict-item] }