Spaces:
Runtime error
Runtime error
| """ | |
| data_gen.py β Training / test data for the elastic mesh. | |
| Each sample is a triple (A, B, C) where: | |
| A β β^DIM encodes constraints ("what must be true") | |
| B β β^DIM encodes objectives ("what we want") | |
| C β β^DIM is the analytic solution β the feasibility center the mesh must learn to produce | |
| Five problem families, each with a geometrically distinct C: | |
| 1. box_proj β clamp B into axis-aligned box defined by A | |
| 2. halfspace β project B onto hyperplane defined by A | |
| 3. sphere β project B onto sphere surface defined by A | |
| 4. simplex β project B onto probability simplex (A = uniform prior signal) | |
| 5. elastic_bal β per-dimension weighted balance between A-center and B | |
| These cover: | |
| - Bounded feasibility (box) | |
| - Equality constraints (halfspace) | |
| - Norm constraints (sphere) | |
| - Probability/sum=1 (simplex) | |
| - Soft trade-offs (elastic) | |
| The mesh sees ONLY (A, B) during inference; C is what it must reconstruct. | |
| """ | |
| import numpy as np | |
| import json, pathlib, argparse | |
| from typing import List, Dict | |
| DIM = 32 # embedding dimension (set to 768 for LLM-scale) | |
| SAMPLES_PER_TYPE = 1000 # Γ 5 types = 5 000 total | |
| # ββ UTILITIES βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def normalize(v: np.ndarray) -> np.ndarray: | |
| n = np.linalg.norm(v) | |
| return v / (n + 1e-12) | |
| def pack(*arrays: np.ndarray, dim: int) -> np.ndarray: | |
| """Concatenate + trim/pad to `dim`.""" | |
| v = np.concatenate(arrays) | |
| if len(v) >= dim: | |
| return v[:dim] | |
| return np.pad(v, (0, dim - len(v))) | |
| # ββ PROBLEM TYPE 1: BOX PROJECTION ββββββββββββββββββββββββββββββββββββββββββββ | |
| # | |
| # Constraint A : encodes per-dimension box [lo, hi] | |
| # A[:D/2] = lo[:D/2], A[D/2:] = hi[:D/2] | |
| # Objective B : unconstrained target point in β^D | |
| # Solution C : clip(B, lo, hi) β nearest point in box to B | |
| # | |
| # Meaning: "stay within resource/capacity bounds while aiming for B" | |
| def gen_box(n: int, dim: int, rng: np.random.Generator) -> List[Dict]: | |
| data = [] | |
| for _ in range(n): | |
| center = rng.uniform(-2, 2, dim) | |
| half = rng.uniform(0.3, 2.0, dim) | |
| lo, hi = center - half, center + half | |
| B = rng.uniform(-4, 4, dim) | |
| C = np.clip(B, lo, hi) | |
| A = pack(lo[:dim//2], hi[:dim//2], dim=dim) | |
| data.append({'A': A.tolist(), 'B': B.tolist(), 'C': C.tolist(), 'type': 'box_proj'}) | |
| return data | |
| # ββ PROBLEM TYPE 2: HALFSPACE PROJECTION ββββββββββββββββββββββββββββββββββββββ | |
| # | |
| # Constraint A : encodes a hyperplane nα΅x = b | |
| # A = normal vector, A[0] carries the offset b | |
| # Objective B : unconstrained point in β^D | |
| # Solution C : projection of B onto the hyperplane | |
| # C = B β (nα΅B β b) Β· n | |
| # | |
| # Meaning: "satisfy one hard equality constraint at minimum cost to B" | |
| def gen_halfspace(n: int, dim: int, rng: np.random.Generator) -> List[Dict]: | |
| data = [] | |
| for _ in range(n): | |
| normal = normalize(rng.standard_normal(dim)) | |
| b = float(rng.uniform(-1, 1)) | |
| B = rng.uniform(-3, 3, dim) | |
| C = B - (float(np.dot(normal, B)) - b) * normal | |
| A = normal.copy() | |
| A[0] = b # offset embedded in first slot | |
| data.append({'A': A.tolist(), 'B': B.tolist(), 'C': C.tolist(), 'type': 'halfspace'}) | |
| return data | |
| # ββ PROBLEM TYPE 3: SPHERE SURFACE ββββββββββββββββββββββββββββββββββββββββββββ | |
| # | |
| # Constraint A : encodes a sphere (center, radius) | |
| # A = center vector, A[0] overwritten with radius r | |
| # Objective B : external point | |
| # Solution C : point on sphere surface nearest to B | |
| # C = center + r Β· (B β center) / βB β centerβ | |
| # | |
| # Meaning: "satisfy a norm/budget constraint, move toward B as far as allowed" | |
| def gen_sphere(n: int, dim: int, rng: np.random.Generator) -> List[Dict]: | |
| data = [] | |
| for _ in range(n): | |
| center = rng.uniform(-1.5, 1.5, dim) | |
| r = float(rng.uniform(1.0, 3.0)) | |
| B = rng.uniform(-4, 4, dim) | |
| diff = B - center | |
| nd = np.linalg.norm(diff) | |
| if nd < 1e-10: | |
| diff = np.ones(dim) / np.sqrt(dim) | |
| nd = 1.0 | |
| C = center + r * diff / nd | |
| A = center.copy() | |
| A[0] = r # radius in first slot | |
| data.append({'A': A.tolist(), 'B': B.tolist(), 'C': C.tolist(), 'type': 'sphere'}) | |
| return data | |
| # ββ PROBLEM TYPE 4: SIMPLEX PROJECTION ββββββββββββββββββββββββββββββββββββββββ | |
| # | |
| # Constraint A : uniform-prior signal (all ones) β encodes simplex constraint Ξ£xα΅’=1, xα΅’β₯0 | |
| # Objective B : unconstrained "belief" vector | |
| # Solution C : nearest point on probability simplex to B | |
| # | |
| # Meaning: "find a valid probability distribution closest to unconstrained belief B" | |
| # Useful for softmax-like problems. | |
| def _proj_simplex(v: np.ndarray) -> np.ndarray: | |
| n = len(v) | |
| u = np.sort(v)[::-1] | |
| cs = np.cumsum(u) - 1.0 | |
| rho = int(np.where(u * np.arange(1, n + 1) > cs)[0][-1]) | |
| theta = cs[rho] / (rho + 1.0) | |
| return np.maximum(v - theta, 0.0) | |
| def gen_simplex(n: int, dim: int, rng: np.random.Generator) -> List[Dict]: | |
| data = [] | |
| for _ in range(n): | |
| A = np.ones(dim) # simplex constraint signal | |
| B = rng.uniform(-1.0, 3.0, dim) # unconstrained belief | |
| C = _proj_simplex(B) | |
| data.append({'A': A.tolist(), 'B': B.tolist(), 'C': C.tolist(), 'type': 'simplex'}) | |
| return data | |
| # ββ PROBLEM TYPE 5: ELASTIC BALANCE βββββββββββββββββββββββββββββββββββββββββββ | |
| # | |
| # Constraint A : encodes soft constraint center + per-dimension tightness weight w β [0,1] | |
| # A[:D/2] = constraint centers, A[D/2:] = tightness weights | |
| # Objective B : desired goal point | |
| # Solution C : per-dimension elastic balance | |
| # C[j] = w[j] Β· a_center[j] + (1 β w[j]) Β· B[j] | |
| # | |
| # Meaning: "each dimension is pulled between constraint center and objective, | |
| # with w[j] controlling how hard the constraint is in that dimension" | |
| # This is the natural problem for the elastic mesh. | |
| def gen_elastic(n: int, dim: int, rng: np.random.Generator) -> List[Dict]: | |
| data = [] | |
| for _ in range(n): | |
| a_center = rng.uniform(-2, 2, dim) | |
| w = rng.uniform(0.05, 0.95, dim) # per-dim tightness | |
| B = rng.uniform(-3, 3, dim) | |
| C = w * a_center + (1.0 - w) * B | |
| A = pack(a_center[:dim//2], w[:dim//2], dim=dim) | |
| data.append({'A': A.tolist(), 'B': B.tolist(), 'C': C.tolist(), 'type': 'elastic'}) | |
| return data | |
| # ββ ASSEMBLY ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| GENERATORS = { | |
| 'box_proj': gen_box, | |
| 'halfspace': gen_halfspace, | |
| 'sphere': gen_sphere, | |
| 'simplex': gen_simplex, | |
| 'elastic': gen_elastic, | |
| } | |
| def generate_all(n_per_type: int = SAMPLES_PER_TYPE, | |
| dim: int = DIM, | |
| seed: int = 42) -> List[Dict]: | |
| rng = np.random.default_rng(seed) | |
| data = [] | |
| for fn in GENERATORS.values(): | |
| data.extend(fn(n_per_type, dim, rng)) | |
| idx = rng.permutation(len(data)) | |
| return [data[i] for i in idx] | |
| # ββ MAIN ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == '__main__': | |
| parser = argparse.ArgumentParser(description='Generate elastic mesh training data') | |
| parser.add_argument('--dim', type=int, default=DIM, help='embedding dimension') | |
| parser.add_argument('--n', type=int, default=SAMPLES_PER_TYPE, help='samples per problem type') | |
| parser.add_argument('--out', type=str, default='data', help='output directory') | |
| args = parser.parse_args() | |
| print(f"\n{'β'*50}") | |
| print(f" Generating {5 * args.n} samples | dim={args.dim}") | |
| print(f"{'β'*50}") | |
| data = generate_all(args.n, args.dim) | |
| split = int(len(data) * 0.9) | |
| train, test = data[:split], data[split:] | |
| out = pathlib.Path(args.out) | |
| out.mkdir(exist_ok=True) | |
| with open(out / 'train.json', 'w') as f: json.dump(train, f) | |
| with open(out / 'test.json', 'w') as f: json.dump(test, f) | |
| # Per-type statistics | |
| from collections import Counter | |
| train_types = Counter(d['type'] for d in train) | |
| test_types = Counter(d['type'] for d in test) | |
| print(f"\n Train : {len(train)}") | |
| print(f" Test : {len(test)}\n") | |
| print(f" {'Type':<14} {'Train':>8} {'Test':>7} C-norm (mean)") | |
| print(f" {'β'*14} {'β'*8} {'β'*7} {'β'*14}") | |
| for t in GENERATORS: | |
| subset = [d for d in data if d['type'] == t] | |
| norms = [np.linalg.norm(d['C']) for d in subset] | |
| print(f" {t:<14} {train_types[t]:>8} {test_types[t]:>7} " | |
| f"{np.mean(norms):.3f} Β± {np.std(norms):.3f}") | |
| # Sanity check one sample per type | |
| print(f"\n Sanity check (first sample per type):") | |
| seen = set() | |
| for d in data: | |
| if d['type'] in seen: continue | |
| seen.add(d['type']) | |
| A, B, C = map(np.array, [d['A'], d['B'], d['C']]) | |
| err = np.linalg.norm(A - B) | |
| print(f" [{d['type']:<12}] " | |
| f"βAβ={np.linalg.norm(A):.2f} βBβ={np.linalg.norm(B):.2f} " | |
| f"βCβ={np.linalg.norm(C):.2f} βA-Bβ={err:.2f}") | |
| print(f"\n Saved β {out}/train.json {out}/test.json\n") |