Instructions to use prometheus04/qwen3-4b-thinking-microagent with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- PEFT
How to use prometheus04/qwen3-4b-thinking-microagent with PEFT:
Task type is invalid.
- Notebooks
- Google Colab
- Kaggle
| """Code-specific filter for Nemotron-Terminal-Corpus code.parquet. | |
| Differences from convert.py: | |
| 1. Parse-error observations are KEPT (not rejection trigger) — rewritten to | |
| match the new XML format we are teaching. | |
| 2. Two acceptance paths: | |
| - SUCCESS: ends task_complete=true with zero final commands -> <finish> | |
| - GIVE_UP: never succeeded, but had >=5 turns and >=3 distinct commands | |
| attempted with a real error observed -> <give_up> | |
| 3. Hard rejects only the unsalvageable: | |
| - parse error inside the first 3 turns (no recovery context) | |
| - first turn not user / no task marker | |
| - extreme turn counts (<2 or >25 assistant) | |
| - cannot extract JSON from >=2 turns (filter handles isolated misses) | |
| 4. Per-task dedup cap (default 5) to prevent template overfitting. | |
| Output schema matches convert.py so train.py is unchanged. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import re | |
| from collections import Counter | |
| from dataclasses import dataclass, field | |
| from typing import Optional | |
| from convert import ( | |
| SYSTEM_PROMPT, | |
| OBS_PREFIX, | |
| OBS_HEAD_CHARS, | |
| OBS_TAIL_CHARS, | |
| TASK_START_MARKER, | |
| TASK_END_MARKER, | |
| INITIAL_STATE_MARKER, | |
| extract_task, | |
| extract_initial_state, | |
| extract_think, | |
| extract_json, | |
| truncate_output, | |
| clean_observation, | |
| keystrokes_to_command, | |
| build_bash_block, | |
| make_finish_summary, | |
| MAX_FINISH_SUMMARY_CHARS, | |
| ) | |
| MIN_ASSISTANT_TURNS = 2 | |
| MAX_ASSISTANT_TURNS = 25 | |
| EARLY_PARSE_ERR_THRESHOLD = 3 # turn idx | |
| GIVE_UP_MIN_TURNS = 5 | |
| GIVE_UP_MIN_DISTINCT_CMDS = 3 | |
| PER_TASK_CAP = 5 | |
| _PARSE_ERR_RE = re.compile(r"(parsing error|no valid json found in response)", re.IGNORECASE) | |
| _REAL_ERR_RE = re.compile( | |
| r"\[exit [1-9]\d*\]|Traceback|^Error:|fail(ed|ure)|permission denied|" | |
| r"command not found|No such file|syntax error|undefined", | |
| re.IGNORECASE | re.MULTILINE, | |
| ) | |
| class CodeRejectStats: | |
| total: int = 0 | |
| accepted_success: int = 0 | |
| accepted_giveup: int = 0 | |
| rejected_too_few_turns: int = 0 | |
| rejected_too_many_turns: int = 0 | |
| rejected_no_task: int = 0 | |
| rejected_early_parse_err: int = 0 | |
| rejected_json_unrecoverable: int = 0 | |
| rejected_per_task_cap: int = 0 | |
| rejected_empty_pre_finish: int = 0 | |
| rejected_no_real_attempt_failure: int = 0 | |
| rejected_giveup_cap: int = 0 | |
| other: int = 0 | |
| per_task_count: Counter = field(default_factory=Counter) | |
| def _is_parse_err_obs(content: str) -> bool: | |
| return bool(_PARSE_ERR_RE.search(content or "")) | |
| def _observation_has_real_error(content: str) -> bool: | |
| return bool(_REAL_ERR_RE.search(content or "")) | |
| def _rewrite_parse_err_obs() -> str: | |
| """Replace original JSON parse-error obs with format-error obs for our XML format.""" | |
| return ( | |
| "[FORMAT ERROR] Your previous response did not match the required format.\n" | |
| "Required: <think>...</think><bash>...</bash> (or <finish>...</finish> when done).\n" | |
| "Re-emit the response in the correct format. Do not abandon the task." | |
| ) | |
| def _clean_obs_or_format_err(content: str) -> str: | |
| """Same as clean_observation but translates parse-err to format-err.""" | |
| if _is_parse_err_obs(content): | |
| return _rewrite_parse_err_obs() | |
| return clean_observation(content) | |
| def _count_attempts(conv: list[dict]) -> tuple[int, int, bool]: | |
| """Return (n_commands_attempted, n_distinct_keystrokes, has_real_error_observed).""" | |
| n_cmds = 0 | |
| distinct = set() | |
| has_err = False | |
| for t in conv: | |
| role = t.get("role") | |
| c = t.get("content", "") | |
| if role == "assistant": | |
| p = extract_json(c) | |
| if p and isinstance(p.get("commands"), list): | |
| for cmd in p["commands"]: | |
| if isinstance(cmd, dict): | |
| ks = cmd.get("keystrokes", "") | |
| if isinstance(ks, str) and ks.strip(): | |
| n_cmds += 1 | |
| distinct.add(ks.strip()[:80]) | |
| elif role == "user": | |
| if _observation_has_real_error(c): | |
| has_err = True | |
| return n_cmds, len(distinct), has_err | |
| _EXCEPTION_LINE_RE = re.compile( | |
| r"^([A-Z][A-Za-z]*(Error|Exception|Warning|Interrupt)|KeyboardInterrupt):?.*$", | |
| re.MULTILINE, | |
| ) | |
| def _clean_snippet(line: str) -> str: | |
| line = re.sub(r"^\S+@[\w-]+:\S+[#$]\s*", "", line) | |
| line = re.sub(r"[\x00-\x08\x0b-\x1f\x7f]", "", line) | |
| line = line.strip("^").strip() | |
| if len(line) > 120: | |
| line = line[:117] + "..." | |
| return line | |
| def _last_error_snippet(conv: list[dict]) -> str | None: | |
| """Walk conversation backward; prefer specific exception lines over | |
| generic 'Traceback' headers.""" | |
| for t in reversed(conv): | |
| if t.get("role") != "user": | |
| continue | |
| c = t.get("content", "") or "" | |
| exc_match = None | |
| for m in _EXCEPTION_LINE_RE.finditer(c): | |
| exc_match = m | |
| if exc_match: | |
| cleaned = _clean_snippet(exc_match.group(0)) | |
| if cleaned and not re.match(r"^C?Traceback", cleaned, re.IGNORECASE): | |
| return cleaned | |
| last_match = None | |
| for m in _REAL_ERR_RE.finditer(c): | |
| last_match = m | |
| if last_match is None: | |
| continue | |
| start = c.rfind("\n", 0, last_match.start()) + 1 | |
| end = c.find("\n", last_match.end()) | |
| if end < 0: | |
| end = len(c) | |
| cleaned = _clean_snippet(c[start:end]) | |
| if cleaned and not re.match(r"^C?Traceback", cleaned, re.IGNORECASE): | |
| return cleaned | |
| return None | |
| def _count_distinct_first_tokens(conv: list[dict]) -> int: | |
| """Count distinct leading command tokens across all assistant turns. | |
| Each turn contributes its first <bash>'s first-line first-token.""" | |
| distinct = set() | |
| for t in conv: | |
| if t.get("role") != "assistant": | |
| continue | |
| p = extract_json(t.get("content", "")) | |
| if not p: | |
| continue | |
| cmds = p.get("commands") or [] | |
| if not isinstance(cmds, list) or not cmds: | |
| continue | |
| first = cmds[0] | |
| if not isinstance(first, dict): | |
| continue | |
| ks = (first.get("keystrokes") or "").strip() | |
| if not ks: | |
| continue | |
| first_line = ks.splitlines()[0] | |
| token = first_line.split(None, 1)[0][:40] if first_line.split() else first_line[:40] | |
| if token: | |
| distinct.add(token) | |
| return len(distinct) | |
| def _compose_giveup_summary(conv: list[dict], _unused_n: int = 0) -> str: | |
| """Build a retrospective give-up summary referencing real failure evidence.""" | |
| n = _count_distinct_first_tokens(conv) | |
| snippet = _last_error_snippet(conv) | |
| if snippet: | |
| summary = f"tried {n} distinct approaches; last failure: {snippet}" | |
| else: | |
| summary = f"exceeded turn budget after {n} distinct attempts" | |
| if len(summary) > MAX_FINISH_SUMMARY_CHARS: | |
| summary = summary[: MAX_FINISH_SUMMARY_CHARS - 3].rstrip() + "..." | |
| return summary | |
| def convert_code_v2(row: dict, stats: CodeRejectStats) -> Optional[dict]: | |
| stats.total += 1 | |
| conv = row.get("conversations") or [] | |
| if len(conv) < 3: | |
| stats.rejected_too_few_turns += 1 | |
| return None | |
| if conv[0].get("role") != "user": | |
| stats.other += 1 | |
| return None | |
| task = extract_task(conv[0]["content"]) | |
| if not task: | |
| stats.rejected_no_task += 1 | |
| return None | |
| # Per-task cap (avoid 500 copies of the same prime-generator) | |
| task_id = row.get("task") or task[:120] | |
| if stats.per_task_count[task_id] >= PER_TASK_CAP: | |
| stats.rejected_per_task_cap += 1 | |
| return None | |
| initial_state = extract_initial_state(conv[0]["content"]) | |
| other = conv[1:] | |
| # Locate parse errors and check early-error rule | |
| for i, t in enumerate(other): | |
| if t.get("role") == "user" and _is_parse_err_obs(t.get("content", "")): | |
| # i is index into `other`, which starts at conv[1]. Effective turn idx in | |
| # full conv is i+1. We reject only if the FIRST parse error is too early. | |
| if i + 1 <= EARLY_PARSE_ERR_THRESHOLD: | |
| stats.rejected_early_parse_err += 1 | |
| return None | |
| break | |
| # Assistant turns | |
| a_idxs = [i for i, t in enumerate(other) if t.get("role") == "assistant"] | |
| if len(a_idxs) < MIN_ASSISTANT_TURNS: | |
| stats.rejected_too_few_turns += 1 | |
| return None | |
| if len(a_idxs) > MAX_ASSISTANT_TURNS: | |
| stats.rejected_too_many_turns += 1 | |
| return None | |
| # Parse all assistant JSONs. Allow up to 1 unparseable mid-turn (we'll skip it). | |
| parsed = [] | |
| unparseable_count = 0 | |
| for i in a_idxs: | |
| p = extract_json(other[i].get("content", "")) | |
| if p is None: | |
| unparseable_count += 1 | |
| parsed.append(None) | |
| else: | |
| parsed.append(p) | |
| if unparseable_count >= 2: | |
| stats.rejected_json_unrecoverable += 1 | |
| return None | |
| # Decide ending: success vs give_up | |
| final_parsed = parsed[-1] | |
| is_success = bool( | |
| final_parsed | |
| and final_parsed.get("task_complete") is True | |
| and not (final_parsed.get("commands") or []) | |
| ) | |
| ending_mode: str | |
| n_distinct_for_summary = 0 | |
| if is_success: | |
| ending_mode = "finish" | |
| else: | |
| # Eligible for give_up? | |
| n_cmds, n_distinct, has_real_err = _count_attempts(conv) | |
| n_distinct_for_summary = n_distinct | |
| if ( | |
| len(a_idxs) >= GIVE_UP_MIN_TURNS | |
| and n_distinct >= GIVE_UP_MIN_DISTINCT_CMDS | |
| and has_real_err | |
| ): | |
| ending_mode = "give_up" | |
| else: | |
| stats.rejected_no_real_attempt_failure += 1 | |
| return None | |
| # Build new conversation | |
| new_conv = [{"role": "system", "content": SYSTEM_PROMPT}] | |
| pinned = f"TASK:\n{task}" | |
| if initial_state: | |
| pinned += f"\n\nInitial state:\n{initial_state}" | |
| new_conv.append({"role": "user", "content": pinned}) | |
| parsed_idx = 0 | |
| final_assistant_pos = a_idxs[-1] | |
| for i, t in enumerate(other): | |
| role = t.get("role") | |
| if role == "assistant": | |
| p = parsed[parsed_idx] | |
| parsed_idx += 1 | |
| is_final = (i == final_assistant_pos) | |
| think = extract_think(t.get("content", "")) | |
| if p is None: | |
| # Single unparseable mid-turn -> synthesize a minimal placeholder | |
| # so the surrounding context survives. (Already verified count <2.) | |
| if is_final: | |
| # Final turn must parse; if it doesn't, this trajectory is dead. | |
| stats.rejected_json_unrecoverable += 1 | |
| return None | |
| action = "<bash>echo '(continuing)'</bash>" | |
| content = f"<think>{think}</think>\n{action}" if think else action | |
| new_conv.append({"role": "assistant", "content": content}) | |
| continue | |
| if is_final and ending_mode == "finish": | |
| summary = make_finish_summary(p) | |
| action = f"<finish>{summary}</finish>" | |
| elif is_final and ending_mode == "give_up": | |
| # Retrospective summary: number of distinct attempts + last observed error | |
| reason = _compose_giveup_summary(conv, n_distinct_for_summary) | |
| action = f"<give_up>{reason}</give_up>" | |
| else: | |
| bash_block = build_bash_block(p) | |
| if bash_block is None: | |
| # Mid-trajectory turn with no commands. If the agent had | |
| # real <think> analysis we keep it as a re-read-and-think | |
| # step using a no-op echo. Otherwise drop the trajectory. | |
| if not think: | |
| stats.rejected_empty_pre_finish += 1 | |
| return None | |
| bash_block = "echo '(reviewing previous output)'" | |
| action = f"<bash>{bash_block}</bash>" | |
| content = f"<think>{think}</think>\n{action}" if think else action | |
| new_conv.append({"role": "assistant", "content": content}) | |
| elif role == "user": | |
| new_conv.append({ | |
| "role": "user", | |
| "content": _clean_obs_or_format_err(t.get("content", "")), | |
| }) | |
| stats.per_task_count[task_id] += 1 | |
| if ending_mode == "finish": | |
| stats.accepted_success += 1 | |
| else: | |
| stats.accepted_giveup += 1 | |
| return { | |
| "conversations": new_conv, | |
| "task": row.get("task"), | |
| "episode": row.get("episode"), | |
| "run_id": row.get("run_id"), | |
| "source_config": "code.parquet", | |
| "n_assistant_turns": len(a_idxs), | |
| "ending_mode": ending_mode, | |
| } | |