Spaces:
Sleeping
Sleeping
File size: 4,329 Bytes
532f1f0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
import pandas as pd
import numpy as np
import streamlit as st
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_chroma import Chroma
from pathlib import Path
from typing import Optional
from utils.console_manager import console_manager
from config import EMBEDDINGS_DIR, EMBEDDING_MODEL_NAME
from data_pipeline.preprocess import PROCESSED_PARQUET
from data_pipeline.preprocess import preprocess_and_save
def initialize_embedding_model(
model_name: str = EMBEDDING_MODEL_NAME,
) -> HuggingFaceEmbeddings:
embeddings = HuggingFaceEmbeddings(model_name=model_name)
console_manager.print_info(f"Initialized embeddings model: {model_name}")
return embeddings
def initialize_chroma(
embedding_model: HuggingFaceEmbeddings, chroma_path: Path = EMBEDDINGS_DIR
) -> Chroma:
if chroma_path.exists() and any(chroma_path.iterdir()):
console_manager.print_info(f"Loading existing ChromaDB from {chroma_path}")
else:
console_manager.print_info(f"Creating new ChromaDB at at: {chroma_path}")
vectordb = Chroma(
persist_directory=str(chroma_path), embedding_function=embedding_model
)
return vectordb
def load_preprocessed_data() -> Optional[pd.DataFrame]:
if not PROCESSED_PARQUET.exists():
console_manager.print_error(f"Processed file not found: {PROCESSED_PARQUET}")
return None
df = pd.read_parquet(PROCESSED_PARQUET)
df["content"] = (
"Title: "
+ df["title"]
+ ". Abstract: "
+ df["abstract"]
+ ". Categories: "
+ df["categories"].apply(
lambda x: ", ".join(x) if isinstance(x, list) else str(x)
)
)
return df
def prepare_documents(df: pd.DataFrame) -> list[dict]:
docs = [
{
"id": str(i),
"content": row["content"],
"metadata": {
"id": str(i),
"title": row["title"],
"categories": row["categories"],
"year": int(row["year"]),
},
}
for i, row in df.iterrows()
]
return docs
def add_embeddings_to_chroma(vectordb: Chroma, docs: list[dict]):
vectordb.add_texts(
texts=[d["content"] for d in docs],
metadatas=[d["metadata"] for d in docs],
)
console_manager.print_success("Embeddings generated and stored successfully!")
def embed_and_store():
try:
with console_manager.status("Generating embeddings...") as status:
embedding_model = initialize_embedding_model()
vectordb = initialize_chroma(embedding_model, EMBEDDINGS_DIR)
df = load_preprocessed_data()
if df is None:
return None
docs = prepare_documents(df)
add_embeddings_to_chroma(vectordb, docs)
return vectordb
except Exception as e:
console_manager.print_error(f"Embedding generation failed: {e}")
return None
def extract_embeddings(_vectordb):
collection = _vectordb._collection
data = collection.get(include=["metadatas", "documents", "embeddings"])
embeddings = np.array(data["embeddings"])
metadata = data["metadatas"]
return embeddings, metadata
def run_pipeline(force_run: bool = False):
st.header("Ingestion & Embedding")
if st.button("Run Ingestion & Embeddings Pipeline"):
with st.spinner("Running full pipeline..."):
preprocess_and_save() # Step 1: Extraction and basic cleaning
embedding_model = initialize_embedding_model()
vectordb = initialize_chroma(embedding_model, EMBEDDINGS_DIR)
collection_data = vectordb._collection.get(include=["embeddings"])
embeddings = collection_data["embeddings"]
if embeddings is not None:
if isinstance(embeddings, np.ndarray):
embeddings_exist = embeddings.size > 0
else:
embeddings_exist = len(embeddings) > 0
else:
embeddings_exist = False
if embeddings_exist and not force_run:
st.warning("Embeddings already exist. Skipping embedding generation.")
return vectordb
return embed_and_store()
st.success("Pipeline finished successfully!")
|