Spaces:
Build error
Build error
File size: 4,515 Bytes
257dcc1 91dc605 257dcc1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
import os
import csv
import hashlib
import chromadb
from llama_index.core import Settings
from llama_index.core import Document
from llama_index.core.schema import BaseNode
from llama_index.core.node_parser import TokenTextSplitter
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core import VectorStoreIndex
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI
from config import CHROMA_PATH, CHROMA_COLLECTION, FILES, CHUNK_SIZE, CHUNK_OVERLAP
api_key = os.getenv("OPENAI_API_KEY")
Settings.llm = OpenAI(temperature=0, model="gpt-4o-mini", api_key=api_key)
Settings.embed_model = OpenAIEmbedding(
model="text-embedding-3-small"
)
def deterministic_id_func(i: int, doc: BaseNode) -> str:
"""Deterministic ID function for the text splitter.
This will be used to generate a unique repeatable identifier for each node."""
unique_identifier = doc.id_ + str(i)
hasher = hashlib.sha256()
hasher.update(unique_identifier.encode('utf-8'))
return hasher.hexdigest()
def create_db(return_nodes=False):
rows = []
# Load the file as a JSON
for FILE in FILES:
with open(FILE, mode="r", encoding="utf-8") as file:
csv_reader = csv.reader(file)
for idx, row in enumerate(csv_reader):
if idx == 0: continue # Skip header row
rows.append(row)
# Convert the chunks to Document objects so the LlamaIndex framework can process them.
documents = [Document(text=row[1], metadata={"title": row[0], "url": row[2]}) for row in rows]
# By default, the node/chunks ids are set to random uuids. To ensure same id's per run, we manually set them.
for idx, doc in enumerate(documents):
doc.id_ = f"doc_{idx}"
# Define the splitter object that split the text into segments with 512 tokens,
# with a 128 overlap between the segments.
text_splitter = TokenTextSplitter(
separator=" ", chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP,
id_func=deterministic_id_func
)
chroma_client = chromadb.PersistentClient(path=CHROMA_PATH)
chroma_collection = chroma_client.get_or_create_collection(CHROMA_COLLECTION)
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
# Create the pipeline to apply the transformation (splitting and embedding) on each chunk,
# and store the transformed text in the chroma vector store.
pipeline = IngestionPipeline(
transformations=[
text_splitter,
OpenAIEmbedding(model = 'text-embedding-3-small'),
],
vector_store=vector_store
)
# Run the transformation pipeline.
nodes = pipeline.run(documents=documents, show_progress=True)
db = chromadb.PersistentClient(path=CHROMA_PATH)
chroma_collection = db.get_or_create_collection(CHROMA_COLLECTION)
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
index = VectorStoreIndex.from_vector_store(vector_store)
if return_nodes:
return nodes
else:
return index
def load_db():
chroma_client = chromadb.PersistentClient(CHROMA_PATH)
chroma_collection = chroma_client.get_collection(CHROMA_COLLECTION)
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
index = VectorStoreIndex.from_vector_store(
vector_store=vector_store,
show_progress=True,
use_async=True,
embed_model=Settings.embed_model
)
return index
def load_asset(file):
"""Load CSS from an external file"""
if os.path.exists(file):
with open(file, "r", encoding="utf-8") as f:
return f.read()
def num_tokens_from_messages(messages, model="gpt-4"):
"""Return the number of tokens used by a list of messages."""
try:
encoding = tiktoken.encoding_for_model(model)
except KeyError:
encoding = tiktoken.get_encoding("cl100k_base")
tokens_per_message = 3
tokens_per_name = 1
num_tokens = 0
for message in messages:
num_tokens += tokens_per_message
for key, value in message.items():
num_tokens += len(encoding.encode(value))
if key == "name":
num_tokens += tokens_per_name
num_tokens += 3 # every reply is primed with <|start|>assistant<|message|>
return num_tokens
|