File size: 4,002 Bytes
3550904
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#!/usr/bin/env python3
"""
Derived from Andrej Karpathy's nanochat project.

MIT License

Copyright (c) 2025 Andrej Karpathy

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
"""

from __future__ import annotations

import argparse
import json
import math
from pathlib import Path


def pressure_dropout(
    *,
    coefficients: dict[str, float],
    feature_set: str,
    parameters: int,
    unique_tokens: int,
    sampled_tokens: int,
) -> float:
    x_model = math.log10(parameters / unique_tokens)
    x_sample = math.log10(sampled_tokens / unique_tokens)
    if feature_set == "base":
        return (
            coefficients["A"] * x_model
            + coefficients["B"] * x_sample
            + coefficients["C0"]
        )
    if feature_set == "interaction":
        return (
            coefficients["A"] * x_model
            + coefficients["B"] * x_sample
            + coefficients["D"] * x_model * x_sample
            + coefficients["C0"]
        )
    if feature_set == "quadratic":
        return (
            coefficients["A"] * x_model
            + coefficients["B"] * x_sample
            + coefficients["Qp"] * x_model * x_model
            + coefficients["Qc"] * x_sample * x_sample
            + coefficients["C0"]
        )
    raise ValueError(f"unsupported feature set for anchors: {feature_set}")


def build_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(
        description="Create locked-stream anchor-dropout specs from coefficient JSON."
    )
    parser.add_argument("--coefficients-json", type=Path, required=True)
    parser.add_argument("--name", required=True)
    parser.add_argument("--parameters", type=int, required=True)
    parser.add_argument("--stream-token-caps", nargs="+", type=int, required=True)
    parser.add_argument("--stage-steps", type=int, required=True)
    parser.add_argument("--batch-size", type=int, required=True)
    parser.add_argument("--block-size", type=int, required=True)
    parser.add_argument("--min-rate", type=float, default=0.02)
    parser.add_argument("--max-rate", type=float, default=0.65)
    parser.add_argument("--precision", type=int, default=3)
    return parser


def main() -> None:
    args = build_parser().parse_args()
    payload = json.loads(args.coefficients_json.read_text(encoding="utf-8"))
    feature_set = payload["feature_set"]
    coefficients = payload.get("coefficients", payload)
    sampled_per_stage = args.stage_steps * args.batch_size * args.block_size
    cumulative_sampled = 0
    anchors: list[str] = []
    rows: list[dict] = []
    for unique_tokens in args.stream_token_caps:
        cumulative_sampled += sampled_per_stage
        raw = pressure_dropout(
            coefficients=coefficients,
            feature_set=feature_set,
            parameters=args.parameters,
            unique_tokens=unique_tokens,
            sampled_tokens=cumulative_sampled,
        )
        clipped = min(args.max_rate, max(args.min_rate, raw))
        anchors.append(f"{unique_tokens}={clipped:.{args.precision}f}")
        rows.append(
            {
                "unique_tokens": unique_tokens,
                "cumulative_sampled_tokens": cumulative_sampled,
                "raw_dropout": raw,
                "clipped_dropout": clipped,
            }
        )

    print(json.dumps({"name": args.name, "feature_set": feature_set, "anchors": rows}, indent=2))
    print(f"{args.name}:{','.join(anchors)}")


if __name__ == "__main__":
    main()