|
|
import numpy as np |
|
|
import faiss |
|
|
import os |
|
|
from typing import List, Union |
|
|
from openai import OpenAI |
|
|
|
|
|
|
|
|
try: |
|
|
from dotenv import load_dotenv |
|
|
load_dotenv() |
|
|
except ImportError: |
|
|
pass |
|
|
|
|
|
|
|
|
openai_client = None |
|
|
|
|
|
def get_openai_client(): |
|
|
"""Get the OpenAI client, loading it if necessary.""" |
|
|
global openai_client |
|
|
if openai_client is None: |
|
|
try: |
|
|
api_key = os.getenv("OPENAI_API_KEY") |
|
|
if not api_key: |
|
|
raise ValueError("OPENAI_API_KEY environment variable not set") |
|
|
openai_client = OpenAI(api_key=api_key) |
|
|
print("β
OpenAI embedding client initialized") |
|
|
except Exception as e: |
|
|
print(f"β Warning: Failed to initialize OpenAI client: {e}") |
|
|
return None |
|
|
return openai_client |
|
|
|
|
|
def get_openai_embeddings(texts: List[str], model: str = "text-embedding-3-small") -> List[List[float]]: |
|
|
"""Get embeddings from OpenAI API for a list of texts.""" |
|
|
client = get_openai_client() |
|
|
if client is None: |
|
|
raise RuntimeError("Failed to initialize OpenAI client") |
|
|
|
|
|
try: |
|
|
|
|
|
cleaned_texts = [] |
|
|
for text in texts: |
|
|
if isinstance(text, str) and text.strip(): |
|
|
|
|
|
cleaned_text = text.strip()[:8000] |
|
|
cleaned_texts.append(cleaned_text) |
|
|
else: |
|
|
cleaned_texts.append("empty") |
|
|
|
|
|
response = client.embeddings.create( |
|
|
input=cleaned_texts, |
|
|
model=model |
|
|
) |
|
|
|
|
|
embeddings = [item.embedding for item in response.data] |
|
|
return embeddings |
|
|
except Exception as e: |
|
|
print(f"β OpenAI API error: {e}") |
|
|
raise RuntimeError(f"Failed to get OpenAI embeddings: {e}") |
|
|
|
|
|
def get_embedding_dimension(model: str = "text-embedding-3-small") -> int: |
|
|
"""Get the embedding dimension for the specified model.""" |
|
|
model_dimensions = { |
|
|
"text-embedding-3-small": 1536, |
|
|
"text-embedding-3-large": 3072, |
|
|
"text-embedding-ada-002": 1536 |
|
|
} |
|
|
return model_dimensions.get(model, 1536) |
|
|
|
|
|
def chunk_text(text, chunk_size=512): |
|
|
tokens = text.split() |
|
|
return [' '.join(tokens[i:i + chunk_size]) for i in range(0, len(tokens), chunk_size)] |
|
|
|
|
|
def build_faiss_index(chunks, batch_size=100, max_chunks=100000, embedding_model="text-embedding-3-small"): |
|
|
"""Build FAISS index using OpenAI embeddings.""" |
|
|
if len(chunks) > max_chunks: |
|
|
chunks = chunks[:max_chunks] |
|
|
print(f"β οΈ Truncated to {max_chunks} chunks for embedding") |
|
|
|
|
|
client = get_openai_client() |
|
|
if client is None: |
|
|
raise RuntimeError("Failed to initialize OpenAI client for embeddings") |
|
|
|
|
|
dim = get_embedding_dimension(embedding_model) |
|
|
index = faiss.IndexFlatL2(dim) |
|
|
|
|
|
print(f"π§ Building FAISS index with OpenAI {embedding_model} (dim={dim})") |
|
|
print(f"π Processing {len(chunks)} chunks in batches of {batch_size}") |
|
|
|
|
|
|
|
|
for start in range(0, len(chunks), batch_size): |
|
|
end = min(start + batch_size, len(chunks)) |
|
|
batch = chunks[start:end] |
|
|
|
|
|
try: |
|
|
|
|
|
embeddings = get_openai_embeddings(batch, embedding_model) |
|
|
vectors = np.array(embeddings, dtype=np.float32) |
|
|
index.add(vectors) |
|
|
|
|
|
if start % (batch_size * 5) == 0: |
|
|
print(f"π Processed {end}/{len(chunks)} chunks...") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Error processing batch {start}-{end}: {e}") |
|
|
|
|
|
continue |
|
|
|
|
|
print(f"β
FAISS index built with {index.ntotal} vectors") |
|
|
return index, chunks |
|
|
|
|
|
|
|
|
def embed_dataframe(df, content_col="content", embedding_model="text-embedding-3-small"): |
|
|
""" |
|
|
Create OpenAI embeddings for dataframe content without saving to disk. |
|
|
Returns index and chunks for in-memory use only. |
|
|
""" |
|
|
|
|
|
df_copy = df.copy() |
|
|
df_copy["chunks"] = df_copy[content_col].apply(lambda x: chunk_text(x) if isinstance(x, str) else []) |
|
|
all_chunks = df_copy["chunks"].explode().dropna().tolist() |
|
|
|
|
|
print(f"π Creating OpenAI embeddings for {len(all_chunks)} chunks...") |
|
|
|
|
|
|
|
|
try: |
|
|
index, chunks = build_faiss_index(all_chunks, embedding_model=embedding_model) |
|
|
return index, chunks |
|
|
except Exception as e: |
|
|
print(f"β Failed to create dataframe embeddings: {e}") |
|
|
return None, [] |
|
|
|
|
|
|
|
|
def retrieve_similar_chunks(query, index, chunks, k=3, embedding_model="text-embedding-3-small"): |
|
|
""" |
|
|
Retrieve similar chunks using OpenAI embeddings for the query. |
|
|
No file loading required - works with in-memory objects. |
|
|
""" |
|
|
if index is None or chunks is None: |
|
|
raise ValueError( |
|
|
"Must provide both index and chunks for similarity search." |
|
|
) |
|
|
|
|
|
client = get_openai_client() |
|
|
if client is None: |
|
|
raise RuntimeError("Failed to initialize OpenAI client for query embedding") |
|
|
|
|
|
try: |
|
|
|
|
|
query_embeddings = get_openai_embeddings([query], embedding_model) |
|
|
query_vector = np.array(query_embeddings[0], dtype=np.float32).reshape(1, -1) |
|
|
|
|
|
|
|
|
distances, indices = index.search(query_vector, k) |
|
|
|
|
|
|
|
|
valid_indices = [i for i in indices[0] if i < len(chunks)] |
|
|
return [chunks[i] for i in valid_indices] |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Error during similarity search: {e}") |
|
|
raise RuntimeError(f"Failed to retrieve similar chunks: {e}") |
|
|
|
|
|
|
|
|
def embed_email_rows(email_df, content_col="content", embedding_model="text-embedding-3-small"): |
|
|
""" |
|
|
Create OpenAI embeddings for email DataFrame with all metadata preserved. |
|
|
Returns index, chunks, and row mapping for in-memory use only. |
|
|
|
|
|
Args: |
|
|
email_df: DataFrame with all email columns |
|
|
content_col: Column name containing email content for embedding |
|
|
embedding_model: OpenAI embedding model to use |
|
|
|
|
|
Returns: |
|
|
tuple: (index, chunks, email_rows) where email_rows maps chunks to rows |
|
|
""" |
|
|
|
|
|
df_copy = email_df.copy() |
|
|
|
|
|
|
|
|
print(f"π§ Processing {len(df_copy)} emails for embedding...") |
|
|
df_copy["chunks"] = df_copy[content_col].apply(lambda x: chunk_text(x) if isinstance(x, str) else []) |
|
|
|
|
|
|
|
|
email_rows = [] |
|
|
all_chunks = [] |
|
|
|
|
|
for idx, row in df_copy.iterrows(): |
|
|
chunks = row["chunks"] |
|
|
for chunk in chunks: |
|
|
if chunk and chunk.strip(): |
|
|
all_chunks.append(chunk) |
|
|
email_rows.append(row.to_dict()) |
|
|
|
|
|
print(f"π Created {len(all_chunks)} chunks from {len(df_copy)} emails") |
|
|
|
|
|
|
|
|
if all_chunks: |
|
|
try: |
|
|
index, _ = build_faiss_index(all_chunks, embedding_model=embedding_model) |
|
|
print(f"β
Successfully created OpenAI embeddings index") |
|
|
return index, all_chunks, email_rows |
|
|
except Exception as e: |
|
|
print(f"β Failed to create embeddings: {e}") |
|
|
return None, [], [] |
|
|
else: |
|
|
print("β οΈ No valid chunks found for embedding") |
|
|
return None, [], [] |
|
|
|