Text Generation
Transformers
Safetensors
GGUF
Korean
English
llama
3b
korean
from-scratch
orpo
instruction-tuned
preference-aligned
fp8
b200
Eval Results (legacy)
text-generation-inference
Instructions to use pathcosmos/frankenstallm with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Transformers
How to use pathcosmos/frankenstallm with Transformers:
# Use a pipeline as a high-level helper from transformers import pipeline pipe = pipeline("text-generation", model="pathcosmos/frankenstallm")# Load model directly from transformers import AutoTokenizer, AutoModelForCausalLM tokenizer = AutoTokenizer.from_pretrained("pathcosmos/frankenstallm") model = AutoModelForCausalLM.from_pretrained("pathcosmos/frankenstallm") - llama-cpp-python
How to use pathcosmos/frankenstallm with llama-cpp-python:
# !pip install llama-cpp-python from llama_cpp import Llama llm = Llama.from_pretrained( repo_id="pathcosmos/frankenstallm", filename="gguf/frankenstallm-3b-Q4_K_M.gguf", )
output = llm( "Once upon a time,", max_tokens=512, echo=True ) print(output)
- Notebooks
- Google Colab
- Kaggle
- Local Apps Settings
- llama.cpp
How to use pathcosmos/frankenstallm with llama.cpp:
Install from brew
brew install llama.cpp # Start a local OpenAI-compatible server with a web UI: llama-server -hf pathcosmos/frankenstallm:Q4_K_M # Run inference directly in the terminal: llama-cli -hf pathcosmos/frankenstallm:Q4_K_M
Install from WinGet (Windows)
winget install llama.cpp # Start a local OpenAI-compatible server with a web UI: llama-server -hf pathcosmos/frankenstallm:Q4_K_M # Run inference directly in the terminal: llama-cli -hf pathcosmos/frankenstallm:Q4_K_M
Use pre-built binary
# Download pre-built binary from: # https://github.com/ggerganov/llama.cpp/releases # Start a local OpenAI-compatible server with a web UI: ./llama-server -hf pathcosmos/frankenstallm:Q4_K_M # Run inference directly in the terminal: ./llama-cli -hf pathcosmos/frankenstallm:Q4_K_M
Build from source code
git clone https://github.com/ggerganov/llama.cpp.git cd llama.cpp cmake -B build cmake --build build -j --target llama-server llama-cli # Start a local OpenAI-compatible server with a web UI: ./build/bin/llama-server -hf pathcosmos/frankenstallm:Q4_K_M # Run inference directly in the terminal: ./build/bin/llama-cli -hf pathcosmos/frankenstallm:Q4_K_M
Use Docker
docker model run hf.co/pathcosmos/frankenstallm:Q4_K_M
- LM Studio
- Jan
- vLLM
How to use pathcosmos/frankenstallm with vLLM:
Install from pip and serve model
# Install vLLM from pip: pip install vllm # Start the vLLM server: vllm serve "pathcosmos/frankenstallm" # Call the server using curl (OpenAI-compatible API): curl -X POST "http://localhost:8000/v1/completions" \ -H "Content-Type: application/json" \ --data '{ "model": "pathcosmos/frankenstallm", "prompt": "Once upon a time,", "max_tokens": 512, "temperature": 0.5 }'Use Docker
docker model run hf.co/pathcosmos/frankenstallm:Q4_K_M
- SGLang
How to use pathcosmos/frankenstallm with SGLang:
Install from pip and serve model
# Install SGLang from pip: pip install sglang # Start the SGLang server: python3 -m sglang.launch_server \ --model-path "pathcosmos/frankenstallm" \ --host 0.0.0.0 \ --port 30000 # Call the server using curl (OpenAI-compatible API): curl -X POST "http://localhost:30000/v1/completions" \ -H "Content-Type: application/json" \ --data '{ "model": "pathcosmos/frankenstallm", "prompt": "Once upon a time,", "max_tokens": 512, "temperature": 0.5 }'Use Docker images
docker run --gpus all \ --shm-size 32g \ -p 30000:30000 \ -v ~/.cache/huggingface:/root/.cache/huggingface \ --env "HF_TOKEN=<secret>" \ --ipc=host \ lmsysorg/sglang:latest \ python3 -m sglang.launch_server \ --model-path "pathcosmos/frankenstallm" \ --host 0.0.0.0 \ --port 30000 # Call the server using curl (OpenAI-compatible API): curl -X POST "http://localhost:30000/v1/completions" \ -H "Content-Type: application/json" \ --data '{ "model": "pathcosmos/frankenstallm", "prompt": "Once upon a time,", "max_tokens": 512, "temperature": 0.5 }' - Ollama
How to use pathcosmos/frankenstallm with Ollama:
ollama run hf.co/pathcosmos/frankenstallm:Q4_K_M
- Unsloth Studio
How to use pathcosmos/frankenstallm with Unsloth Studio:
Install Unsloth Studio (macOS, Linux, WSL)
curl -fsSL https://unsloth.ai/install.sh | sh # Run unsloth studio unsloth studio -H 0.0.0.0 -p 8888 # Then open http://localhost:8888 in your browser # Search for pathcosmos/frankenstallm to start chatting
Install Unsloth Studio (Windows)
irm https://unsloth.ai/install.ps1 | iex # Run unsloth studio unsloth studio -H 0.0.0.0 -p 8888 # Then open http://localhost:8888 in your browser # Search for pathcosmos/frankenstallm to start chatting
Using HuggingFace Spaces for Unsloth
# No setup required # Open https://huggingface.co/spaces/unsloth/studio in your browser # Search for pathcosmos/frankenstallm to start chatting
- Docker Model Runner
How to use pathcosmos/frankenstallm with Docker Model Runner:
docker model run hf.co/pathcosmos/frankenstallm:Q4_K_M
- Lemonade
How to use pathcosmos/frankenstallm with Lemonade:
Pull the model
# Download Lemonade from https://lemonade-server.ai/ lemonade pull pathcosmos/frankenstallm:Q4_K_M
Run and chat with the model
lemonade run user.frankenstallm-Q4_K_M
List all available models
lemonade list
File size: 23,229 Bytes
b3d361d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 | """
SFT (Supervised Fine-Tuning) dataset for the Korean LLM project.
Reads JSONL files in three supported formats:
1. Alpaca format
{"instruction": "...", "input": "...", "output": "..."}
2. Alpaca format without optional input
{"instruction": "...", "output": "..."}
3. Conversation format
{"conversations": [{"role": "user", "content": "..."},
{"role": "assistant", "content": "..."}]}
Chat template applied:
<|user|>
{instruction or user turn}
<|assistant|>
{output or assistant turn}</s>
Loss masking: ``labels`` is -1 for all prompt tokens so
``nn.CrossEntropyLoss`` (ignore_index=-1) only trains on
the assistant responses.
"""
from __future__ import annotations
import json
import multiprocessing
import time
from pathlib import Path
from typing import Union
import torch
from torch.utils.data import Dataset
from tokenizers import Tokenizer # HuggingFace tokenizers (fast, Rust-based)
# ---------------------------------------------------------------------------
# Role tags used in the chat template.
# ---------------------------------------------------------------------------
_USER_TAG = "<|user|>\n"
_ASSISTANT_TAG = "<|assistant|>\n"
_EOS_STRING = "</s>"
def _build_alpaca_turns(
instruction: str,
input_text: str,
output: str,
) -> tuple[str, str]:
"""
Convert an Alpaca-format sample into (prompt, response) strings.
The *prompt* includes the user tag and instruction (+ optional input).
The *response* includes the assistant tag and output, plus EOS.
Args:
instruction: The task instruction.
input_text: Optional additional input context. May be empty.
output: The expected assistant response.
Returns:
Tuple of (prompt_text, response_text).
"""
user_body = instruction
if input_text and input_text.strip():
user_body = f"{instruction}\n{input_text.strip()}"
prompt = f"{_USER_TAG}{user_body}\n{_ASSISTANT_TAG}"
response = f"{output}{_EOS_STRING}"
return prompt, response
def _build_conversation_turns(
conversations: list[dict],
) -> list[tuple[str, str]]:
"""
Convert a conversation list into a sequence of (prompt, response) pairs.
For a multi-turn conversation the prompt for turn *k* is the entire
dialogue history up to (but not including) assistant turn *k*.
Only user→assistant pairs contribute training samples. Consecutive
user messages are merged. Conversations that start with an assistant
turn, or that have no assistant turn, are skipped (return empty list).
Args:
conversations: List of dicts with ``role`` and ``content`` keys.
Roles are expected to be ``"user"`` or ``"assistant"``.
Returns:
List of (prompt_text, response_text) tuples, one per assistant turn.
"""
pairs: list[tuple[str, str]] = []
history = "" # accumulated dialogue so far
pending_user = "" # user content not yet closed by an assistant turn
for turn in conversations:
role = turn.get("role", "").lower()
content = turn.get("content", "")
if role == "user":
if pending_user:
# Two consecutive user turns — concatenate them.
pending_user = f"{pending_user}\n{content}"
else:
pending_user = content
elif role == "assistant":
if not pending_user:
# Assistant turn without a preceding user turn — skip.
continue
prompt = f"{history}{_USER_TAG}{pending_user}\n{_ASSISTANT_TAG}"
response = f"{content}{_EOS_STRING}"
pairs.append((prompt, response))
# Update history to include this full exchange (without the EOS
# so the model does not treat it as a hard stop mid-context).
history = f"{history}{_USER_TAG}{pending_user}\n{_ASSISTANT_TAG}{content}\n"
pending_user = ""
return pairs
# ---------------------------------------------------------------------------
# Multiprocessing worker for parallel tokenization.
# ---------------------------------------------------------------------------
_worker_tokenizer: Tokenizer | None = None
_worker_eos_id: int = -1
_worker_max_seq_len: int = 4096
def _worker_init(tokenizer_path: str, eos_string: str, max_seq_len: int) -> None:
"""Initializer for each pool worker — loads its own tokenizer instance."""
global _worker_tokenizer, _worker_eos_id, _worker_max_seq_len
_worker_tokenizer = Tokenizer.from_file(tokenizer_path)
eos_id = _worker_tokenizer.token_to_id(eos_string)
if eos_id is None:
raise ValueError(f"EOS token '{eos_string}' not found in worker tokenizer.")
_worker_eos_id = eos_id
_worker_max_seq_len = max_seq_len
def _worker_tokenize_batch(
batch: list[tuple[str, str]],
) -> list[tuple[list[int], list[int]] | None]:
"""
Tokenize a batch of (prompt, response) pairs in a worker process.
Returns a list of (prompt_ids, response_ids) as Python lists, or None
for samples that should be skipped.
"""
global _worker_tokenizer, _worker_eos_id, _worker_max_seq_len
tok = _worker_tokenizer
eos_id = _worker_eos_id
max_seq_len = _worker_max_seq_len
results = []
for prompt_text, response_text in batch:
prompt_ids = tok.encode(prompt_text).ids
response_ids = tok.encode(response_text).ids
# Skip samples where the prompt alone leaves no room for output.
if len(prompt_ids) >= max_seq_len - 10:
results.append(None)
continue
full_len = len(prompt_ids) + len(response_ids)
# Truncate response if combined sequence is too long.
if full_len > max_seq_len:
allowed_response = max_seq_len - len(prompt_ids)
if allowed_response <= 0:
results.append(None)
continue
response_ids = response_ids[:allowed_response]
# Force EOS at end after truncation.
if response_ids[-1] != eos_id:
response_ids[-1] = eos_id
results.append((prompt_ids, response_ids))
return results
class SFTDataset(Dataset):
"""
Supervised Fine-Tuning dataset built from JSONL files.
Each JSONL line must conform to one of three schemas described in the
module docstring. After tokenisation the sample is laid out as::
[prompt tokens ...] [response tokens ...] [pad tokens ...]
|<---- labels=-1 ---->| |<-- labels=token_id -->| |<- labels=-1 ->|
The ``labels`` tensor uses -1 as the ignore value so that
``nn.CrossEntropyLoss(ignore_index=-1)`` only penalises the model on
the assistant response tokens.
Args:
data_path: Path to a single ``.jsonl`` file or a directory that
contains multiple ``.jsonl`` files (all are loaded).
tokenizer: A ``tokenizers.Tokenizer`` instance (HuggingFace fast
tokenizer loaded from ``tokenizer.json``).
max_seq_len: Maximum sequence length (tokens). Samples exceeding
this are truncated from the *end of the response*.
Default: 4096.
pad_token_id: Token id used for right-padding. Default: 0.
"""
def __init__(
self,
data_path: Union[str, Path],
tokenizer: Tokenizer,
max_seq_len: int = 4096,
pad_token_id: int = 0,
tokenizer_path: Union[str, Path, None] = None,
num_workers: int = 60,
) -> None:
super().__init__()
self.tokenizer = tokenizer
self.max_seq_len = max_seq_len
self.pad_token_id = pad_token_id
# Resolve EOS token id from the vocabulary.
eos_id = tokenizer.token_to_id(_EOS_STRING)
if eos_id is None:
raise ValueError(
f"EOS token '{_EOS_STRING}' not found in the tokenizer vocabulary. "
"Check that the tokenizer was trained with this special token."
)
self.eos_token_id: int = eos_id
# ------------------------------------------------------------------
# Load raw JSONL samples.
# ------------------------------------------------------------------
data_path = Path(data_path)
raw_samples = self._load_jsonl(data_path)
# ------------------------------------------------------------------
# Try loading from cache first.
# ------------------------------------------------------------------
cache_path = Path(f"{data_path}.sft_cache.pt")
cache_key = self._make_cache_key(data_path, max_seq_len, tokenizer)
cached = self._try_load_cache(cache_path, cache_key)
if cached is not None:
self.samples = cached
return
# ------------------------------------------------------------------
# Tokenise and build (input_ids, labels) pairs.
# ------------------------------------------------------------------
t0 = time.time()
if tokenizer_path is not None:
self.samples = self._tokenize_parallel(
raw_samples, str(tokenizer_path), max_seq_len, num_workers,
)
else:
self.samples = self._tokenize_sequential(
raw_samples, tokenizer, max_seq_len,
)
elapsed = time.time() - t0
print(f"[SFTDataset] Tokenization took {elapsed:.1f}s")
# ------------------------------------------------------------------
# Save cache.
# ------------------------------------------------------------------
self._save_cache(cache_path, cache_key)
# ------------------------------------------------------------------
# Cache helpers
# ------------------------------------------------------------------
@staticmethod
def _make_cache_key(
data_path: Path, max_seq_len: int, tokenizer: Tokenizer,
) -> tuple:
"""Build a cheap cache key from file metadata + settings."""
if data_path.is_file():
stat = data_path.stat()
file_sig = (stat.st_size, stat.st_mtime)
else:
# Directory: combine stats of all jsonl files.
parts = []
for f in sorted(data_path.glob("*.jsonl")):
s = f.stat()
parts.append((str(f), s.st_size, s.st_mtime))
file_sig = tuple(parts)
return (file_sig, max_seq_len, tokenizer.get_vocab_size())
def _try_load_cache(
self, cache_path: Path, cache_key: tuple,
) -> list[tuple[torch.Tensor, torch.Tensor]] | None:
"""Load cached tokenized samples if cache is valid."""
if not cache_path.exists():
print(f"[SFTDataset] Cache miss — no cache file at {cache_path}")
return None
try:
t0 = time.time()
cache = torch.load(cache_path, map_location="cpu", weights_only=False)
if cache.get("cache_key") != cache_key:
print(f"[SFTDataset] Cache miss — stale cache (key mismatch)")
return None
samples = cache["samples"]
elapsed = time.time() - t0
print(
f"[SFTDataset] Cache hit! Loaded {len(samples)} samples "
f"from {cache_path} in {elapsed:.1f}s"
)
return samples
except Exception as exc:
print(f"[SFTDataset] Cache miss — failed to load: {exc}")
return None
def _save_cache(self, cache_path: Path, cache_key: tuple) -> None:
"""Save tokenized samples to cache file."""
try:
t0 = time.time()
torch.save(
{"cache_key": cache_key, "samples": self.samples},
cache_path,
)
elapsed = time.time() - t0
size_mb = cache_path.stat().st_size / (1024 * 1024)
print(
f"[SFTDataset] Saved cache ({size_mb:.0f} MB) "
f"to {cache_path} in {elapsed:.1f}s"
)
except Exception as exc:
print(f"[SFTDataset] WARNING: Failed to save cache: {exc}")
# ------------------------------------------------------------------
# Tokenization strategies
# ------------------------------------------------------------------
def _tokenize_sequential(
self,
raw_samples: list[tuple[str, str]],
tokenizer: Tokenizer,
max_seq_len: int,
) -> list[tuple[torch.Tensor, torch.Tensor]]:
"""Original sequential tokenization (fallback when no tokenizer_path)."""
samples: list[tuple[torch.Tensor, torch.Tensor]] = []
total_loaded = 0
total_tokens = 0
skipped_too_long = 0
truncated_count = 0
for prompt_text, response_text in raw_samples:
total_loaded += 1
prompt_ids = tokenizer.encode(prompt_text).ids
response_ids = tokenizer.encode(response_text).ids
if len(prompt_ids) >= max_seq_len - 10:
skipped_too_long += 1
continue
full_ids = prompt_ids + response_ids
if len(full_ids) > max_seq_len:
truncated_count += 1
allowed_response = max_seq_len - len(prompt_ids)
if allowed_response <= 0:
skipped_too_long += 1
continue
response_ids = response_ids[:allowed_response]
if response_ids[-1] != self.eos_token_id:
response_ids[-1] = self.eos_token_id
full_ids = prompt_ids + response_ids
seq_len = len(full_ids)
total_tokens += seq_len
input_ids = torch.tensor(full_ids, dtype=torch.int32)
labels = torch.full((seq_len,), fill_value=-1, dtype=torch.int32)
resp_start = len(prompt_ids)
resp_label_start = max(0, resp_start - 1)
resp_label_end = resp_label_start + len(response_ids)
labels[resp_label_start:resp_label_end] = torch.tensor(
response_ids, dtype=torch.int32
)
samples.append((input_ids, labels))
n = len(samples)
avg_len = (total_tokens / n) if n > 0 else 0.0
print(
f"[SFTDataset] Loaded {n} samples "
f"(raw={total_loaded}, "
f"skipped_too_long={skipped_too_long}, "
f"truncated={truncated_count})"
)
print(
f"[SFTDataset] avg_seq_len={avg_len:.1f}, "
f"max_seq_len={max_seq_len}, "
f"pad_token_id={self.pad_token_id}, "
f"eos_token_id={self.eos_token_id}"
)
return samples
def _tokenize_parallel(
self,
raw_samples: list[tuple[str, str]],
tokenizer_path: str,
max_seq_len: int,
num_workers: int,
) -> list[tuple[torch.Tensor, torch.Tensor]]:
"""Parallel tokenization using multiprocessing.Pool."""
total = len(raw_samples)
print(
f"[SFTDataset] Starting parallel tokenization: "
f"{total} samples, {num_workers} workers"
)
# Split raw_samples into chunks for imap_unordered.
chunk_size = 1000
chunks = []
for i in range(0, total, chunk_size):
chunks.append(raw_samples[i : i + chunk_size])
# Collect tokenized results from workers.
all_token_pairs: list[tuple[list[int], list[int]] | None] = []
processed = 0
# Use 'spawn' context to avoid fork+CUDA issues when called
# after model is already on GPU (e.g., in DDP training).
ctx = multiprocessing.get_context("spawn")
with ctx.Pool(
processes=num_workers,
initializer=_worker_init,
initargs=(tokenizer_path, _EOS_STRING, max_seq_len),
) as pool:
for batch_results in pool.imap_unordered(
_worker_tokenize_batch, chunks, chunksize=1,
):
all_token_pairs.extend(batch_results)
processed += len(batch_results)
if processed % 100_000 < chunk_size:
print(
f"[SFTDataset] Tokenized {processed}/{total} "
f"({100.0 * processed / total:.1f}%)"
)
# Print final progress if not already printed.
if processed % 100_000 >= chunk_size:
print(f"[SFTDataset] Tokenized {processed}/{total} (100.0%)")
# Convert to tensors and build samples.
samples: list[tuple[torch.Tensor, torch.Tensor]] = []
total_tokens = 0
skipped_too_long = 0
truncated_count = 0
for pair in all_token_pairs:
if pair is None:
skipped_too_long += 1
continue
prompt_ids, response_ids = pair
full_ids = prompt_ids + response_ids
# Count truncated: if combined length exactly equals max_seq_len,
# the worker likely truncated the response.
if len(full_ids) == max_seq_len:
truncated_count += 1
seq_len = len(full_ids)
total_tokens += seq_len
input_ids = torch.tensor(full_ids, dtype=torch.int32)
labels = torch.full((seq_len,), fill_value=-1, dtype=torch.int32)
resp_start = len(prompt_ids)
resp_label_start = max(0, resp_start - 1)
resp_label_end = resp_label_start + len(response_ids)
labels[resp_label_start:resp_label_end] = torch.tensor(
response_ids, dtype=torch.int32
)
samples.append((input_ids, labels))
n = len(samples)
avg_len = (total_tokens / n) if n > 0 else 0.0
print(
f"[SFTDataset] Loaded {n} samples "
f"(raw={total}, "
f"skipped_too_long={skipped_too_long}, "
f"truncated={truncated_count})"
)
print(
f"[SFTDataset] avg_seq_len={avg_len:.1f}, "
f"max_seq_len={max_seq_len}, "
f"pad_token_id={self.pad_token_id}, "
f"eos_token_id={self.eos_token_id}"
)
return samples
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _load_jsonl(self, path: Path) -> list[tuple[str, str]]:
"""
Discover and parse JSONL files, returning (prompt, response) pairs.
If ``path`` is a file, load that file only. If it is a directory,
load all ``*.jsonl`` files found directly inside it (non-recursive).
Args:
path: File or directory path.
Returns:
List of (prompt_text, response_text) tuples.
Raises:
FileNotFoundError: If ``path`` does not exist.
ValueError: If no ``.jsonl`` files are found under a
directory path.
"""
if not path.exists():
raise FileNotFoundError(f"Data path not found: {path}")
if path.is_dir():
jsonl_files = sorted(path.glob("*.jsonl"))
if not jsonl_files:
raise ValueError(f"No .jsonl files found in directory: {path}")
else:
jsonl_files = [path]
pairs: list[tuple[str, str]] = []
for jsonl_file in jsonl_files:
pairs.extend(self._parse_jsonl_file(jsonl_file))
return pairs
def _parse_jsonl_file(self, path: Path) -> list[tuple[str, str]]:
"""
Parse a single JSONL file into (prompt, response) pairs.
Lines that are empty, whitespace-only, or fail JSON parsing are
silently skipped with a warning. Lines whose schema cannot be
recognised are also skipped.
Args:
path: Path to a ``.jsonl`` file.
Returns:
List of (prompt_text, response_text) tuples extracted from
the file.
"""
pairs: list[tuple[str, str]] = []
with path.open("r", encoding="utf-8") as fh:
for lineno, line in enumerate(fh, start=1):
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except json.JSONDecodeError as exc:
print(
f"[SFTDataset] WARNING: JSON parse error in "
f"{path}:{lineno} — {exc}"
)
continue
# ---- Conversation format ------------------------------------
# Support both "conversations" and "messages" keys
conv_list = obj.get("conversations") or obj.get("messages")
if conv_list and isinstance(conv_list, list):
turn_pairs = _build_conversation_turns(conv_list)
if not turn_pairs:
print(
f"[SFTDataset] WARNING: No valid user→assistant "
f"pairs in {path}:{lineno}, skipping."
)
pairs.extend(turn_pairs)
# ---- Alpaca / Alpaca-no-input format -----------------------
elif "instruction" in obj and "output" in obj:
prompt, response = _build_alpaca_turns(
instruction=obj["instruction"],
input_text=obj.get("input", ""),
output=obj["output"],
)
pairs.append((prompt, response))
else:
print(
f"[SFTDataset] WARNING: Unrecognised schema at "
f"{path}:{lineno}, skipping."
)
return pairs
# ------------------------------------------------------------------
# Dataset interface
# ------------------------------------------------------------------
def __len__(self) -> int:
"""Return the number of valid samples in the dataset."""
return len(self.samples)
def __getitem__(self, idx: int) -> tuple[torch.Tensor, torch.Tensor]:
"""
Return a single training sample.
Args:
idx: Sample index.
Returns:
Tuple ``(input_ids, labels)`` where both tensors have shape
``[seq_len]`` (variable per sample) and dtype ``torch.long``.
Use a collate function to pad batches dynamically.
- ``input_ids``: Full token sequence (prompt + response),
NO padding (raw length).
- ``labels``: Response token ids at response positions,
``-1`` everywhere else (prompt tokens).
Use ``ignore_index=-1`` in your loss function.
"""
input_ids, labels = self.samples[idx]
return input_ids.long(), labels.long()
|