Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| import streamlit as st | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from langchain_chroma import Chroma | |
| from pathlib import Path | |
| from typing import Optional | |
| from utils.console_manager import console_manager | |
| from config import EMBEDDINGS_DIR, EMBEDDING_MODEL_NAME | |
| from data_pipeline.preprocess import PROCESSED_PARQUET | |
| from data_pipeline.preprocess import preprocess_and_save | |
| def initialize_embedding_model( | |
| model_name: str = EMBEDDING_MODEL_NAME, | |
| ) -> HuggingFaceEmbeddings: | |
| embeddings = HuggingFaceEmbeddings(model_name=model_name) | |
| console_manager.print_info(f"Initialized embeddings model: {model_name}") | |
| return embeddings | |
| def initialize_chroma( | |
| embedding_model: HuggingFaceEmbeddings, chroma_path: Path = EMBEDDINGS_DIR | |
| ) -> Chroma: | |
| if chroma_path.exists() and any(chroma_path.iterdir()): | |
| console_manager.print_info(f"Loading existing ChromaDB from {chroma_path}") | |
| else: | |
| console_manager.print_info(f"Creating new ChromaDB at at: {chroma_path}") | |
| vectordb = Chroma( | |
| persist_directory=str(chroma_path), embedding_function=embedding_model | |
| ) | |
| return vectordb | |
| def load_preprocessed_data() -> Optional[pd.DataFrame]: | |
| if not PROCESSED_PARQUET.exists(): | |
| console_manager.print_error(f"Processed file not found: {PROCESSED_PARQUET}") | |
| return None | |
| df = pd.read_parquet(PROCESSED_PARQUET) | |
| df["content"] = ( | |
| "Title: " | |
| + df["title"] | |
| + ". Abstract: " | |
| + df["abstract"] | |
| + ". Categories: " | |
| + df["categories"].apply( | |
| lambda x: ", ".join(x) if isinstance(x, list) else str(x) | |
| ) | |
| ) | |
| return df | |
| def prepare_documents(df: pd.DataFrame) -> list[dict]: | |
| docs = [ | |
| { | |
| "id": str(i), | |
| "content": row["content"], | |
| "metadata": { | |
| "id": str(i), | |
| "title": row["title"], | |
| "categories": row["categories"], | |
| "year": int(row["year"]), | |
| }, | |
| } | |
| for i, row in df.iterrows() | |
| ] | |
| return docs | |
| def add_embeddings_to_chroma(vectordb: Chroma, docs: list[dict]): | |
| vectordb.add_texts( | |
| texts=[d["content"] for d in docs], | |
| metadatas=[d["metadata"] for d in docs], | |
| ) | |
| console_manager.print_success("Embeddings generated and stored successfully!") | |
| def embed_and_store(): | |
| try: | |
| with console_manager.status("Generating embeddings...") as status: | |
| embedding_model = initialize_embedding_model() | |
| vectordb = initialize_chroma(embedding_model, EMBEDDINGS_DIR) | |
| df = load_preprocessed_data() | |
| if df is None: | |
| return None | |
| docs = prepare_documents(df) | |
| add_embeddings_to_chroma(vectordb, docs) | |
| return vectordb | |
| except Exception as e: | |
| console_manager.print_error(f"Embedding generation failed: {e}") | |
| return None | |
| def extract_embeddings(_vectordb): | |
| collection = _vectordb._collection | |
| data = collection.get(include=["metadatas", "documents", "embeddings"]) | |
| embeddings = np.array(data["embeddings"]) | |
| metadata = data["metadatas"] | |
| return embeddings, metadata | |
| def run_pipeline(force_run: bool = False): | |
| st.header("Ingestion & Embedding") | |
| if st.button("Run Ingestion & Embeddings Pipeline"): | |
| with st.spinner("Running full pipeline..."): | |
| preprocess_and_save() # Step 1: Extraction and basic cleaning | |
| embedding_model = initialize_embedding_model() | |
| vectordb = initialize_chroma(embedding_model, EMBEDDINGS_DIR) | |
| collection_data = vectordb._collection.get(include=["embeddings"]) | |
| embeddings = collection_data["embeddings"] | |
| if embeddings is not None: | |
| if isinstance(embeddings, np.ndarray): | |
| embeddings_exist = embeddings.size > 0 | |
| else: | |
| embeddings_exist = len(embeddings) > 0 | |
| else: | |
| embeddings_exist = False | |
| if embeddings_exist and not force_run: | |
| st.warning("Embeddings already exist. Skipping embedding generation.") | |
| return vectordb | |
| return embed_and_store() | |
| st.success("Pipeline finished successfully!") | |