bat-6's picture
feat: implement sync worker service for automated FAISS index rebuilding and project preprocessing
334beca
Raw
History Blame Contribute Delete
3.64 kB
import logging
from pathlib import Path
from typing import List
import pandas as pd
import numpy as np
import faiss
from sentence_transformers import SentenceTransformer
from Data.database.sql_connector import load_preprocessed_projects
logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
logger = logging.getLogger(__name__)
DEFAULT_MODEL = "all-mpnet-base-v2"
TEXT_COL = "clean_text"
TITLE_COL = "project_title"
TECH_COL = "technologies"
_PROJECT_ROOT = Path(__file__).resolve().parents[2]
MODEL_DIR = _PROJECT_ROOT / "models"
INDEX_PATH = MODEL_DIR / "faiss_index.bin"
META_PATH = MODEL_DIR / "metadata.parquet"
class ProjectEmbedder:
def __init__(self, model_name: str = DEFAULT_MODEL):
logger.info(f"Loading embedding model: {model_name}")
self.model = SentenceTransformer(model_name)
self.index = None
self.metadata = None
def generate_embeddings(self, texts: List[str], batch_size: int = 64) -> np.ndarray:
logger.info(f"Generating embeddings for {len(texts)} projects...")
vectors = self.model.encode(
texts,
batch_size=batch_size,
show_progress_bar=True,
convert_to_numpy=True,
normalize_embeddings=True
)
return vectors.astype("float32")
def build_index(self, df: pd.DataFrame):
"""Build FAISS cosine index."""
self.metadata = df.copy()
self.metadata = self.metadata.reset_index(drop=True)
for col in [TITLE_COL, TEXT_COL]:
if col not in self.metadata.columns:
self.metadata[col] = ""
if TECH_COL not in self.metadata.columns:
self.metadata[TECH_COL] = ""
FEATURE_COL = "features"
if FEATURE_COL not in self.metadata.columns:
self.metadata[FEATURE_COL] = ""
feature_text = self.metadata[FEATURE_COL].fillna("").astype(str)
rich_texts = (
self.metadata[TITLE_COL].fillna("").astype(str)
+ " "
+ self.metadata[TEXT_COL].fillna("").astype(str)
+ " "
+ feature_text
).tolist()
embeddings = self.generate_embeddings(rich_texts)
self.embeddings = embeddings
dim = embeddings.shape[1]
base_index = faiss.IndexFlatIP(dim)
self.index = faiss.IndexIDMap(base_index)
ids = np.arange(len(self.metadata)).astype("int64")
self.index.add_with_ids(embeddings, ids)
logger.info(f"FAISS index built successfully with {self.index.ntotal} vectors.")
def save_artifacts(self, folder: str = "models"):
path = Path(folder)
path.mkdir(parents=True, exist_ok=True)
faiss.write_index(self.index, str(path / "faiss_index.bin"))
self.metadata.to_parquet(path / "metadata.parquet", index=False)
if hasattr(self, "embeddings"):
np.save(str(path / "project_embeddings.npy"), self.embeddings)
logger.info(f"Artifacts saved to {folder}")
def load_artifacts(self, folder: str = "models"):
path = Path(folder)
self.index = faiss.read_index(str(path / "faiss_index.bin"))
self.metadata = pd.read_parquet(path / "metadata.parquet")
logger.info("Artifacts loaded successfully.")
def train_embedding_engine():
logger.info("Loading processed dataset from Azure SQL...")
df = load_preprocessed_projects()
engine = ProjectEmbedder()
engine.build_index(df)
engine.save_artifacts(str(MODEL_DIR))
logger.info("Embedding engine completed successfully.")
return engine