Sage / scripts /pipeline.py
vxa8502's picture
Add docker-compose, multi-stage builds, and developer tooling
d507c32
"""
Data loading and embedding pipeline.
Loads Amazon Reviews, chunks them, generates embeddings, and uploads to
Qdrant for vector search.
Features:
- Caches data locally (parquet) and embeddings (.npy)
- Skips Qdrant upload if collection already populated (use --force to override)
- Always applies 5-core filtering (standard for recommendation systems)
Usage:
python scripts/pipeline.py
python scripts/pipeline.py --force
python scripts/pipeline.py --validate-tokenizer
python scripts/pipeline.py --test-chunking
Run from project root.
"""
import argparse
import numpy as np
from sage.config import (
CHARS_PER_TOKEN,
DEV_SUBSET_SIZE,
DATA_DIR,
EMBEDDING_MODEL,
get_logger,
log_banner,
log_section,
log_kv,
)
from sage.data import (
prepare_data,
get_review_stats,
create_temporal_splits,
verify_temporal_boundaries,
)
from sage.core.chunking import chunk_reviews_batch
from sage.adapters.embeddings import get_embedder
from sage.adapters.vector_store import (
get_client,
create_collection,
upload_chunks,
get_collection_info,
create_payload_indexes,
search,
)
logger = get_logger(__name__)
# ============================================================================
# TOKENIZER VALIDATION (--validate-tokenizer)
# ============================================================================
def run_tokenizer_validation():
"""Validate the chars/token ratio assumption used in chunker.py."""
from transformers import AutoTokenizer
log_banner(logger, "TOKENIZER VALIDATION")
df = prepare_data(subset_size=DEV_SUBSET_SIZE, verbose=False)
sample = df["text"].dropna().sample(500, random_state=42)
logger.info("Loaded reviews and sampled 500", extra={"total": len(df)})
logger.info("Loading E5 tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(EMBEDDING_MODEL)
ratios = []
for text in sample:
if text and text.strip():
tokens = tokenizer.encode(text, add_special_tokens=False)
if len(tokens) > 0:
ratios.append(len(text) / len(tokens))
ratios = np.array(ratios)
log_section(logger, "Results")
log_kv(logger, "Mean chars/token", np.mean(ratios))
log_kv(logger, "Std", np.std(ratios, ddof=1))
log_kv(logger, "Current assumption", 4.0)
status = "VALID" if abs(np.mean(ratios) - 4.0) <= 0.5 else "UPDATE NEEDED"
logger.info("Validation status: %s", status)
# ============================================================================
# CHUNKING QUALITY TEST (--test-chunking)
# ============================================================================
def run_chunking_test():
"""Test chunking quality on long reviews."""
import pandas as pd
from sage.core.chunking import (
chunk_text,
split_sentences,
estimate_tokens,
NO_CHUNK_THRESHOLD,
)
log_banner(logger, "CHUNKING QUALITY TEST", width=70)
df = prepare_data(subset_size=DEV_SUBSET_SIZE, verbose=False)
embedder = get_embedder()
df["tokens"] = df["text"].apply(estimate_tokens)
long_reviews = df[df["tokens"] > NO_CHUNK_THRESHOLD]
logger.info("Reviews needing chunking: %d", len(long_reviews))
sample = long_reviews.sample(min(50, len(long_reviews)), random_state=42)
results = []
for idx, row in enumerate(sample.itertuples(index=False)):
text, tokens, rating = row.text, row.tokens, int(row.rating)
chunks = chunk_text(text, embedder=embedder)
sentences = split_sentences(text)
results.append(
{
"tokens": tokens,
"sentences": len(sentences),
"chunks": len(chunks),
"avg_chunk_tokens": np.mean([estimate_tokens(c) for c in chunks]),
}
)
if idx < 5:
logger.info(
"Review %d [%d*] (%d tok) -> %d chunks",
idx + 1,
rating,
tokens,
len(chunks),
)
results_df = pd.DataFrame(results)
log_section(logger, f"Summary ({len(results_df)} reviews)")
logger.info(
"Chunks per review: %.2f (median: %.0f)",
results_df["chunks"].mean(),
results_df["chunks"].median(),
)
logger.info("Avg tokens/chunk: %.0f", results_df["avg_chunk_tokens"].mean())
expansion = (
results_df["chunks"] * results_df["avg_chunk_tokens"]
).sum() / results_df["tokens"].sum()
logger.info("Expansion ratio: %.2fx", expansion)
# ============================================================================
# MAIN PIPELINE
# ============================================================================
def run_pipeline(subset_size: int, force: bool):
"""Run the full data pipeline: load, chunk, embed, upload."""
logger.info("Config", extra={"subset_size": subset_size, "force": force})
# Load and prepare data
df = prepare_data(subset_size=subset_size, force=force)
logger.info("Prepared dataset", extra={"reviews": len(df)})
# Basic statistics
stats = get_review_stats(df)
log_section(logger, "Dataset Stats")
for key, value in stats.items():
if key == "rating_dist":
continue
if key == "sparsity":
logger.info("%s: %.4f (%.2f%% sparse)", key, value, value * 100)
elif isinstance(value, float):
log_kv(logger, key, value)
else:
log_kv(logger, key, value)
# Create temporal splits for recommendation evaluation
train_df, val_df, test_df = create_temporal_splits(df)
verify_temporal_boundaries(train_df, val_df, test_df)
# Review length analysis
df["text_length"] = df["text"].str.len()
df["estimated_tokens"] = df["text_length"] // CHARS_PER_TOKEN
needs_chunking = (df["estimated_tokens"] > 200).sum()
logger.info(
"Reviews needing chunking (>200 tokens): %d (%.1f%%)",
needs_chunking,
needs_chunking / len(df) * 100,
)
# Prepare reviews for chunking
reviews_for_chunking = df.to_dict("records")
for i, review in enumerate(reviews_for_chunking):
review["review_id"] = f"review_{i}"
review["product_id"] = review.get("parent_asin", review.get("asin", ""))
# Load embedder and Qdrant client
client = get_client()
embedder = get_embedder()
# Chunk reviews with semantic chunking
logger.info("Chunking %d reviews...", len(reviews_for_chunking))
chunks = chunk_reviews_batch(reviews_for_chunking, embedder=embedder)
logger.info(
"Created %d chunks from %d reviews (expansion: %.2fx)",
len(chunks),
len(reviews_for_chunking),
len(chunks) / len(reviews_for_chunking),
)
# Generate embeddings
chunk_texts = [c.text for c in chunks]
cache_path = DATA_DIR / f"embeddings_{len(chunks)}.npy"
logger.info("Embedding %d chunks...", len(chunk_texts))
embeddings = embedder.embed_passages(
chunk_texts, cache_path=cache_path, force=force
)
logger.info("Embeddings shape: %s", embeddings.shape)
# Embedding technical validation
log_section(logger, "Embedding Technical Validation")
logger.info("Shape: %s (expected: (n, 384))", embeddings.shape)
assert embeddings.shape[1] == 384, f"Wrong dimensions: {embeddings.shape[1]}"
nan_count = np.isnan(embeddings).sum()
inf_count = np.isinf(embeddings).sum()
logger.info("NaN values: %d", nan_count)
logger.info("Inf values: %d", inf_count)
assert nan_count == 0 and inf_count == 0, "Found NaN or Inf values"
norms = np.linalg.norm(embeddings, axis=1)
logger.info("L2 norms: mean=%.4f, std=%.6f", norms.mean(), norms.std())
assert np.allclose(norms, 1.0, atol=0.01), "Embeddings not normalized"
logger.info("Value range: [%.3f, %.3f]", embeddings.min(), embeddings.max())
logger.info("Technical validation: PASSED")
# Model smoke test (verifies model is loaded correctly)
# E5 captures topical/domain similarity, NOT sentiment polarity.
# "terrible battery life" scores HIGHER with "great battery life" (0.93) than
# "excellent screen resolution" (0.84) because topic > sentiment for retrieval.
log_section(logger, "Model Smoke Test")
logger.info("(Verifying E5 captures domain/topical similarity)")
test_query = "great battery life"
in_domain_similar = "long lasting charge"
in_domain_different = "excellent screen quality"
out_of_domain = "Shakespeare wrote many plays"
emb_query = embedder.embed_single_query(test_query)
emb_in_sim = embedder.embed_single_query(in_domain_similar)
emb_in_diff = embedder.embed_single_query(in_domain_different)
emb_out = embedder.embed_single_query(out_of_domain)
sim_in_similar = float(np.dot(emb_query, emb_in_sim))
sim_in_different = float(np.dot(emb_query, emb_in_diff))
sim_out = float(np.dot(emb_query, emb_out))
logger.info("Query: '%s'", test_query)
logger.info(
" In-domain (same topic): '%s' = %.3f", in_domain_similar, sim_in_similar
)
logger.info(
" In-domain (diff topic): '%s' = %.3f", in_domain_different, sim_in_different
)
logger.info(" Out-of-domain: '%s' = %.3f", out_of_domain, sim_out)
if sim_in_similar > sim_in_different > sim_out:
logger.info(
"Ranking correct: %.3f > %.3f > %.3f",
sim_in_similar,
sim_in_different,
sim_out,
)
else:
logger.warning("Unexpected ranking")
# Known limitation: Dense retrievers don't handle negation well
# -------------------------------------------------------------------------
# E5 (and most dense retrievers) encode topical similarity, not sentiment.
# A query for "good sound quality" will match "The sound quality is NOT good"
# because the content words overlap - the model largely ignores negation.
#
# MITIGATION: Rating filter (min_rating=4+) removes negative reviews before
# they reach the user. The embedding retrieval finds topically relevant
# content; the metadata filter enforces quality/sentiment alignment.
# -------------------------------------------------------------------------
# Create collection and upload
create_collection(client)
upload_chunks(client, chunks, embeddings)
create_payload_indexes(client)
# Verify upload
info = get_collection_info(client)
log_section(logger, "Collection Info")
for key, value in info.items():
log_kv(logger, key, value)
# Test search
log_section(logger, "Testing Search")
test_queries = [
"great battery life, lasts all day",
"poor quality, broke after a week",
"easy to set up and use",
]
for query in test_queries:
query_embedding = embedder.embed_single_query(query)
results = search(client, query_embedding.tolist(), limit=3)
logger.info("Query: '%s'", query)
for i, r in enumerate(results):
logger.info(" %d. [%.0f*] %s...", i + 1, r["rating"], r["text"][:80])
# Test filtered search (demonstrates negation mitigation)
log_section(logger, "Filtered Search (4+ stars)")
query = "good sound quality"
query_embedding = embedder.embed_single_query(query)
results = search(client, query_embedding.tolist(), limit=5, min_rating=4.0)
logger.info("Query: '%s' (min 4 stars)", query)
for i, r in enumerate(results):
logger.info(" %d. [%.0f*] %s...", i + 1, r["rating"], r["text"][:80])
client.close()
log_banner(logger, "PIPELINE COMPLETE")
def main():
parser = argparse.ArgumentParser(description="Run the data pipeline")
parser.add_argument(
"--force", action="store_true", help="Force recreate collection"
)
parser.add_argument(
"--subset-size",
type=int,
default=DEV_SUBSET_SIZE,
help="Number of reviews to load initially",
)
parser.add_argument(
"--validate-tokenizer",
action="store_true",
help="Run tokenizer validation only",
)
parser.add_argument(
"--test-chunking", action="store_true", help="Run chunking quality test only"
)
args = parser.parse_args()
if args.validate_tokenizer:
run_tokenizer_validation()
elif args.test_chunking:
run_chunking_test()
else:
run_pipeline(subset_size=args.subset_size, force=args.force)
if __name__ == "__main__":
main()