| """ | |
| Vector DB integration using Qdrant for semantic embedding storage and similarity search. | |
| Env vars: QDRANT_URL, QDRANT_API_KEY, QDRANT_COLLECTION | |
| """ | |
| import httpx | |
| from typing import List, Optional, Dict | |
| from backend.app.core.config import settings | |
| from backend.app.core.logging import get_logger | |
| logger = get_logger(__name__) | |
| _TIMEOUT = httpx.Timeout(15.0, connect=5.0) | |
| def _headers() -> Dict[str, str]: | |
| h = {"Content-Type": "application/json"} | |
| if settings.QDRANT_API_KEY: | |
| h["api-key"] = settings.QDRANT_API_KEY | |
| return h | |
| async def ensure_collection(vector_size: int = 768): | |
| """Create collection if it doesn't exist.""" | |
| try: | |
| async with httpx.AsyncClient(timeout=_TIMEOUT) as client: | |
| resp = await client.get( | |
| f"{settings.QDRANT_URL}/collections/{settings.QDRANT_COLLECTION}", | |
| headers=_headers(), | |
| ) | |
| if resp.status_code == 404: | |
| await client.put( | |
| f"{settings.QDRANT_URL}/collections/{settings.QDRANT_COLLECTION}", | |
| json={"vectors": {"size": vector_size, "distance": "Cosine"}}, | |
| headers=_headers(), | |
| ) | |
| logger.info("Created Qdrant collection", collection=settings.QDRANT_COLLECTION) | |
| except Exception as e: | |
| logger.warning("Qdrant collection setup failed (non-fatal)", error=str(e)) | |
| async def upsert_embedding(point_id: str, vector: List[float], payload: Optional[Dict] = None): | |
| """Store an embedding vector in Qdrant.""" | |
| try: | |
| async with httpx.AsyncClient(timeout=_TIMEOUT) as client: | |
| await client.put( | |
| f"{settings.QDRANT_URL}/collections/{settings.QDRANT_COLLECTION}/points", | |
| json={ | |
| "points": [ | |
| {"id": point_id, "vector": vector, "payload": payload or {}} | |
| ] | |
| }, | |
| headers=_headers(), | |
| ) | |
| except Exception as e: | |
| logger.warning("Qdrant upsert failed (non-fatal)", error=str(e)) | |
| async def search_similar(vector: List[float], top_k: int = 5) -> List[Dict]: | |
| """Search for similar embeddings in Qdrant.""" | |
| try: | |
| async with httpx.AsyncClient(timeout=_TIMEOUT) as client: | |
| resp = await client.post( | |
| f"{settings.QDRANT_URL}/collections/{settings.QDRANT_COLLECTION}/points/search", | |
| json={"vector": vector, "limit": top_k, "with_payload": True}, | |
| headers=_headers(), | |
| ) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| return data.get("result", []) | |
| except Exception as e: | |
| logger.warning("Qdrant search failed (non-fatal)", error=str(e)) | |
| return [] | |
| async def compute_cluster_score(vector: List[float]) -> float: | |
| """ | |
| Compute a cluster density score for the given vector. | |
| Higher score = more similar to existing content (potential coordinated campaign). | |
| Returns 0 if no similar items found. | |
| """ | |
| similar = await search_similar(vector, top_k=10) | |
| if not similar: | |
| return 0.0 | |
| scores = [item.get("score", 0.0) for item in similar] | |
| avg_similarity = sum(scores) / len(scores) | |
| return round(min(1.0, avg_similarity), 4) | |