| |
| """Build text-only claimed-CBU extraction requests from caption JSONL files.""" |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import hashlib |
| import json |
| from pathlib import Path |
| from typing import Any |
|
|
|
|
| UNIT_CATEGORIES = [ |
| "object", |
| "attribute", |
| "relation", |
| "style", |
| "camera", |
| "lighting", |
| "count", |
| "text_rendering", |
| ] |
|
|
|
|
| SYSTEM_PROMPT = """You extract atomic controllable visual content units from captions for text-to-image training-data evaluation. |
| Return only valid compact JSON. Extract only facts explicitly claimed by the caption. Do not infer image content beyond the caption.""" |
|
|
|
|
| CBU_JSON_SCHEMA: dict[str, Any] = { |
| "type": "object", |
| "properties": { |
| "caption_id": {"type": "string"}, |
| "claimed_units": { |
| "type": "array", |
| "items": { |
| "type": "object", |
| "properties": { |
| "category": {"type": "string", "enum": UNIT_CATEGORIES}, |
| "unit": {"type": "string", "maxLength": 80}, |
| "span": {"type": "string", "maxLength": 120}, |
| "target": {"type": "string", "maxLength": 80}, |
| }, |
| "required": ["category", "unit", "span", "target"], |
| "additionalProperties": False, |
| }, |
| }, |
| }, |
| "required": ["caption_id", "claimed_units"], |
| "additionalProperties": False, |
| } |
|
|
|
|
| def parse_args() -> argparse.Namespace: |
| parser = argparse.ArgumentParser(description="Build claimed-CBU extraction request JSONL") |
| parser.add_argument("--input", required=True, help="Caption JSONL") |
| parser.add_argument("--output", required=True) |
| parser.add_argument("--text-field", default="caption") |
| parser.add_argument("--id-field", default=None) |
| parser.add_argument("--surface", required=True) |
| parser.add_argument("--max-records", type=int, default=None) |
| parser.add_argument("--sample-records", type=int, default=None) |
| parser.add_argument("--sample-seed", type=int, default=0) |
| parser.add_argument("--max-caption-chars", type=int, default=1800) |
| parser.add_argument( |
| "--token-budget", |
| type=int, |
| default=None, |
| help="Optional whitespace token prefix budget for length-controlled CBU@B requests", |
| ) |
| parser.add_argument( |
| "--max-units", |
| type=int, |
| default=None, |
| help="Optional maximum atomic units in the JSON schema; use only for stress/debug caps", |
| ) |
| return parser.parse_args() |
|
|
|
|
| def stable_float(*parts: object) -> float: |
| raw = ":".join(str(part) for part in parts) |
| digest = hashlib.blake2b(raw.encode("utf-8"), digest_size=8).digest() |
| return int.from_bytes(digest, "big") / 2**64 |
|
|
|
|
| def iter_rows(args: argparse.Namespace) -> list[tuple[int, str | None, str]]: |
| rows: list[tuple[int, str | None, str]] = [] |
| with Path(args.input).open("r", encoding="utf-8") as handle: |
| for row_index, line in enumerate(handle): |
| if args.max_records is not None and args.sample_records is None and len(rows) >= args.max_records: |
| break |
| if not line.strip(): |
| continue |
| row = json.loads(line) |
| text = row.get(args.text_field) |
| if not isinstance(text, str) or not text.strip(): |
| continue |
| row_id = row.get(args.id_field) if args.id_field else None |
| rows.append((row_index, str(row_id) if row_id is not None else None, text)) |
| if args.sample_records is not None: |
| rows.sort(key=lambda item: stable_float(args.sample_seed, args.surface, item[0], item[1] or "")) |
| rows = rows[: args.sample_records] |
| rows.sort(key=lambda item: item[0]) |
| return rows |
|
|
|
|
| def schema_with_max_units(max_units: int | None) -> dict[str, Any]: |
| schema = json.loads(json.dumps(CBU_JSON_SCHEMA)) |
| if max_units is not None: |
| schema["properties"]["claimed_units"]["maxItems"] = max_units |
| return schema |
|
|
|
|
| def build_user_prompt(caption_id: str, caption: str, max_caption_chars: int, max_units: int | None) -> str: |
| clipped = caption[:max_caption_chars].replace("\n", " ") |
| schema = json.dumps(schema_with_max_units(max_units), ensure_ascii=False, separators=(",", ":")) |
| categories = ", ".join(UNIT_CATEGORIES) |
| return ( |
| "Extract caption-claimed controllable visual units as atomic records.\n" |
| f"Unit categories: {categories}.\n" |
| "Rules:\n" |
| "- Each record must contain exactly one visual control fact.\n" |
| "- Use each semantic fact once; choose the single best category.\n" |
| "- unit is a short canonical phrase, not a full clause.\n" |
| "- span is the shortest caption span supporting the unit.\n" |
| "- target is the object or scene element modified by the unit; use \"scene\" when global.\n" |
| "- relation units must include both the relation and participating objects; do not output lone verbs or prepositions.\n" |
| "- count units must attach a number to a target object; never output articles such as a, an, or the.\n" |
| "- text_rendering units are only visible rendered text explicitly claimed by the caption; absent text claims are not units.\n" |
| "- Do not output negative or absent facts, metadata, captioner phrases, or duplicate paraphrases.\n" |
| "- Keep text_rendering units short; do not copy long copyright, table, or legal text blocks.\n" |
| "- Use [] when the caption contains no controllable visual units.\n" |
| "Return only JSON matching this schema:\n" |
| f"{schema}\n\n" |
| f"caption_id={caption_id}\ncaption={clipped}" |
| ) |
|
|
|
|
| def apply_token_budget(caption: str, token_budget: int | None) -> str: |
| if token_budget is None: |
| return caption |
| return " ".join(caption.split()[:token_budget]) |
|
|
|
|
| def main() -> int: |
| args = parse_args() |
| if args.max_records is not None and args.sample_records is not None: |
| raise SystemExit("--max-records and --sample-records are mutually exclusive") |
| output = Path(args.output) |
| output.parent.mkdir(parents=True, exist_ok=True) |
| rows = iter_rows(args) |
| with output.open("w", encoding="utf-8") as handle: |
| for emitted_index, (source_row, row_id, caption) in enumerate(rows): |
| caption_id = row_id or f"{args.surface}:{source_row}" |
| request_caption = apply_token_budget(caption, args.token_budget) |
| budget_tag = f"b{args.token_budget}" if args.token_budget is not None else "full" |
| request_id = hashlib.blake2b( |
| f"claimed_cbu_v2:{budget_tag}:{args.surface}:{source_row}:{caption_id}".encode("utf-8"), |
| digest_size=16, |
| ).hexdigest() |
| row = { |
| "request_id": request_id, |
| "task": "claimed_cbu_v2", |
| "token_budget": args.token_budget, |
| "surface": args.surface, |
| "caption_id": caption_id, |
| "source_row": source_row, |
| "emitted_index": emitted_index, |
| "caption": request_caption, |
| "source_caption": caption, |
| "system_prompt": SYSTEM_PROMPT, |
| "user_prompt": build_user_prompt(caption_id, request_caption, args.max_caption_chars, args.max_units), |
| } |
| handle.write(json.dumps(row, ensure_ascii=False) + "\n") |
| manifest = { |
| "task": "claimed_cbu_v2", |
| "input": args.input, |
| "output": str(output), |
| "surface": args.surface, |
| "text_field": args.text_field, |
| "id_field": args.id_field, |
| "max_records": args.max_records, |
| "sample_records": args.sample_records, |
| "sample_seed": args.sample_seed, |
| "token_budget": args.token_budget, |
| "max_units": args.max_units, |
| "rows": len(rows), |
| "schema": schema_with_max_units(args.max_units), |
| } |
| manifest_path = output.with_suffix(".manifest.json") |
| manifest_path.write_text(json.dumps(manifest, indent=2, ensure_ascii=False), encoding="utf-8") |
| print(json.dumps({"output": str(output), "manifest": str(manifest_path), "requests": len(rows)}, indent=2)) |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|