File size: 11,839 Bytes
6a62501
d5be10a
6a62501
d5be10a
6a62501
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d5be10a
 
 
 
 
 
6a62501
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d5be10a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6a62501
 
 
 
 
 
 
d5be10a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6a62501
 
 
d5be10a
6a62501
d5be10a
 
 
 
 
6a62501
 
 
 
 
 
 
 
 
 
 
 
 
 
d5be10a
 
 
 
 
 
 
 
 
 
 
6a62501
 
d5be10a
 
 
 
 
6a62501
 
 
 
 
 
 
 
 
 
d5be10a
 
6a62501
 
d5be10a
 
 
 
 
6a62501
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d5be10a
6a62501
 
 
 
 
 
 
d5be10a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
from __future__ import annotations
import os, csv, json, sqlite3
from pathlib import Path
from typing import Optional, Dict, Any, List, Tuple

DEFAULT_DIR = "/data/app_data" if os.access("/data", os.W_OK) else "/tmp/app_data"
DB_DIR = Path(os.environ.get("APP_DATA_DIR", DEFAULT_DIR))
DB_DIR.mkdir(parents=True, exist_ok=True)
DB_PATH = DB_DIR / "data.db"

SCHEMA_SQL = """
PRAGMA journal_mode=WAL;
CREATE TABLE IF NOT EXISTS campaigns (
  campaign_id TEXT PRIMARY KEY,
  brand TEXT,
  product TEXT,
  target_audience TEXT,
  tone TEXT,
  language TEXT,
  constraints_json TEXT,
  value_per_conversion REAL DEFAULT 1.0,
  policy TEXT DEFAULT 'thompson',
  holdout_ratio REAL DEFAULT 0.0,
  stop_min_impressions INTEGER DEFAULT 200,
  stop_rel_ev_threshold REAL DEFAULT 0.5,
  created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS variants (
  campaign_id TEXT,
  variant_id TEXT,
  text TEXT,
  status TEXT,
  rejection_reason TEXT,
  PRIMARY KEY (campaign_id, variant_id)
);
CREATE TABLE IF NOT EXISTS metrics (
  campaign_id TEXT,
  variant_id TEXT,
  impressions INTEGER DEFAULT 0,
  clicks INTEGER DEFAULT 0,
  conversions INTEGER DEFAULT 0,
  alpha_click REAL DEFAULT 1.0,
  beta_click REAL DEFAULT 1.0,
  alpha_conv REAL DEFAULT 1.0,
  beta_conv REAL DEFAULT 1.0,
  PRIMARY KEY (campaign_id, variant_id)
);
CREATE TABLE IF NOT EXISTS events (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  campaign_id TEXT,
  variant_id TEXT,
  event_type TEXT,
  ts TEXT,
  value REAL
);
-- LinUCB のパラメータをJSONで保持
CREATE TABLE IF NOT EXISTS linucb (
  campaign_id TEXT,
  variant_id TEXT,
  d INTEGER,
  A_json TEXT,
  b_json TEXT,
  n_updates INTEGER DEFAULT 0,
  PRIMARY KEY (campaign_id, variant_id)
);
-- コンプライアンス監査ログ(NG詳細/LLM修正案)
CREATE TABLE IF NOT EXISTS compliance_logs (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  campaign_id TEXT,
  variant_id TEXT,
  status TEXT,
  ng_rules_json TEXT,
  llm_ok INTEGER,
  llm_reasons_json TEXT,
  llm_fixed TEXT,
  ts TEXT DEFAULT (datetime('now'))
);
-- 任意の運用監査ログ
CREATE TABLE IF NOT EXISTS audit_logs (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  campaign_id TEXT,
  action TEXT,
  payload_json TEXT,
  ts TEXT DEFAULT (datetime('now'))
);
"""

def get_conn():
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    return conn

def _ensure_columns():
    need_cols = {
        "campaigns": [
            ("policy", "TEXT", "'thompson'"),
            ("holdout_ratio", "REAL", "0.0"),
            ("stop_min_impressions", "INTEGER", "200"),
            ("stop_rel_ev_threshold", "REAL", "0.5"),
            ("created_at", "TEXT", "datetime('now')"),
        ]
    }
    with get_conn() as con:
        for table, cols in need_cols.items():
            cur = con.execute(f"PRAGMA table_info({table})")
            have = {r["name"] for r in cur.fetchall()}
            for name, typ, default in cols:
                if name not in have:
                    con.execute(f"ALTER TABLE {table} ADD COLUMN {name} {typ} DEFAULT ({default})")

def init_db():
    with get_conn() as con:
        con.executescript(SCHEMA_SQL)
    _ensure_columns()

# ============== Campaign/Variant/Metric 基本 ==============

def upsert_campaign(campaign_id: str, brand: str, product: str, target_audience: str,
                    tone: str, language: str, constraints: Optional[Dict[str, Any]],
                    value_per_conversion: float):
    with get_conn() as con:
        con.execute(
            """
            INSERT INTO campaigns (campaign_id, brand, product, target_audience, tone, language, constraints_json, value_per_conversion)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
            ON CONFLICT(campaign_id) DO UPDATE SET
              brand=excluded.brand,
              product=excluded.product,
              target_audience=excluded.target_audience,
              tone=excluded.tone,
              language=excluded.language,
              constraints_json=excluded.constraints_json,
              value_per_conversion=excluded.value_per_conversion
            """,
            (campaign_id, brand, product, target_audience, tone, language, json.dumps(constraints or {}, ensure_ascii=False), value_per_conversion),
        )

def set_campaign_settings(campaign_id: str, policy: str, holdout_ratio: float, stop_min_impressions: int, stop_rel_ev_threshold: float):
    with get_conn() as con:
        con.execute(
            """
            UPDATE campaigns SET policy=?, holdout_ratio=?, stop_min_impressions=?, stop_rel_ev_threshold=?
            WHERE campaign_id=?
            """,
            (policy, float(holdout_ratio), int(stop_min_impressions), float(stop_rel_ev_threshold), campaign_id)
        )

def get_campaign(campaign_id: str):
    with get_conn() as con:
        cur = con.execute("SELECT * FROM campaigns WHERE campaign_id=?", (campaign_id,))
        return cur.fetchone()

def insert_variant(campaign_id: str, variant_id: str, text: str, status: str, rejection_reason: Optional[str]):
    with get_conn() as con:
        con.execute(
            """
            INSERT OR REPLACE INTO variants (campaign_id, variant_id, text, status, rejection_reason)
            VALUES (?, ?, ?, ?, ?)
            """,
            (campaign_id, variant_id, text, status, rejection_reason),
        )
        con.execute(
            "INSERT OR IGNORE INTO metrics (campaign_id, variant_id) VALUES (?, ?)",
            (campaign_id, variant_id)
        )

def get_variant(campaign_id: str, variant_id: str):
    with get_conn() as con:
        cur = con.execute("SELECT * FROM variants WHERE campaign_id=? AND variant_id=?", (campaign_id, variant_id))
        return cur.fetchone()

def get_variants(campaign_id: str) -> List[sqlite3.Row]:
    with get_conn() as con:
        cur = con.execute("SELECT * FROM variants WHERE campaign_id=?", (campaign_id,))
        return cur.fetchall()

def get_metrics(campaign_id: str) -> List[sqlite3.Row]:
    with get_conn() as con:
        cur = con.execute("SELECT * FROM metrics WHERE campaign_id=?", (campaign_id,))
        return cur.fetchall()

def update_metric(campaign_id: str, variant_id: str, field: str, inc: float = 1.0):
    assert field in {"impressions", "clicks", "conversions", "alpha_click", "beta_click", "alpha_conv", "beta_conv"}
    with get_conn() as con:
        con.execute(f"UPDATE metrics SET {field} = {field} + ? WHERE campaign_id=? AND variant_id=?", (inc, campaign_id, variant_id))

def log_event(campaign_id: str, variant_id: str, event_type: str, ts: str, value):
    with get_conn() as con:
        con.execute(
            "INSERT INTO events (campaign_id, variant_id, event_type, ts, value) VALUES (?, ?, ?, ?, ?)",
            (campaign_id, variant_id, event_type, ts, value)
        )

def get_campaign_value_per_conversion(campaign_id: str) -> float:
    with get_conn() as con:
        cur = con.execute("SELECT value_per_conversion FROM campaigns WHERE campaign_id=?", (campaign_id,))
        row = cur.fetchone()
        return float(row[0]) if row else 1.0

# ============== Compliance / Audit ==============

def record_compliance_log(campaign_id: str, variant_id: str, status: str,
                          ng_rules: List[str], llm_ok: bool, llm_reasons: List[str], llm_fixed: Optional[str]):
    with get_conn() as con:
        con.execute(
            """
            INSERT INTO compliance_logs (campaign_id, variant_id, status, ng_rules_json, llm_ok, llm_reasons_json, llm_fixed)
            VALUES (?, ?, ?, ?, ?, ?, ?)
            """,
            (campaign_id, variant_id, status, json.dumps(ng_rules, ensure_ascii=False), int(llm_ok),
             json.dumps(llm_reasons, ensure_ascii=False), llm_fixed)
        )

def audit(campaign_id: str, action: str, payload: Dict[str, Any]):
    with get_conn() as con:
        con.execute(
            "INSERT INTO audit_logs (campaign_id, action, payload_json) VALUES (?, ?, ?)",
            (campaign_id, action, json.dumps(payload, ensure_ascii=False))
        )

# ============== LinUCB state ==============

def get_linucb_state(campaign_id: str, variant_id: str):
    with get_conn() as con:
        cur = con.execute("SELECT d, A_json, b_json, n_updates FROM linucb WHERE campaign_id=? AND variant_id=?",
                          (campaign_id, variant_id))
        row = cur.fetchone()
        return row

def upsert_linucb_state(campaign_id: str, variant_id: str, d: int, A_json: str, b_json: str, n_updates: int):
    with get_conn() as con:
        con.execute(
            """
            INSERT INTO linucb (campaign_id, variant_id, d, A_json, b_json, n_updates)
            VALUES (?, ?, ?, ?, ?, ?)
            ON CONFLICT(campaign_id, variant_id) DO UPDATE SET
              d=excluded.d, A_json=excluded.A_json, b_json=excluded.b_json, n_updates=excluded.n_updates
            """,
            (campaign_id, variant_id, d, A_json, b_json, n_updates)
        )

# ============== Export / Reset / Stop rules ==============

def export_csv(campaign_id: str, table: str) -> str:
    assert table in {"events", "metrics", "variants", "compliance_logs", "audit_logs"}
    out_dir = DB_DIR / "export"
    out_dir.mkdir(parents=True, exist_ok=True)
    out_path = out_dir / f"{campaign_id}_{table}.csv"
    with get_conn() as con, open(out_path, "w", newline="", encoding="utf-8") as f:
        cur = con.execute(f"SELECT * FROM {table} WHERE campaign_id=? ORDER BY rowid ASC", (campaign_id,))
        rows = cur.fetchall()
        if not rows:
            f.write("")  # 空でもファイルは作る
            return str(out_path)
        fieldnames = rows[0].keys()
        w = csv.DictWriter(f, fieldnames=fieldnames)
        w.writeheader()
        for r in rows:
            w.writerow({k: r[k] for k in fieldnames})
    return str(out_path)

def reset_all():
    # 破壊的操作:全テーブル初期化
    with get_conn() as con:
        con.executescript("""
            DROP TABLE IF EXISTS linucb;
            DROP TABLE IF EXISTS compliance_logs;
            DROP TABLE IF EXISTS audit_logs;
            DROP TABLE IF EXISTS events;
            DROP TABLE IF EXISTS metrics;
            DROP TABLE IF EXISTS variants;
            DROP TABLE IF EXISTS campaigns;
        """)
    init_db()

def evaluate_stop_rules(campaign_id: str) -> List[Tuple[str, str]]:
    """
    撤退基準:
      - impressions >= stop_min_impressions
      - EV(CTRmean*CVRmean*V) がベストの stop_rel_ev_threshold 倍未満 → pause
    返り値: [(variant_id, reason), ...] (pause されたもの)
    """
    cfg = get_campaign(campaign_id)
    if not cfg:
        return []
    min_imp = int(cfg["stop_min_impressions"] or 200)
    thresh = float(cfg["stop_rel_ev_threshold"] or 0.5)
    vpc = float(cfg["value_per_conversion"] or 1.0)

    mets = get_metrics(campaign_id)
    if not mets:
        return []

    def ev_of(r):
        imp = int(r["impressions"]); clk = int(r["clicks"]); conv = int(r["conversions"])
        ctr = (clk / imp) if imp > 0 else 0.0
        cvr = (conv / max(1, clk)) if clk > 0 else 0.0
        return ctr * cvr * vpc

    best_ev = max((ev_of(r) for r in mets), default=0.0)
    paused = []
    with get_conn() as con:
        for r in mets:
            vid = r["variant_id"]
            imp = int(r["impressions"])
            if imp < min_imp:
                continue
            ev = ev_of(r)
            if best_ev <= 0.0:
                continue
            if ev < thresh * best_ev:
                con.execute("UPDATE variants SET status=?, rejection_reason=? WHERE campaign_id=? AND variant_id=?",
                            ("paused", "auto_pause:low_EV", campaign_id, vid))
                paused.append((vid, f"EV {ev:.6f} < {thresh:.2f} * best {best_ev:.6f}"))
    return paused