td-toolkit / td_lang /grammar.py
td-builder's picture
Fixed code: vocab mismatch fix for cross-arch merging (Llama/Falcon)
5d61448 verified
"""
TD Lang Grammar — Lark parser for .td files.
Defines the syntax for Phase 1 commands (load, merge, heal, eval, commit)
plus gate/budget blocks. Phase 2 commands are parsed into stub nodes so the
compiler can reject them with a clear error until implemented.
"""
from lark import Lark, Token, Transformer, UnexpectedInput, v_args
from .ast_nodes import (
AbsorbCmd,
BudgetBlock,
CommitCmd,
DataContractBlock,
DebateCmd,
DiagnoseCmd,
DistillCmd,
EditCmd,
EvalCmd,
FuseCmd,
ForkCmd,
GateBlock,
HealCmd,
IfBlock,
LoadCmd,
MergeCmd,
NotifyCmd,
OnErrorBlock,
PromptBlock,
PruneCmd,
RepeatBlock,
ReportCmd,
ResetCmd,
RewardContractBlock,
RollbackCmd,
CurriculumCmd,
StarCmd,
BestOfCmd,
ExploitCmd,
ArenaCmd,
ResearchArenaCmd,
SaveCmd,
ScheduleCmd,
DownloadCmd,
LogBlock,
CompareCmd,
VerifyCmd,
VoteCmd,
SetupBlock,
SnapshotCmd,
SynthCmd,
TDProgram,
TrainCmd,
)
from .errors import TDSyntaxError
# ============================================================================
# LARK GRAMMAR DEFINITION
# ============================================================================
TD_GRAMMAR = r"""
// TD Lang Grammar v0.1.0
// One command per line, blocks with curly braces, comments with #
start: (_NL* statement _NL*)* _NL*
?statement: load_cmd
| merge_cmd
| heal_cmd
| eval_cmd
| commit_cmd
| synth_cmd
| train_cmd
| debate_cmd
| diagnose_cmd
| fork_cmd
| reset_cmd
| prune_cmd
| edit_cmd
| fuse_cmd
| absorb_cmd
| repeat_block_cmd
| if_block_cmd
| snapshot_cmd
| report_cmd
| notify_cmd
| save_cmd
| gate_block
| budget_block
| data_contract_block
| reward_contract_block
| setup_block
| on_error_block
| schedule_cmd
| download_cmd
| log_block
| compare_cmd
| verify_cmd
| vote_cmd
| prompt_cmd
| distill_cmd
| rollback_cmd
| curriculum_cmd
| star_cmd
| best_of_cmd
| exploit_cmd
| arena_cmd
| research_arena_cmd
// ======================== PHASE 1 COMMANDS ========================
// load "model/path" as alias
load_cmd: "load" string "as" IDENT
// merge "source" into target using method [strength 0.5]
merge_cmd: "merge" string "into" IDENT "using" IDENT (merge_strength)?
merge_strength: "strength" NUMBER
// heal target [lora_r 32] [epochs 2]
heal_cmd: "heal" IDENT (heal_opt)*
heal_opt: "lora_r" INT -> heal_lora_r
| "epochs" INT -> heal_epochs
// eval target [on "dataset"] [-> output.json]
eval_cmd: "eval" IDENT (eval_on)? (eval_output)?
eval_on: "on" string
eval_output: "->" FILEPATH
// commit target [if [gate1, gate2, gate3]]
commit_cmd: "commit" IDENT (commit_gates)?
commit_gates: "if" name_list
// ======================== PHASE 2 COMMANDS ========================
// (parsed but not compiled yet — will show "not implemented" message)
// synth target from source [filter cherry_llm] [-> output.jsonl]
synth_cmd: "synth" IDENT "from" IDENT (synth_filter)? (synth_output)?
synth_filter: "filter" IDENT
synth_output: "->" FILEPATH
// train target on "dataset" using method [steps 100] [lr 0.0001]
train_cmd: "train" IDENT "on" string "using" IDENT (train_opt)*
train_opt: "steps" INT -> train_steps
| "lr" NUMBER -> train_lr
// debate target rounds 3 candidates 8 [-> output.jsonl]
debate_cmd: "debate" IDENT "rounds" INT "candidates" INT (debate_output)?
debate_output: "->" FILEPATH
// diagnose target [-> weaknesses.json]
diagnose_cmd: "diagnose" IDENT (diagnose_output)?
diagnose_output: "->" FILEPATH
// fork source as alias
fork_cmd: "fork" IDENT "as" IDENT
// reset target to checkpoint_name
reset_cmd: "reset" IDENT "to" (string | IDENT)
// prune target using method [aggressiveness 0.1]
prune_cmd: "prune" IDENT "using" IDENT (prune_aggr)?
prune_aggr: "aggressiveness" NUMBER
// edit target layers 16-28 using lora [lr 0.0001]
edit_cmd: "edit" IDENT "layers" LAYER_SPEC "using" IDENT (edit_lr)?
edit_lr: "lr" NUMBER
// ======================== PHASE 7 — LOOP CONTROL ========================
// repeat N { commands... }
repeat_block_cmd: "repeat" INT "{" _NL* body_cmd+ _NL* "}"
// if condition target { commands... } [else { commands... }]
if_block_cmd: "if" IDENT IDENT "{" _NL* body_cmd+ _NL* "}" (else_clause)?
else_clause: "else" "{" _NL* body_cmd+ _NL* "}"
// Commands allowed inside blocks (same as top-level minus config blocks)
?body_cmd: (load_cmd | merge_cmd | heal_cmd | eval_cmd | commit_cmd
| synth_cmd | train_cmd | debate_cmd | diagnose_cmd
| fork_cmd | reset_cmd | prune_cmd | edit_cmd
| fuse_cmd | absorb_cmd | snapshot_cmd | report_cmd
| notify_cmd | save_cmd
| repeat_block_cmd | if_block_cmd | schedule_cmd
| download_cmd | compare_cmd | verify_cmd
| vote_cmd | prompt_cmd | distill_cmd | rollback_cmd
| curriculum_cmd | star_cmd | best_of_cmd | exploit_cmd
| arena_cmd | research_arena_cmd) _NL*
// ======================== PHASE 6 — EASY MERGE COMMANDS ========================
// fuse [model1, model2, model3] into target [using method] [strategy equal|weighted|sequential]
fuse_cmd: "fuse" model_list "into" IDENT (fuse_method)? (fuse_strategy)?
model_list: "[" string ("," string)* "]"
fuse_method: "using" IDENT
fuse_strategy: "strategy" IDENT
// absorb "model" into target [strength 0.5]
absorb_cmd: "absorb" string "into" IDENT (absorb_strength)?
absorb_strength: "strength" NUMBER
// ======================== PHASE 4 COMMANDS ========================
// snapshot target [-> output_dir]
snapshot_cmd: "snapshot" IDENT (snapshot_output)?
snapshot_output: "->" FILEPATH
// report [-> economics.json]
report_cmd: "report" (report_output)?
report_output: "->" FILEPATH
// ======================== BLOCKS ========================
// gate { must_pass = [canary, perplexity, thinking_mode] }
gate_block: "gate" "{" _NL* gate_field+ _NL* "}"
gate_field: "must_pass" "=" name_list _NL*
// budget { max_gpu_hours = 8 \n max_cost = 50.00 }
budget_block: "budget" "{" _NL* budget_field+ _NL* "}"
budget_field: (budget_gpu | budget_cost | budget_tokens | budget_experiments) _NL*
budget_gpu: "max_gpu_hours" "=" NUMBER
budget_cost: "max_cost" "=" NUMBER
budget_tokens: "max_tokens" "=" INT
budget_experiments: "max_experiments" "=" INT
// data_contract { required_fields = [prompt, response] \n min_samples = 100 \n max_perplexity = 50.0 }
data_contract_block: "data_contract" "{" _NL* dc_field+ _NL* "}"
dc_field: (dc_required | dc_min_samples | dc_max_ppl) _NL*
dc_required: "required_fields" "=" name_list
dc_min_samples: "min_samples" "=" INT
dc_max_ppl: "max_perplexity" "=" NUMBER
// reward_contract { verifiers = [code_compiles, math_correct] \n min_reward = 0.3 }
reward_contract_block: "reward_contract" "{" _NL* rc_field+ _NL* "}"
rc_field: (rc_verifiers | rc_min_reward) _NL*
rc_verifiers: "verifiers" "=" name_list
rc_min_reward: "min_reward" "=" NUMBER
// ======================== PHASE 8 — AUTOPILOT ========================
// notify "Training complete!"
notify_cmd: "notify" string
// save target to "gdrive:TD/models/v1"
save_cmd: "save" IDENT "to" string
// setup { pip = [torch, transformers] hf_token = env notify = "ntfy.sh/my_ai" }
setup_block: "setup" "{" _NL* setup_field+ _NL* "}"
setup_field: (setup_pip | setup_hf | setup_notify) _NL*
setup_pip: "pip" "=" name_list
setup_hf: "hf_token" "=" IDENT
setup_notify: "notify" "=" string
// on_error { retry = 3 fallback = reduce_batch notify = true }
on_error_block: "on_error" "{" _NL* on_error_field+ _NL* "}"
on_error_field: (onerr_retry | onerr_fallback | onerr_notify) _NL*
onerr_retry: "retry" "=" INT
onerr_fallback: "fallback" "=" IDENT
onerr_notify: "notify" "=" IDENT
// ======================== PHASE 9 — SCHEDULE ========================
// schedule "every 6h" { commands... }
// schedule "at 02:00" { commands... }
// schedule "after 30m" { commands... }
schedule_cmd: "schedule" string "{" _NL* body_cmd+ _NL* "}"
// ======================== PHASE 10 - TOOLBOX ========================
// download "gsm8k" as math_data [split train]
download_cmd: "download" string "as" IDENT (download_split)?
download_split: "split" IDENT
// log "training_log.txt"
log_block: "log" string
// compare target vs "source_model" [questions 50] [-> output.json]
compare_cmd: "compare" IDENT "vs" string (compare_questions)? (compare_output)?
compare_questions: "questions" INT
compare_output: "->" FILEPATH
// verify target on "dataset" [questions 100] [-> results.json]
verify_cmd: "verify" IDENT "on" string (verify_questions)? (verify_output)?
verify_questions: "questions" INT
verify_output: "->" FILEPATH
// ======================== PHASE 11 - INTELLIGENCE ========================
// vote target "question" [samples 5] [-> output.json]
vote_cmd: "vote" IDENT string (vote_samples)? (vote_output)?
vote_samples: "samples" INT
vote_output: "->" FILEPATH
// prompt target "system prompt text"
prompt_cmd: "prompt" IDENT string
// distill target into "small_model" [steps 200] [-> output_dir]
distill_cmd: "distill" IDENT "into" string (distill_steps)? (distill_output)?
distill_steps: "steps" INT
distill_output: "->" FILEPATH
// rollback target
rollback_cmd: "rollback" IDENT
// ======================== PHASE 12 - RL & FINE-TUNING ========================
// curriculum target on "dataset" using method [levels 3] [steps 64]
curriculum_cmd: "curriculum" IDENT "on" string "using" IDENT (curriculum_opt)*
curriculum_opt: "levels" INT -> curriculum_levels
| "steps" INT -> curriculum_steps
// star target on "dataset" [rounds 3] [samples 8]
star_cmd: "star" IDENT "on" string (star_opt)*
star_opt: "rounds" INT -> star_rounds
| "samples" INT -> star_samples
// best_of target on "dataset" [n 8] [steps 32]
best_of_cmd: "best_of" IDENT "on" string (best_of_opt)*
best_of_opt: "n" INT -> best_of_n
| "steps" INT -> best_of_steps
// exploit target on "dataset" [samples 16] [steps 32] [-> output.jsonl]
exploit_cmd: "exploit" IDENT "on" string (exploit_opt)*
exploit_opt: "samples" INT -> exploit_samples
| "steps" INT -> exploit_steps
| "->" FILEPATH -> exploit_output
// ======================== PHASE 13 - REAL RL (ARENA) ========================
// arena target on "dataset" [rounds 5] [episodes 50] [steps 64] [curiosity 0.3] [-> log.json]
arena_cmd: "arena" IDENT "on" string (arena_opt)*
arena_opt: "rounds" INT -> arena_rounds
| "episodes" INT -> arena_episodes
| "steps" INT -> arena_steps
| "curiosity" NUMBER -> arena_curiosity
| "->" FILEPATH -> arena_output
// research_arena target topic "subject" [sources "web"|"pubmed"|"arxiv"|path]
// [rounds 5] [episodes 30] [steps 64] [curiosity 0.3] [difficulty_scale 0.25] [-> log.json]
research_arena_cmd: "research_arena" IDENT "topic" string (ra_opt)*
ra_opt: "sources" string -> ra_sources
| "rounds" INT -> ra_rounds
| "episodes" INT -> ra_episodes
| "steps" INT -> ra_steps
| "curiosity" NUMBER -> ra_curiosity
| "difficulty_scale" NUMBER -> ra_difficulty
| "->" FILEPATH -> ra_output
// ======================== SHARED RULES ========================
// List of names: [name1, name2, name3]
name_list: "[" IDENT ("," IDENT)* "]"
// String: double-quoted
string: ESCAPED_STRING
// Layer spec: "all", single number, or range like "16-28"
LAYER_SPEC: /all|[0-9]+-[0-9]+|[0-9]+/
// Filepath: word with dots, slashes, underscores (no spaces)
FILEPATH: /[a-zA-Z0-9_.\-\/]+/
// Identifier: letters, numbers, underscores, hyphens (but starts with letter/underscore)
IDENT: /[a-zA-Z_][a-zA-Z0-9_\-]*/
// Numbers
NUMBER: /\d+\.?\d*([eE][+-]?\d+)?/
INT: /\d+/
// Whitespace and comments
_NL: /\s*/ NEWLINE /\s*/
COMMENT: /#[^\n]*/
%import common.ESCAPED_STRING
%import common.NEWLINE
%import common.WS_INLINE
%ignore WS_INLINE
%ignore COMMENT
"""
# ============================================================================
# LARK TRANSFORMER — Parse Tree → AST Nodes
# ============================================================================
@v_args(inline=True)
class TDTransformer(Transformer):
"""Transforms Lark parse tree into td_lang AST nodes.
Each method matches a grammar rule name and returns the corresponding
dataclass from ast_nodes.py.
"""
# --- Helpers ---
def string(self, s: Token) -> str:
"""Strip quotes from a string token."""
return str(s)[1:-1]
def name_list(self, *names: Token) -> list[str]:
"""Convert name list tokens to Python list of strings."""
return [str(n) for n in names]
def IDENT(self, token: Token) -> str:
return str(token)
def INT(self, token: Token) -> int:
return int(token)
def NUMBER(self, token: Token) -> float:
return float(token)
def FILEPATH(self, token: Token) -> str:
return str(token)
def LAYER_SPEC(self, token: Token) -> str:
return str(token)
# --- Phase 1 Commands ---
def load_cmd(self, model_ref: str, alias: str) -> LoadCmd:
return LoadCmd(model_ref=model_ref, alias=alias)
def merge_cmd(self, source: str, target: str, method: str,
strength: float | None = None) -> MergeCmd:
return MergeCmd(
source=source,
target=target,
method=method,
strength=strength if strength is not None else 0.5,
)
def merge_strength(self, value: float) -> float:
return value
def heal_cmd(self, target: str, *opts) -> HealCmd:
cmd = HealCmd(target=target)
for opt in opts:
if isinstance(opt, tuple):
key, val = opt
if key == "lora_r":
cmd.lora_r = val
elif key == "epochs":
cmd.epochs = val
return cmd
def heal_lora_r(self, value: int) -> tuple:
return ("lora_r", value)
def heal_epochs(self, value: int) -> tuple:
return ("epochs", value)
def eval_cmd(self, target: str, *opts) -> EvalCmd:
cmd = EvalCmd(target=target)
for opt in opts:
if isinstance(opt, tuple):
key, val = opt
if key == "on":
cmd.dataset = val
elif key == "output":
cmd.output = val
return cmd
def eval_on(self, dataset: str) -> tuple:
return ("on", dataset)
def eval_output(self, filepath: str) -> tuple:
return ("output", filepath)
def commit_cmd(self, target: str, gates: list[str] | None = None) -> CommitCmd:
return CommitCmd(target=target, gates=gates)
def commit_gates(self, gates: list[str]) -> list[str]:
return gates
# --- Phase 2 Commands ---
def synth_cmd(self, target: str, source: str, *opts) -> SynthCmd:
cmd = SynthCmd(target=target, source=source)
for opt in opts:
if isinstance(opt, tuple):
key, val = opt
if key == "filter":
cmd.filter_method = val
elif key == "output":
cmd.output = val
return cmd
def synth_filter(self, method: str) -> tuple:
return ("filter", method)
def synth_output(self, filepath: str) -> tuple:
return ("output", filepath)
def train_cmd(self, target: str, dataset: str, method: str, *opts) -> TrainCmd:
cmd = TrainCmd(target=target, dataset=dataset, method=method)
for opt in opts:
if isinstance(opt, tuple):
key, val = opt
if key == "steps":
cmd.steps = val
elif key == "lr":
cmd.learning_rate = val
return cmd
def train_steps(self, value: int) -> tuple:
return ("steps", value)
def train_lr(self, value: float) -> tuple:
return ("lr", value)
def debate_cmd(self, target: str, rounds: int, candidates: int,
output: tuple | None = None) -> DebateCmd:
cmd = DebateCmd(target=target, rounds=rounds, candidates=candidates)
if isinstance(output, tuple) and output[0] == "output":
cmd.output = output[1]
return cmd
def debate_output(self, filepath: str) -> tuple:
return ("output", filepath)
def diagnose_cmd(self, target: str, output: tuple | None = None) -> DiagnoseCmd:
cmd = DiagnoseCmd(target=target)
if isinstance(output, tuple) and output[0] == "output":
cmd.output = output[1]
return cmd
def diagnose_output(self, filepath: str) -> tuple:
return ("output", filepath)
def fork_cmd(self, source: str, alias: str) -> ForkCmd:
return ForkCmd(source=source, alias=alias)
def reset_cmd(self, target: str, checkpoint) -> ResetCmd:
return ResetCmd(target=target, checkpoint=str(checkpoint))
def prune_cmd(self, target: str, method: str,
aggressiveness: float | None = None) -> PruneCmd:
return PruneCmd(
target=target,
method=method,
aggressiveness=aggressiveness if aggressiveness is not None else 0.1,
)
def prune_aggr(self, value: float) -> float:
return value
def edit_cmd(self, target: str, layers: str, method: str,
lr: float | None = None) -> EditCmd:
return EditCmd(
target=target,
layers=layers,
method=method,
learning_rate=lr,
)
def edit_lr(self, value: float) -> float:
return value
# --- Phase 7: Loop Control ---
def repeat_block_cmd(self, count: int, *body_cmds) -> RepeatBlock:
return RepeatBlock(count=count, body=list(body_cmds))
def if_block_cmd(self, condition: str, target: str, *rest) -> IfBlock:
"""Parse if condition target { then... } [else { else... }]"""
block = IfBlock(condition=condition, target=target)
# rest contains then_body commands + possibly an else list
for item in rest:
if isinstance(item, list) and item and hasattr(item, '__iter__'):
# This is the else body (passed from else_clause)
block.else_body = item
else:
block.then_body.append(item)
return block
def else_clause(self, *body_cmds) -> list:
return list(body_cmds)
# --- Phase 9: Schedule ---
def schedule_cmd(self, timing: str, *body_cmds) -> ScheduleCmd:
return ScheduleCmd(timing=timing, body=list(body_cmds))
# --- Phase 10: Toolbox ---
def download_cmd(self, dataset: str, alias: str, split: str | None = None) -> DownloadCmd:
cmd = DownloadCmd(dataset=dataset, alias=alias)
if isinstance(split, tuple) and split[0] == "split":
cmd.split = split[1]
elif isinstance(split, str):
cmd.split = split
return cmd
def download_split(self, value: str) -> tuple:
return ("split", value)
def log_block(self, filepath: str) -> LogBlock:
return LogBlock(filepath=filepath)
def compare_cmd(self, target: str, source: str, *opts) -> CompareCmd:
cmd = CompareCmd(target=target, source=source)
for opt in opts:
if isinstance(opt, tuple):
key, val = opt
if key == "questions":
cmd.questions = val
elif key == "output":
cmd.output = val
return cmd
def compare_questions(self, value: int) -> tuple:
return ("questions", value)
def compare_output(self, filepath: str) -> tuple:
return ("output", filepath)
def verify_cmd(self, target: str, dataset: str, *opts) -> VerifyCmd:
cmd = VerifyCmd(target=target, dataset=dataset)
for opt in opts:
if isinstance(opt, tuple):
key, val = opt
if key == "questions":
cmd.questions = val
elif key == "output":
cmd.output = val
return cmd
def verify_questions(self, value: int) -> tuple:
return ("questions", value)
def verify_output(self, filepath: str) -> tuple:
return ("output", filepath)
# --- Phase 11: Intelligence Commands ---
def vote_cmd(self, target: str, question: str, *opts) -> VoteCmd:
cmd = VoteCmd(target=target, question=question)
for opt in opts:
if isinstance(opt, tuple):
key, val = opt
if key == "samples":
cmd.samples = val
elif key == "output":
cmd.output = val
return cmd
def vote_samples(self, value: int) -> tuple:
return ("samples", value)
def vote_output(self, filepath: str) -> tuple:
return ("output", filepath)
def prompt_cmd(self, target: str, text: str) -> PromptBlock:
return PromptBlock(target=target, text=text)
def distill_cmd(self, teacher: str, student: str, *opts) -> DistillCmd:
cmd = DistillCmd(teacher=teacher, student=student)
for opt in opts:
if isinstance(opt, tuple):
key, val = opt
if key == "steps":
cmd.steps = val
elif key == "output":
cmd.output = val
return cmd
def distill_steps(self, value: int) -> tuple:
return ("steps", value)
def distill_output(self, filepath: str) -> tuple:
return ("output", filepath)
def rollback_cmd(self, target: str) -> RollbackCmd:
return RollbackCmd(target=target)
# --- Phase 12: RL & Fine-Tuning Commands ---
def curriculum_cmd(self, target: str, dataset: str, method: str, *opts) -> CurriculumCmd:
cmd = CurriculumCmd(target=target, dataset=dataset, method=method)
for opt in opts:
if isinstance(opt, tuple):
key, val = opt
if key == "levels":
cmd.levels = val
elif key == "steps":
cmd.steps = val
return cmd
def curriculum_levels(self, value: int) -> tuple:
return ("levels", value)
def curriculum_steps(self, value: int) -> tuple:
return ("steps", value)
def star_cmd(self, target: str, dataset: str, *opts) -> StarCmd:
cmd = StarCmd(target=target, dataset=dataset)
for opt in opts:
if isinstance(opt, tuple):
key, val = opt
if key == "rounds":
cmd.rounds = val
elif key == "samples":
cmd.samples = val
return cmd
def star_rounds(self, value: int) -> tuple:
return ("rounds", value)
def star_samples(self, value: int) -> tuple:
return ("samples", value)
def best_of_cmd(self, target: str, dataset: str, *opts) -> BestOfCmd:
cmd = BestOfCmd(target=target, dataset=dataset)
for opt in opts:
if isinstance(opt, tuple):
key, val = opt
if key == "n":
cmd.n = val
elif key == "steps":
cmd.steps = val
return cmd
def best_of_n(self, value: int) -> tuple:
return ("n", value)
def best_of_steps(self, value: int) -> tuple:
return ("steps", value)
def exploit_cmd(self, target: str, dataset: str, *opts) -> ExploitCmd:
cmd = ExploitCmd(target=target, dataset=dataset)
for opt in opts:
if isinstance(opt, tuple):
key, val = opt
if key == "samples":
cmd.samples = val
elif key == "steps":
cmd.steps = val
elif key == "output":
cmd.output = val
return cmd
def exploit_samples(self, value: int) -> tuple:
return ("samples", value)
def exploit_steps(self, value: int) -> tuple:
return ("steps", value)
def exploit_output(self, filepath: str) -> tuple:
return ("output", filepath)
# --- Phase 13: Real RL (Arena) ---
def arena_cmd(self, target: str, dataset: str, *opts) -> ArenaCmd:
cmd = ArenaCmd(target=target, dataset=dataset)
for opt in opts:
if isinstance(opt, tuple):
key, val = opt
if key == "rounds":
cmd.rounds = val
elif key == "episodes":
cmd.episodes = val
elif key == "steps":
cmd.steps = val
elif key == "curiosity":
cmd.curiosity = val
elif key == "output":
cmd.output = val
return cmd
def arena_rounds(self, value: int) -> tuple:
return ("rounds", value)
def arena_episodes(self, value: int) -> tuple:
return ("episodes", value)
def arena_steps(self, value: int) -> tuple:
return ("steps", value)
def arena_curiosity(self, value: float) -> tuple:
return ("curiosity", value)
def arena_output(self, filepath: str) -> tuple:
return ("output", filepath)
# --- Phase 13: Research Arena ---
def research_arena_cmd(self, target: str, topic: str, *opts) -> ResearchArenaCmd:
cmd = ResearchArenaCmd(target=target, topic=topic)
for opt in opts:
if isinstance(opt, tuple):
key, val = opt
if key == "sources":
cmd.sources = val
elif key == "rounds":
cmd.rounds = val
elif key == "episodes":
cmd.episodes = val
elif key == "steps":
cmd.steps = val
elif key == "curiosity":
cmd.curiosity = val
elif key == "difficulty_scale":
cmd.difficulty_scale = val
elif key == "output":
cmd.output = val
return cmd
def ra_sources(self, value: str) -> tuple:
return ("sources", value)
def ra_rounds(self, value: int) -> tuple:
return ("rounds", value)
def ra_episodes(self, value: int) -> tuple:
return ("episodes", value)
def ra_steps(self, value: int) -> tuple:
return ("steps", value)
def ra_curiosity(self, value: float) -> tuple:
return ("curiosity", value)
def ra_difficulty(self, value: float) -> tuple:
return ("difficulty_scale", value)
def ra_output(self, filepath: str) -> tuple:
return ("output", filepath)
# --- Phase 6: Easy Merge Commands ---
def fuse_cmd(self, sources: list[str], target: str, *opts) -> FuseCmd:
cmd = FuseCmd(sources=sources, target=target)
for opt in opts:
if isinstance(opt, tuple):
key, val = opt
if key == "method":
cmd.method = val
elif key == "strategy":
cmd.strategy = val
return cmd
def model_list(self, *models: str) -> list[str]:
return [str(m) for m in models]
def fuse_method(self, method: str) -> tuple:
return ("method", method)
def fuse_strategy(self, strategy: str) -> tuple:
return ("strategy", strategy)
def absorb_cmd(self, source: str, target: str,
strength: float | None = None) -> AbsorbCmd:
return AbsorbCmd(
source=source,
target=target,
strength=strength if strength is not None else 0.5,
)
def absorb_strength(self, value: float) -> float:
return value
# --- Phase 4 Commands ---
def snapshot_cmd(self, target: str, output: tuple | None = None) -> SnapshotCmd:
cmd = SnapshotCmd(target=target)
if isinstance(output, tuple) and output[0] == "output":
cmd.output = output[1]
return cmd
def snapshot_output(self, filepath: str) -> tuple:
return ("output", filepath)
def report_cmd(self, output: tuple | None = None) -> ReportCmd:
cmd = ReportCmd()
if isinstance(output, tuple) and output[0] == "output":
cmd.output = output[1]
return cmd
def report_output(self, filepath: str) -> tuple:
return ("output", filepath)
# --- Blocks ---
def gate_block(self, *fields) -> GateBlock:
gate = GateBlock()
for f in fields:
if isinstance(f, list):
gate.must_pass = f
return gate
def gate_field(self, names: list[str]) -> list[str]:
return names
def budget_block(self, *fields) -> BudgetBlock:
budget = BudgetBlock()
for f in fields:
if isinstance(f, tuple):
key, val = f
if key == "max_gpu_hours":
budget.max_gpu_hours = val
elif key == "max_cost":
budget.max_cost = val
elif key == "max_tokens":
budget.max_tokens = int(val)
elif key == "max_experiments":
budget.max_experiments = int(val)
return budget
def budget_field(self, field_data) -> tuple:
return field_data
def budget_gpu(self, value: float) -> tuple:
return ("max_gpu_hours", value)
def budget_cost(self, value: float) -> tuple:
return ("max_cost", value)
def budget_tokens(self, value: int) -> tuple:
return ("max_tokens", value)
def budget_experiments(self, value: int) -> tuple:
return ("max_experiments", value)
# --- Phase 8: Autopilot Commands ---
def notify_cmd(self, message: str) -> NotifyCmd:
return NotifyCmd(message=message)
def save_cmd(self, target: str, destination: str) -> SaveCmd:
return SaveCmd(target=target, destination=destination)
def setup_block(self, *fields) -> SetupBlock:
sb = SetupBlock()
for f in fields:
if isinstance(f, tuple):
key, val = f
if key == "pip":
sb.pip_packages = val
elif key == "hf_token":
sb.hf_token = val
elif key == "notify":
sb.notify_url = val
return sb
def setup_field(self, field_data) -> tuple:
return field_data
def setup_pip(self, packages: list[str]) -> tuple:
return ("pip", packages)
def setup_hf(self, mode: str) -> tuple:
return ("hf_token", mode)
def setup_notify(self, url: str) -> tuple:
return ("notify", url)
def on_error_block(self, *fields) -> OnErrorBlock:
oe = OnErrorBlock()
for f in fields:
if isinstance(f, tuple):
key, val = f
if key == "retry":
oe.retry = int(val)
elif key == "fallback":
oe.fallback = val
elif key == "notify":
oe.notify = str(val).lower() == "true"
return oe
def on_error_field(self, field_data) -> tuple:
return field_data
def onerr_retry(self, value: int) -> tuple:
return ("retry", value)
def onerr_fallback(self, value: str) -> tuple:
return ("fallback", value)
def onerr_notify(self, value: str) -> tuple:
return ("notify", value)
# --- Contract Blocks (Phase 4) ---
def data_contract_block(self, *fields) -> DataContractBlock:
dc = DataContractBlock()
for f in fields:
if isinstance(f, tuple):
key, val = f
if key == "required_fields":
dc.required_fields = val
elif key == "min_samples":
dc.min_samples = int(val)
elif key == "max_perplexity":
dc.max_perplexity = val
return dc
def dc_field(self, field_data) -> tuple:
return field_data
def dc_required(self, names: list[str]) -> tuple:
return ("required_fields", names)
def dc_min_samples(self, value: int) -> tuple:
return ("min_samples", value)
def dc_max_ppl(self, value: float) -> tuple:
return ("max_perplexity", value)
def reward_contract_block(self, *fields) -> RewardContractBlock:
rc = RewardContractBlock()
for f in fields:
if isinstance(f, tuple):
key, val = f
if key == "verifiers":
rc.verifiers = val
elif key == "min_reward":
rc.min_reward = val
return rc
def rc_field(self, field_data) -> tuple:
return field_data
def rc_verifiers(self, names: list[str]) -> tuple:
return ("verifiers", names)
def rc_min_reward(self, value: float) -> tuple:
return ("min_reward", value)
# --- Top Level ---
def start(self, *items) -> TDProgram:
"""Collect all parsed commands and blocks into a TDProgram."""
program = TDProgram()
for item in items:
if item is None:
continue
if isinstance(item, GateBlock):
program.gates = item
elif isinstance(item, BudgetBlock):
program.budget = item
elif isinstance(item, DataContractBlock):
program.data_contract = item
elif isinstance(item, RewardContractBlock):
program.reward_contract = item
elif isinstance(item, SetupBlock):
program.setup = item
elif isinstance(item, OnErrorBlock):
program.on_error = item
elif isinstance(item, LogBlock):
program.log = item
else:
program.commands.append(item)
return program
# ============================================================================
# PUBLIC API
# ============================================================================
# Create the parser once — reuse for all files
_parser = Lark(
TD_GRAMMAR,
parser="earley",
propagate_positions=True,
)
_transformer = TDTransformer()
def parse_td_string(source: str) -> TDProgram:
"""Parse a .td source string into a TDProgram AST.
Args:
source: The .td file content as a string.
Returns:
TDProgram with all commands and blocks.
Raises:
TDSyntaxError: If the source has invalid syntax.
"""
try:
tree = _parser.parse(source)
return _transformer.transform(tree)
except UnexpectedInput as e:
raise TDSyntaxError(
message=f"Unexpected {e.token!r}" if hasattr(e, "token") else str(e),
line=getattr(e, "line", None),
hint="Check for typos or missing quotes around model paths.",
) from e
def parse_td_file(filepath: str) -> TDProgram:
"""Parse a .td file into a TDProgram AST.
Args:
filepath: Path to the .td file.
Returns:
TDProgram with all commands and blocks.
Raises:
TDSyntaxError: If the file has invalid syntax.
FileNotFoundError: If the file doesn't exist.
"""
with open(filepath, "r") as f:
source = f.read()
program = parse_td_string(source)
program.source_file = filepath
return program