Spaces:
Sleeping
Sleeping
File size: 4,832 Bytes
e7c5f93 e691c59 79f40eb e7c5f93 e691c59 e7c5f93 e691c59 e7c5f93 e691c59 e7c5f93 e691c59 e7c5f93 e691c59 6c053bc e691c59 6c053bc e691c59 e7c5f93 e691c59 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
from __future__ import annotations
import os
import sqlite3
import json
from pathlib import Path
from typing import Optional, Dict, Any, List
# 永続領域 /data が書き込み可なら /data/app_data を使う。なければ /tmp/app_data へフォールバック。
DEFAULT_DIR = "/data/app_data" if os.access("/data", os.W_OK) else "/tmp/app_data"
DB_DIR = Path(os.environ.get("APP_DATA_DIR", DEFAULT_DIR))
DB_DIR.mkdir(parents=True, exist_ok=True)
DB_PATH = DB_DIR / "data.db"
SCHEMA_SQL = """
PRAGMA journal_mode=WAL;
CREATE TABLE IF NOT EXISTS campaigns (
campaign_id TEXT PRIMARY KEY,
brand TEXT,
product TEXT,
target_audience TEXT,
tone TEXT,
language TEXT,
constraints_json TEXT,
value_per_conversion REAL DEFAULT 1.0
);
CREATE TABLE IF NOT EXISTS variants (
campaign_id TEXT,
variant_id TEXT,
text TEXT,
status TEXT,
rejection_reason TEXT,
PRIMARY KEY (campaign_id, variant_id)
);
CREATE TABLE IF NOT EXISTS metrics (
campaign_id TEXT,
variant_id TEXT,
impressions INTEGER DEFAULT 0,
clicks INTEGER DEFAULT 0,
conversions INTEGER DEFAULT 0,
alpha_click REAL DEFAULT 1.0,
beta_click REAL DEFAULT 1.0,
alpha_conv REAL DEFAULT 1.0,
beta_conv REAL DEFAULT 1.0,
PRIMARY KEY (campaign_id, variant_id)
);
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
campaign_id TEXT,
variant_id TEXT,
event_type TEXT,
ts TEXT,
value REAL
);
"""
def get_conn():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db():
with get_conn() as con:
con.executescript(SCHEMA_SQL)
def upsert_campaign(
campaign_id: str,
brand: str,
product: str,
target_audience: str,
tone: str,
language: str,
constraints: Optional[Dict[str, Any]],
value_per_conversion: float,
):
with get_conn() as con:
con.execute(
"""
INSERT INTO campaigns (campaign_id, brand, product, target_audience, tone, language, constraints_json, value_per_conversion)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(campaign_id) DO UPDATE SET
brand=excluded.brand,
product=excluded.product,
target_audience=excluded.target_audience,
tone=excluded.tone,
language=excluded.language,
constraints_json=excluded.constraints_json,
value_per_conversion=excluded.value_per_conversion
""",
(campaign_id, brand, product, target_audience, tone, language, json.dumps(constraints or {}), value_per_conversion),
)
def insert_variant(campaign_id: str, variant_id: str, text: str, status: str, rejection_reason: Optional[str]):
with get_conn() as con:
con.execute(
"""
INSERT OR REPLACE INTO variants (campaign_id, variant_id, text, status, rejection_reason)
VALUES (?, ?, ?, ?, ?)
""",
(campaign_id, variant_id, text, status, rejection_reason),
)
con.execute(
"""
INSERT OR IGNORE INTO metrics (campaign_id, variant_id)
VALUES (?, ?)
""",
(campaign_id, variant_id),
)
def get_variants(campaign_id: str) -> List[sqlite3.Row]:
with get_conn() as con:
cur = con.execute("SELECT * FROM variants WHERE campaign_id=?", (campaign_id,))
return cur.fetchall()
def get_variant(campaign_id: str, variant_id: str) -> Optional[sqlite3.Row]:
with get_conn() as con:
cur = con.execute("SELECT * FROM variants WHERE campaign_id=? AND variant_id=?", (campaign_id, variant_id))
return cur.fetchone()
def get_metrics(campaign_id: str) -> List[sqlite3.Row]:
with get_conn() as con:
cur = con.execute("SELECT * FROM metrics WHERE campaign_id=?", (campaign_id,))
return cur.fetchall()
def update_metric(campaign_id: str, variant_id: str, field: str, inc: float = 1.0):
assert field in {"impressions", "clicks", "conversions", "alpha_click", "beta_click", "alpha_conv", "beta_conv"}
with get_conn() as con:
con.execute(f"UPDATE metrics SET {field} = {field} + ? WHERE campaign_id=? AND variant_id=?", (inc, campaign_id, variant_id))
def log_event(campaign_id: str, variant_id: str, event_type: str, ts: str, value):
with get_conn() as con:
con.execute(
"INSERT INTO events (campaign_id, variant_id, event_type, ts, value) VALUES (?, ?, ?, ?, ?)",
(campaign_id, variant_id, event_type, ts, value),
)
def get_campaign_value_per_conversion(campaign_id: str) -> float:
with get_conn() as con:
cur = con.execute("SELECT value_per_conversion FROM campaigns WHERE campaign_id=?", (campaign_id,))
row = cur.fetchone()
return float(row[0]) if row else 1.0
|