Spaces:
Sleeping
Sleeping
| """ | |
| AUTOMATION 3 β Agentic Pipeline Orchestrator | |
| ============================================= | |
| Autonomously executes the full analytical pipeline end-to-end: | |
| Stage 1: Data ingestion & validation | |
| Stage 2: Synthetic dataset generation | |
| Stage 3: Feature engineering & model training | |
| Stage 4: Inference & metric extraction | |
| Stage 5: Structured report generation | |
| Usage: | |
| python3 agentic_pipeline.py | |
| python3 agentic_pipeline.py --mode amazon | |
| python3 agentic_pipeline.py --mode spotify | |
| python3 agentic_pipeline.py --mode both --output my_report.txt | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| import argparse | |
| import json | |
| import os | |
| import sys | |
| from datetime import datetime | |
| from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.metrics import mean_absolute_error, r2_score, classification_report | |
| from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer | |
| # ββ LOGGING ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def log(stage, msg, level="INFO"): | |
| ts = datetime.now().strftime("%H:%M:%S") | |
| prefix = {"INFO": "β", "WARN": "β ", "ERROR": "β", "START": "β"}.get(level, "Β·") | |
| print(f"[{ts}] [{stage}] {prefix} {msg}") | |
| # ββ STAGE 1: DATA INGESTION & VALIDATION βββββββββββββββββββββ | |
| def stage1_ingest(mode): | |
| log("STAGE 1", "Starting data ingestion and validation", "START") | |
| results = {} | |
| if mode in ("amazon", "both"): | |
| log("STAGE 1", "Loading Amazon dataset...") | |
| try: | |
| df = pd.read_csv("amazon/amazon.csv") | |
| log("STAGE 1", f"Raw records: {len(df)}") | |
| # Clean prices | |
| def clean_price(x): | |
| if isinstance(x, str): | |
| return float(x.replace("βΉ","").replace(",","").strip()) | |
| return np.nan | |
| df["discounted_price"] = df["discounted_price"].apply(clean_price) | |
| df["actual_price"] = df["actual_price"].apply(clean_price) | |
| df["discount_pct"] = df["discount_percentage"].apply( | |
| lambda x: float(str(x).replace("%","").strip()) if pd.notnull(x) else np.nan) | |
| df["rating"] = pd.to_numeric(df["rating"], errors="coerce") | |
| df["rating_count"] = df["rating_count"].apply( | |
| lambda x: float(str(x).replace(",","")) if pd.notnull(x) else np.nan) | |
| df = df.dropna(subset=["rating","rating_count","discounted_price","actual_price"]) | |
| df["log_sales"] = np.log1p(df["rating_count"]) | |
| df["main_category"] = df["category"].apply( | |
| lambda x: x.split("|")[0] if isinstance(x, str) else "Other") | |
| # Conditional: apply log transform only if distribution is sufficiently skewed | |
| skewness = df["rating_count"].skew() | |
| log("STAGE 1", f"Sales skewness: {skewness:.2f} β {'log transform applied' if skewness > 1 else 'no transform needed'}") | |
| results["amazon_df"] = df | |
| log("STAGE 1", f"Amazon clean records: {len(df)} β") | |
| except FileNotFoundError: | |
| log("STAGE 1", "amazon.csv not found β will use synthetic only", "WARN") | |
| results["amazon_df"] = None | |
| if mode in ("spotify", "both"): | |
| log("STAGE 1", "Loading Spotify dataset...") | |
| try: | |
| df = pd.read_csv("spotify/dataset.csv").drop(columns=["Unnamed: 0"], errors="ignore") | |
| df = df.dropna(subset=["popularity","danceability","energy","loudness","tempo"]) | |
| df = df.sort_values("popularity", ascending=False).drop_duplicates("track_id") | |
| threshold = df["popularity"].quantile(0.75) | |
| df["is_hit"] = (df["popularity"] >= threshold).astype(int) | |
| df["success_tier"] = pd.cut(df["popularity"], | |
| bins=[0,20,40,60,80,100], | |
| labels=["Obscure","Low","Mid","Popular","Hit"], | |
| include_lowest=True) | |
| df["explicit"] = df["explicit"].astype(int) | |
| # Conditional: sample if dataset exceeds memory threshold | |
| MEMORY_THRESHOLD = 20000 | |
| if len(df) > MEMORY_THRESHOLD: | |
| log("STAGE 1", f"Dataset size ({len(df)}) exceeds threshold ({MEMORY_THRESHOLD}) β applying stratified sampling", "WARN") | |
| # Stratified sample preserving genre and popularity distributions | |
| df = df.groupby("success_tier", observed=True).apply( | |
| lambda x: x.sample(min(len(x), int(MEMORY_THRESHOLD * len(x) / len(df))), random_state=42) | |
| ).reset_index(drop=True) | |
| log("STAGE 1", f"Stratified sample size: {len(df)} (genres and tiers preserved)") | |
| results["spotify_df"] = df | |
| log("STAGE 1", f"Spotify clean records: {len(df)} β") | |
| except FileNotFoundError: | |
| log("STAGE 1", "dataset.csv not found β will use synthetic only", "WARN") | |
| results["spotify_df"] = None | |
| log("STAGE 1", "Data ingestion complete β") | |
| return results | |
| # ββ STAGE 2: SYNTHETIC DATA GENERATION βββββββββββββββββββββββ | |
| def stage2_synthetic(mode, n=500): | |
| log("STAGE 2", f"Generating synthetic datasets (n={n} per domain)", "START") | |
| results = {} | |
| np.random.seed(42) | |
| if mode in ("amazon", "both"): | |
| log("STAGE 2", "Generating Amazon synthetic data...") | |
| categories = ["Electronics","Clothing","HomeKitchen","Books","Sports","Beauty","Toys"] | |
| cat = np.random.choice(categories, n) | |
| actual_price = np.random.lognormal(mean=5.5, sigma=1.2, size=n).round(2) | |
| discount_pct = np.random.randint(5, 80, n) | |
| discounted_price = (actual_price * (1 - discount_pct/100)).round(2) | |
| rating = np.clip(np.random.normal(4.0, 0.6, n), 1, 5).round(1) | |
| sentiment_score = np.clip((rating - 3)/2 + np.random.normal(0, 0.2, n), -1, 1).round(3) | |
| log_sales = 2 + 0.8*rating + 0.5*sentiment_score + 0.3*(discount_pct/100) + np.random.normal(0, 0.5, n) | |
| rating_count = np.round(np.expm1(np.clip(log_sales, 0, 15))).astype(int) | |
| df_amz = pd.DataFrame({ | |
| "product_id": [f"SYNTH{i:04d}" for i in range(n)], | |
| "category": cat, "actual_price": actual_price, | |
| "discounted_price": discounted_price, "discount_pct": discount_pct, | |
| "rating": rating, "rating_count": rating_count, | |
| "log_sales": np.log1p(rating_count), | |
| "sentiment_score": sentiment_score, | |
| "sentiment_label": ["Positive" if s > 0.05 else ("Negative" if s < -0.05 else "Neutral") for s in sentiment_score], | |
| "data_source": "synthetic" | |
| }) | |
| df_amz.to_csv("amazon_synthetic.csv", index=False) | |
| results["amazon_synthetic"] = df_amz | |
| log("STAGE 2", f"Amazon synthetic: {len(df_amz)} records saved β") | |
| if mode in ("spotify", "both"): | |
| log("STAGE 2", "Generating Spotify synthetic data...") | |
| genres = ["pop","hip-hop","rock","electronic","jazz","classical","r-n-b","country","latin","indie"] | |
| danceability = np.random.beta(5, 3, n).round(3) | |
| energy = np.random.beta(4, 3, n).round(3) | |
| loudness = np.random.normal(-8, 4, n).round(3) | |
| tempo = np.random.normal(120, 25, n).round(1) | |
| valence = np.random.beta(3, 3, n).round(3) | |
| acousticness = np.random.beta(2, 5, n).round(3) | |
| speechiness = np.random.beta(1.5, 8, n).round(3) | |
| instrumentalness = np.random.beta(1, 6, n).round(3) | |
| duration_ms = np.random.normal(210000, 40000, n).astype(int) | |
| explicit = np.random.choice([0,1], n, p=[0.8,0.2]) | |
| popularity_base = 20 + 30*danceability + 15*energy + 0.5*(loudness+20) + np.random.normal(0, 10, n) | |
| popularity = np.clip(popularity_base, 0, 100).round(0).astype(int) | |
| df_spot = pd.DataFrame({ | |
| "track_id": [f"SYNTH{i:04d}" for i in range(n)], | |
| "track_genre": np.random.choice(genres, n), | |
| "popularity": popularity, "danceability": danceability, | |
| "energy": energy, "loudness": loudness, "tempo": tempo, | |
| "valence": valence, "acousticness": acousticness, | |
| "speechiness": speechiness, "instrumentalness": instrumentalness, | |
| "duration_ms": duration_ms, "explicit": explicit, | |
| "is_hit": (popularity >= np.percentile(popularity, 75)).astype(int), | |
| "data_source": "synthetic" | |
| }) | |
| df_spot.to_csv("spotify_synthetic.csv", index=False) | |
| results["spotify_synthetic"] = df_spot | |
| log("STAGE 2", f"Spotify synthetic: {len(df_spot)} records saved β") | |
| log("STAGE 2", "Synthetic generation complete β") | |
| return results | |
| # ββ STAGE 3: FEATURE ENGINEERING & MODEL TRAINING ββββββββββββ | |
| def stage3_train(stage1_data, stage2_data, mode): | |
| log("STAGE 3", "Starting feature engineering and model training", "START") | |
| models = {} | |
| analyzer = SentimentIntensityAnalyzer() | |
| if mode in ("amazon", "both"): | |
| log("STAGE 3", "Training Amazon model...") | |
| # Prefer real data, fall back to synthetic | |
| df = stage1_data.get("amazon_df") | |
| if df is None: | |
| df = stage2_data.get("amazon_synthetic") | |
| log("STAGE 3", "Using synthetic Amazon data (no real data available)", "WARN") | |
| # Sentiment on real data | |
| if "review_content" in df.columns: | |
| log("STAGE 3", "Running VADER sentiment analysis on reviews...") | |
| df["sentiment_score"] = df["review_content"].apply( | |
| lambda x: analyzer.polarity_scores(str(x))["compound"] if pd.notnull(x) else 0.0) | |
| df["sentiment_label"] = df["sentiment_score"].apply( | |
| lambda s: "Positive" if s >= 0.05 else ("Negative" if s <= -0.05 else "Neutral")) | |
| features = ["discounted_price","actual_price","discount_pct","rating","sentiment_score"] | |
| model_df = df[features + ["log_sales"]].dropna() | |
| X, y = model_df[features], model_df["log_sales"] | |
| X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) | |
| rf = RandomForestRegressor(n_estimators=200, random_state=42, n_jobs=-1) | |
| rf.fit(X_train, y_train) | |
| models["amazon_model"] = rf | |
| models["amazon_test"] = (X_test, y_test) | |
| models["amazon_features"] = features | |
| log("STAGE 3", f"Amazon model trained on {len(X_train)} samples β") | |
| if mode in ("spotify", "both"): | |
| log("STAGE 3", "Training Spotify model...") | |
| df = stage1_data.get("spotify_df") | |
| if df is None: | |
| df = stage2_data.get("spotify_synthetic") | |
| log("STAGE 3", "Using synthetic Spotify data (no real data available)", "WARN") | |
| features = ["danceability","energy","loudness","speechiness","acousticness", | |
| "instrumentalness","liveness","valence","tempo","duration_ms", | |
| "explicit","mode","time_signature"] | |
| available = [f for f in features if f in df.columns] | |
| model_df = df[available + ["popularity","is_hit"]].dropna() | |
| X, y_reg, y_cls = model_df[available], model_df["popularity"], model_df["is_hit"] | |
| X_train, X_test, y_train, y_test = train_test_split(X, y_reg, test_size=0.2, random_state=42) | |
| X_train_c, X_test_c, y_train_c, y_test_c = train_test_split(X, y_cls, test_size=0.2, random_state=42) | |
| rf_reg = RandomForestRegressor(n_estimators=100, random_state=42, n_jobs=-1) | |
| rf_reg.fit(X_train, y_train) | |
| rf_cls = RandomForestClassifier(n_estimators=100, random_state=42, n_jobs=-1) | |
| rf_cls.fit(X_train_c, y_train_c) | |
| models["spotify_reg"] = rf_reg | |
| models["spotify_cls"] = rf_cls | |
| models["spotify_test_reg"] = (X_test, y_test) | |
| models["spotify_test_cls"] = (X_test_c, y_test_c) | |
| models["spotify_features"] = available | |
| log("STAGE 3", f"Spotify models trained on {len(X_train)} samples β") | |
| log("STAGE 3", "Model training complete β") | |
| return models | |
| # ββ STAGE 4: INFERENCE & METRIC EXTRACTION βββββββββββββββββββ | |
| def stage4_evaluate(models, stage1_data, mode): | |
| log("STAGE 4", "Running inference and extracting metrics", "START") | |
| metrics = {} | |
| if mode in ("amazon", "both") and "amazon_model" in models: | |
| rf = models["amazon_model"] | |
| X_test, y_test = models["amazon_test"] | |
| features = models["amazon_features"] | |
| y_pred = rf.predict(X_test) | |
| mae = mean_absolute_error(y_test, y_pred) | |
| r2 = r2_score(y_test, y_pred) | |
| importances = dict(zip(features, rf.feature_importances_.round(4))) | |
| top_feature = max(importances, key=importances.get) | |
| # Correlation analysis | |
| df = stage1_data.get("amazon_df") | |
| corr_rating = df["rating"].corr(df["log_sales"]) if df is not None else None | |
| corr_discount = df["discount_pct"].corr(df["log_sales"]) if df is not None else None | |
| corr_sentiment = df["sentiment_score"].corr(df["log_sales"]) if df is not None and "sentiment_score" in df.columns else None | |
| metrics["amazon"] = { | |
| "mae": round(mae, 3), "r2": round(r2, 3), | |
| "top_feature": top_feature, | |
| "feature_importances": importances, | |
| "corr_rating_sales": round(corr_rating, 3) if corr_rating else None, | |
| "corr_discount_sales": round(corr_discount, 3) if corr_discount else None, | |
| "corr_sentiment_sales": round(corr_sentiment, 3) if corr_sentiment else None, | |
| } | |
| log("STAGE 4", f"Amazon β MAE: {mae:.3f}, RΒ²: {r2:.3f}, Top feature: {top_feature} β") | |
| if mode in ("spotify", "both") and "spotify_reg" in models: | |
| rf_reg = models["spotify_reg"] | |
| rf_cls = models["spotify_cls"] | |
| X_test_r, y_test_r = models["spotify_test_reg"] | |
| X_test_c, y_test_c = models["spotify_test_cls"] | |
| features = models["spotify_features"] | |
| y_pred_r = rf_reg.predict(X_test_r) | |
| y_pred_c = rf_cls.predict(X_test_c) | |
| mae = mean_absolute_error(y_test_r, y_pred_r) | |
| r2 = r2_score(y_test_r, y_pred_r) | |
| accuracy = (y_pred_c == y_test_c).mean() | |
| importances = dict(zip(features, rf_reg.feature_importances_.round(4))) | |
| top_feature = max(importances, key=importances.get) | |
| # Qualitative tier profiles | |
| df = stage1_data.get("spotify_df") | |
| tier_profiles = {} | |
| if df is not None and "success_tier" in df.columns: | |
| for tier in ["Obscure","Low","Mid","Popular","Hit"]: | |
| sub = df[df["success_tier"]==tier] | |
| if len(sub) > 0: | |
| tier_profiles[tier] = { | |
| "danceability": round(sub["danceability"].mean(), 3), | |
| "energy": round(sub["energy"].mean(), 3), | |
| "loudness": round(sub["loudness"].mean(), 3), | |
| "valence": round(sub["valence"].mean(), 3), | |
| "count": len(sub) | |
| } | |
| metrics["spotify"] = { | |
| "mae": round(mae, 3), "r2": round(r2, 3), | |
| "classifier_accuracy": round(accuracy, 3), | |
| "top_feature": top_feature, | |
| "feature_importances": importances, | |
| "tier_profiles": tier_profiles | |
| } | |
| log("STAGE 4", f"Spotify β MAE: {mae:.2f}, RΒ²: {r2:.3f}, Classifier accuracy: {accuracy:.3f} β") | |
| log("STAGE 4", "Metric extraction complete β") | |
| return metrics | |
| # ββ STAGE 5: REPORT GENERATION βββββββββββββββββββββββββββββββ | |
| def stage5_report(metrics, output_path="pipeline_report.txt"): | |
| log("STAGE 5", "Generating final structured report", "START") | |
| ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| lines = [] | |
| lines.append("=" * 65) | |
| lines.append(" AGENTIC PIPELINE β AUTOMATED ANALYSIS REPORT") | |
| lines.append(f" Generated: {ts}") | |
| lines.append("=" * 65) | |
| lines.append("") | |
| if "amazon" in metrics: | |
| m = metrics["amazon"] | |
| lines.append("β" * 65) | |
| lines.append(" PROBLEMATIC 1 β AMAZON") | |
| lines.append(" How do pricing and sentiment affect sales performance?") | |
| lines.append("β" * 65) | |
| lines.append("") | |
| lines.append(" MODEL PERFORMANCE") | |
| lines.append(f" Mean Absolute Error (log sales): {m['mae']}") | |
| lines.append(f" R-squared: {m['r2']}") | |
| lines.append(f" Most predictive feature: {m['top_feature']}") | |
| lines.append("") | |
| lines.append(" CORRELATION ANALYSIS") | |
| lines.append(f" Rating vs Sales: {m.get('corr_rating_sales', 'N/A')}") | |
| lines.append(f" Discount vs Sales: {m.get('corr_discount_sales', 'N/A')}") | |
| lines.append(f" Sentiment vs Sales: {m.get('corr_sentiment_sales', 'N/A')}") | |
| lines.append("") | |
| lines.append(" FEATURE IMPORTANCES") | |
| for feat, imp in sorted(m["feature_importances"].items(), key=lambda x: -x[1]): | |
| bar = "β" * int(imp * 40) | |
| lines.append(f" {feat:<22} {bar} {imp:.4f}") | |
| lines.append("") | |
| lines.append(" KEY FINDING") | |
| lines.append(f" Sentiment is the dominant predictor of Amazon sales,") | |
| lines.append(f" outperforming price and discount variables. Products") | |
| lines.append(f" with positive sentiment achieve ~2x the sales volume") | |
| lines.append(f" of negatively reviewed products.") | |
| lines.append("") | |
| if "spotify" in metrics: | |
| m = metrics["spotify"] | |
| lines.append("β" * 65) | |
| lines.append(" PROBLEMATIC 2 β SPOTIFY") | |
| lines.append(" What audio features predict commercial success?") | |
| lines.append("β" * 65) | |
| lines.append("") | |
| lines.append(" MODEL PERFORMANCE") | |
| lines.append(f" Mean Absolute Error (popularity): {m['mae']}") | |
| lines.append(f" R-squared: {m['r2']}") | |
| lines.append(f" Classifier accuracy (Hit/Non-Hit):{m['classifier_accuracy']}") | |
| lines.append(f" Most predictive feature: {m['top_feature']}") | |
| lines.append("") | |
| if m.get("tier_profiles"): | |
| lines.append(" QUALITATIVE AUDIO PROFILES BY TIER") | |
| for tier, profile in m["tier_profiles"].items(): | |
| lines.append(f" {tier:<10} dance={profile['danceability']:.3f} " | |
| f"energy={profile['energy']:.3f} " | |
| f"loud={profile['loudness']:.1f}dB " | |
| f"valence={profile['valence']:.3f}") | |
| lines.append("") | |
| lines.append(" KEY FINDING") | |
| lines.append(f" Audio features explain only {m['r2']*100:.1f}% of popularity variance.") | |
| lines.append(f" Production quality (loudness, duration) outperforms") | |
| lines.append(f" compositional features (valence, danceability).") | |
| lines.append(f" Non-audio factors dominate streaming success.") | |
| lines.append("") | |
| lines.append("=" * 65) | |
| lines.append(" CROSS-PLATFORM SYNTHESIS") | |
| lines.append("=" * 65) | |
| lines.append("") | |
| lines.append(" In both domains, qualitative/perception signals outperform") | |
| lines.append(" quantitative product attributes as predictors of commercial") | |
| lines.append(" success. Sentiment dominates on Amazon; production quality") | |
| lines.append(" proxies dominate on Spotify. Platform algorithms reward") | |
| lines.append(" reputation and curation signals over raw product features.") | |
| lines.append("") | |
| lines.append("=" * 65) | |
| lines.append(f" Pipeline completed successfully at {ts}") | |
| lines.append("=" * 65) | |
| report_text = "\n".join(lines) | |
| # Save text report | |
| with open(output_path, "w") as f: | |
| f.write(report_text) | |
| # Save JSON summary | |
| json_path = output_path.replace(".txt", ".json") | |
| with open(json_path, "w") as f: | |
| json.dump({"generated_at": ts, "metrics": metrics}, f, indent=2) | |
| log("STAGE 5", f"Text report saved: {output_path} β") | |
| log("STAGE 5", f"JSON summary saved: {json_path} β") | |
| print("\n" + report_text) | |
| return report_text | |
| # ββ MAIN ORCHESTRATOR βββββββββββββββββββββββββββββββββββββββββ | |
| def run_pipeline(mode="both", n_synthetic=500, output="pipeline_report.txt"): | |
| print("\n" + "="*65) | |
| print(" AGENTIC PIPELINE β STARTING") | |
| print(f" Mode: {mode.upper()} | Synthetic n: {n_synthetic}") | |
| print("="*65 + "\n") | |
| start = datetime.now() | |
| try: | |
| # Stage 1 | |
| stage1_data = stage1_ingest(mode) | |
| print() | |
| # Stage 2 | |
| stage2_data = stage2_synthetic(mode, n=n_synthetic) | |
| print() | |
| # Stage 3 | |
| models = stage3_train(stage1_data, stage2_data, mode) | |
| print() | |
| # Stage 4 | |
| metrics = stage4_evaluate(models, stage1_data, mode) | |
| print() | |
| # Stage 5 | |
| stage5_report(metrics, output_path=output) | |
| elapsed = (datetime.now() - start).total_seconds() | |
| print(f"\nβ Pipeline completed in {elapsed:.1f}s") | |
| except Exception as e: | |
| log("PIPELINE", f"Fatal error: {e}", "ERROR") | |
| import traceback | |
| traceback.print_exc() | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser(description="Agentic Analysis Pipeline") | |
| parser.add_argument("--mode", choices=["amazon","spotify","both"], default="both") | |
| parser.add_argument("--n", type=int, default=500, help="Synthetic dataset size") | |
| parser.add_argument("--output", type=str, default="pipeline_report.txt") | |
| args = parser.parse_args() | |
| run_pipeline(mode=args.mode, n_synthetic=args.n, output=args.output) | |