| import ast |
| import itertools |
| from dataclasses import dataclass |
| from typing import Callable, Dict, List, Tuple |
|
|
| import gradio as gr |
| import numpy as np |
| import pandas as pd |
| import plotly.graph_objects as go |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| @dataclass |
| class Step: |
| step: int |
| matrix: List[List[float]] |
| objective: float |
| action: str |
| column: int | None = None |
| before_order: List[int] | None = None |
| after_order: List[int] | None = None |
|
|
|
|
| def parse_matrix(text: str) -> np.ndarray: |
| """Parse a matrix from Python-list/JSON-ish text or CSV-like rows.""" |
| text = text.strip() |
| if not text: |
| raise ValueError("行列を入力してください。") |
|
|
| try: |
| if text.startswith("["): |
| value = ast.literal_eval(text) |
| x = np.array(value, dtype=float) |
| else: |
| rows = [] |
| for line in text.splitlines(): |
| line = line.strip() |
| if not line: |
| continue |
| rows.append([float(v.strip()) for v in line.split(",") if v.strip()]) |
| x = np.array(rows, dtype=float) |
| except Exception as exc: |
| raise ValueError("行列として解釈できませんでした。例: [[1,4,7],[2,5,8],[3,6,9]]") from exc |
|
|
| if x.ndim != 2: |
| raise ValueError("2次元の行列を入力してください。") |
| if x.shape[0] < 2 or x.shape[1] < 2: |
| raise ValueError("少なくとも 2 行 2 列が必要です。") |
| if not np.isfinite(x).all(): |
| raise ValueError("NaN や Inf は使えません。") |
| return x |
|
|
|
|
| def make_initial_matrix(n: int, d: int, seed: int, distribution: str) -> np.ndarray: |
| rng = np.random.default_rng(seed) |
|
|
| if distribution == "normal": |
| cols = [ |
| np.sort(rng.normal(loc=0.0, scale=1.0 + 0.25 * j, size=n)) |
| for j in range(d) |
| ] |
|
|
| elif distribution == "uniform": |
| cols = [ |
| np.sort(rng.uniform(-1.0 - j * 0.2, 1.0 + j * 0.2, size=n)) |
| for j in range(d) |
| ] |
|
|
| elif distribution == "lognormal": |
| cols = [ |
| np.sort(rng.lognormal(mean=0.0, sigma=0.35 + 0.1 * j, size=n)) |
| for j in range(d) |
| ] |
|
|
| elif distribution == "integer": |
| cols = [ |
| np.sort( |
| rng.integers( |
| low=0, |
| high=10 + 2 * j + 1, |
| size=n, |
| ) |
| ) |
| for j in range(d) |
| ] |
|
|
| else: |
| raise ValueError("未知の分布です。") |
|
|
| x = np.column_stack(cols) |
|
|
| |
| for j in range(d): |
| x[:, j] = rng.permutation(x[:, j]) |
|
|
| return x |
|
|
|
|
| def matrix_to_text(x: np.ndarray) -> str: |
| return "\n".join(", ".join(f"{v:.4g}" for v in row) for row in x) |
|
|
|
|
| def build_f(name: str, theta: float, custom_expr: str) -> Tuple[Callable[[np.ndarray], np.ndarray], str]: |
| """ |
| Return f for |
| |
| psi(x_1, ..., x_d) = f(sum_j x_j) |
| |
| Input s is a length-n NumPy array of row sums. |
| Output must also be length n. |
| """ |
| if name == "square": |
| return lambda s: s**2, "ψ(x₁, ..., x_d) = f(Σxᵢ), f(s) = s²" |
|
|
| if name == "absolute": |
| return lambda s: np.abs(s), "ψ(x₁, ..., x_d) = f(Σxᵢ), f(s) = |s|" |
|
|
| if name == "exponential": |
| return lambda s: np.exp(theta * s), f"ψ(x₁, ..., x_d) = f(Σxᵢ), f(s) = exp({theta:g} · s)" |
|
|
| if name == "positive_part": |
| return lambda s: np.maximum(s, 0.0), "ψ(x₁, ..., x_d) = f(Σxᵢ), f(s) = max(s, 0)" |
|
|
| if name == "custom": |
| expr = custom_expr.strip() |
| if not expr: |
| raise ValueError("custom を使う場合は f(s) の式を入力してください。") |
|
|
| allowed = { |
| "np": np, |
| "abs": np.abs, |
| "sqrt": np.sqrt, |
| "exp": np.exp, |
| "log": np.log, |
| "maximum": np.maximum, |
| "minimum": np.minimum, |
| } |
|
|
| def custom_f(s: np.ndarray) -> np.ndarray: |
| y = eval(expr, {"__builtins__": {}}, {**allowed, "s": s}) |
| y = np.asarray(y, dtype=float) |
| if y.ndim == 0: |
| y = np.full(s.shape[0], float(y)) |
| return y |
|
|
| return custom_f, f"ψ(x₁, ..., x_d) = f(Σxᵢ), f(s) = {expr}" |
|
|
| raise ValueError("未知の f です。") |
|
|
|
|
| def build_psi(name: str, theta: float, custom_expr: str) -> Tuple[Callable[[np.ndarray], np.ndarray], str]: |
| """ |
| Return row-wise psi with |
| |
| psi(x_1, ..., x_d) = f(sum_j x_j). |
| |
| Input X is n x d. |
| Output is length n. |
| """ |
| f, f_label = build_f(name, theta, custom_expr) |
|
|
| def psi(x: np.ndarray) -> np.ndarray: |
| s = np.sum(x, axis=1) |
| return f(s) |
|
|
| return psi, f_label |
|
|
|
|
| def objective(x: np.ndarray, psi: Callable[[np.ndarray], np.ndarray]) -> float: |
| values = psi(x) |
| if values.shape[0] != x.shape[0]: |
| raise ValueError("ψ は各行ごとに1つの値を返す必要があります。") |
| if not np.isfinite(values).all(): |
| raise ValueError("ψ の値に NaN または Inf が出ました。") |
| return float(np.mean(values)) |
|
|
|
|
| def is_better(new_value: float, old_value: float, tol: float = 1e-12) -> bool: |
| return new_value < old_value - tol |
|
|
|
|
| def greedy_sort_rearrangement( |
| x: np.ndarray, |
| psi_name: str, |
| theta: float, |
| custom_expr: str, |
| max_iter: int, |
| random_tie_break: bool, |
| seed: int, |
| ) -> Tuple[List[Step], str]: |
| """ |
| Heuristic RA. |
| |
| For one chosen column j, keep all other columns fixed. |
| We search for a permutation of column j that improves the selected objective. |
| |
| The objective is to minimize E[psi(X)] = E[f(sum_j X_j)]. |
| |
| For common convex increasing-in-sum losses, the classical RA idea is to |
| put a selected column in opposite order to the partial row sum of the other |
| columns. For more general f, we use this as a proposal and also run a |
| small pairwise local improvement pass. |
| """ |
| psi, objective_label = build_psi(psi_name, theta, custom_expr) |
| objective_fn = lambda z: objective(z, psi) |
| rng = np.random.default_rng(seed) |
| x = x.copy() |
| n, d = x.shape |
|
|
| steps = [Step(0, x.tolist(), objective_fn(x), "初期カップリング")] |
| current = steps[-1].objective |
|
|
| for it in range(1, max_iter + 1): |
| improved_any = False |
| columns = list(range(d)) |
| if random_tie_break: |
| rng.shuffle(columns) |
|
|
| for j in columns: |
| old_col = x[:, j].copy() |
| old_order = np.argsort(old_col, kind="mergesort").tolist() |
|
|
| |
| rest_sum = np.sum(x, axis=1) - x[:, j] |
| row_order = np.argsort(rest_sum, kind="mergesort") |
| col_sorted_desc = np.sort(old_col)[::-1] |
|
|
| candidate = x.copy() |
| candidate[row_order, j] = col_sorted_desc |
| cand_obj = objective_fn(candidate) |
|
|
| best = candidate |
| best_obj = cand_obj |
| best_action = f"列 {j + 1}: 他列の行和に対して反対順に rearrange" |
|
|
| |
| |
| pair_best = x.copy() |
| pair_best_obj = current |
| pair_action = None |
|
|
| max_pair_checks = min(n * (n - 1) // 2, 2500) |
| pairs = list(itertools.combinations(range(n), 2)) |
|
|
| if len(pairs) > max_pair_checks: |
| pairs_idx = rng.choice(len(pairs), size=max_pair_checks, replace=False) |
| pairs = [pairs[k] for k in pairs_idx] |
|
|
| for a, b in pairs: |
| tmp = x.copy() |
| tmp[a, j], tmp[b, j] = tmp[b, j], tmp[a, j] |
| tmp_obj = objective_fn(tmp) |
|
|
| if is_better(tmp_obj, pair_best_obj): |
| pair_best = tmp |
| pair_best_obj = tmp_obj |
| pair_action = f"列 {j + 1}: 行 {a + 1} と行 {b + 1} を swap" |
|
|
| if is_better(pair_best_obj, best_obj): |
| best = pair_best |
| best_obj = pair_best_obj |
| best_action = pair_action or f"列 {j + 1}: pairwise swap" |
|
|
| if is_better(best_obj, current): |
| x = best |
| current = best_obj |
| improved_any = True |
|
|
| new_col = x[:, j] |
| new_order = np.argsort(new_col, kind="mergesort").tolist() |
|
|
| steps.append( |
| Step( |
| len(steps), |
| x.tolist(), |
| current, |
| best_action, |
| column=j, |
| before_order=old_order, |
| after_order=new_order, |
| ) |
| ) |
|
|
| if not improved_any: |
| steps.append(Step(len(steps), x.tolist(), current, "改善なし: 局所解として停止")) |
| break |
|
|
| return steps, objective_label |
|
|
|
|
| def make_matrix_df(step: Step): |
| x = np.array(step.matrix) |
| df = pd.DataFrame(x, columns=[f"X{j + 1}" for j in range(x.shape[1])]) |
| df.insert(0, "row", np.arange(1, x.shape[0] + 1)) |
| df["sum"] = x.sum(axis=1) |
| df = df.round(6) |
|
|
| min_sum = df["sum"].min() |
| max_sum = df["sum"].max() |
|
|
| def highlight_sum_extremes(row): |
| styles = [""] * len(row) |
|
|
| sum_col_idx = row.index.get_loc("sum") |
|
|
| if row["sum"] == max_sum: |
| styles[sum_col_idx] = "background-color: #fecaca; color: #7f1d1d; font-weight: bold;" |
|
|
| if row["sum"] == min_sum: |
| styles[sum_col_idx] = "background-color: #bfdbfe; color: #1e3a8a; font-weight: bold;" |
|
|
| return styles |
|
|
| return df.style.apply(highlight_sum_extremes, axis=1) |
|
|
|
|
| def make_marginal_check_df(initial: np.ndarray, current: np.ndarray) -> pd.DataFrame: |
| rows = [] |
| for j in range(initial.shape[1]): |
| same = np.allclose(np.sort(initial[:, j]), np.sort(current[:, j])) |
| rows.append( |
| { |
| "列": f"X{j + 1}", |
| "周辺分布 preserved?": "YES" if same else "NO", |
| "初期 min": np.min(initial[:, j]), |
| "現在 min": np.min(current[:, j]), |
| "初期 max": np.max(initial[:, j]), |
| "現在 max": np.max(current[:, j]), |
| "初期 mean": np.mean(initial[:, j]), |
| "現在 mean": np.mean(current[:, j]), |
| } |
| ) |
| return pd.DataFrame(rows).round(6) |
|
|
|
|
| def make_heatmap(step: Step) -> go.Figure: |
| x = np.array(step.matrix) |
| fig = go.Figure( |
| data=go.Heatmap( |
| z=x, |
| x=[f"X{j + 1}" for j in range(x.shape[1])], |
| y=[f"row {i + 1}" for i in range(x.shape[0])], |
| colorbar=dict(title="value"), |
| ) |
| ) |
| title = f"Step {step.step}: {step.action}<br>目的関数 = {step.objective:.6g}" |
| fig.update_layout(title=title, height=450, margin=dict(l=70, r=30, t=80, b=40)) |
| return fig |
|
|
|
|
| def make_trace(steps: List[Step]) -> go.Figure: |
| fig = go.Figure() |
| fig.add_trace( |
| go.Scatter( |
| x=[s.step for s in steps], |
| y=[s.objective for s in steps], |
| mode="lines+markers", |
| hovertemplate="step %{x}<br>目的関数=%{y:.6g}<extra></extra>", |
| ) |
| ) |
| fig.update_layout( |
| title="目的関数の推移", |
| xaxis_title="step", |
| yaxis_title="目的関数", |
| height=360, |
| margin=dict(l=50, r=20, t=60, b=40), |
| ) |
| return fig |
|
|
|
|
|
|
|
|
| def _discrete_entropy_from_counts(counts: np.ndarray) -> float: |
| """Shannon entropy from empirical counts, using natural log.""" |
| counts = np.asarray(counts, dtype=float) |
| counts = counts[counts > 0] |
| if counts.size == 0: |
| return 0.0 |
| probs = counts / counts.sum() |
| return float(-np.sum(probs * np.log(probs))) |
|
|
|
|
| def _rank_bin_matrix(x: np.ndarray, n_bins: int) -> np.ndarray: |
| """ |
| Convert X to empirical-copula / rank-bin labels. |
| |
| RA preserves each column's multiset, so the marginal empirical distribution |
| is fixed. To visualize the copula part, we throw away the scale of each |
| marginal and keep only rank-bin labels within each column. |
| |
| The bin labels approximate U_j = F_j(X_j) on a finite sample. |
| """ |
| x = np.asarray(x, dtype=float) |
| n, d = x.shape |
| n_bins = int(max(2, min(int(n_bins), n))) |
| z = np.zeros((n, d), dtype=int) |
|
|
| for j in range(d): |
| |
| |
| |
| order = np.argsort(x[:, j], kind="mergesort") |
| ranks = np.empty(n, dtype=int) |
| ranks[order] = np.arange(n) |
| z[:, j] = np.floor(ranks * n_bins / n).astype(int) |
| z[:, j] = np.clip(z[:, j], 0, n_bins - 1) |
|
|
| return z |
|
|
|
|
| def copula_entropy_summary(x: np.ndarray, n_bins: int) -> Dict[str, float]: |
| """ |
| Rank-binned empirical entropy summary. |
| |
| H_joint is H(B_1, ..., B_d), where B_j is the rank-bin of X_j. |
| H_marginal_sum is sum_j H(B_j). Because RA preserves each marginal, |
| H_marginal_sum should be constant up to ties/binning. |
| |
| For a continuous copula density c, copula entropy is often defined as |
| H_c = h(c) = h(X_1,...,X_d) - sum_j h(X_j) |
| and mutual information is |
| I = sum_j h(X_j) - h(X_1,...,X_d) = -H_c. |
| |
| The values here are finite-sample, rank-binned approximations. |
| """ |
| bins = _rank_bin_matrix(np.asarray(x, dtype=float), int(n_bins)) |
|
|
| _, joint_counts = np.unique(bins, axis=0, return_counts=True) |
| h_joint = _discrete_entropy_from_counts(joint_counts) |
|
|
| h_marginals = [] |
| for j in range(bins.shape[1]): |
| _, counts = np.unique(bins[:, j], return_counts=True) |
| h_marginals.append(_discrete_entropy_from_counts(counts)) |
|
|
| h_marginal_sum = float(np.sum(h_marginals)) |
| copula_entropy = h_joint - h_marginal_sum |
| mutual_information = h_marginal_sum - h_joint |
|
|
| return { |
| "H_joint": h_joint, |
| "H_marginal_sum": h_marginal_sum, |
| "H_copula": copula_entropy, |
| "mutual_information": mutual_information, |
| } |
|
|
|
|
| def make_entropy_trace(steps: List[Step], n_bins: int) -> go.Figure: |
| rows = [] |
| for s in steps: |
| ent = copula_entropy_summary(np.array(s.matrix), int(n_bins)) |
| rows.append({"step": s.step, **ent}) |
|
|
| df = pd.DataFrame(rows) |
|
|
| fig = go.Figure() |
| fig.add_trace( |
| go.Scatter( |
| x=df["step"], |
| y=df["H_joint"], |
| mode="lines+markers", |
| name="joint entropy H(X)", |
| hovertemplate="step %{x}<br>H(X)=%{y:.6g}<extra></extra>", |
| ) |
| ) |
| fig.add_trace( |
| go.Scatter( |
| x=df["step"], |
| y=df["H_marginal_sum"], |
| mode="lines+markers", |
| name="marginal entropy sum ΣH(Fj)", |
| hovertemplate="step %{x}<br>ΣH(Fj)=%{y:.6g}<extra></extra>", |
| ) |
| ) |
| fig.add_trace( |
| go.Scatter( |
| x=df["step"], |
| y=df["H_copula"], |
| mode="lines+markers", |
| name="copula entropy Hc=H(X)-ΣH(Fj)", |
| hovertemplate="step %{x}<br>Hc=%{y:.6g}<extra></extra>", |
| ) |
| ) |
| fig.update_layout( |
| title=f"rank-binned copula entropy の推移(各列 {int(n_bins)} rank bins)", |
| xaxis_title="step", |
| yaxis_title="entropy / nats", |
| height=420, |
| margin=dict(l=50, r=20, t=70, b=80), |
| legend=dict(orientation="h", yanchor="bottom", y=-0.35, xanchor="left", x=0), |
| ) |
| return fig |
|
|
|
|
| def make_entropy_history_df(steps: List[Step], n_bins: int) -> pd.DataFrame: |
| rows = [] |
| for s in steps: |
| ent = copula_entropy_summary(np.array(s.matrix), int(n_bins)) |
| rows.append( |
| { |
| "step": s.step, |
| "H(X) joint": ent["H_joint"], |
| "ΣH(Fj) marginal sum": ent["H_marginal_sum"], |
| "Hc = H(X)-ΣH(Fj)": ent["H_copula"], |
| "I = ΣH(Fj)-H(X)": ent["mutual_information"], |
| } |
| ) |
| return pd.DataFrame(rows).round(6) |
|
|
| def make_sum_hist(step: Step) -> go.Figure: |
| x = np.array(step.matrix) |
| sums = x.sum(axis=1) |
| fig = go.Figure() |
| fig.add_trace(go.Histogram(x=sums, nbinsx=min(20, max(5, len(sums) // 2)))) |
| fig.update_layout( |
| title="行和 S = X1 + ... + Xd の分布", |
| xaxis_title="S", |
| yaxis_title="count", |
| height=320, |
| margin=dict(l=50, r=20, t=60, b=40), |
| ) |
| return fig |
|
|
|
|
| def state_to_steps(state: List[Dict]) -> List[Step]: |
| return [Step(**s) for s in state] |
|
|
|
|
| def run_algorithm( |
| matrix_text: str, |
| psi_name: str, |
| theta: float, |
| custom_expr: str, |
| max_iter: int, |
| random_tie_break: bool, |
| seed: int, |
| entropy_bins: int, |
| ): |
| try: |
| x0 = parse_matrix(matrix_text) |
| steps, objective_label = greedy_sort_rearrangement( |
| x0, |
| psi_name, |
| theta, |
| custom_expr, |
| int(max_iter), |
| bool(random_tie_break), |
| int(seed), |
| ) |
|
|
| state = [s.__dict__ for s in steps] |
| first = steps[0] |
| last = steps[-1] |
|
|
| improvement = steps[0].objective - last.objective |
| objective_name = "E[ψ(X)]" |
|
|
| summary = ( |
| f"{objective_label}\n" |
| f"初期 {objective_name} = {steps[0].objective:.8g}\n" |
| f"最終 {objective_name} = {last.objective:.8g}\n" |
| f"改善量 = {improvement:.8g}\n" |
| f"ステップ数 = {len(steps) - 1}\n" |
| f"エントロピー: 各列を {int(entropy_bins)} 個のrank binに変換し、経験copula上で計算" |
| ) |
|
|
| entropy_history = make_entropy_history_df(steps, int(entropy_bins)) |
| history = pd.DataFrame( |
| { |
| "step": [s.step for s in steps], |
| "目的関数": [s.objective for s in steps], |
| "操作": [s.action for s in steps], |
| } |
| ).merge(entropy_history, on="step", how="left") |
|
|
| return ( |
| state, |
| 0, |
| len(steps) - 1, |
| make_matrix_df(first), |
| make_heatmap(first), |
| make_trace(steps), |
| make_entropy_trace(steps, int(entropy_bins)), |
| make_sum_hist(first), |
| make_marginal_check_df(np.array(steps[0].matrix), np.array(first.matrix)), |
| history, |
| summary, |
| "", |
| ) |
|
|
| except Exception as exc: |
| return ( |
| [], |
| 0, |
| 0, |
| pd.DataFrame(), |
| go.Figure(), |
| go.Figure(), |
| go.Figure(), |
| go.Figure(), |
| pd.DataFrame(), |
| pd.DataFrame(), |
| "", |
| f"エラー: {exc}", |
| ) |
|
|
|
|
| def show_step(state: List[Dict], step_no: int): |
| if not state: |
| return pd.DataFrame(), go.Figure(), go.Figure(), pd.DataFrame(), "先に実行してください。" |
|
|
| steps = state_to_steps(state) |
| step_no = int(max(0, min(step_no, len(steps) - 1))) |
| step = steps[step_no] |
|
|
| initial = np.array(steps[0].matrix) |
| current = np.array(step.matrix) |
|
|
| message = ( |
| f"Step {step.step} / {len(steps) - 1}\n" |
| f"{step.action}\n" |
| f"目的関数 = {step.objective:.8g}" |
| ) |
|
|
| return ( |
| make_matrix_df(step), |
| make_heatmap(step), |
| make_sum_hist(step), |
| make_marginal_check_df(initial, current), |
| message, |
| ) |
|
|
|
|
| def move_step(state: List[Dict], current_step: int, delta: int): |
| if not state: |
| return 0, pd.DataFrame(), go.Figure(), go.Figure(), pd.DataFrame(), "先に実行してください。" |
|
|
| steps = state_to_steps(state) |
| new_step = int(max(0, min(int(current_step) + delta, len(steps) - 1))) |
|
|
| matrix_df, heatmap, hist, marginal_df, message = show_step(state, new_step) |
|
|
| return new_step, matrix_df, heatmap, hist, marginal_df, message |
|
|
|
|
| def autoplay_tick(state: List[Dict], current_step: int, interval_sec: float): |
| """Advance one step on each timer tick and stop automatically at the final step.""" |
| if not state: |
| return ( |
| 0, |
| pd.DataFrame(), |
| go.Figure(), |
| go.Figure(), |
| pd.DataFrame(), |
| "先に実行してください。", |
| gr.Timer(active=False), |
| ) |
|
|
| steps = state_to_steps(state) |
| current_step = int(current_step) |
|
|
| if current_step >= len(steps) - 1: |
| matrix_df, heatmap, hist, marginal_df, message = show_step(state, current_step) |
| return ( |
| current_step, |
| matrix_df, |
| heatmap, |
| hist, |
| marginal_df, |
| message + "再生終了。", |
| gr.Timer(active=False), |
| ) |
|
|
| new_step = current_step + 1 |
| matrix_df, heatmap, hist, marginal_df, message = show_step(state, new_step) |
|
|
| return ( |
| new_step, |
| matrix_df, |
| heatmap, |
| hist, |
| marginal_df, |
| message, |
| gr.Timer(value=float(interval_sec), active=True), |
| ) |
|
|
|
|
| def generate_matrix(n: int, d: int, seed: int, distribution: str): |
| try: |
| x = make_initial_matrix(int(n), int(d), int(seed), distribution) |
| return matrix_to_text(x), "" |
| except Exception as exc: |
| return "", f"エラー: {exc}" |
|
|
|
|
| CSS = """ |
| #title { text-align: center; } |
| .note { color: #475569; font-size: 0.95rem; } |
| """ |
|
|
| with gr.Blocks(css=CSS, title="Rearrangement Algorithm Visualizer") as demo: |
| gr.Markdown( |
| """ |
| # Rearrangement Algorithm Visualizer |
| |
| 各列を周辺分布の標本とみなし、**列内の並べ替えだけ**で結合分布を変えます。 |
| そのため、各周辺分布は保存されたまま、目的関数 `E[ψ(X)] = E[f(X1 + ... + Xd)]` が小さくなるようにヒューリスティックに rearrange します。 |
| |
| <p class="note"> |
| 行 = 同時実現シナリオ、列 = 周辺分布。列の値の multiset は不変です。 |
| </p> |
| """, |
| elem_id="title", |
| ) |
|
|
| state = gr.State([]) |
|
|
| with gr.Row(): |
| with gr.Column(scale=2): |
| matrix_input = gr.Textbox( |
| label="X: n 行 d 列", |
| value=( |
| "0.12, 1.80, -0.40\n" |
| "-1.10, 0.35, 1.25\n" |
| "0.65, -0.90, 0.10\n" |
| "1.40, 0.05, -1.30\n" |
| "-0.55, -1.20, 0.75\n" |
| "0.90, 1.10, -0.85" |
| ), |
| lines=8, |
| placeholder="例:\n1, 4, 7\n2, 5, 8\n3, 6, 9", |
| ) |
|
|
| with gr.Column(scale=1): |
| n_input = gr.Slider(label="生成 n", minimum=4, maximum=100, step=1, value=12) |
| d_input = gr.Slider(label="生成 d", minimum=2, maximum=10, step=1, value=3) |
| seed_input = gr.Number(label="seed", value=1, precision=0) |
| distribution_input = gr.Dropdown( |
| label="生成する周辺分布", |
| choices=["integer", "normal", "uniform", "lognormal"], |
| value="integer", |
| ) |
| generate_matrix_btn = gr.Button("ランダム初期値を生成") |
|
|
|
|
| with gr.Row(): |
| psi_input = gr.Dropdown( |
| label="f(ψ(x₁,...,x_d)=f(Σxᵢ))", |
| choices=["square", "absolute", "exponential", "positive_part", "custom"], |
| value="square", |
| ) |
| theta_input = gr.Number(label="exponential の θ", value=1.0) |
| custom_expr_input = gr.Textbox( |
| label="custom f(s) 式", |
| value="s**2", |
| placeholder="例: s**2, np.exp(0.5*s), maximum(s, 0) / 変数: s", |
| ) |
|
|
| with gr.Row(): |
| max_iter_input = gr.Slider(label="最大 sweep 数", minimum=1, maximum=100, step=1, value=20) |
| random_tie_input = gr.Checkbox(label="列順をランダム化", value=False) |
| entropy_bins_input = gr.Slider( |
| label="copula entropy 用の rank bin 数", |
| minimum=2, |
| maximum=10, |
| step=1, |
| value=2, |
| info="各列をrank binに変換し、周辺を固定したcopula側の経験エントロピーを近似します。少ない行数では2〜4を推奨。", |
| ) |
| run_btn = gr.Button("RA を実行", variant="primary") |
|
|
| error_box = gr.Textbox(label="メッセージ", interactive=False) |
| summary_box = gr.Textbox(label="サマリー", lines=5, interactive=False) |
|
|
| playback_timer = gr.Timer(value=0.8, active=False) |
|
|
| with gr.Row(): |
| prev_btn = gr.Button("← 前へ") |
| play_btn = gr.Button("▶ 再生", variant="secondary") |
| stop_btn = gr.Button("⏸ 停止") |
| next_btn = gr.Button("次へ →") |
|
|
| with gr.Row(): |
| step_slider = gr.Slider(label="Step", minimum=0, maximum=100, step=1, value=0) |
| interval_slider = gr.Slider(label="再生間隔 秒/step", minimum=0.1, maximum=3.0, step=0.1, value=0.8) |
|
|
| step_message = gr.Textbox(label="現在の状態", lines=3, interactive=False) |
|
|
| with gr.Row(): |
| matrix_df = gr.Dataframe(label="現在の X", interactive=False, wrap=True) |
| marginal_df = gr.Dataframe(label="周辺分布チェック", interactive=False, wrap=True) |
|
|
| with gr.Row(): |
| heatmap = gr.Plot(label="X のヒートマップ") |
| with gr.Column(): |
| trace_plot = gr.Plot(label="目的関数の推移") |
| entropy_plot = gr.Plot(label="rank-binned copula entropy の推移") |
|
|
| sum_hist = gr.Plot(label="行和の分布") |
| history_df = gr.Dataframe(label="履歴", interactive=False, wrap=True) |
|
|
| generate_matrix_btn.click( |
| generate_matrix, |
| inputs=[n_input, d_input, seed_input, distribution_input], |
| outputs=[matrix_input, error_box], |
| ) |
|
|
|
|
| run_btn.click( |
| run_algorithm, |
| inputs=[ |
| matrix_input, |
| psi_input, |
| theta_input, |
| custom_expr_input, |
| max_iter_input, |
| random_tie_input, |
| seed_input, |
| entropy_bins_input, |
| ], |
| outputs=[ |
| state, |
| step_slider, |
| step_slider, |
| matrix_df, |
| heatmap, |
| trace_plot, |
| entropy_plot, |
| sum_hist, |
| marginal_df, |
| history_df, |
| summary_box, |
| error_box, |
| ], |
| ) |
|
|
| step_slider.change( |
| show_step, |
| inputs=[state, step_slider], |
| outputs=[matrix_df, heatmap, sum_hist, marginal_df, step_message], |
| ) |
|
|
| prev_btn.click( |
| lambda s, c: move_step(s, c, -1), |
| inputs=[state, step_slider], |
| outputs=[step_slider, matrix_df, heatmap, sum_hist, marginal_df, step_message], |
| ) |
|
|
| next_btn.click( |
| lambda s, c: move_step(s, c, 1), |
| inputs=[state, step_slider], |
| outputs=[step_slider, matrix_df, heatmap, sum_hist, marginal_df, step_message], |
| ) |
|
|
| play_btn.click( |
| lambda interval: gr.Timer(value=float(interval), active=True), |
| inputs=[interval_slider], |
| outputs=[playback_timer], |
| ) |
|
|
| stop_btn.click( |
| lambda: gr.Timer(active=False), |
| inputs=None, |
| outputs=[playback_timer], |
| ) |
|
|
| playback_timer.tick( |
| autoplay_tick, |
| inputs=[state, step_slider, interval_slider], |
| outputs=[ |
| step_slider, |
| matrix_df, |
| heatmap, |
| sum_hist, |
| marginal_df, |
| step_message, |
| playback_timer, |
| ], |
| ) |
|
|
| gr.Markdown( |
| """ |
| # Acknowledgments |
| |
| 以下文献を参考にしました。 |
| |
| [1] 小池, 南, 白石, 「[再配列アルゴリズムを用いたVaR境界の算出](https://www.jstage.jst.go.jp/article/jjssj/45/2/45_353/_pdf/-char/ja)」, 2016. |
| |
| """, |
| elem_id="title", |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| demo.launch() |