Spaces:
Sleeping
Sleeping
File size: 11,839 Bytes
6a62501 d5be10a 6a62501 d5be10a 6a62501 d5be10a 6a62501 d5be10a 6a62501 d5be10a 6a62501 d5be10a 6a62501 d5be10a 6a62501 d5be10a 6a62501 d5be10a 6a62501 d5be10a 6a62501 d5be10a 6a62501 d5be10a 6a62501 d5be10a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 |
from __future__ import annotations
import os, csv, json, sqlite3
from pathlib import Path
from typing import Optional, Dict, Any, List, Tuple
DEFAULT_DIR = "/data/app_data" if os.access("/data", os.W_OK) else "/tmp/app_data"
DB_DIR = Path(os.environ.get("APP_DATA_DIR", DEFAULT_DIR))
DB_DIR.mkdir(parents=True, exist_ok=True)
DB_PATH = DB_DIR / "data.db"
SCHEMA_SQL = """
PRAGMA journal_mode=WAL;
CREATE TABLE IF NOT EXISTS campaigns (
campaign_id TEXT PRIMARY KEY,
brand TEXT,
product TEXT,
target_audience TEXT,
tone TEXT,
language TEXT,
constraints_json TEXT,
value_per_conversion REAL DEFAULT 1.0,
policy TEXT DEFAULT 'thompson',
holdout_ratio REAL DEFAULT 0.0,
stop_min_impressions INTEGER DEFAULT 200,
stop_rel_ev_threshold REAL DEFAULT 0.5,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS variants (
campaign_id TEXT,
variant_id TEXT,
text TEXT,
status TEXT,
rejection_reason TEXT,
PRIMARY KEY (campaign_id, variant_id)
);
CREATE TABLE IF NOT EXISTS metrics (
campaign_id TEXT,
variant_id TEXT,
impressions INTEGER DEFAULT 0,
clicks INTEGER DEFAULT 0,
conversions INTEGER DEFAULT 0,
alpha_click REAL DEFAULT 1.0,
beta_click REAL DEFAULT 1.0,
alpha_conv REAL DEFAULT 1.0,
beta_conv REAL DEFAULT 1.0,
PRIMARY KEY (campaign_id, variant_id)
);
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
campaign_id TEXT,
variant_id TEXT,
event_type TEXT,
ts TEXT,
value REAL
);
-- LinUCB のパラメータをJSONで保持
CREATE TABLE IF NOT EXISTS linucb (
campaign_id TEXT,
variant_id TEXT,
d INTEGER,
A_json TEXT,
b_json TEXT,
n_updates INTEGER DEFAULT 0,
PRIMARY KEY (campaign_id, variant_id)
);
-- コンプライアンス監査ログ(NG詳細/LLM修正案)
CREATE TABLE IF NOT EXISTS compliance_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
campaign_id TEXT,
variant_id TEXT,
status TEXT,
ng_rules_json TEXT,
llm_ok INTEGER,
llm_reasons_json TEXT,
llm_fixed TEXT,
ts TEXT DEFAULT (datetime('now'))
);
-- 任意の運用監査ログ
CREATE TABLE IF NOT EXISTS audit_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
campaign_id TEXT,
action TEXT,
payload_json TEXT,
ts TEXT DEFAULT (datetime('now'))
);
"""
def get_conn():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def _ensure_columns():
need_cols = {
"campaigns": [
("policy", "TEXT", "'thompson'"),
("holdout_ratio", "REAL", "0.0"),
("stop_min_impressions", "INTEGER", "200"),
("stop_rel_ev_threshold", "REAL", "0.5"),
("created_at", "TEXT", "datetime('now')"),
]
}
with get_conn() as con:
for table, cols in need_cols.items():
cur = con.execute(f"PRAGMA table_info({table})")
have = {r["name"] for r in cur.fetchall()}
for name, typ, default in cols:
if name not in have:
con.execute(f"ALTER TABLE {table} ADD COLUMN {name} {typ} DEFAULT ({default})")
def init_db():
with get_conn() as con:
con.executescript(SCHEMA_SQL)
_ensure_columns()
# ============== Campaign/Variant/Metric 基本 ==============
def upsert_campaign(campaign_id: str, brand: str, product: str, target_audience: str,
tone: str, language: str, constraints: Optional[Dict[str, Any]],
value_per_conversion: float):
with get_conn() as con:
con.execute(
"""
INSERT INTO campaigns (campaign_id, brand, product, target_audience, tone, language, constraints_json, value_per_conversion)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(campaign_id) DO UPDATE SET
brand=excluded.brand,
product=excluded.product,
target_audience=excluded.target_audience,
tone=excluded.tone,
language=excluded.language,
constraints_json=excluded.constraints_json,
value_per_conversion=excluded.value_per_conversion
""",
(campaign_id, brand, product, target_audience, tone, language, json.dumps(constraints or {}, ensure_ascii=False), value_per_conversion),
)
def set_campaign_settings(campaign_id: str, policy: str, holdout_ratio: float, stop_min_impressions: int, stop_rel_ev_threshold: float):
with get_conn() as con:
con.execute(
"""
UPDATE campaigns SET policy=?, holdout_ratio=?, stop_min_impressions=?, stop_rel_ev_threshold=?
WHERE campaign_id=?
""",
(policy, float(holdout_ratio), int(stop_min_impressions), float(stop_rel_ev_threshold), campaign_id)
)
def get_campaign(campaign_id: str):
with get_conn() as con:
cur = con.execute("SELECT * FROM campaigns WHERE campaign_id=?", (campaign_id,))
return cur.fetchone()
def insert_variant(campaign_id: str, variant_id: str, text: str, status: str, rejection_reason: Optional[str]):
with get_conn() as con:
con.execute(
"""
INSERT OR REPLACE INTO variants (campaign_id, variant_id, text, status, rejection_reason)
VALUES (?, ?, ?, ?, ?)
""",
(campaign_id, variant_id, text, status, rejection_reason),
)
con.execute(
"INSERT OR IGNORE INTO metrics (campaign_id, variant_id) VALUES (?, ?)",
(campaign_id, variant_id)
)
def get_variant(campaign_id: str, variant_id: str):
with get_conn() as con:
cur = con.execute("SELECT * FROM variants WHERE campaign_id=? AND variant_id=?", (campaign_id, variant_id))
return cur.fetchone()
def get_variants(campaign_id: str) -> List[sqlite3.Row]:
with get_conn() as con:
cur = con.execute("SELECT * FROM variants WHERE campaign_id=?", (campaign_id,))
return cur.fetchall()
def get_metrics(campaign_id: str) -> List[sqlite3.Row]:
with get_conn() as con:
cur = con.execute("SELECT * FROM metrics WHERE campaign_id=?", (campaign_id,))
return cur.fetchall()
def update_metric(campaign_id: str, variant_id: str, field: str, inc: float = 1.0):
assert field in {"impressions", "clicks", "conversions", "alpha_click", "beta_click", "alpha_conv", "beta_conv"}
with get_conn() as con:
con.execute(f"UPDATE metrics SET {field} = {field} + ? WHERE campaign_id=? AND variant_id=?", (inc, campaign_id, variant_id))
def log_event(campaign_id: str, variant_id: str, event_type: str, ts: str, value):
with get_conn() as con:
con.execute(
"INSERT INTO events (campaign_id, variant_id, event_type, ts, value) VALUES (?, ?, ?, ?, ?)",
(campaign_id, variant_id, event_type, ts, value)
)
def get_campaign_value_per_conversion(campaign_id: str) -> float:
with get_conn() as con:
cur = con.execute("SELECT value_per_conversion FROM campaigns WHERE campaign_id=?", (campaign_id,))
row = cur.fetchone()
return float(row[0]) if row else 1.0
# ============== Compliance / Audit ==============
def record_compliance_log(campaign_id: str, variant_id: str, status: str,
ng_rules: List[str], llm_ok: bool, llm_reasons: List[str], llm_fixed: Optional[str]):
with get_conn() as con:
con.execute(
"""
INSERT INTO compliance_logs (campaign_id, variant_id, status, ng_rules_json, llm_ok, llm_reasons_json, llm_fixed)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(campaign_id, variant_id, status, json.dumps(ng_rules, ensure_ascii=False), int(llm_ok),
json.dumps(llm_reasons, ensure_ascii=False), llm_fixed)
)
def audit(campaign_id: str, action: str, payload: Dict[str, Any]):
with get_conn() as con:
con.execute(
"INSERT INTO audit_logs (campaign_id, action, payload_json) VALUES (?, ?, ?)",
(campaign_id, action, json.dumps(payload, ensure_ascii=False))
)
# ============== LinUCB state ==============
def get_linucb_state(campaign_id: str, variant_id: str):
with get_conn() as con:
cur = con.execute("SELECT d, A_json, b_json, n_updates FROM linucb WHERE campaign_id=? AND variant_id=?",
(campaign_id, variant_id))
row = cur.fetchone()
return row
def upsert_linucb_state(campaign_id: str, variant_id: str, d: int, A_json: str, b_json: str, n_updates: int):
with get_conn() as con:
con.execute(
"""
INSERT INTO linucb (campaign_id, variant_id, d, A_json, b_json, n_updates)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(campaign_id, variant_id) DO UPDATE SET
d=excluded.d, A_json=excluded.A_json, b_json=excluded.b_json, n_updates=excluded.n_updates
""",
(campaign_id, variant_id, d, A_json, b_json, n_updates)
)
# ============== Export / Reset / Stop rules ==============
def export_csv(campaign_id: str, table: str) -> str:
assert table in {"events", "metrics", "variants", "compliance_logs", "audit_logs"}
out_dir = DB_DIR / "export"
out_dir.mkdir(parents=True, exist_ok=True)
out_path = out_dir / f"{campaign_id}_{table}.csv"
with get_conn() as con, open(out_path, "w", newline="", encoding="utf-8") as f:
cur = con.execute(f"SELECT * FROM {table} WHERE campaign_id=? ORDER BY rowid ASC", (campaign_id,))
rows = cur.fetchall()
if not rows:
f.write("") # 空でもファイルは作る
return str(out_path)
fieldnames = rows[0].keys()
w = csv.DictWriter(f, fieldnames=fieldnames)
w.writeheader()
for r in rows:
w.writerow({k: r[k] for k in fieldnames})
return str(out_path)
def reset_all():
# 破壊的操作:全テーブル初期化
with get_conn() as con:
con.executescript("""
DROP TABLE IF EXISTS linucb;
DROP TABLE IF EXISTS compliance_logs;
DROP TABLE IF EXISTS audit_logs;
DROP TABLE IF EXISTS events;
DROP TABLE IF EXISTS metrics;
DROP TABLE IF EXISTS variants;
DROP TABLE IF EXISTS campaigns;
""")
init_db()
def evaluate_stop_rules(campaign_id: str) -> List[Tuple[str, str]]:
"""
撤退基準:
- impressions >= stop_min_impressions
- EV(CTRmean*CVRmean*V) がベストの stop_rel_ev_threshold 倍未満 → pause
返り値: [(variant_id, reason), ...] (pause されたもの)
"""
cfg = get_campaign(campaign_id)
if not cfg:
return []
min_imp = int(cfg["stop_min_impressions"] or 200)
thresh = float(cfg["stop_rel_ev_threshold"] or 0.5)
vpc = float(cfg["value_per_conversion"] or 1.0)
mets = get_metrics(campaign_id)
if not mets:
return []
def ev_of(r):
imp = int(r["impressions"]); clk = int(r["clicks"]); conv = int(r["conversions"])
ctr = (clk / imp) if imp > 0 else 0.0
cvr = (conv / max(1, clk)) if clk > 0 else 0.0
return ctr * cvr * vpc
best_ev = max((ev_of(r) for r in mets), default=0.0)
paused = []
with get_conn() as con:
for r in mets:
vid = r["variant_id"]
imp = int(r["impressions"])
if imp < min_imp:
continue
ev = ev_of(r)
if best_ev <= 0.0:
continue
if ev < thresh * best_ev:
con.execute("UPDATE variants SET status=?, rejection_reason=? WHERE campaign_id=? AND variant_id=?",
("paused", "auto_pause:low_EV", campaign_id, vid))
paused.append((vid, f"EV {ev:.6f} < {thresh:.2f} * best {best_ev:.6f}"))
return paused
|