Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import os, csv, json, sqlite3 | |
| from pathlib import Path | |
| from typing import Optional, Dict, Any, List, Tuple | |
| DEFAULT_DIR = "/data/app_data" if os.access("/data", os.W_OK) else "/tmp/app_data" | |
| DB_DIR = Path(os.environ.get("APP_DATA_DIR", DEFAULT_DIR)) | |
| DB_DIR.mkdir(parents=True, exist_ok=True) | |
| DB_PATH = DB_DIR / "data.db" | |
| SCHEMA_SQL = """ | |
| PRAGMA journal_mode=WAL; | |
| CREATE TABLE IF NOT EXISTS campaigns ( | |
| campaign_id TEXT PRIMARY KEY, | |
| brand TEXT, | |
| product TEXT, | |
| target_audience TEXT, | |
| tone TEXT, | |
| language TEXT, | |
| constraints_json TEXT, | |
| value_per_conversion REAL DEFAULT 1.0, | |
| policy TEXT DEFAULT 'thompson', | |
| holdout_ratio REAL DEFAULT 0.0, | |
| stop_min_impressions INTEGER DEFAULT 200, | |
| stop_rel_ev_threshold REAL DEFAULT 0.5, | |
| created_at TEXT DEFAULT (datetime('now')) | |
| ); | |
| CREATE TABLE IF NOT EXISTS variants ( | |
| campaign_id TEXT, | |
| variant_id TEXT, | |
| text TEXT, | |
| status TEXT, | |
| rejection_reason TEXT, | |
| PRIMARY KEY (campaign_id, variant_id) | |
| ); | |
| CREATE TABLE IF NOT EXISTS metrics ( | |
| campaign_id TEXT, | |
| variant_id TEXT, | |
| impressions INTEGER DEFAULT 0, | |
| clicks INTEGER DEFAULT 0, | |
| conversions INTEGER DEFAULT 0, | |
| alpha_click REAL DEFAULT 1.0, | |
| beta_click REAL DEFAULT 1.0, | |
| alpha_conv REAL DEFAULT 1.0, | |
| beta_conv REAL DEFAULT 1.0, | |
| PRIMARY KEY (campaign_id, variant_id) | |
| ); | |
| CREATE TABLE IF NOT EXISTS events ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| campaign_id TEXT, | |
| variant_id TEXT, | |
| event_type TEXT, | |
| ts TEXT, | |
| value REAL | |
| ); | |
| -- LinUCB のパラメータをJSONで保持 | |
| CREATE TABLE IF NOT EXISTS linucb ( | |
| campaign_id TEXT, | |
| variant_id TEXT, | |
| d INTEGER, | |
| A_json TEXT, | |
| b_json TEXT, | |
| n_updates INTEGER DEFAULT 0, | |
| PRIMARY KEY (campaign_id, variant_id) | |
| ); | |
| -- コンプライアンス監査ログ(NG詳細/LLM修正案) | |
| CREATE TABLE IF NOT EXISTS compliance_logs ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| campaign_id TEXT, | |
| variant_id TEXT, | |
| status TEXT, | |
| ng_rules_json TEXT, | |
| llm_ok INTEGER, | |
| llm_reasons_json TEXT, | |
| llm_fixed TEXT, | |
| ts TEXT DEFAULT (datetime('now')) | |
| ); | |
| -- 任意の運用監査ログ | |
| CREATE TABLE IF NOT EXISTS audit_logs ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| campaign_id TEXT, | |
| action TEXT, | |
| payload_json TEXT, | |
| ts TEXT DEFAULT (datetime('now')) | |
| ); | |
| """ | |
| def get_conn(): | |
| conn = sqlite3.connect(DB_PATH) | |
| conn.row_factory = sqlite3.Row | |
| return conn | |
| def _ensure_columns(): | |
| need_cols = { | |
| "campaigns": [ | |
| ("policy", "TEXT", "'thompson'"), | |
| ("holdout_ratio", "REAL", "0.0"), | |
| ("stop_min_impressions", "INTEGER", "200"), | |
| ("stop_rel_ev_threshold", "REAL", "0.5"), | |
| ("created_at", "TEXT", "datetime('now')"), | |
| ] | |
| } | |
| with get_conn() as con: | |
| for table, cols in need_cols.items(): | |
| cur = con.execute(f"PRAGMA table_info({table})") | |
| have = {r["name"] for r in cur.fetchall()} | |
| for name, typ, default in cols: | |
| if name not in have: | |
| con.execute(f"ALTER TABLE {table} ADD COLUMN {name} {typ} DEFAULT ({default})") | |
| def init_db(): | |
| with get_conn() as con: | |
| con.executescript(SCHEMA_SQL) | |
| _ensure_columns() | |
| # ============== Campaign/Variant/Metric 基本 ============== | |
| def upsert_campaign(campaign_id: str, brand: str, product: str, target_audience: str, | |
| tone: str, language: str, constraints: Optional[Dict[str, Any]], | |
| value_per_conversion: float): | |
| with get_conn() as con: | |
| con.execute( | |
| """ | |
| INSERT INTO campaigns (campaign_id, brand, product, target_audience, tone, language, constraints_json, value_per_conversion) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?) | |
| ON CONFLICT(campaign_id) DO UPDATE SET | |
| brand=excluded.brand, | |
| product=excluded.product, | |
| target_audience=excluded.target_audience, | |
| tone=excluded.tone, | |
| language=excluded.language, | |
| constraints_json=excluded.constraints_json, | |
| value_per_conversion=excluded.value_per_conversion | |
| """, | |
| (campaign_id, brand, product, target_audience, tone, language, json.dumps(constraints or {}, ensure_ascii=False), value_per_conversion), | |
| ) | |
| def set_campaign_settings(campaign_id: str, policy: str, holdout_ratio: float, stop_min_impressions: int, stop_rel_ev_threshold: float): | |
| with get_conn() as con: | |
| con.execute( | |
| """ | |
| UPDATE campaigns SET policy=?, holdout_ratio=?, stop_min_impressions=?, stop_rel_ev_threshold=? | |
| WHERE campaign_id=? | |
| """, | |
| (policy, float(holdout_ratio), int(stop_min_impressions), float(stop_rel_ev_threshold), campaign_id) | |
| ) | |
| def get_campaign(campaign_id: str): | |
| with get_conn() as con: | |
| cur = con.execute("SELECT * FROM campaigns WHERE campaign_id=?", (campaign_id,)) | |
| return cur.fetchone() | |
| def insert_variant(campaign_id: str, variant_id: str, text: str, status: str, rejection_reason: Optional[str]): | |
| with get_conn() as con: | |
| con.execute( | |
| """ | |
| INSERT OR REPLACE INTO variants (campaign_id, variant_id, text, status, rejection_reason) | |
| VALUES (?, ?, ?, ?, ?) | |
| """, | |
| (campaign_id, variant_id, text, status, rejection_reason), | |
| ) | |
| con.execute( | |
| "INSERT OR IGNORE INTO metrics (campaign_id, variant_id) VALUES (?, ?)", | |
| (campaign_id, variant_id) | |
| ) | |
| def get_variant(campaign_id: str, variant_id: str): | |
| with get_conn() as con: | |
| cur = con.execute("SELECT * FROM variants WHERE campaign_id=? AND variant_id=?", (campaign_id, variant_id)) | |
| return cur.fetchone() | |
| def get_variants(campaign_id: str) -> List[sqlite3.Row]: | |
| with get_conn() as con: | |
| cur = con.execute("SELECT * FROM variants WHERE campaign_id=?", (campaign_id,)) | |
| return cur.fetchall() | |
| def get_metrics(campaign_id: str) -> List[sqlite3.Row]: | |
| with get_conn() as con: | |
| cur = con.execute("SELECT * FROM metrics WHERE campaign_id=?", (campaign_id,)) | |
| return cur.fetchall() | |
| def update_metric(campaign_id: str, variant_id: str, field: str, inc: float = 1.0): | |
| assert field in {"impressions", "clicks", "conversions", "alpha_click", "beta_click", "alpha_conv", "beta_conv"} | |
| with get_conn() as con: | |
| con.execute(f"UPDATE metrics SET {field} = {field} + ? WHERE campaign_id=? AND variant_id=?", (inc, campaign_id, variant_id)) | |
| def log_event(campaign_id: str, variant_id: str, event_type: str, ts: str, value): | |
| with get_conn() as con: | |
| con.execute( | |
| "INSERT INTO events (campaign_id, variant_id, event_type, ts, value) VALUES (?, ?, ?, ?, ?)", | |
| (campaign_id, variant_id, event_type, ts, value) | |
| ) | |
| def get_campaign_value_per_conversion(campaign_id: str) -> float: | |
| with get_conn() as con: | |
| cur = con.execute("SELECT value_per_conversion FROM campaigns WHERE campaign_id=?", (campaign_id,)) | |
| row = cur.fetchone() | |
| return float(row[0]) if row else 1.0 | |
| # ============== Compliance / Audit ============== | |
| def record_compliance_log(campaign_id: str, variant_id: str, status: str, | |
| ng_rules: List[str], llm_ok: bool, llm_reasons: List[str], llm_fixed: Optional[str]): | |
| with get_conn() as con: | |
| con.execute( | |
| """ | |
| INSERT INTO compliance_logs (campaign_id, variant_id, status, ng_rules_json, llm_ok, llm_reasons_json, llm_fixed) | |
| VALUES (?, ?, ?, ?, ?, ?, ?) | |
| """, | |
| (campaign_id, variant_id, status, json.dumps(ng_rules, ensure_ascii=False), int(llm_ok), | |
| json.dumps(llm_reasons, ensure_ascii=False), llm_fixed) | |
| ) | |
| def audit(campaign_id: str, action: str, payload: Dict[str, Any]): | |
| with get_conn() as con: | |
| con.execute( | |
| "INSERT INTO audit_logs (campaign_id, action, payload_json) VALUES (?, ?, ?)", | |
| (campaign_id, action, json.dumps(payload, ensure_ascii=False)) | |
| ) | |
| # ============== LinUCB state ============== | |
| def get_linucb_state(campaign_id: str, variant_id: str): | |
| with get_conn() as con: | |
| cur = con.execute("SELECT d, A_json, b_json, n_updates FROM linucb WHERE campaign_id=? AND variant_id=?", | |
| (campaign_id, variant_id)) | |
| row = cur.fetchone() | |
| return row | |
| def upsert_linucb_state(campaign_id: str, variant_id: str, d: int, A_json: str, b_json: str, n_updates: int): | |
| with get_conn() as con: | |
| con.execute( | |
| """ | |
| INSERT INTO linucb (campaign_id, variant_id, d, A_json, b_json, n_updates) | |
| VALUES (?, ?, ?, ?, ?, ?) | |
| ON CONFLICT(campaign_id, variant_id) DO UPDATE SET | |
| d=excluded.d, A_json=excluded.A_json, b_json=excluded.b_json, n_updates=excluded.n_updates | |
| """, | |
| (campaign_id, variant_id, d, A_json, b_json, n_updates) | |
| ) | |
| # ============== Export / Reset / Stop rules ============== | |
| def export_csv(campaign_id: str, table: str) -> str: | |
| assert table in {"events", "metrics", "variants", "compliance_logs", "audit_logs"} | |
| out_dir = DB_DIR / "export" | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| out_path = out_dir / f"{campaign_id}_{table}.csv" | |
| with get_conn() as con, open(out_path, "w", newline="", encoding="utf-8") as f: | |
| cur = con.execute(f"SELECT * FROM {table} WHERE campaign_id=? ORDER BY rowid ASC", (campaign_id,)) | |
| rows = cur.fetchall() | |
| if not rows: | |
| f.write("") # 空でもファイルは作る | |
| return str(out_path) | |
| fieldnames = rows[0].keys() | |
| w = csv.DictWriter(f, fieldnames=fieldnames) | |
| w.writeheader() | |
| for r in rows: | |
| w.writerow({k: r[k] for k in fieldnames}) | |
| return str(out_path) | |
| def reset_all(): | |
| # 破壊的操作:全テーブル初期化 | |
| with get_conn() as con: | |
| con.executescript(""" | |
| DROP TABLE IF EXISTS linucb; | |
| DROP TABLE IF EXISTS compliance_logs; | |
| DROP TABLE IF EXISTS audit_logs; | |
| DROP TABLE IF EXISTS events; | |
| DROP TABLE IF EXISTS metrics; | |
| DROP TABLE IF EXISTS variants; | |
| DROP TABLE IF EXISTS campaigns; | |
| """) | |
| init_db() | |
| def evaluate_stop_rules(campaign_id: str) -> List[Tuple[str, str]]: | |
| """ | |
| 撤退基準: | |
| - impressions >= stop_min_impressions | |
| - EV(CTRmean*CVRmean*V) がベストの stop_rel_ev_threshold 倍未満 → pause | |
| 返り値: [(variant_id, reason), ...] (pause されたもの) | |
| """ | |
| cfg = get_campaign(campaign_id) | |
| if not cfg: | |
| return [] | |
| min_imp = int(cfg["stop_min_impressions"] or 200) | |
| thresh = float(cfg["stop_rel_ev_threshold"] or 0.5) | |
| vpc = float(cfg["value_per_conversion"] or 1.0) | |
| mets = get_metrics(campaign_id) | |
| if not mets: | |
| return [] | |
| def ev_of(r): | |
| imp = int(r["impressions"]); clk = int(r["clicks"]); conv = int(r["conversions"]) | |
| ctr = (clk / imp) if imp > 0 else 0.0 | |
| cvr = (conv / max(1, clk)) if clk > 0 else 0.0 | |
| return ctr * cvr * vpc | |
| best_ev = max((ev_of(r) for r in mets), default=0.0) | |
| paused = [] | |
| with get_conn() as con: | |
| for r in mets: | |
| vid = r["variant_id"] | |
| imp = int(r["impressions"]) | |
| if imp < min_imp: | |
| continue | |
| ev = ev_of(r) | |
| if best_ev <= 0.0: | |
| continue | |
| if ev < thresh * best_ev: | |
| con.execute("UPDATE variants SET status=?, rejection_reason=? WHERE campaign_id=? AND variant_id=?", | |
| ("paused", "auto_pause:low_EV", campaign_id, vid)) | |
| paused.append((vid, f"EV {ev:.6f} < {thresh:.2f} * best {best_ev:.6f}")) | |
| return paused | |