# ruff: noqa: E402 """ Production EDA: Analyze data directly from Qdrant Cloud. Queries the production vector store to generate accurate statistics and visualizations. This ensures EDA reports match deployed data. Usage: python scripts/eda.py make eda Requires: QDRANT_URL and QDRANT_API_KEY environment variables. """ from __future__ import annotations import os import sys from collections import Counter from pathlib import Path from dotenv import load_dotenv load_dotenv() # Validate environment before imports if not os.getenv("QDRANT_URL"): print("ERROR: QDRANT_URL not set. Cannot run production EDA.") print("Set QDRANT_URL and QDRANT_API_KEY in .env or environment.") sys.exit(1) import matplotlib.pyplot as plt import numpy as np from sage.adapters.vector_store import get_client, get_collection_info from sage.config import COLLECTION_NAME, CHARS_PER_TOKEN FIGURES_DIR = Path("assets") FIGURES_DIR.mkdir(parents=True, exist_ok=True) REPORTS_DIR = Path("reports") REPORTS_DIR.mkdir(exist_ok=True) # Plot configuration plt.style.use("seaborn-v0_8-whitegrid") plt.rcParams.update( { "figure.figsize": (10, 5), "figure.dpi": 100, "savefig.dpi": 300, "savefig.bbox": "tight", "savefig.pad_inches": 0.1, "font.size": 11, "axes.titlesize": 12, "axes.labelsize": 11, "figure.autolayout": True, } ) PRIMARY_COLOR = "#05A0D1" SECONDARY_COLOR = "#FF9900" FIGURE_SIZE_WIDE = (12, 5) def scroll_all_payloads(client, batch_size: int = 1000, limit: int | None = None): """ Scroll through all points in the collection and yield payloads. Args: client: Qdrant client. batch_size: Points per scroll request. limit: Optional max points to retrieve (None = all). Yields: Payload dicts from each point. """ offset = None total = 0 while True: results = client.scroll( collection_name=COLLECTION_NAME, limit=batch_size, offset=offset, with_payload=True, with_vectors=False, ) points, next_offset = results if not points: break for point in points: yield point.payload total += 1 if limit and total >= limit: return offset = next_offset if offset is None: break def compute_stats(client, sample_size: int | None = None) -> dict: """ Compute statistics from production Qdrant data. Args: client: Qdrant client. sample_size: Optional limit for faster iteration. Returns: Dict with computed statistics. """ print("Scanning Qdrant collection...") ratings = [] text_lengths = [] timestamps = [] product_ids = set() review_ids = set() chunks_per_review = {} for i, payload in enumerate(scroll_all_payloads(client, limit=sample_size)): if i % 10000 == 0 and i > 0: print(f" Processed {i:,} chunks...") ratings.append(payload.get("rating", 0)) text_lengths.append(len(payload.get("text", ""))) timestamps.append(payload.get("timestamp", 0)) product_ids.add(payload.get("product_id")) review_ids.add(payload.get("review_id")) # Track chunks per review review_id = payload.get("review_id") total_chunks = payload.get("total_chunks", 1) if review_id: chunks_per_review[review_id] = total_chunks print(f" Scanned {len(ratings):,} total chunks") # Compute distributions rating_dist = Counter(ratings) chunk_dist = Counter(chunks_per_review.values()) # Estimate tokens from text length token_lengths = [length // CHARS_PER_TOKEN for length in text_lengths] return { "total_chunks": len(ratings), "unique_reviews": len(review_ids), "unique_products": len(product_ids), "ratings": ratings, "rating_dist": dict(sorted(rating_dist.items())), "text_lengths": text_lengths, "token_lengths": token_lengths, "timestamps": timestamps, "chunks_per_review": list(chunks_per_review.values()), "chunk_dist": dict(sorted(chunk_dist.items())), } def generate_figures(stats: dict) -> None: """Generate EDA figures from computed stats.""" # 1. Rating distribution fig, ax = plt.subplots() rating_counts = stats["rating_dist"] ratings = list(rating_counts.keys()) counts = list(rating_counts.values()) bars = ax.bar(ratings, counts, color=PRIMARY_COLOR, edgecolor="black") ax.set_xlabel("Rating") ax.set_ylabel("Chunk Count") ax.set_title("Rating Distribution (Production Data)") ax.set_xticks(ratings) for bar, count in zip(bars, counts, strict=True): ax.text( bar.get_x() + bar.get_width() / 2, bar.get_height() + max(counts) * 0.01, f"{count:,}", ha="center", va="bottom", fontsize=9, ) plt.savefig(FIGURES_DIR / "rating_distribution.png") plt.close() print(f" Saved: {FIGURES_DIR / 'rating_distribution.png'}") # 2. Chunk text length distribution fig, axes = plt.subplots(1, 2, figsize=FIGURE_SIZE_WIDE) ax1 = axes[0] lengths = np.array(stats["text_lengths"]) ax1.hist(lengths.clip(max=2000), bins=50, color=PRIMARY_COLOR, edgecolor="black") ax1.set_xlabel("Characters") ax1.set_ylabel("Chunk Count") ax1.set_title("Chunk Length Distribution") ax1.axvline( np.median(lengths), color=SECONDARY_COLOR, linestyle="--", label=f"Median: {np.median(lengths):.0f}", ) ax1.legend() ax2 = axes[1] tokens = np.array(stats["token_lengths"]) ax2.hist(tokens.clip(max=500), bins=50, color=SECONDARY_COLOR, edgecolor="black") ax2.set_xlabel("Estimated Tokens") ax2.set_ylabel("Chunk Count") ax2.set_title("Chunk Token Distribution") ax2.axvline( np.median(tokens), color=PRIMARY_COLOR, linestyle="--", label=f"Median: {np.median(tokens):.0f}", ) ax2.legend() plt.savefig(FIGURES_DIR / "chunk_lengths.png") plt.close() print(f" Saved: {FIGURES_DIR / 'chunk_lengths.png'}") # 3. Chunks per review distribution fig, ax = plt.subplots() chunk_counts = stats["chunk_dist"] x = list(chunk_counts.keys()) y = list(chunk_counts.values()) ax.bar(x, y, color=PRIMARY_COLOR, edgecolor="black") ax.set_xlabel("Chunks per Review") ax.set_ylabel("Number of Reviews") ax.set_title("Review Chunking Distribution") plt.savefig(FIGURES_DIR / "chunks_per_review.png") plt.close() print(f" Saved: {FIGURES_DIR / 'chunks_per_review.png'}") # 4. Temporal distribution (if timestamps exist) timestamps = [t for t in stats["timestamps"] if t and t > 0] if timestamps: from datetime import datetime fig, ax = plt.subplots() # Convert to dates and count by month dates = [datetime.fromtimestamp(t / 1000) for t in timestamps] months = [d.strftime("%Y-%m") for d in dates] month_counts = Counter(months) sorted_months = sorted(month_counts.items()) if len(sorted_months) > 24: # Show only last 24 months if too many sorted_months = sorted_months[-24:] x = [m[0] for m in sorted_months] y = [m[1] for m in sorted_months] ax.bar(range(len(x)), y, color=PRIMARY_COLOR) ax.set_xlabel("Month") ax.set_ylabel("Chunk Count") ax.set_title("Temporal Distribution") ax.set_xticks(range(0, len(x), max(1, len(x) // 6))) ax.set_xticklabels( [x[i] for i in range(0, len(x), max(1, len(x) // 6))], rotation=45 ) plt.savefig(FIGURES_DIR / "temporal_distribution.png") plt.close() print(f" Saved: {FIGURES_DIR / 'temporal_distribution.png'}") def generate_report(stats: dict, collection_info: dict) -> None: """Generate markdown EDA report.""" total_chunks = stats["total_chunks"] unique_reviews = stats["unique_reviews"] unique_products = stats["unique_products"] # Rating stats rating_dist = stats["rating_dist"] total_ratings = sum(rating_dist.values()) five_star_pct = ( rating_dist.get(5.0, rating_dist.get(5, 0)) / total_ratings * 100 if total_ratings else 0 ) one_star_pct = ( rating_dist.get(1.0, rating_dist.get(1, 0)) / total_ratings * 100 if total_ratings else 0 ) # Length stats lengths = stats["text_lengths"] tokens = stats["token_lengths"] median_chars = int(np.median(lengths)) if lengths else 0 median_tokens = int(np.median(tokens)) if tokens else 0 mean_chars = int(np.mean(lengths)) if lengths else 0 # Chunk distribution chunk_dist = stats["chunk_dist"] single_chunk_reviews = chunk_dist.get(1, 0) multi_chunk_reviews = unique_reviews - single_chunk_reviews expansion_ratio = total_chunks / unique_reviews if unique_reviews else 0 # Rating breakdown rating_lines = [] for rating in sorted(rating_dist.keys()): count = rating_dist[rating] pct = count / total_ratings * 100 if total_ratings else 0 rating_lines.append(f"| {int(rating)} | {count:,} | {pct:.1f}% |") report_content = f"""# Exploratory Data Analysis: Production Data **Source:** Qdrant Cloud (Collection: `{collection_info.get("name", COLLECTION_NAME)}`) **Status:** {collection_info.get("status", "unknown")} **Generated from live production data** --- ## Dataset Overview This report analyzes the actual data deployed in production, ensuring all statistics match what the recommendation system uses. | Metric | Value | |--------|-------| | Total Chunks | {total_chunks:,} | | Unique Reviews | {unique_reviews:,} | | Unique Products | {unique_products:,} | | Expansion Ratio | {expansion_ratio:.2f}x | --- ## Rating Distribution Amazon reviews exhibit a characteristic J-shaped distribution, heavily skewed toward 5-star ratings. ![Rating Distribution](../assets/rating_distribution.png) | Rating | Count | Percentage | |--------|-------|------------| {chr(10).join(rating_lines)} **Key Observations:** - 5-star ratings: {five_star_pct:.1f}% of chunks - 1-star ratings: {one_star_pct:.1f}% of chunks - This polarization is typical for e-commerce review data --- ## Chunk Length Analysis Chunk lengths affect retrieval quality and context window usage. ![Chunk Lengths](../assets/chunk_lengths.png) **Statistics:** - Median chunk length: {median_chars:,} characters (~{median_tokens} tokens) - Mean chunk length: {mean_chars:,} characters - Most chunks fit comfortably within embedding model context --- ## Chunking Distribution Reviews are chunked based on length: short reviews stay whole, longer reviews are split semantically. ![Chunks per Review](../assets/chunks_per_review.png) | Metric | Value | |--------|-------| | Single-chunk reviews | {single_chunk_reviews:,} | | Multi-chunk reviews | {multi_chunk_reviews:,} | | Expansion ratio | {expansion_ratio:.2f}x | **Chunking Strategy:** - Reviews < 200 tokens: No chunking (embedded whole) - Reviews 200-500 tokens: Semantic chunking - Reviews > 500 tokens: Semantic + sliding window --- ## Temporal Distribution Review timestamps enable chronological analysis and temporal evaluation splits. ![Temporal Distribution](../assets/temporal_distribution.png) --- ## Data Quality The production dataset has been through 5-core filtering (users and items with 5+ interactions) and quality checks: - All chunks have valid text content - All ratings are in [1, 5] range - All product identifiers present - Deterministic chunk IDs (MD5 hash of review_id + chunk_index) --- ## Summary This production EDA confirms the deployed data characteristics: 1. **Scale:** {total_chunks:,} chunks across {unique_products:,} products 2. **Quality:** 5-core filtered, validated payloads 3. **Distribution:** J-shaped ratings, typical e-commerce pattern 4. **Chunking:** {expansion_ratio:.2f}x expansion from reviews to chunks The data matches what the recommendation API queries in real-time. --- *Report generated from Qdrant Cloud. Run `make eda` to regenerate.* """ report_path = REPORTS_DIR / "eda_report.md" report_path.write_text(report_content) print(f" Report: {report_path}") def main(): print("=" * 60) print("PRODUCTION EDA: Querying Qdrant Cloud") print("=" * 60) client = get_client() # Get collection info try: info = get_collection_info(client) print(f"\nCollection: {info['name']}") print(f"Points: {info['points_count']:,}") print(f"Status: {info['status']}") except Exception as e: print(f"ERROR: Cannot access collection: {e}") print("Ensure QDRANT_URL and QDRANT_API_KEY are correct.") sys.exit(1) # Compute stats print("\n--- Computing Statistics ---") stats = compute_stats(client) # Generate figures print("\n--- Generating Figures ---") generate_figures(stats) # Generate report print("\n--- Generating Report ---") generate_report(stats, info) print("\n" + "=" * 60) print("EDA COMPLETE") print("=" * 60) print(f"Figures: {FIGURES_DIR}/") print(f"Report: {REPORTS_DIR / 'eda_report.md'}") client.close() if __name__ == "__main__": main()