""" TD Lang Grammar — Lark parser for .td files. Defines the syntax for Phase 1 commands (load, merge, heal, eval, commit) plus gate/budget blocks. Phase 2 commands are parsed into stub nodes so the compiler can reject them with a clear error until implemented. """ from lark import Lark, Token, Transformer, UnexpectedInput, v_args from .ast_nodes import ( AbsorbCmd, BudgetBlock, CommitCmd, DataContractBlock, DebateCmd, DiagnoseCmd, DistillCmd, EditCmd, EvalCmd, FuseCmd, ForkCmd, GateBlock, HealCmd, IfBlock, LoadCmd, MergeCmd, NotifyCmd, OnErrorBlock, PromptBlock, PruneCmd, RepeatBlock, ReportCmd, ResetCmd, RewardContractBlock, RollbackCmd, CurriculumCmd, StarCmd, BestOfCmd, ExploitCmd, ArenaCmd, ResearchArenaCmd, SaveCmd, ScheduleCmd, DownloadCmd, LogBlock, CompareCmd, VerifyCmd, VoteCmd, SetupBlock, SnapshotCmd, SynthCmd, TDProgram, TrainCmd, ) from .errors import TDSyntaxError # ============================================================================ # LARK GRAMMAR DEFINITION # ============================================================================ TD_GRAMMAR = r""" // TD Lang Grammar v0.1.0 // One command per line, blocks with curly braces, comments with # start: (_NL* statement _NL*)* _NL* ?statement: load_cmd | merge_cmd | heal_cmd | eval_cmd | commit_cmd | synth_cmd | train_cmd | debate_cmd | diagnose_cmd | fork_cmd | reset_cmd | prune_cmd | edit_cmd | fuse_cmd | absorb_cmd | repeat_block_cmd | if_block_cmd | snapshot_cmd | report_cmd | notify_cmd | save_cmd | gate_block | budget_block | data_contract_block | reward_contract_block | setup_block | on_error_block | schedule_cmd | download_cmd | log_block | compare_cmd | verify_cmd | vote_cmd | prompt_cmd | distill_cmd | rollback_cmd | curriculum_cmd | star_cmd | best_of_cmd | exploit_cmd | arena_cmd | research_arena_cmd // ======================== PHASE 1 COMMANDS ======================== // load "model/path" as alias load_cmd: "load" string "as" IDENT // merge "source" into target using method [strength 0.5] merge_cmd: "merge" string "into" IDENT "using" IDENT (merge_strength)? merge_strength: "strength" NUMBER // heal target [lora_r 32] [epochs 2] heal_cmd: "heal" IDENT (heal_opt)* heal_opt: "lora_r" INT -> heal_lora_r | "epochs" INT -> heal_epochs // eval target [on "dataset"] [-> output.json] eval_cmd: "eval" IDENT (eval_on)? (eval_output)? eval_on: "on" string eval_output: "->" FILEPATH // commit target [if [gate1, gate2, gate3]] commit_cmd: "commit" IDENT (commit_gates)? commit_gates: "if" name_list // ======================== PHASE 2 COMMANDS ======================== // (parsed but not compiled yet — will show "not implemented" message) // synth target from source [filter cherry_llm] [-> output.jsonl] synth_cmd: "synth" IDENT "from" IDENT (synth_filter)? (synth_output)? synth_filter: "filter" IDENT synth_output: "->" FILEPATH // train target on "dataset" using method [steps 100] [lr 0.0001] train_cmd: "train" IDENT "on" string "using" IDENT (train_opt)* train_opt: "steps" INT -> train_steps | "lr" NUMBER -> train_lr // debate target rounds 3 candidates 8 [-> output.jsonl] debate_cmd: "debate" IDENT "rounds" INT "candidates" INT (debate_output)? debate_output: "->" FILEPATH // diagnose target [-> weaknesses.json] diagnose_cmd: "diagnose" IDENT (diagnose_output)? diagnose_output: "->" FILEPATH // fork source as alias fork_cmd: "fork" IDENT "as" IDENT // reset target to checkpoint_name reset_cmd: "reset" IDENT "to" (string | IDENT) // prune target using method [aggressiveness 0.1] prune_cmd: "prune" IDENT "using" IDENT (prune_aggr)? prune_aggr: "aggressiveness" NUMBER // edit target layers 16-28 using lora [lr 0.0001] edit_cmd: "edit" IDENT "layers" LAYER_SPEC "using" IDENT (edit_lr)? edit_lr: "lr" NUMBER // ======================== PHASE 7 — LOOP CONTROL ======================== // repeat N { commands... } repeat_block_cmd: "repeat" INT "{" _NL* body_cmd+ _NL* "}" // if condition target { commands... } [else { commands... }] if_block_cmd: "if" IDENT IDENT "{" _NL* body_cmd+ _NL* "}" (else_clause)? else_clause: "else" "{" _NL* body_cmd+ _NL* "}" // Commands allowed inside blocks (same as top-level minus config blocks) ?body_cmd: (load_cmd | merge_cmd | heal_cmd | eval_cmd | commit_cmd | synth_cmd | train_cmd | debate_cmd | diagnose_cmd | fork_cmd | reset_cmd | prune_cmd | edit_cmd | fuse_cmd | absorb_cmd | snapshot_cmd | report_cmd | notify_cmd | save_cmd | repeat_block_cmd | if_block_cmd | schedule_cmd | download_cmd | compare_cmd | verify_cmd | vote_cmd | prompt_cmd | distill_cmd | rollback_cmd | curriculum_cmd | star_cmd | best_of_cmd | exploit_cmd | arena_cmd | research_arena_cmd) _NL* // ======================== PHASE 6 — EASY MERGE COMMANDS ======================== // fuse [model1, model2, model3] into target [using method] [strategy equal|weighted|sequential] fuse_cmd: "fuse" model_list "into" IDENT (fuse_method)? (fuse_strategy)? model_list: "[" string ("," string)* "]" fuse_method: "using" IDENT fuse_strategy: "strategy" IDENT // absorb "model" into target [strength 0.5] absorb_cmd: "absorb" string "into" IDENT (absorb_strength)? absorb_strength: "strength" NUMBER // ======================== PHASE 4 COMMANDS ======================== // snapshot target [-> output_dir] snapshot_cmd: "snapshot" IDENT (snapshot_output)? snapshot_output: "->" FILEPATH // report [-> economics.json] report_cmd: "report" (report_output)? report_output: "->" FILEPATH // ======================== BLOCKS ======================== // gate { must_pass = [canary, perplexity, thinking_mode] } gate_block: "gate" "{" _NL* gate_field+ _NL* "}" gate_field: "must_pass" "=" name_list _NL* // budget { max_gpu_hours = 8 \n max_cost = 50.00 } budget_block: "budget" "{" _NL* budget_field+ _NL* "}" budget_field: (budget_gpu | budget_cost | budget_tokens | budget_experiments) _NL* budget_gpu: "max_gpu_hours" "=" NUMBER budget_cost: "max_cost" "=" NUMBER budget_tokens: "max_tokens" "=" INT budget_experiments: "max_experiments" "=" INT // data_contract { required_fields = [prompt, response] \n min_samples = 100 \n max_perplexity = 50.0 } data_contract_block: "data_contract" "{" _NL* dc_field+ _NL* "}" dc_field: (dc_required | dc_min_samples | dc_max_ppl) _NL* dc_required: "required_fields" "=" name_list dc_min_samples: "min_samples" "=" INT dc_max_ppl: "max_perplexity" "=" NUMBER // reward_contract { verifiers = [code_compiles, math_correct] \n min_reward = 0.3 } reward_contract_block: "reward_contract" "{" _NL* rc_field+ _NL* "}" rc_field: (rc_verifiers | rc_min_reward) _NL* rc_verifiers: "verifiers" "=" name_list rc_min_reward: "min_reward" "=" NUMBER // ======================== PHASE 8 — AUTOPILOT ======================== // notify "Training complete!" notify_cmd: "notify" string // save target to "gdrive:TD/models/v1" save_cmd: "save" IDENT "to" string // setup { pip = [torch, transformers] hf_token = env notify = "ntfy.sh/my_ai" } setup_block: "setup" "{" _NL* setup_field+ _NL* "}" setup_field: (setup_pip | setup_hf | setup_notify) _NL* setup_pip: "pip" "=" name_list setup_hf: "hf_token" "=" IDENT setup_notify: "notify" "=" string // on_error { retry = 3 fallback = reduce_batch notify = true } on_error_block: "on_error" "{" _NL* on_error_field+ _NL* "}" on_error_field: (onerr_retry | onerr_fallback | onerr_notify) _NL* onerr_retry: "retry" "=" INT onerr_fallback: "fallback" "=" IDENT onerr_notify: "notify" "=" IDENT // ======================== PHASE 9 — SCHEDULE ======================== // schedule "every 6h" { commands... } // schedule "at 02:00" { commands... } // schedule "after 30m" { commands... } schedule_cmd: "schedule" string "{" _NL* body_cmd+ _NL* "}" // ======================== PHASE 10 - TOOLBOX ======================== // download "gsm8k" as math_data [split train] download_cmd: "download" string "as" IDENT (download_split)? download_split: "split" IDENT // log "training_log.txt" log_block: "log" string // compare target vs "source_model" [questions 50] [-> output.json] compare_cmd: "compare" IDENT "vs" string (compare_questions)? (compare_output)? compare_questions: "questions" INT compare_output: "->" FILEPATH // verify target on "dataset" [questions 100] [-> results.json] verify_cmd: "verify" IDENT "on" string (verify_questions)? (verify_output)? verify_questions: "questions" INT verify_output: "->" FILEPATH // ======================== PHASE 11 - INTELLIGENCE ======================== // vote target "question" [samples 5] [-> output.json] vote_cmd: "vote" IDENT string (vote_samples)? (vote_output)? vote_samples: "samples" INT vote_output: "->" FILEPATH // prompt target "system prompt text" prompt_cmd: "prompt" IDENT string // distill target into "small_model" [steps 200] [-> output_dir] distill_cmd: "distill" IDENT "into" string (distill_steps)? (distill_output)? distill_steps: "steps" INT distill_output: "->" FILEPATH // rollback target rollback_cmd: "rollback" IDENT // ======================== PHASE 12 - RL & FINE-TUNING ======================== // curriculum target on "dataset" using method [levels 3] [steps 64] curriculum_cmd: "curriculum" IDENT "on" string "using" IDENT (curriculum_opt)* curriculum_opt: "levels" INT -> curriculum_levels | "steps" INT -> curriculum_steps // star target on "dataset" [rounds 3] [samples 8] star_cmd: "star" IDENT "on" string (star_opt)* star_opt: "rounds" INT -> star_rounds | "samples" INT -> star_samples // best_of target on "dataset" [n 8] [steps 32] best_of_cmd: "best_of" IDENT "on" string (best_of_opt)* best_of_opt: "n" INT -> best_of_n | "steps" INT -> best_of_steps // exploit target on "dataset" [samples 16] [steps 32] [-> output.jsonl] exploit_cmd: "exploit" IDENT "on" string (exploit_opt)* exploit_opt: "samples" INT -> exploit_samples | "steps" INT -> exploit_steps | "->" FILEPATH -> exploit_output // ======================== PHASE 13 - REAL RL (ARENA) ======================== // arena target on "dataset" [rounds 5] [episodes 50] [steps 64] [curiosity 0.3] [-> log.json] arena_cmd: "arena" IDENT "on" string (arena_opt)* arena_opt: "rounds" INT -> arena_rounds | "episodes" INT -> arena_episodes | "steps" INT -> arena_steps | "curiosity" NUMBER -> arena_curiosity | "->" FILEPATH -> arena_output // research_arena target topic "subject" [sources "web"|"pubmed"|"arxiv"|path] // [rounds 5] [episodes 30] [steps 64] [curiosity 0.3] [difficulty_scale 0.25] [-> log.json] research_arena_cmd: "research_arena" IDENT "topic" string (ra_opt)* ra_opt: "sources" string -> ra_sources | "rounds" INT -> ra_rounds | "episodes" INT -> ra_episodes | "steps" INT -> ra_steps | "curiosity" NUMBER -> ra_curiosity | "difficulty_scale" NUMBER -> ra_difficulty | "->" FILEPATH -> ra_output // ======================== SHARED RULES ======================== // List of names: [name1, name2, name3] name_list: "[" IDENT ("," IDENT)* "]" // String: double-quoted string: ESCAPED_STRING // Layer spec: "all", single number, or range like "16-28" LAYER_SPEC: /all|[0-9]+-[0-9]+|[0-9]+/ // Filepath: word with dots, slashes, underscores (no spaces) FILEPATH: /[a-zA-Z0-9_.\-\/]+/ // Identifier: letters, numbers, underscores, hyphens (but starts with letter/underscore) IDENT: /[a-zA-Z_][a-zA-Z0-9_\-]*/ // Numbers NUMBER: /\d+\.?\d*([eE][+-]?\d+)?/ INT: /\d+/ // Whitespace and comments _NL: /\s*/ NEWLINE /\s*/ COMMENT: /#[^\n]*/ %import common.ESCAPED_STRING %import common.NEWLINE %import common.WS_INLINE %ignore WS_INLINE %ignore COMMENT """ # ============================================================================ # LARK TRANSFORMER — Parse Tree → AST Nodes # ============================================================================ @v_args(inline=True) class TDTransformer(Transformer): """Transforms Lark parse tree into td_lang AST nodes. Each method matches a grammar rule name and returns the corresponding dataclass from ast_nodes.py. """ # --- Helpers --- def string(self, s: Token) -> str: """Strip quotes from a string token.""" return str(s)[1:-1] def name_list(self, *names: Token) -> list[str]: """Convert name list tokens to Python list of strings.""" return [str(n) for n in names] def IDENT(self, token: Token) -> str: return str(token) def INT(self, token: Token) -> int: return int(token) def NUMBER(self, token: Token) -> float: return float(token) def FILEPATH(self, token: Token) -> str: return str(token) def LAYER_SPEC(self, token: Token) -> str: return str(token) # --- Phase 1 Commands --- def load_cmd(self, model_ref: str, alias: str) -> LoadCmd: return LoadCmd(model_ref=model_ref, alias=alias) def merge_cmd(self, source: str, target: str, method: str, strength: float | None = None) -> MergeCmd: return MergeCmd( source=source, target=target, method=method, strength=strength if strength is not None else 0.5, ) def merge_strength(self, value: float) -> float: return value def heal_cmd(self, target: str, *opts) -> HealCmd: cmd = HealCmd(target=target) for opt in opts: if isinstance(opt, tuple): key, val = opt if key == "lora_r": cmd.lora_r = val elif key == "epochs": cmd.epochs = val return cmd def heal_lora_r(self, value: int) -> tuple: return ("lora_r", value) def heal_epochs(self, value: int) -> tuple: return ("epochs", value) def eval_cmd(self, target: str, *opts) -> EvalCmd: cmd = EvalCmd(target=target) for opt in opts: if isinstance(opt, tuple): key, val = opt if key == "on": cmd.dataset = val elif key == "output": cmd.output = val return cmd def eval_on(self, dataset: str) -> tuple: return ("on", dataset) def eval_output(self, filepath: str) -> tuple: return ("output", filepath) def commit_cmd(self, target: str, gates: list[str] | None = None) -> CommitCmd: return CommitCmd(target=target, gates=gates) def commit_gates(self, gates: list[str]) -> list[str]: return gates # --- Phase 2 Commands --- def synth_cmd(self, target: str, source: str, *opts) -> SynthCmd: cmd = SynthCmd(target=target, source=source) for opt in opts: if isinstance(opt, tuple): key, val = opt if key == "filter": cmd.filter_method = val elif key == "output": cmd.output = val return cmd def synth_filter(self, method: str) -> tuple: return ("filter", method) def synth_output(self, filepath: str) -> tuple: return ("output", filepath) def train_cmd(self, target: str, dataset: str, method: str, *opts) -> TrainCmd: cmd = TrainCmd(target=target, dataset=dataset, method=method) for opt in opts: if isinstance(opt, tuple): key, val = opt if key == "steps": cmd.steps = val elif key == "lr": cmd.learning_rate = val return cmd def train_steps(self, value: int) -> tuple: return ("steps", value) def train_lr(self, value: float) -> tuple: return ("lr", value) def debate_cmd(self, target: str, rounds: int, candidates: int, output: tuple | None = None) -> DebateCmd: cmd = DebateCmd(target=target, rounds=rounds, candidates=candidates) if isinstance(output, tuple) and output[0] == "output": cmd.output = output[1] return cmd def debate_output(self, filepath: str) -> tuple: return ("output", filepath) def diagnose_cmd(self, target: str, output: tuple | None = None) -> DiagnoseCmd: cmd = DiagnoseCmd(target=target) if isinstance(output, tuple) and output[0] == "output": cmd.output = output[1] return cmd def diagnose_output(self, filepath: str) -> tuple: return ("output", filepath) def fork_cmd(self, source: str, alias: str) -> ForkCmd: return ForkCmd(source=source, alias=alias) def reset_cmd(self, target: str, checkpoint) -> ResetCmd: return ResetCmd(target=target, checkpoint=str(checkpoint)) def prune_cmd(self, target: str, method: str, aggressiveness: float | None = None) -> PruneCmd: return PruneCmd( target=target, method=method, aggressiveness=aggressiveness if aggressiveness is not None else 0.1, ) def prune_aggr(self, value: float) -> float: return value def edit_cmd(self, target: str, layers: str, method: str, lr: float | None = None) -> EditCmd: return EditCmd( target=target, layers=layers, method=method, learning_rate=lr, ) def edit_lr(self, value: float) -> float: return value # --- Phase 7: Loop Control --- def repeat_block_cmd(self, count: int, *body_cmds) -> RepeatBlock: return RepeatBlock(count=count, body=list(body_cmds)) def if_block_cmd(self, condition: str, target: str, *rest) -> IfBlock: """Parse if condition target { then... } [else { else... }]""" block = IfBlock(condition=condition, target=target) # rest contains then_body commands + possibly an else list for item in rest: if isinstance(item, list) and item and hasattr(item, '__iter__'): # This is the else body (passed from else_clause) block.else_body = item else: block.then_body.append(item) return block def else_clause(self, *body_cmds) -> list: return list(body_cmds) # --- Phase 9: Schedule --- def schedule_cmd(self, timing: str, *body_cmds) -> ScheduleCmd: return ScheduleCmd(timing=timing, body=list(body_cmds)) # --- Phase 10: Toolbox --- def download_cmd(self, dataset: str, alias: str, split: str | None = None) -> DownloadCmd: cmd = DownloadCmd(dataset=dataset, alias=alias) if isinstance(split, tuple) and split[0] == "split": cmd.split = split[1] elif isinstance(split, str): cmd.split = split return cmd def download_split(self, value: str) -> tuple: return ("split", value) def log_block(self, filepath: str) -> LogBlock: return LogBlock(filepath=filepath) def compare_cmd(self, target: str, source: str, *opts) -> CompareCmd: cmd = CompareCmd(target=target, source=source) for opt in opts: if isinstance(opt, tuple): key, val = opt if key == "questions": cmd.questions = val elif key == "output": cmd.output = val return cmd def compare_questions(self, value: int) -> tuple: return ("questions", value) def compare_output(self, filepath: str) -> tuple: return ("output", filepath) def verify_cmd(self, target: str, dataset: str, *opts) -> VerifyCmd: cmd = VerifyCmd(target=target, dataset=dataset) for opt in opts: if isinstance(opt, tuple): key, val = opt if key == "questions": cmd.questions = val elif key == "output": cmd.output = val return cmd def verify_questions(self, value: int) -> tuple: return ("questions", value) def verify_output(self, filepath: str) -> tuple: return ("output", filepath) # --- Phase 11: Intelligence Commands --- def vote_cmd(self, target: str, question: str, *opts) -> VoteCmd: cmd = VoteCmd(target=target, question=question) for opt in opts: if isinstance(opt, tuple): key, val = opt if key == "samples": cmd.samples = val elif key == "output": cmd.output = val return cmd def vote_samples(self, value: int) -> tuple: return ("samples", value) def vote_output(self, filepath: str) -> tuple: return ("output", filepath) def prompt_cmd(self, target: str, text: str) -> PromptBlock: return PromptBlock(target=target, text=text) def distill_cmd(self, teacher: str, student: str, *opts) -> DistillCmd: cmd = DistillCmd(teacher=teacher, student=student) for opt in opts: if isinstance(opt, tuple): key, val = opt if key == "steps": cmd.steps = val elif key == "output": cmd.output = val return cmd def distill_steps(self, value: int) -> tuple: return ("steps", value) def distill_output(self, filepath: str) -> tuple: return ("output", filepath) def rollback_cmd(self, target: str) -> RollbackCmd: return RollbackCmd(target=target) # --- Phase 12: RL & Fine-Tuning Commands --- def curriculum_cmd(self, target: str, dataset: str, method: str, *opts) -> CurriculumCmd: cmd = CurriculumCmd(target=target, dataset=dataset, method=method) for opt in opts: if isinstance(opt, tuple): key, val = opt if key == "levels": cmd.levels = val elif key == "steps": cmd.steps = val return cmd def curriculum_levels(self, value: int) -> tuple: return ("levels", value) def curriculum_steps(self, value: int) -> tuple: return ("steps", value) def star_cmd(self, target: str, dataset: str, *opts) -> StarCmd: cmd = StarCmd(target=target, dataset=dataset) for opt in opts: if isinstance(opt, tuple): key, val = opt if key == "rounds": cmd.rounds = val elif key == "samples": cmd.samples = val return cmd def star_rounds(self, value: int) -> tuple: return ("rounds", value) def star_samples(self, value: int) -> tuple: return ("samples", value) def best_of_cmd(self, target: str, dataset: str, *opts) -> BestOfCmd: cmd = BestOfCmd(target=target, dataset=dataset) for opt in opts: if isinstance(opt, tuple): key, val = opt if key == "n": cmd.n = val elif key == "steps": cmd.steps = val return cmd def best_of_n(self, value: int) -> tuple: return ("n", value) def best_of_steps(self, value: int) -> tuple: return ("steps", value) def exploit_cmd(self, target: str, dataset: str, *opts) -> ExploitCmd: cmd = ExploitCmd(target=target, dataset=dataset) for opt in opts: if isinstance(opt, tuple): key, val = opt if key == "samples": cmd.samples = val elif key == "steps": cmd.steps = val elif key == "output": cmd.output = val return cmd def exploit_samples(self, value: int) -> tuple: return ("samples", value) def exploit_steps(self, value: int) -> tuple: return ("steps", value) def exploit_output(self, filepath: str) -> tuple: return ("output", filepath) # --- Phase 13: Real RL (Arena) --- def arena_cmd(self, target: str, dataset: str, *opts) -> ArenaCmd: cmd = ArenaCmd(target=target, dataset=dataset) for opt in opts: if isinstance(opt, tuple): key, val = opt if key == "rounds": cmd.rounds = val elif key == "episodes": cmd.episodes = val elif key == "steps": cmd.steps = val elif key == "curiosity": cmd.curiosity = val elif key == "output": cmd.output = val return cmd def arena_rounds(self, value: int) -> tuple: return ("rounds", value) def arena_episodes(self, value: int) -> tuple: return ("episodes", value) def arena_steps(self, value: int) -> tuple: return ("steps", value) def arena_curiosity(self, value: float) -> tuple: return ("curiosity", value) def arena_output(self, filepath: str) -> tuple: return ("output", filepath) # --- Phase 13: Research Arena --- def research_arena_cmd(self, target: str, topic: str, *opts) -> ResearchArenaCmd: cmd = ResearchArenaCmd(target=target, topic=topic) for opt in opts: if isinstance(opt, tuple): key, val = opt if key == "sources": cmd.sources = val elif key == "rounds": cmd.rounds = val elif key == "episodes": cmd.episodes = val elif key == "steps": cmd.steps = val elif key == "curiosity": cmd.curiosity = val elif key == "difficulty_scale": cmd.difficulty_scale = val elif key == "output": cmd.output = val return cmd def ra_sources(self, value: str) -> tuple: return ("sources", value) def ra_rounds(self, value: int) -> tuple: return ("rounds", value) def ra_episodes(self, value: int) -> tuple: return ("episodes", value) def ra_steps(self, value: int) -> tuple: return ("steps", value) def ra_curiosity(self, value: float) -> tuple: return ("curiosity", value) def ra_difficulty(self, value: float) -> tuple: return ("difficulty_scale", value) def ra_output(self, filepath: str) -> tuple: return ("output", filepath) # --- Phase 6: Easy Merge Commands --- def fuse_cmd(self, sources: list[str], target: str, *opts) -> FuseCmd: cmd = FuseCmd(sources=sources, target=target) for opt in opts: if isinstance(opt, tuple): key, val = opt if key == "method": cmd.method = val elif key == "strategy": cmd.strategy = val return cmd def model_list(self, *models: str) -> list[str]: return [str(m) for m in models] def fuse_method(self, method: str) -> tuple: return ("method", method) def fuse_strategy(self, strategy: str) -> tuple: return ("strategy", strategy) def absorb_cmd(self, source: str, target: str, strength: float | None = None) -> AbsorbCmd: return AbsorbCmd( source=source, target=target, strength=strength if strength is not None else 0.5, ) def absorb_strength(self, value: float) -> float: return value # --- Phase 4 Commands --- def snapshot_cmd(self, target: str, output: tuple | None = None) -> SnapshotCmd: cmd = SnapshotCmd(target=target) if isinstance(output, tuple) and output[0] == "output": cmd.output = output[1] return cmd def snapshot_output(self, filepath: str) -> tuple: return ("output", filepath) def report_cmd(self, output: tuple | None = None) -> ReportCmd: cmd = ReportCmd() if isinstance(output, tuple) and output[0] == "output": cmd.output = output[1] return cmd def report_output(self, filepath: str) -> tuple: return ("output", filepath) # --- Blocks --- def gate_block(self, *fields) -> GateBlock: gate = GateBlock() for f in fields: if isinstance(f, list): gate.must_pass = f return gate def gate_field(self, names: list[str]) -> list[str]: return names def budget_block(self, *fields) -> BudgetBlock: budget = BudgetBlock() for f in fields: if isinstance(f, tuple): key, val = f if key == "max_gpu_hours": budget.max_gpu_hours = val elif key == "max_cost": budget.max_cost = val elif key == "max_tokens": budget.max_tokens = int(val) elif key == "max_experiments": budget.max_experiments = int(val) return budget def budget_field(self, field_data) -> tuple: return field_data def budget_gpu(self, value: float) -> tuple: return ("max_gpu_hours", value) def budget_cost(self, value: float) -> tuple: return ("max_cost", value) def budget_tokens(self, value: int) -> tuple: return ("max_tokens", value) def budget_experiments(self, value: int) -> tuple: return ("max_experiments", value) # --- Phase 8: Autopilot Commands --- def notify_cmd(self, message: str) -> NotifyCmd: return NotifyCmd(message=message) def save_cmd(self, target: str, destination: str) -> SaveCmd: return SaveCmd(target=target, destination=destination) def setup_block(self, *fields) -> SetupBlock: sb = SetupBlock() for f in fields: if isinstance(f, tuple): key, val = f if key == "pip": sb.pip_packages = val elif key == "hf_token": sb.hf_token = val elif key == "notify": sb.notify_url = val return sb def setup_field(self, field_data) -> tuple: return field_data def setup_pip(self, packages: list[str]) -> tuple: return ("pip", packages) def setup_hf(self, mode: str) -> tuple: return ("hf_token", mode) def setup_notify(self, url: str) -> tuple: return ("notify", url) def on_error_block(self, *fields) -> OnErrorBlock: oe = OnErrorBlock() for f in fields: if isinstance(f, tuple): key, val = f if key == "retry": oe.retry = int(val) elif key == "fallback": oe.fallback = val elif key == "notify": oe.notify = str(val).lower() == "true" return oe def on_error_field(self, field_data) -> tuple: return field_data def onerr_retry(self, value: int) -> tuple: return ("retry", value) def onerr_fallback(self, value: str) -> tuple: return ("fallback", value) def onerr_notify(self, value: str) -> tuple: return ("notify", value) # --- Contract Blocks (Phase 4) --- def data_contract_block(self, *fields) -> DataContractBlock: dc = DataContractBlock() for f in fields: if isinstance(f, tuple): key, val = f if key == "required_fields": dc.required_fields = val elif key == "min_samples": dc.min_samples = int(val) elif key == "max_perplexity": dc.max_perplexity = val return dc def dc_field(self, field_data) -> tuple: return field_data def dc_required(self, names: list[str]) -> tuple: return ("required_fields", names) def dc_min_samples(self, value: int) -> tuple: return ("min_samples", value) def dc_max_ppl(self, value: float) -> tuple: return ("max_perplexity", value) def reward_contract_block(self, *fields) -> RewardContractBlock: rc = RewardContractBlock() for f in fields: if isinstance(f, tuple): key, val = f if key == "verifiers": rc.verifiers = val elif key == "min_reward": rc.min_reward = val return rc def rc_field(self, field_data) -> tuple: return field_data def rc_verifiers(self, names: list[str]) -> tuple: return ("verifiers", names) def rc_min_reward(self, value: float) -> tuple: return ("min_reward", value) # --- Top Level --- def start(self, *items) -> TDProgram: """Collect all parsed commands and blocks into a TDProgram.""" program = TDProgram() for item in items: if item is None: continue if isinstance(item, GateBlock): program.gates = item elif isinstance(item, BudgetBlock): program.budget = item elif isinstance(item, DataContractBlock): program.data_contract = item elif isinstance(item, RewardContractBlock): program.reward_contract = item elif isinstance(item, SetupBlock): program.setup = item elif isinstance(item, OnErrorBlock): program.on_error = item elif isinstance(item, LogBlock): program.log = item else: program.commands.append(item) return program # ============================================================================ # PUBLIC API # ============================================================================ # Create the parser once — reuse for all files _parser = Lark( TD_GRAMMAR, parser="earley", propagate_positions=True, ) _transformer = TDTransformer() def parse_td_string(source: str) -> TDProgram: """Parse a .td source string into a TDProgram AST. Args: source: The .td file content as a string. Returns: TDProgram with all commands and blocks. Raises: TDSyntaxError: If the source has invalid syntax. """ try: tree = _parser.parse(source) return _transformer.transform(tree) except UnexpectedInput as e: raise TDSyntaxError( message=f"Unexpected {e.token!r}" if hasattr(e, "token") else str(e), line=getattr(e, "line", None), hint="Check for typos or missing quotes around model paths.", ) from e def parse_td_file(filepath: str) -> TDProgram: """Parse a .td file into a TDProgram AST. Args: filepath: Path to the .td file. Returns: TDProgram with all commands and blocks. Raises: TDSyntaxError: If the file has invalid syntax. FileNotFoundError: If the file doesn't exist. """ with open(filepath, "r") as f: source = f.read() program = parse_td_string(source) program.source_file = filepath return program