ใ……ใ…Žใ…‡
Add CodeWeaver Gradio app
515f392
import hashlib
import logging
import os
from typing import Dict, List, Optional
from dotenv import load_dotenv # type: ignore[import]
from qdrant_client import QdrantClient, models
from src.vector_db.local_embeddings import LocalEmbeddingManager
# .env ํŒŒ์ผ์—์„œ ํ™˜๊ฒฝ ๋ณ€์ˆ˜ ๋กœ๋“œ (๋กœ์ปฌ ๊ฐœ๋ฐœ ํŽธ์˜์„ฑ)
load_dotenv()
logger = logging.getLogger(__name__)
class QdrantManager:
"""Qdrant Cloud ๊ธฐ๋ฐ˜ ๋ฒกํ„ฐ ์บ์‹œ ๊ด€๋ฆฌ ํด๋ž˜์Šค.
- ์ž„๋ฒ ๋”ฉ ์ƒ์„ฑ: ๋กœ์ปฌ BAAI/bge-m3
- ๋ฒกํ„ฐ ์ €์žฅ/๊ฒ€์ƒ‰: Qdrant Cloud
"""
def __init__(self, collection_name: str = "CodeWeaver") -> None:
"""Qdrant Cloud ํด๋ผ์ด์–ธํŠธ๋ฅผ ์ดˆ๊ธฐํ™”ํ•˜๊ณ  ์ปฌ๋ ‰์…˜์„ ์ค€๋น„ํ•œ๋‹ค."""
qdrant_url = os.getenv("QDRANT_URL")
qdrant_api_key = os.getenv("QDRANT_API_KEY")
if not qdrant_url or not qdrant_api_key:
raise ValueError(
"QDRANT_URL ๋ฐ QDRANT_API_KEY ํ™˜๊ฒฝ ๋ณ€์ˆ˜๊ฐ€ ๋ชจ๋‘ ์„ค์ •๋˜์–ด ์žˆ์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค."
)
# Qdrant Cloud ๊ณต์‹ ๊ฐ€์ด๋“œ์™€ ์œ ์‚ฌํ•œ ์ดˆ๊ธฐํ™” ํ˜•ํƒœ ์‚ฌ์šฉ
# https://qdrant.tech/documentation/tutorials-and-examples/cloud-inference-hybrid-search/
self.client = QdrantClient(
url=qdrant_url,
api_key=qdrant_api_key,
timeout=30,
)
self.collection_name = collection_name
self.embedding_manager = LocalEmbeddingManager()
logger.info("QdrantManager ์ดˆ๊ธฐํ™”: collection=%s, url=%s", collection_name, qdrant_url)
# ์ปฌ๋ ‰์…˜์ด ์—†๋‹ค๋ฉด ์ƒ์„ฑ
self._init_collection()
def _init_collection(self) -> None:
"""์ปฌ๋ ‰์…˜์ด ์—†์œผ๋ฉด ์ƒ์„ฑํ•œ๋‹ค."""
try:
exists = self.client.collection_exists(self.collection_name)
except Exception as e: # pragma: no cover - ๋ฐฉ์–ด์  ์ฝ”๋“œ
logger.error("Qdrant ์ปฌ๋ ‰์…˜ ์กด์žฌ ์—ฌ๋ถ€ ํ™•์ธ ์‹คํŒจ: %s", e, exc_info=True)
raise
if exists:
logger.info("Qdrant ์ปฌ๋ ‰์…˜ ์ด๋ฏธ ์กด์žฌ: %s", self.collection_name)
return
try:
self.client.create_collection(
collection_name=self.collection_name,
vectors_config=models.VectorParams(
size=1024, # bge-m3 ์ž„๋ฒ ๋”ฉ ์ฐจ์›
distance=models.Distance.COSINE,
),
)
logger.info("Qdrant ์ปฌ๋ ‰์…˜ ์ƒ์„ฑ ์™„๋ฃŒ: %s", self.collection_name)
except Exception as e:
logger.error("Qdrant ์ปฌ๋ ‰์…˜ ์ƒ์„ฑ ์‹คํŒจ: %s", e, exc_info=True)
raise
async def get_embedding(self, text: str) -> List[float]:
"""๋กœ์ปฌ ์ž„๋ฒ ๋”ฉ ๋ชจ๋ธ์„ ์‚ฌ์šฉํ•ด ํ…์ŠคํŠธ ์ž„๋ฒ ๋”ฉ์„ ์ƒ์„ฑํ•œ๋‹ค."""
try:
embedding = self.embedding_manager.get_embedding(text)
logger.debug("์ž„๋ฒ ๋”ฉ ์ƒ์„ฑ ์™„๋ฃŒ (๊ธธ์ด=%d)", len(embedding))
return embedding
except Exception as e:
logger.error("์ž„๋ฒ ๋”ฉ ์ƒ์„ฑ ์‹คํŒจ: %s", e, exc_info=True)
raise
async def search_cache(
self,
question: str,
threshold: float = 0.85,
) -> Optional[str]:
"""์งˆ๋ฌธ์— ๋Œ€ํ•œ ์บ์‹œ๋œ ๋‹ต๋ณ€์„ Qdrant์—์„œ ๊ฒ€์ƒ‰ํ•œ๋‹ค.
threshold๋ณด๋‹ค ๋†’์€ score๋ฅผ ๊ฐ€์ง„ ๊ฒฐ๊ณผ๊ฐ€ ์žˆ์„ ๋•Œ๋งŒ answer๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค.
"""
try:
embedding = await self.get_embedding(question)
except Exception:
# ์ด๋ฏธ get_embedding ๋‚ด๋ถ€์—์„œ ๋กœ๊ทธ๋ฅผ ๋‚จ๊ธฐ๋ฏ€๋กœ ์—ฌ๊ธฐ์„œ๋Š” ์กฐ์šฉํžˆ ์‹คํŒจ ์ฒ˜๋ฆฌ
return None
try:
# Qdrant ๊ณต์‹ ๋ฌธ์„œ: query_points๋ฅผ ์‚ฌ์šฉํ•œ ๋ฒกํ„ฐ ๊ฒ€์ƒ‰
# ๋‹จ์ผ ๋ฒกํ„ฐ ์ปฌ๋ ‰์…˜์˜ ๊ฒฝ์šฐ query ํŒŒ๋ผ๋ฏธํ„ฐ์— ๋ฒกํ„ฐ ๋ฆฌ์ŠคํŠธ๋ฅผ ์ง์ ‘ ์ „๋‹ฌ
# https://qdrant.tech/documentation/tutorials-and-examples/cloud-inference-hybrid-search/
results = self.client.query_points(
collection_name=self.collection_name,
query=embedding, # ๋‹จ์ผ ๋ฒกํ„ฐ ์ปฌ๋ ‰์…˜: ๋ฒกํ„ฐ๋ฅผ ์ง์ ‘ ์ „๋‹ฌ
limit=1,
with_payload=True,
)
except Exception as e:
logger.error("Qdrant ์บ์‹œ ๊ฒ€์ƒ‰ ์‹คํŒจ: %s", e, exc_info=True)
return None
if not results.points:
logger.info("์บ์‹œ ๋ฏธ์Šค: ๊ฒฐ๊ณผ ์—†์Œ (question=%s)", question)
return None
top = results.points[0]
score = getattr(top, "score", None)
payload = getattr(top, "payload", {}) or {}
if score is None:
logger.warning("๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ์— score๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค. payload=%s", payload)
return None
if score < threshold:
logger.info(
"์บ์‹œ ๋ฏธ์Šค: score(%.4f) < threshold(%.4f) (question=%s)",
score,
threshold,
question,
)
return None
answer = payload.get("answer")
if answer is None:
logger.info("์บ์‹œ ํžˆํŠธ์ด์ง€๋งŒ payload์— answer๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค. payload=%s", payload)
return None
logger.info(
"์บ์‹œ ํžˆํŠธ: score=%.4f, question=%s, answer_length=%d",
score,
question,
len(str(answer)),
)
return str(answer)
async def save_to_cache(self, question: str, answer: str) -> None:
"""์งˆ๋ฌธ-๋‹ต๋ณ€ ์Œ์„ Qdrant ์บ์‹œ์— ์ €์žฅํ•œ๋‹ค.
๋™์ผํ•œ ์งˆ๋ฌธ์— ๋Œ€ํ•ด์„œ๋Š” deterministic ID๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ,
upsert ์‹œ ๊ธฐ์กด ์—”ํŠธ๋ฆฌ๋ฅผ ๋ฎ์–ด์“ฐ๊ฒŒ ํ•จ์œผ๋กœ์จ ์ค‘๋ณต์„ ๋ฐฉ์ง€ํ•œ๋‹ค.
"""
try:
embedding = await self.get_embedding(question)
except Exception:
# ์ž„๋ฒ ๋”ฉ ์‹คํŒจ ์‹œ ์บ์‹œ์— ์ €์žฅํ•˜์ง€ ์•Š๋Š”๋‹ค.
logger.warning("์ž„๋ฒ ๋”ฉ ์‹คํŒจ๋กœ ์ธํ•ด ์บ์‹œ์— ์ €์žฅํ•˜์ง€ ์•Š์Œ. question=%s", question)
return
# UUID ๋Œ€์‹  ์งˆ๋ฌธ ํ•ด์‹œ ๊ธฐ๋ฐ˜ deterministic ID ์‚ฌ์šฉ
# โ†’ ๋™์ผ ์งˆ๋ฌธ = ๋™์ผ ID โ†’ upsert๊ฐ€ ๋ฎ์–ด์“ฐ๊ธฐ๋กœ ๋™์ž‘ โ†’ ์ค‘๋ณต ๋ฐฉ์ง€
#
# ์ฃผ์˜: Qdrant point id๋Š” "unsigned int" ๋˜๋Š” "UUID"๋งŒ ํ—ˆ์šฉํ•œ๋‹ค.
# ๋”ฐ๋ผ์„œ sha256 hex(64์ž)๋ฅผ ๊ทธ๋Œ€๋กœ ์“ฐ์ง€ ์•Š๊ณ , ์•ž 32์ž๋ฅผ UUID ํฌ๋งท์œผ๋กœ ๋ณ€ํ™˜ํ•ด ์‚ฌ์šฉํ•œ๋‹ค.
digest = hashlib.sha256(question.encode("utf-8")).hexdigest()
point_id = f"{digest[:8]}-{digest[8:12]}-{digest[12:16]}-{digest[16:20]}-{digest[20:32]}"
# ๊ธฐ์กด ์—”ํŠธ๋ฆฌ ์กด์žฌ ์‹œ(๋ฎ์–ด์“ฐ๊ธฐ) ๋กœ๊ทธ๋ฅผ ๋‚จ๊ธด๋‹ค. ์‹คํŒจํ•ด๋„ upsert๋Š” ๊ณ„์† ์‹œ๋„.
try:
existing = self.client.retrieve(
collection_name=self.collection_name,
ids=[point_id],
with_payload=False,
with_vectors=False,
)
if existing:
logger.info("๊ธฐ์กด ์บ์‹œ ์—”ํŠธ๋ฆฌ๋ฅผ ๋ฎ์–ด์”๋‹ˆ๋‹ค: point_id=%s", point_id)
except Exception:
pass
point = models.PointStruct(
id=point_id,
vector=embedding,
payload={
"question": question,
"answer": answer,
},
)
try:
self.client.upsert(
collection_name=self.collection_name,
points=[point],
)
logger.info(
"Qdrant ์บ์‹œ์— ์ €์žฅ ์™„๋ฃŒ (hash ID๋กœ ์ค‘๋ณต ๋ฐฉ์ง€): point_id=%s, question_length=%d, answer_length=%d",
point_id,
len(question),
len(answer),
)
except Exception as e:
logger.error("Qdrant ์บ์‹œ ์ €์žฅ ์‹คํŒจ: %s", e, exc_info=True)
async def get_cache_stats(self) -> Dict[str, int]:
"""ํ˜„์žฌ ์ปฌ๋ ‰์…˜์˜ ์บ์‹œ ํ†ต๊ณ„๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค."""
try:
info = self.client.get_collection(self.collection_name)
# qdrant_client์˜ CollectionInfo๋Š” points_count ์†์„ฑ์„ ์ œ๊ณต
points_count = getattr(info, "points_count", 0) or 0
logger.debug(
"Qdrant ์บ์‹œ ํ†ต๊ณ„ ์กฐํšŒ: collection=%s, total_entries=%d",
self.collection_name,
points_count,
)
return {"total_entries": int(points_count)}
except Exception as e:
logger.error("Qdrant ์บ์‹œ ํ†ต๊ณ„ ์กฐํšŒ ์‹คํŒจ: %s", e, exc_info=True)
# ํ˜ธ์ถœ ์ธก์—์„œ ์—๋Ÿฌ ๋ฉ”์‹œ์ง€๋ฅผ ์ฐธ๊ณ ํ•  ์ˆ˜ ์žˆ๋„๋ก ํฌํ•จ
return {
"total_entries": 0,
"error": str(e), # type: ignore[dict-item]
}