Spaces:
Sleeping
Sleeping
File size: 7,824 Bytes
da3fe02 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 |
from __future__ import annotations
import json
import logging
import sqlite3
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, Optional
logger = logging.getLogger("labeler")
@dataclass
class LabelInput:
image_path: str
image_abspath: Optional[Path]
image_url: Optional[str]
image_sha256: Optional[str]
item_name: str
item_description: Optional[str]
item_part: Optional[str]
source_type: str
ocid: Optional[str]
ranking: Optional[int]
def iter_inputs(
*,
input_path: Optional[Path],
db_path: Optional[Path],
only_source: str,
max_samples: Optional[int],
run_id: Optional[str],
) -> Iterable[LabelInput]:
if input_path:
if input_path.suffix.lower() in {".jsonl", ".json"}:
yield from _iter_manifest_jsonl(input_path, only_source, max_samples)
elif input_path.suffix.lower() == ".parquet":
yield from _iter_manifest_parquet(input_path, only_source, max_samples)
else:
raise ValueError(f"Unsupported input format: {input_path}")
return
if not db_path:
raise ValueError("Provide --input or --db")
yield from _iter_db(db_path, only_source, max_samples, run_id)
def _iter_manifest_jsonl(
path: Path,
only_source: str,
max_samples: Optional[int],
) -> Iterable[LabelInput]:
base_dir = path.parent
count = 0
with path.open("r", encoding="utf-8") as handle:
for line in handle:
line = line.strip()
if not line:
continue
try:
record = json.loads(line)
except json.JSONDecodeError:
logger.warning("Skipping invalid JSON line in %s", path)
continue
sample = _build_from_record(record, base_dir, only_source)
if not sample:
continue
yield sample
count += 1
if max_samples and count >= max_samples:
break
def _iter_manifest_parquet(
path: Path,
only_source: str,
max_samples: Optional[int],
) -> Iterable[LabelInput]:
try:
import pyarrow.parquet as pq
except ImportError as exc:
raise RuntimeError("pyarrow is required for parquet input") from exc
base_dir = path.parent
table = pq.read_table(path)
rows = table.to_pylist()
count = 0
for record in rows:
sample = _build_from_record(record, base_dir, only_source)
if not sample:
continue
yield sample
count += 1
if max_samples and count >= max_samples:
break
def _build_from_record(
record: dict[str, object],
base_dir: Path,
only_source: str,
) -> Optional[LabelInput]:
source_type = str(record.get("source_type") or "")
if not _source_allowed(source_type, only_source):
return None
image_path = str(record.get("image_path") or "").strip()
if not image_path:
logger.warning("Missing image_path in manifest record")
return None
image_abspath = Path(image_path)
if not image_abspath.is_absolute():
image_abspath = (base_dir / image_abspath).resolve()
item_name = str(record.get("item_name") or "").strip()
if not item_name:
logger.warning("Missing item_name in manifest record")
return None
return LabelInput(
image_path=image_path,
image_abspath=image_abspath,
image_url=_optional_str(record.get("image_url")),
image_sha256=_optional_str(record.get("image_sha256")),
item_name=item_name,
item_description=_optional_str(record.get("item_description")),
item_part=_optional_str(record.get("item_part")),
source_type=source_type,
ocid=_optional_str(record.get("ocid")),
ranking=_optional_int(record.get("ranking")),
)
def _iter_db(
db_path: Path,
only_source: str,
max_samples: Optional[int],
run_id: Optional[str],
) -> Iterable[LabelInput]:
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
base_dir = db_path.parent
def stream(query: str, params: tuple[object, ...], source_type: str) -> Iterable[LabelInput]:
cursor = conn.execute(query, params)
for row in cursor:
local_path = row["local_path"]
image_path = local_path or ""
image_abspath = (base_dir / local_path).resolve() if local_path else None
item_name = row["item_name"] or ""
if not item_name:
continue
yield LabelInput(
image_path=image_path,
image_abspath=image_abspath,
image_url=row["image_url"],
image_sha256=row["sha256"],
item_name=item_name,
item_description=row["item_description"],
item_part=_build_item_part(row["item_part"], row["item_slot"]),
source_type=source_type,
ocid=row["ocid"],
ranking=None,
)
count = 0
if only_source in ("equipment_shape", "all"):
query, params = _equipment_query(run_id)
for sample in stream(query, params, "equipment_shape"):
yield sample
count += 1
if max_samples and count >= max_samples:
conn.close()
return
if only_source in ("cash", "all"):
query, params = _cash_query(run_id)
for sample in stream(query, params, "cash"):
yield sample
count += 1
if max_samples and count >= max_samples:
conn.close()
return
conn.close()
def _equipment_query(run_id: Optional[str]) -> tuple[str, tuple[object, ...]]:
query = (
"SELECT e.item_shape_icon_url AS image_url, a.sha256 AS sha256, a.local_path AS local_path, "
"e.item_name AS item_name, e.item_description AS item_description, "
"e.item_equipment_part AS item_part, e.equipment_slot AS item_slot, e.ocid AS ocid "
"FROM equipment_shape_items e "
"LEFT JOIN icon_assets a ON a.url = e.item_shape_icon_url "
"WHERE e.item_shape_icon_url IS NOT NULL AND e.item_shape_icon_url != ''"
)
if run_id:
query += " AND e.run_id = ?"
return query, (run_id,)
return query, ()
def _cash_query(run_id: Optional[str]) -> tuple[str, tuple[object, ...]]:
query = (
"SELECT c.cash_item_icon_url AS image_url, a.sha256 AS sha256, a.local_path AS local_path, "
"c.cash_item_name AS item_name, c.cash_item_description AS item_description, "
"c.cash_item_equipment_part AS item_part, c.cash_item_equipment_slot AS item_slot, c.ocid AS ocid "
"FROM cash_items c "
"LEFT JOIN icon_assets a ON a.url = c.cash_item_icon_url "
"WHERE c.cash_item_icon_url IS NOT NULL AND c.cash_item_icon_url != ''"
)
if run_id:
query += " AND c.run_id = ?"
return query, (run_id,)
return query, ()
def _source_allowed(source_type: str, only_source: str) -> bool:
if only_source == "all":
return source_type in {"equipment_shape", "cash"}
return source_type == only_source
def _build_item_part(part: Optional[str], slot: Optional[str]) -> Optional[str]:
part = (part or "").strip()
slot = (slot or "").strip()
if part and slot and part != slot:
return f"{part}/{slot}"
return part or slot or None
def _optional_str(value: object) -> Optional[str]:
if value is None:
return None
text = str(value).strip()
return text or None
def _optional_int(value: object) -> Optional[int]:
try:
return int(value) if value is not None else None
except (TypeError, ValueError):
return None
|