import ast
import itertools
from dataclasses import dataclass
from typing import Callable, Dict, List, Tuple
import gradio as gr
import numpy as np
import pandas as pd
import plotly.graph_objects as go
# ============================================================
# Rearrangement Algorithm visualizer
# ------------------------------------------------------------
# We represent a joint distribution by an n x d matrix X.
# Each column is one marginal sample. Rearrangement means:
# - values inside a column may be permuted;
# - the multiset in every column is unchanged;
# - hence all empirical marginal distributions are preserved.
# The algorithm decreases E[psi(X)] = E[f(sum_j X_j)]
# by rearranging columns while preserving empirical marginals.
# ============================================================
@dataclass
class Step:
step: int
matrix: List[List[float]]
objective: float
action: str
column: int | None = None
before_order: List[int] | None = None
after_order: List[int] | None = None
def parse_matrix(text: str) -> np.ndarray:
"""Parse a matrix from Python-list/JSON-ish text or CSV-like rows."""
text = text.strip()
if not text:
raise ValueError("行列を入力してください。")
try:
if text.startswith("["):
value = ast.literal_eval(text)
x = np.array(value, dtype=float)
else:
rows = []
for line in text.splitlines():
line = line.strip()
if not line:
continue
rows.append([float(v.strip()) for v in line.split(",") if v.strip()])
x = np.array(rows, dtype=float)
except Exception as exc:
raise ValueError("行列として解釈できませんでした。例: [[1,4,7],[2,5,8],[3,6,9]]") from exc
if x.ndim != 2:
raise ValueError("2次元の行列を入力してください。")
if x.shape[0] < 2 or x.shape[1] < 2:
raise ValueError("少なくとも 2 行 2 列が必要です。")
if not np.isfinite(x).all():
raise ValueError("NaN や Inf は使えません。")
return x
def make_initial_matrix(n: int, d: int, seed: int, distribution: str) -> np.ndarray:
rng = np.random.default_rng(seed)
if distribution == "normal":
cols = [
np.sort(rng.normal(loc=0.0, scale=1.0 + 0.25 * j, size=n))
for j in range(d)
]
elif distribution == "uniform":
cols = [
np.sort(rng.uniform(-1.0 - j * 0.2, 1.0 + j * 0.2, size=n))
for j in range(d)
]
elif distribution == "lognormal":
cols = [
np.sort(rng.lognormal(mean=0.0, sigma=0.35 + 0.1 * j, size=n))
for j in range(d)
]
elif distribution == "integer":
cols = [
np.sort(
rng.integers(
low=0,
high=10 + 2 * j + 1,
size=n,
)
)
for j in range(d)
]
else:
raise ValueError("未知の分布です。")
x = np.column_stack(cols)
# Randomly permute each column independently to create an initial coupling.
for j in range(d):
x[:, j] = rng.permutation(x[:, j])
return x
def matrix_to_text(x: np.ndarray) -> str:
return "\n".join(", ".join(f"{v:.4g}" for v in row) for row in x)
def build_f(name: str, theta: float, custom_expr: str) -> Tuple[Callable[[np.ndarray], np.ndarray], str]:
"""
Return f for
psi(x_1, ..., x_d) = f(sum_j x_j)
Input s is a length-n NumPy array of row sums.
Output must also be length n.
"""
if name == "square":
return lambda s: s**2, "ψ(x₁, ..., x_d) = f(Σxᵢ), f(s) = s²"
if name == "absolute":
return lambda s: np.abs(s), "ψ(x₁, ..., x_d) = f(Σxᵢ), f(s) = |s|"
if name == "exponential":
return lambda s: np.exp(theta * s), f"ψ(x₁, ..., x_d) = f(Σxᵢ), f(s) = exp({theta:g} · s)"
if name == "positive_part":
return lambda s: np.maximum(s, 0.0), "ψ(x₁, ..., x_d) = f(Σxᵢ), f(s) = max(s, 0)"
if name == "custom":
expr = custom_expr.strip()
if not expr:
raise ValueError("custom を使う場合は f(s) の式を入力してください。")
allowed = {
"np": np,
"abs": np.abs,
"sqrt": np.sqrt,
"exp": np.exp,
"log": np.log,
"maximum": np.maximum,
"minimum": np.minimum,
}
def custom_f(s: np.ndarray) -> np.ndarray:
y = eval(expr, {"__builtins__": {}}, {**allowed, "s": s})
y = np.asarray(y, dtype=float)
if y.ndim == 0:
y = np.full(s.shape[0], float(y))
return y
return custom_f, f"ψ(x₁, ..., x_d) = f(Σxᵢ), f(s) = {expr}"
raise ValueError("未知の f です。")
def build_psi(name: str, theta: float, custom_expr: str) -> Tuple[Callable[[np.ndarray], np.ndarray], str]:
"""
Return row-wise psi with
psi(x_1, ..., x_d) = f(sum_j x_j).
Input X is n x d.
Output is length n.
"""
f, f_label = build_f(name, theta, custom_expr)
def psi(x: np.ndarray) -> np.ndarray:
s = np.sum(x, axis=1)
return f(s)
return psi, f_label
def objective(x: np.ndarray, psi: Callable[[np.ndarray], np.ndarray]) -> float:
values = psi(x)
if values.shape[0] != x.shape[0]:
raise ValueError("ψ は各行ごとに1つの値を返す必要があります。")
if not np.isfinite(values).all():
raise ValueError("ψ の値に NaN または Inf が出ました。")
return float(np.mean(values))
def is_better(new_value: float, old_value: float, tol: float = 1e-12) -> bool:
return new_value < old_value - tol
def greedy_sort_rearrangement(
x: np.ndarray,
psi_name: str,
theta: float,
custom_expr: str,
max_iter: int,
random_tie_break: bool,
seed: int,
) -> Tuple[List[Step], str]:
"""
Heuristic RA.
For one chosen column j, keep all other columns fixed.
We search for a permutation of column j that improves the selected objective.
The objective is to minimize E[psi(X)] = E[f(sum_j X_j)].
For common convex increasing-in-sum losses, the classical RA idea is to
put a selected column in opposite order to the partial row sum of the other
columns. For more general f, we use this as a proposal and also run a
small pairwise local improvement pass.
"""
psi, objective_label = build_psi(psi_name, theta, custom_expr)
objective_fn = lambda z: objective(z, psi)
rng = np.random.default_rng(seed)
x = x.copy()
n, d = x.shape
steps = [Step(0, x.tolist(), objective_fn(x), "初期カップリング")]
current = steps[-1].objective
for it in range(1, max_iter + 1):
improved_any = False
columns = list(range(d))
if random_tie_break:
rng.shuffle(columns)
for j in columns:
old_col = x[:, j].copy()
old_order = np.argsort(old_col, kind="mergesort").tolist()
# Proposal 1: anti-monotone arrangement against partial sums.
rest_sum = np.sum(x, axis=1) - x[:, j]
row_order = np.argsort(rest_sum, kind="mergesort")
col_sorted_desc = np.sort(old_col)[::-1]
candidate = x.copy()
candidate[row_order, j] = col_sorted_desc
cand_obj = objective_fn(candidate)
best = candidate
best_obj = cand_obj
best_action = f"列 {j + 1}: 他列の行和に対して反対順に rearrange"
# Proposal 2: if anti-monotone does not help enough, try pair swaps.
# This makes the app useful for non-smooth/custom f as well.
pair_best = x.copy()
pair_best_obj = current
pair_action = None
max_pair_checks = min(n * (n - 1) // 2, 2500)
pairs = list(itertools.combinations(range(n), 2))
if len(pairs) > max_pair_checks:
pairs_idx = rng.choice(len(pairs), size=max_pair_checks, replace=False)
pairs = [pairs[k] for k in pairs_idx]
for a, b in pairs:
tmp = x.copy()
tmp[a, j], tmp[b, j] = tmp[b, j], tmp[a, j]
tmp_obj = objective_fn(tmp)
if is_better(tmp_obj, pair_best_obj):
pair_best = tmp
pair_best_obj = tmp_obj
pair_action = f"列 {j + 1}: 行 {a + 1} と行 {b + 1} を swap"
if is_better(pair_best_obj, best_obj):
best = pair_best
best_obj = pair_best_obj
best_action = pair_action or f"列 {j + 1}: pairwise swap"
if is_better(best_obj, current):
x = best
current = best_obj
improved_any = True
new_col = x[:, j]
new_order = np.argsort(new_col, kind="mergesort").tolist()
steps.append(
Step(
len(steps),
x.tolist(),
current,
best_action,
column=j,
before_order=old_order,
after_order=new_order,
)
)
if not improved_any:
steps.append(Step(len(steps), x.tolist(), current, "改善なし: 局所解として停止"))
break
return steps, objective_label
def make_matrix_df(step: Step):
x = np.array(step.matrix)
df = pd.DataFrame(x, columns=[f"X{j + 1}" for j in range(x.shape[1])])
df.insert(0, "row", np.arange(1, x.shape[0] + 1))
df["sum"] = x.sum(axis=1)
df = df.round(6)
min_sum = df["sum"].min()
max_sum = df["sum"].max()
def highlight_sum_extremes(row):
styles = [""] * len(row)
sum_col_idx = row.index.get_loc("sum")
if row["sum"] == max_sum:
styles[sum_col_idx] = "background-color: #fecaca; color: #7f1d1d; font-weight: bold;"
if row["sum"] == min_sum:
styles[sum_col_idx] = "background-color: #bfdbfe; color: #1e3a8a; font-weight: bold;"
return styles
return df.style.apply(highlight_sum_extremes, axis=1)
def make_marginal_check_df(initial: np.ndarray, current: np.ndarray) -> pd.DataFrame:
rows = []
for j in range(initial.shape[1]):
same = np.allclose(np.sort(initial[:, j]), np.sort(current[:, j]))
rows.append(
{
"列": f"X{j + 1}",
"周辺分布 preserved?": "YES" if same else "NO",
"初期 min": np.min(initial[:, j]),
"現在 min": np.min(current[:, j]),
"初期 max": np.max(initial[:, j]),
"現在 max": np.max(current[:, j]),
"初期 mean": np.mean(initial[:, j]),
"現在 mean": np.mean(current[:, j]),
}
)
return pd.DataFrame(rows).round(6)
def make_heatmap(step: Step) -> go.Figure:
x = np.array(step.matrix)
fig = go.Figure(
data=go.Heatmap(
z=x,
x=[f"X{j + 1}" for j in range(x.shape[1])],
y=[f"row {i + 1}" for i in range(x.shape[0])],
colorbar=dict(title="value"),
)
)
title = f"Step {step.step}: {step.action}
目的関数 = {step.objective:.6g}"
fig.update_layout(title=title, height=450, margin=dict(l=70, r=30, t=80, b=40))
return fig
def make_trace(steps: List[Step]) -> go.Figure:
fig = go.Figure()
fig.add_trace(
go.Scatter(
x=[s.step for s in steps],
y=[s.objective for s in steps],
mode="lines+markers",
hovertemplate="step %{x}
目的関数=%{y:.6g}
H(X)=%{y:.6g}
ΣH(Fj)=%{y:.6g}
Hc=%{y:.6g}
行 = 同時実現シナリオ、列 = 周辺分布。列の値の multiset は不変です。
""", elem_id="title", ) state = gr.State([]) with gr.Row(): with gr.Column(scale=2): matrix_input = gr.Textbox( label="X: n 行 d 列", value=( "0.12, 1.80, -0.40\n" "-1.10, 0.35, 1.25\n" "0.65, -0.90, 0.10\n" "1.40, 0.05, -1.30\n" "-0.55, -1.20, 0.75\n" "0.90, 1.10, -0.85" ), lines=8, placeholder="例:\n1, 4, 7\n2, 5, 8\n3, 6, 9", ) with gr.Column(scale=1): n_input = gr.Slider(label="生成 n", minimum=4, maximum=100, step=1, value=12) d_input = gr.Slider(label="生成 d", minimum=2, maximum=10, step=1, value=3) seed_input = gr.Number(label="seed", value=1, precision=0) distribution_input = gr.Dropdown( label="生成する周辺分布", choices=["integer", "normal", "uniform", "lognormal"], value="integer", ) generate_matrix_btn = gr.Button("ランダム初期値を生成") with gr.Row(): psi_input = gr.Dropdown( label="f(ψ(x₁,...,x_d)=f(Σxᵢ))", choices=["square", "absolute", "exponential", "positive_part", "custom"], value="square", ) theta_input = gr.Number(label="exponential の θ", value=1.0) custom_expr_input = gr.Textbox( label="custom f(s) 式", value="s**2", placeholder="例: s**2, np.exp(0.5*s), maximum(s, 0) / 変数: s", ) with gr.Row(): max_iter_input = gr.Slider(label="最大 sweep 数", minimum=1, maximum=100, step=1, value=20) random_tie_input = gr.Checkbox(label="列順をランダム化", value=False) entropy_bins_input = gr.Slider( label="copula entropy 用の rank bin 数", minimum=2, maximum=10, step=1, value=2, info="各列をrank binに変換し、周辺を固定したcopula側の経験エントロピーを近似します。少ない行数では2〜4を推奨。", ) run_btn = gr.Button("RA を実行", variant="primary") error_box = gr.Textbox(label="メッセージ", interactive=False) summary_box = gr.Textbox(label="サマリー", lines=5, interactive=False) playback_timer = gr.Timer(value=0.8, active=False) with gr.Row(): prev_btn = gr.Button("← 前へ") play_btn = gr.Button("▶ 再生", variant="secondary") stop_btn = gr.Button("⏸ 停止") next_btn = gr.Button("次へ →") with gr.Row(): step_slider = gr.Slider(label="Step", minimum=0, maximum=100, step=1, value=0) interval_slider = gr.Slider(label="再生間隔 秒/step", minimum=0.1, maximum=3.0, step=0.1, value=0.8) step_message = gr.Textbox(label="現在の状態", lines=3, interactive=False) with gr.Row(): matrix_df = gr.Dataframe(label="現在の X", interactive=False, wrap=True) marginal_df = gr.Dataframe(label="周辺分布チェック", interactive=False, wrap=True) with gr.Row(): heatmap = gr.Plot(label="X のヒートマップ") with gr.Column(): trace_plot = gr.Plot(label="目的関数の推移") entropy_plot = gr.Plot(label="rank-binned copula entropy の推移") sum_hist = gr.Plot(label="行和の分布") history_df = gr.Dataframe(label="履歴", interactive=False, wrap=True) generate_matrix_btn.click( generate_matrix, inputs=[n_input, d_input, seed_input, distribution_input], outputs=[matrix_input, error_box], ) run_btn.click( run_algorithm, inputs=[ matrix_input, psi_input, theta_input, custom_expr_input, max_iter_input, random_tie_input, seed_input, entropy_bins_input, ], outputs=[ state, step_slider, step_slider, matrix_df, heatmap, trace_plot, entropy_plot, sum_hist, marginal_df, history_df, summary_box, error_box, ], ) step_slider.change( show_step, inputs=[state, step_slider], outputs=[matrix_df, heatmap, sum_hist, marginal_df, step_message], ) prev_btn.click( lambda s, c: move_step(s, c, -1), inputs=[state, step_slider], outputs=[step_slider, matrix_df, heatmap, sum_hist, marginal_df, step_message], ) next_btn.click( lambda s, c: move_step(s, c, 1), inputs=[state, step_slider], outputs=[step_slider, matrix_df, heatmap, sum_hist, marginal_df, step_message], ) play_btn.click( lambda interval: gr.Timer(value=float(interval), active=True), inputs=[interval_slider], outputs=[playback_timer], ) stop_btn.click( lambda: gr.Timer(active=False), inputs=None, outputs=[playback_timer], ) playback_timer.tick( autoplay_tick, inputs=[state, step_slider, interval_slider], outputs=[ step_slider, matrix_df, heatmap, sum_hist, marginal_df, step_message, playback_timer, ], ) gr.Markdown( """ # Acknowledgments 以下文献を参考にしました。 [1] 小池, 南, 白石, 「[再配列アルゴリズムを用いたVaR境界の算出](https://www.jstage.jst.go.jp/article/jjssj/45/2/45_353/_pdf/-char/ja)」, 2016. """, elem_id="title", ) if __name__ == "__main__": demo.launch()