import json from pathlib import Path from .text_quality import clean_answer_text, clean_context_text, clean_training_text TEXT_EXTENSIONS = {".txt", ".md", ".text"} STRUCTURED_EXTENSIONS = {".jsonl", ".json"} def _default_record_weight(record_type: str) -> int: if record_type == "dialogue_turn": return 2 if record_type == "instruction_answer": return 2 if record_type == "preference_chosen": return 3 if record_type == "preference_rejected": return 0 return 1 def _record_repeat_count(record: object) -> int: if not isinstance(record, dict): return 1 if bool(record.get("drop")): return 0 raw_weight = record.get("weight") if raw_weight is not None: try: numeric = int(round(float(raw_weight))) except (TypeError, ValueError): numeric = 1 return max(0, min(8, numeric)) return _default_record_weight(str(record.get("record_type", ""))) def _coerce_text_record(record: object) -> str: if isinstance(record, str): return clean_training_text(record.strip()) if isinstance(record, dict): if "text" in record: return clean_training_text(str(record["text"]).strip()) if "content" in record: return clean_training_text(str(record["content"]).strip()) if "context" in record and "answer" in record: context = clean_context_text(str(record["context"]).strip()) answer = clean_answer_text(str(record["answer"]).strip()) if context and answer: return f" {context} {answer}" return "" def _coerce_prompt_record(record: object) -> dict[str, object] | None: if isinstance(record, str): prompt = record.strip() return {"prompt": prompt, "tags": []} if prompt else None if isinstance(record, dict): raw_prompt = record.get("prompt", record.get("context", "")) prompt = clean_context_text(str(raw_prompt).strip()) if not prompt: return None raw_tags = record.get("tags", []) tags = [str(tag) for tag in raw_tags] if isinstance(raw_tags, list) else [] normalized = dict(record) normalized["prompt"] = prompt normalized["tags"] = tags return normalized return None def load_text_corpus(source: str | Path) -> str: path = Path(source) if path.is_dir(): parts = [ load_text_corpus(child) for child in sorted(path.rglob("*")) if child.is_file() and child.suffix.lower() in TEXT_EXTENSIONS | STRUCTURED_EXTENSIONS ] return "\n".join(part for part in parts if part.strip()) suffix = path.suffix.lower() if suffix in TEXT_EXTENSIONS: return path.read_text(encoding="utf-8") if suffix == ".jsonl": lines = [] for line in path.read_text(encoding="utf-8").splitlines(): if not line.strip(): continue record = json.loads(line) text = _coerce_text_record(record) if text: lines.extend([text] * _record_repeat_count(record)) return "\n".join(lines) if suffix == ".json": payload = json.loads(path.read_text(encoding="utf-8")) if isinstance(payload, list): parts: list[str] = [] for item in payload: text = _coerce_text_record(item) if text: parts.extend([text] * _record_repeat_count(item)) return "\n".join(parts) if isinstance(payload, dict): if "texts" in payload and isinstance(payload["texts"], list): parts: list[str] = [] for item in payload["texts"]: text = _coerce_text_record(item) if text: parts.extend([text] * _record_repeat_count(item)) return "\n".join(parts) if "records" in payload and isinstance(payload["records"], list): parts: list[str] = [] for item in payload["records"]: text = _coerce_text_record(item) if text: parts.extend([text] * _record_repeat_count(item)) return "\n".join(parts) text = _coerce_text_record(payload) if text: return "\n".join([text] * _record_repeat_count(payload)) raise ValueError(f"Unsupported corpus source: {path}") def load_prompt_suite(source: str | Path) -> list[dict[str, object]]: path = Path(source) suffix = path.suffix.lower() prompts: list[dict[str, object]] = [] if suffix in TEXT_EXTENSIONS: for line in path.read_text(encoding="utf-8").splitlines(): record = _coerce_prompt_record(line) if record is not None: prompts.append(record) return prompts if suffix == ".jsonl": for line in path.read_text(encoding="utf-8").splitlines(): if not line.strip(): continue record = _coerce_prompt_record(json.loads(line)) if record is not None: prompts.append(record) return prompts if suffix == ".json": payload = json.loads(path.read_text(encoding="utf-8")) if isinstance(payload, list): for item in payload: record = _coerce_prompt_record(item) if record is not None: prompts.append(record) return prompts if isinstance(payload, dict): if "prompts" in payload and isinstance(payload["prompts"], list): for item in payload["prompts"]: record = _coerce_prompt_record(item) if record is not None: prompts.append(record) return prompts record = _coerce_prompt_record(payload) if record is not None: return [record] raise ValueError(f"Unsupported prompt suite: {path}")