Knowledge-Universe / src /format_adapters /embedding_adapter.py
vlsiddarth's picture
Commit latest version with ranking logic and API fixes
0733aae
"""
Knowledge Universe β€” Embedding Adapter
Produces vector embeddings for RAG pipeline consumption.
Uses local sentence-transformers (all-MiniLM-L6-v2) β€” no OpenAI API needed.
Output is a KnowledgeObject with embedding field populated,
ready to be inserted into any vector store (Qdrant, Weaviate, Chroma, Pinecone).
"""
import logging
from typing import Any, Dict, List, Optional
from datetime import datetime
from src.api.models import Source, KnowledgeObject
from src.format_adapters.base_adapter import BaseFormatAdapter
logger = logging.getLogger(__name__)
# Lazy-loaded model (first call loads it, subsequent calls reuse)
_model = None
def _get_model():
global _model
if _model is None:
try:
from sentence_transformers import SentenceTransformer
_model = SentenceTransformer("all-MiniLM-L6-v2")
logger.info("SentenceTransformer loaded: all-MiniLM-L6-v2")
except Exception as e:
logger.error(f"Failed to load embedding model: {e}")
raise
return _model
class EmbeddingAdapter(BaseFormatAdapter):
"""
Converts sources to KnowledgeObjects with vector embeddings.
The embedding encodes: title + summary + tags + platform
Dimensionality: 384 (all-MiniLM-L6-v2)
Ready for: Qdrant, Weaviate, ChromaDB, Pinecone, pgvector
"""
def transform(self, source: Source) -> KnowledgeObject:
embedding = self._embed(source)
return KnowledgeObject(
source_id=source.id,
title=source.title,
url=source.url,
platform=source.source_platform,
format=source.formats[0] if source.formats else None,
quality_score=source.quality_score,
pedagogical_fit=source.pedagogical_fit,
freshness_score=self._freshness_score(source),
authors=source.authors,
publication_date=source.publication_date,
license=source.license,
open_access=source.open_access,
embedding=embedding,
summary=source.summary,
tags=source.tags,
)
def transform_many(self, sources: list) -> list:
"""Batch embed β€” more efficient than one-by-one."""
if not sources:
return []
texts = [self._source_to_text(s) for s in sources]
try:
model = _get_model()
embeddings = model.encode(texts, convert_to_numpy=True)
embeddings_list = embeddings.tolist()
except Exception as e:
logger.error(f"Batch embedding failed: {e}")
embeddings_list = [None] * len(sources)
results = []
for source, embedding in zip(sources, embeddings_list):
obj = KnowledgeObject(
source_id=source.id,
title=source.title,
url=source.url,
platform=source.source_platform,
format=source.formats[0] if source.formats else None,
quality_score=source.quality_score,
pedagogical_fit=source.pedagogical_fit,
freshness_score=self._freshness_score(source),
authors=source.authors,
publication_date=source.publication_date,
license=source.license,
open_access=source.open_access,
embedding=embedding,
summary=source.summary,
tags=source.tags,
)
results.append(obj)
return results
def _embed(self, source: Source) -> Optional[List[float]]:
try:
model = _get_model()
text = self._source_to_text(source)
vector = model.encode(text, convert_to_numpy=True)
return vector.tolist()
except Exception as e:
logger.error(f"Embedding failed for {source.id}: {e}")
return None
def _source_to_text(self, source: Source) -> str:
"""Concatenate fields that describe the source's semantic content."""
parts = [
source.title,
source.summary[:300] if source.summary else "",
" ".join(source.tags[:10]),
source.source_platform,
" ".join(f.value for f in source.formats),
" ".join(source.authors[:2]),
]
return " | ".join(p for p in parts if p)
def _freshness_score(self, source: Source) -> float:
"""Compute 0-1 freshness score from publication date."""
if not source.publication_date:
return 0.5
try:
days = (datetime.now(source.publication_date.tzinfo) - source.publication_date).days
if days < 180: return 1.0
if days < 720: return 0.8
if days < 1800: return 0.6
return 0.4
except Exception:
return 0.5
# ── Qdrant upload helper (optional, call from scripts) ─────────────────────
def upload_to_qdrant(
knowledge_objects: List[KnowledgeObject],
collection: str = "knowledge_universe",
qdrant_url: str = "http://localhost:6333",
):
"""
Upload KnowledgeObjects to a local Qdrant instance.
Run Qdrant with: docker run -p 6333:6333 qdrant/qdrant
This is completely free β€” no cloud dependency.
"""
try:
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct, Distance, VectorParams
client = QdrantClient(url=qdrant_url)
# Create collection if needed
try:
client.create_collection(
collection_name=collection,
vectors_config=VectorParams(size=384, distance=Distance.COSINE),
)
except Exception:
pass # already exists
points = [
PointStruct(
id=abs(hash(obj.source_id)) % (2**32),
vector=obj.embedding,
payload={
"source_id": obj.source_id,
"title": obj.title,
"url": obj.url,
"platform": obj.platform,
"quality_score": obj.quality_score,
"summary": obj.summary,
"tags": obj.tags,
},
)
for obj in knowledge_objects
if obj.embedding
]
client.upsert(collection_name=collection, points=points)
logger.info(f"Uploaded {len(points)} vectors to Qdrant collection '{collection}'")
return len(points)
except ImportError:
logger.error("qdrant-client not installed. Run: pip install qdrant-client")
return 0
except Exception as e:
logger.error(f"Qdrant upload failed: {e}")
return 0