from __future__ import annotations import os # ---- PyTensor(=PyMCの内部) のコンパイルキャッシュ先を /tmp に固定 ---- # 環境変数で上書き可: PYTENSOR_BASE=/data/.pytensor(永続化したい場合) base = os.environ.get("PYTENSOR_BASE", "/tmp/.pytensor") os.makedirs(base, exist_ok=True) if "PYTENSOR_FLAGS" in os.environ and "base_compiledir=" not in os.environ["PYTENSOR_FLAGS"]: os.environ["PYTENSOR_FLAGS"] = os.environ["PYTENSOR_FLAGS"] + f",base_compiledir={base}" elif "PYTENSOR_FLAGS" not in os.environ: os.environ["PYTENSOR_FLAGS"] = f"base_compiledir={base}" os.environ.setdefault("XDG_CACHE_HOME", "/tmp/.cache") # ---- 以降は従来どおり ---- import numpy as np import pandas as pd import pymc as pm import pytensor.tensor as at from typing import Dict, Any, Optional def _zscore(df: pd.DataFrame, cols): x = df[cols].astype(float) return (x - x.mean()) / (x.std(ddof=0) + 1e-9) def fit_uplift_binary( df: pd.DataFrame, outcome_col: str = "click_bin", medium_col: str = "medium", creative_col: str = "creative", control_flag_col: str = "is_control", feature_cols: Optional[list] = None, draws: int = 1000, target_accept: float = 0.9, random_seed: int = 42, ) -> Dict[str, Any]: d = df.copy().reset_index(drop=True) if outcome_col not in d.columns: d["n"] = d["impressions"].astype(int) d["y"] = d["clicks"].astype(int) binomial = True else: d["y"] = d[outcome_col].astype(int) d["n"] = 1 binomial = False creatives = d[creative_col].astype(str).unique().tolist() control_creatives = d.loc[d[control_flag_col] == 1, creative_col].astype(str).unique().tolist() control_ref = control_creatives[0] if len(control_creatives) else creatives[0] d["creative_idx"] = d[creative_col].astype(str).apply(lambda x: creatives.index(x)).astype(int) d["medium_idx"] = d[medium_col].astype(str).astype('category').cat.codes.values X = None if feature_cols: X = _zscore(d, feature_cols).values p = X.shape[1] else: p = 0 with pm.Model() as model: n_medium = int(np.max(d["medium_idx"])) + 1 mu_re = pm.Normal("mu_re", 0.0, 1.0) sd_re = pm.HalfNormal("sd_re", 1.0) z_re = pm.Normal("z_re", 0.0, 1.0, shape=n_medium) b_medium = pm.Deterministic("b_medium", mu_re + z_re * sd_re) n_creative = len(creatives) b0 = pm.Normal("intercept", 0.0, 1.5) b_cre = pm.Normal("b_cre_raw", 0.0, 1.0, shape=n_creative) ref_idx = creatives.index(control_ref) b_cre_adj = at.set_subtensor(b_cre[ref_idx], 0.0) if p > 0: b_x = pm.Normal("b_x", 0.0, 1.0, shape=p) lin = b0 + b_cre_adj[d["creative_idx"].values] + b_medium[d["medium_idx"].values] + at.dot(X, b_x) else: lin = b0 + b_cre_adj[d["creative_idx"].values] + b_medium[d["medium_idx"].values] p_click = pm.Deterministic("p_click", pm.math.sigmoid(lin)) if binomial: pm.Binomial("y_obs", n=d["n"].values, p=p_click, observed=d["y"].values) else: pm.Bernoulli("y_obs", p=p_click, observed=d["y"].values) idata = pm.sample( draws=draws, tune=draws, chains=2, target_accept=target_accept, random_seed=random_seed, progressbar=False ) post = idata.posterior b0_s = post["intercept"].stack(sample=("chain", "draw")) b_cre_s = post["b_cre_raw"].stack(sample=("chain", "draw")) mu_re = post["mu_re"].stack(sample=("chain", "draw")) def sigmoid(x): return 1 / (1 + np.exp(-x)) results = [] for cr in creatives: idx = creatives.index(cr) lin_t = b0_s + b_cre_s.isel(b_cre_raw_dim_0=idx) + mu_re lin_c = b0_s + b_cre_s.isel(b_cre_raw_dim_0=ref_idx) + mu_re uplift = sigmoid(lin_t) - sigmoid(lin_c) results.append({ "creative": cr, "uplift_mean": float(uplift.mean().item()), "uplift_p_gt0": float((uplift > 0).mean().item()), "control_ref": control_ref, }) return {"control_ref": control_ref, "creatives": creatives, "results": results}