File size: 4,329 Bytes
532f1f0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import pandas as pd
import numpy as np
import streamlit as st

from langchain_huggingface import HuggingFaceEmbeddings
from langchain_chroma import Chroma
from pathlib import Path
from typing import Optional

from utils.console_manager import console_manager
from config import EMBEDDINGS_DIR, EMBEDDING_MODEL_NAME
from data_pipeline.preprocess import PROCESSED_PARQUET

from data_pipeline.preprocess import preprocess_and_save


def initialize_embedding_model(
    model_name: str = EMBEDDING_MODEL_NAME,
) -> HuggingFaceEmbeddings:
    embeddings = HuggingFaceEmbeddings(model_name=model_name)
    console_manager.print_info(f"Initialized embeddings model: {model_name}")
    return embeddings


def initialize_chroma(
    embedding_model: HuggingFaceEmbeddings, chroma_path: Path = EMBEDDINGS_DIR
) -> Chroma:
    if chroma_path.exists() and any(chroma_path.iterdir()):
        console_manager.print_info(f"Loading existing ChromaDB from {chroma_path}")
    else:
        console_manager.print_info(f"Creating new ChromaDB at at: {chroma_path}")

    vectordb = Chroma(
        persist_directory=str(chroma_path), embedding_function=embedding_model
    )
    return vectordb


def load_preprocessed_data() -> Optional[pd.DataFrame]:
    if not PROCESSED_PARQUET.exists():
        console_manager.print_error(f"Processed file not found: {PROCESSED_PARQUET}")
        return None

    df = pd.read_parquet(PROCESSED_PARQUET)
    df["content"] = (
        "Title: "
        + df["title"]
        + ". Abstract: "
        + df["abstract"]
        + ". Categories: "
        + df["categories"].apply(
            lambda x: ", ".join(x) if isinstance(x, list) else str(x)
        )
    )
    return df


def prepare_documents(df: pd.DataFrame) -> list[dict]:
    docs = [
        {
            "id": str(i),
            "content": row["content"],
            "metadata": {
                "id": str(i),
                "title": row["title"],
                "categories": row["categories"],
                "year": int(row["year"]),
            },
        }
        for i, row in df.iterrows()
    ]
    return docs


def add_embeddings_to_chroma(vectordb: Chroma, docs: list[dict]):
    vectordb.add_texts(
        texts=[d["content"] for d in docs],
        metadatas=[d["metadata"] for d in docs],
    )
    console_manager.print_success("Embeddings generated and stored successfully!")


def embed_and_store():
    try:
        with console_manager.status("Generating embeddings...") as status:

            embedding_model = initialize_embedding_model()
            vectordb = initialize_chroma(embedding_model, EMBEDDINGS_DIR)

            df = load_preprocessed_data()
            if df is None:
                return None

            docs = prepare_documents(df)
            add_embeddings_to_chroma(vectordb, docs)
            return vectordb

    except Exception as e:
        console_manager.print_error(f"Embedding generation failed: {e}")
        return None


def extract_embeddings(_vectordb):
    collection = _vectordb._collection
    data = collection.get(include=["metadatas", "documents", "embeddings"])
    embeddings = np.array(data["embeddings"])
    metadata = data["metadatas"]
    return embeddings, metadata


def run_pipeline(force_run: bool = False):
    st.header("Ingestion & Embedding")

    if st.button("Run Ingestion & Embeddings Pipeline"):
        with st.spinner("Running full pipeline..."):
            preprocess_and_save()  # Step 1: Extraction and basic cleaning
            embedding_model = initialize_embedding_model()
            vectordb = initialize_chroma(embedding_model, EMBEDDINGS_DIR)
            collection_data = vectordb._collection.get(include=["embeddings"])
            embeddings = collection_data["embeddings"]

            if embeddings is not None:
                if isinstance(embeddings, np.ndarray):
                    embeddings_exist = embeddings.size > 0
                else:
                    embeddings_exist = len(embeddings) > 0
            else:
                embeddings_exist = False

            if embeddings_exist and not force_run:
                st.warning("Embeddings already exist. Skipping embedding generation.")
                return vectordb
            return embed_and_store()

        st.success("Pipeline finished successfully!")