""" Knowledge Universe — Embedding Adapter Produces vector embeddings for RAG pipeline consumption. Uses local sentence-transformers (all-MiniLM-L6-v2) — no OpenAI API needed. Output is a KnowledgeObject with embedding field populated, ready to be inserted into any vector store (Qdrant, Weaviate, Chroma, Pinecone). """ import logging from typing import Any, Dict, List, Optional from datetime import datetime from src.api.models import Source, KnowledgeObject from src.format_adapters.base_adapter import BaseFormatAdapter logger = logging.getLogger(__name__) # Lazy-loaded model (first call loads it, subsequent calls reuse) _model = None def _get_model(): global _model if _model is None: try: from sentence_transformers import SentenceTransformer _model = SentenceTransformer("all-MiniLM-L6-v2") logger.info("SentenceTransformer loaded: all-MiniLM-L6-v2") except Exception as e: logger.error(f"Failed to load embedding model: {e}") raise return _model class EmbeddingAdapter(BaseFormatAdapter): """ Converts sources to KnowledgeObjects with vector embeddings. The embedding encodes: title + summary + tags + platform Dimensionality: 384 (all-MiniLM-L6-v2) Ready for: Qdrant, Weaviate, ChromaDB, Pinecone, pgvector """ def transform(self, source: Source) -> KnowledgeObject: embedding = self._embed(source) return KnowledgeObject( source_id=source.id, title=source.title, url=source.url, platform=source.source_platform, format=source.formats[0] if source.formats else None, quality_score=source.quality_score, pedagogical_fit=source.pedagogical_fit, freshness_score=self._freshness_score(source), authors=source.authors, publication_date=source.publication_date, license=source.license, open_access=source.open_access, embedding=embedding, summary=source.summary, tags=source.tags, ) def transform_many(self, sources: list) -> list: """Batch embed — more efficient than one-by-one.""" if not sources: return [] texts = [self._source_to_text(s) for s in sources] try: model = _get_model() embeddings = model.encode(texts, convert_to_numpy=True) embeddings_list = embeddings.tolist() except Exception as e: logger.error(f"Batch embedding failed: {e}") embeddings_list = [None] * len(sources) results = [] for source, embedding in zip(sources, embeddings_list): obj = KnowledgeObject( source_id=source.id, title=source.title, url=source.url, platform=source.source_platform, format=source.formats[0] if source.formats else None, quality_score=source.quality_score, pedagogical_fit=source.pedagogical_fit, freshness_score=self._freshness_score(source), authors=source.authors, publication_date=source.publication_date, license=source.license, open_access=source.open_access, embedding=embedding, summary=source.summary, tags=source.tags, ) results.append(obj) return results def _embed(self, source: Source) -> Optional[List[float]]: try: model = _get_model() text = self._source_to_text(source) vector = model.encode(text, convert_to_numpy=True) return vector.tolist() except Exception as e: logger.error(f"Embedding failed for {source.id}: {e}") return None def _source_to_text(self, source: Source) -> str: """Concatenate fields that describe the source's semantic content.""" parts = [ source.title, source.summary[:300] if source.summary else "", " ".join(source.tags[:10]), source.source_platform, " ".join(f.value for f in source.formats), " ".join(source.authors[:2]), ] return " | ".join(p for p in parts if p) def _freshness_score(self, source: Source) -> float: """Compute 0-1 freshness score from publication date.""" if not source.publication_date: return 0.5 try: days = (datetime.now(source.publication_date.tzinfo) - source.publication_date).days if days < 180: return 1.0 if days < 720: return 0.8 if days < 1800: return 0.6 return 0.4 except Exception: return 0.5 # ── Qdrant upload helper (optional, call from scripts) ───────────────────── def upload_to_qdrant( knowledge_objects: List[KnowledgeObject], collection: str = "knowledge_universe", qdrant_url: str = "http://localhost:6333", ): """ Upload KnowledgeObjects to a local Qdrant instance. Run Qdrant with: docker run -p 6333:6333 qdrant/qdrant This is completely free — no cloud dependency. """ try: from qdrant_client import QdrantClient from qdrant_client.models import PointStruct, Distance, VectorParams client = QdrantClient(url=qdrant_url) # Create collection if needed try: client.create_collection( collection_name=collection, vectors_config=VectorParams(size=384, distance=Distance.COSINE), ) except Exception: pass # already exists points = [ PointStruct( id=abs(hash(obj.source_id)) % (2**32), vector=obj.embedding, payload={ "source_id": obj.source_id, "title": obj.title, "url": obj.url, "platform": obj.platform, "quality_score": obj.quality_score, "summary": obj.summary, "tags": obj.tags, }, ) for obj in knowledge_objects if obj.embedding ] client.upsert(collection_name=collection, points=points) logger.info(f"Uploaded {len(points)} vectors to Qdrant collection '{collection}'") return len(points) except ImportError: logger.error("qdrant-client not installed. Run: pip install qdrant-client") return 0 except Exception as e: logger.error(f"Qdrant upload failed: {e}") return 0