csds-project / embedder.py
beatrizpm's picture
Upload 9 files
532f1f0 verified
import pandas as pd
import numpy as np
import streamlit as st
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_chroma import Chroma
from pathlib import Path
from typing import Optional
from utils.console_manager import console_manager
from config import EMBEDDINGS_DIR, EMBEDDING_MODEL_NAME
from data_pipeline.preprocess import PROCESSED_PARQUET
from data_pipeline.preprocess import preprocess_and_save
def initialize_embedding_model(
model_name: str = EMBEDDING_MODEL_NAME,
) -> HuggingFaceEmbeddings:
embeddings = HuggingFaceEmbeddings(model_name=model_name)
console_manager.print_info(f"Initialized embeddings model: {model_name}")
return embeddings
def initialize_chroma(
embedding_model: HuggingFaceEmbeddings, chroma_path: Path = EMBEDDINGS_DIR
) -> Chroma:
if chroma_path.exists() and any(chroma_path.iterdir()):
console_manager.print_info(f"Loading existing ChromaDB from {chroma_path}")
else:
console_manager.print_info(f"Creating new ChromaDB at at: {chroma_path}")
vectordb = Chroma(
persist_directory=str(chroma_path), embedding_function=embedding_model
)
return vectordb
def load_preprocessed_data() -> Optional[pd.DataFrame]:
if not PROCESSED_PARQUET.exists():
console_manager.print_error(f"Processed file not found: {PROCESSED_PARQUET}")
return None
df = pd.read_parquet(PROCESSED_PARQUET)
df["content"] = (
"Title: "
+ df["title"]
+ ". Abstract: "
+ df["abstract"]
+ ". Categories: "
+ df["categories"].apply(
lambda x: ", ".join(x) if isinstance(x, list) else str(x)
)
)
return df
def prepare_documents(df: pd.DataFrame) -> list[dict]:
docs = [
{
"id": str(i),
"content": row["content"],
"metadata": {
"id": str(i),
"title": row["title"],
"categories": row["categories"],
"year": int(row["year"]),
},
}
for i, row in df.iterrows()
]
return docs
def add_embeddings_to_chroma(vectordb: Chroma, docs: list[dict]):
vectordb.add_texts(
texts=[d["content"] for d in docs],
metadatas=[d["metadata"] for d in docs],
)
console_manager.print_success("Embeddings generated and stored successfully!")
def embed_and_store():
try:
with console_manager.status("Generating embeddings...") as status:
embedding_model = initialize_embedding_model()
vectordb = initialize_chroma(embedding_model, EMBEDDINGS_DIR)
df = load_preprocessed_data()
if df is None:
return None
docs = prepare_documents(df)
add_embeddings_to_chroma(vectordb, docs)
return vectordb
except Exception as e:
console_manager.print_error(f"Embedding generation failed: {e}")
return None
def extract_embeddings(_vectordb):
collection = _vectordb._collection
data = collection.get(include=["metadatas", "documents", "embeddings"])
embeddings = np.array(data["embeddings"])
metadata = data["metadatas"]
return embeddings, metadata
def run_pipeline(force_run: bool = False):
st.header("Ingestion & Embedding")
if st.button("Run Ingestion & Embeddings Pipeline"):
with st.spinner("Running full pipeline..."):
preprocess_and_save() # Step 1: Extraction and basic cleaning
embedding_model = initialize_embedding_model()
vectordb = initialize_chroma(embedding_model, EMBEDDINGS_DIR)
collection_data = vectordb._collection.get(include=["embeddings"])
embeddings = collection_data["embeddings"]
if embeddings is not None:
if isinstance(embeddings, np.ndarray):
embeddings_exist = embeddings.size > 0
else:
embeddings_exist = len(embeddings) > 0
else:
embeddings_exist = False
if embeddings_exist and not force_run:
st.warning("Embeddings already exist. Skipping embedding generation.")
return vectordb
return embed_and_store()
st.success("Pipeline finished successfully!")