gyubin02 commited on
Commit
da3fe02
·
0 Parent(s):

Initial commit

Browse files
.env.example ADDED
@@ -0,0 +1,6 @@
 
 
 
 
 
 
 
1
+ # Nexon Open API key
2
+ NEXON_API_KEY=
3
+
4
+ # Optional output locations
5
+ OUTPUT_DIR=data
6
+ DB_PATH=
.gitattributes ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ *.sqlite3 filter=lfs diff=lfs merge=lfs -text
2
+ *.bin filter=lfs diff=lfs merge=lfs -text
3
+ *.pickle filter=lfs diff=lfs merge=lfs -text
.gitignore ADDED
@@ -0,0 +1,29 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Python
2
+ __pycache__/
3
+ *.pyc
4
+ *.pyo
5
+ *.pyd
6
+ *.egg-info/
7
+ .eggs/
8
+ .venv/
9
+ venv/
10
+ ENV/
11
+
12
+ # Env files
13
+ .env
14
+
15
+ # Test/coverage
16
+ .pytest_cache/
17
+ .coverage
18
+ htmlcov/
19
+
20
+ # Editor/OS
21
+ .DS_Store
22
+ .idea/
23
+ .vscode/
24
+
25
+ # Data outputs
26
+ /data/
27
+
28
+ # Logs
29
+ *.log
README.md ADDED
@@ -0,0 +1,90 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # MapleStory Ranking Icon Pipeline
2
+
3
+ Data based on NEXON Open API.
4
+
5
+ ## Overview
6
+ Collects MapleStory overall ranking (1-100) characters and stores:
7
+ - Equipment shape icons (`item_shape_icon`) + metadata
8
+ - Cash item icons (`cash_item_icon`) + metadata
9
+
10
+ Output includes:
11
+ - Raw JSON responses for audit/replay
12
+ - SQLite database with idempotent upserts
13
+ - Optional icon downloads with SHA256 integrity tracking
14
+
15
+ ## Requirements
16
+ - Python 3.11+
17
+ - Nexon Open API key (set `NEXON_API_KEY`)
18
+
19
+ ## Install
20
+ ```bash
21
+ python -m venv .venv
22
+ source .venv/bin/activate
23
+ pip install -r requirements.txt
24
+ ```
25
+
26
+ ## Configuration
27
+ Create `.env` (see `.env.example`) or export env vars:
28
+ - `NEXON_API_KEY` (required)
29
+ - `OUTPUT_DIR` (optional, default `data/`)
30
+ - `DB_PATH` (optional, default `data/YYYY-MM-DD/db.sqlite`)
31
+
32
+ ## Usage
33
+ ```bash
34
+ python -m pipeline run --date 2026-01-10 --top 100 --download-icons --concurrency 8 --rps 5
35
+ python -m pipeline run --top 100 --no-download
36
+ python -m pipeline --top 100
37
+ python -m pipeline --start-rank 101 --end-rank 200 --date 2026-01-10
38
+ ```
39
+
40
+ The `run` subcommand is optional.
41
+
42
+ Optional filters:
43
+ - `--world-name`
44
+ - `--world-type`
45
+ - `--class-name`
46
+
47
+ Rank range:
48
+ - `--start-rank` (default 1)
49
+ - `--end-rank` (default = `--top`)
50
+ - `--top` remains as an alias for `--end-rank`
51
+
52
+ Merge additional ranges into the same run:
53
+ - Use `--run-id` with a previous `Run ID` from `README_run.md`
54
+
55
+ Preset handling:
56
+ - Default: only current preset (or preset 1 if missing)
57
+ - `--all-presets` to store all presets
58
+
59
+ ## Output Layout
60
+ ```
61
+ project/
62
+ src/
63
+ data/
64
+ YYYY-MM-DD/
65
+ raw/
66
+ ranking_overall.json
67
+ ocid/{rank}_{character_name}.json
68
+ item_equipment/{ocid}.json
69
+ cashitem_equipment/{ocid}.json
70
+ db.sqlite
71
+ icons/
72
+ equipment_shape/
73
+ cash/
74
+ ```
75
+
76
+ ## Idempotency
77
+ Runs are keyed by a deterministic `run_id` derived from `target_date` and ranking parameters, so re-running with the same inputs updates existing rows instead of creating duplicates.
78
+
79
+ ## Compliance
80
+ - Data based on NEXON Open API.
81
+ - Refresh data within 30 days to stay compliant; the CLI is scheduler-friendly (cron, etc.).
82
+
83
+ ## Tests
84
+ ```bash
85
+ pytest
86
+ ```
87
+
88
+ ## Notes
89
+ - Ranking pagination continues until the requested rank range is collected or pages are exhausted.
90
+ - Raw JSON is stored unmodified (for recovery if field names change).
README_labeling.md ADDED
@@ -0,0 +1,111 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Labeling Pipeline (CLIP Text Labels)
2
+
3
+ Data based on NEXON Open API.
4
+
5
+ ## Overview
6
+ This pipeline generates CLIP-ready text labels for MapleStory item icons using Qwen2-VL.
7
+ It consumes either a manifest file or the SQLite DB and writes:
8
+ - `labels.jsonl` (one JSON record per image)
9
+ - `labels.parquet` (optional)
10
+
11
+ ## Requirements
12
+ - Python 3.11+
13
+ - GPU recommended for Qwen2-VL inference
14
+
15
+ ## Install
16
+ ```bash
17
+ python -m venv .venv
18
+ source .venv/bin/activate
19
+ pip install -r requirements.txt
20
+ ```
21
+
22
+ Optional (for 4-bit quantization):
23
+ ```bash
24
+ pip install bitsandbytes
25
+ ```
26
+
27
+ ## Input Adapters
28
+ You can use one of the following:
29
+
30
+ A) Manifest (recommended)
31
+ - `data/<DATE>/manifest.parquet` or `manifest.jsonl`
32
+ - Required columns: `image_path`, `item_name`, `source_type`
33
+
34
+ B) SQLite DB
35
+ - `data/<DATE>/db.sqlite`
36
+ - Joins `equipment_shape_items` / `cash_items` with `icon_assets`
37
+
38
+ ## Run
39
+ ```bash
40
+ python -m labeler run \
41
+ --input data/2026-01-10/manifest.parquet \
42
+ --outdir data/2026-01-10/labels \
43
+ --model Qwen/Qwen2-VL-2B-Instruct \
44
+ --device auto \
45
+ --batch-size 8 \
46
+ --upscale 2 \
47
+ --resume
48
+ ```
49
+
50
+ Using DB input:
51
+ ```bash
52
+ python -m labeler run \
53
+ --db data/2026-01-10/db.sqlite \
54
+ --outdir data/2026-01-10/labels
55
+ ```
56
+
57
+ Range filter by run_id (optional):
58
+ ```bash
59
+ python -m labeler run --db data/2026-01-10/db.sqlite --run-id <RUN_ID>
60
+ ```
61
+
62
+ ## Output Schema
63
+ Each line in `labels.jsonl` is a JSON object:
64
+
65
+ ```json
66
+ {
67
+ "image_path": "...",
68
+ "image_sha256": "...",
69
+ "source_type": "equipment_shape" | "cash",
70
+ "item_name": "...",
71
+ "item_description": "...",
72
+ "label_ko": "...",
73
+ "label_en": "...",
74
+ "tags_ko": ["..."],
75
+ "attributes": {
76
+ "colors": ["..."],
77
+ "theme": ["..."],
78
+ "material": ["..."],
79
+ "vibe": ["..."],
80
+ "item_type_guess": "..."
81
+ },
82
+ "query_variants_ko": ["..."],
83
+ "quality_flags": {
84
+ "is_uncertain": true,
85
+ "reasons": ["too_small", "ambiguous_icon"]
86
+ },
87
+ "model": "Qwen/Qwen2-VL-2B-Instruct",
88
+ "prompt_version": "v1",
89
+ "generated_at": "ISO-8601"
90
+ }
91
+ ```
92
+
93
+ ## Prompt Versioning
94
+ - Prompt version is stored as `prompt_version` in each record.
95
+ - Current version: `v1` (see `src/labeler/prompts.py`).
96
+
97
+ ## Resume / Idempotency
98
+ - If `labels.jsonl` already exists, use `--resume`.
99
+ - The pipeline skips images already labeled by `image_path` or `image_sha256`.
100
+
101
+ ## Comparisons
102
+ You can compare modes:
103
+ - `--no-image` (metadata only)
104
+ - `--no-metadata` (image only)
105
+
106
+ ## Example Output (3 lines)
107
+ ```json
108
+ {"image_path":"icons/equipment_shape/abc.png","image_sha256":"sha...","source_type":"equipment_shape","item_name":"Sample Hat","item_description":null,"label_ko":"샘플 모자 아이콘, 붉은 색감","label_en":null,"tags_ko":["모자","붉은","아이콘","장비","캐릭터"],"attributes":{"colors":["red"],"theme":["fantasy"],"material":["cloth"],"vibe":["cute"],"item_type_guess":"hat"},"query_variants_ko":["샘플 모자","붉은 모자 아이콘","메이플 모자"],"quality_flags":{"is_uncertain":false,"reasons":[]},"model":"Qwen/Qwen2-VL-2B-Instruct","prompt_version":"v1","generated_at":"2026-01-10T00:00:00Z"}
109
+ {"image_path":"icons/cash/def.png","image_sha256":"sha...","source_type":"cash","item_name":"Sample Cape","item_description":"Example","label_ko":"샘플 망토 아이콘, 푸른 계열","label_en":null,"tags_ko":["망토","푸른","코디","캐시","아이콘"],"attributes":{"colors":["blue"],"theme":["classic"],"material":["silk"],"vibe":["elegant"],"item_type_guess":"cape"},"query_variants_ko":["푸른 망토","샘플 망토 아이콘","메이플 캐시 망토"],"quality_flags":{"is_uncertain":false,"reasons":[]},"model":"Qwen/Qwen2-VL-2B-Instruct","prompt_version":"v1","generated_at":"2026-01-10T00:00:00Z"}
110
+ {"image_path":"icons/equipment_shape/ghi.png","image_sha256":"sha...","source_type":"equipment_shape","item_name":"Sample Sword","item_description":null,"label_ko":"샘플 검 아이콘, 금속 느낌","label_en":null,"tags_ko":["검","무기","금속","아이콘","장비"],"attributes":{"colors":["silver"],"theme":["fantasy"],"material":["metal"],"vibe":["sharp"],"item_type_guess":"sword"},"query_variants_ko":["샘플 검","메이플 검 아이콘","금속 검"],"quality_flags":{"is_uncertain":false,"reasons":[]},"model":"Qwen/Qwen2-VL-2B-Instruct","prompt_version":"v1","generated_at":"2026-01-10T00:00:00Z"}
111
+ ```
labeler/__init__.py ADDED
@@ -0,0 +1,14 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import sys
4
+ from pathlib import Path
5
+ from pkgutil import extend_path
6
+
7
+ _root = Path(__file__).resolve().parent.parent
8
+ _src = _root / "src"
9
+ if _src.exists():
10
+ src_str = str(_src)
11
+ if src_str not in sys.path:
12
+ sys.path.insert(0, src_str)
13
+
14
+ __path__ = extend_path(__path__, __name__)
labeler/__main__.py ADDED
@@ -0,0 +1,12 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import sys
4
+
5
+ import typer
6
+
7
+ from labeler.cli import run
8
+
9
+ if len(sys.argv) > 1 and sys.argv[1] == "run":
10
+ sys.argv.pop(1)
11
+
12
+ typer.run(run)
pipeline/__init__.py ADDED
@@ -0,0 +1,14 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import sys
4
+ from pathlib import Path
5
+ from pkgutil import extend_path
6
+
7
+ _root = Path(__file__).resolve().parent.parent
8
+ _src = _root / "src"
9
+ if _src.exists():
10
+ src_str = str(_src)
11
+ if src_str not in sys.path:
12
+ sys.path.insert(0, src_str)
13
+
14
+ __path__ = extend_path(__path__, __name__)
pipeline/__main__.py ADDED
@@ -0,0 +1,12 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import sys
4
+
5
+ import typer
6
+
7
+ from pipeline.cli import run
8
+
9
+ if len(sys.argv) > 1 and sys.argv[1] == "run":
10
+ sys.argv.pop(1)
11
+
12
+ typer.run(run)
requirements.txt ADDED
@@ -0,0 +1,9 @@
 
 
 
 
 
 
 
 
 
 
1
+ httpx>=0.25
2
+ python-dotenv>=1.0
3
+ typer>=0.9
4
+ pytest>=7.0
5
+ accelerate>=0.27
6
+ pillow>=10.0
7
+ pyarrow>=14.0
8
+ torch>=2.1
9
+ transformers>=4.41
src/labeler/__init__.py ADDED
@@ -0,0 +1,2 @@
 
 
 
1
+ __all__ = ["__version__"]
2
+ __version__ = "0.1.0"
src/labeler/adapters.py ADDED
@@ -0,0 +1,246 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import json
4
+ import logging
5
+ import sqlite3
6
+ from dataclasses import dataclass
7
+ from pathlib import Path
8
+ from typing import Iterable, Optional
9
+
10
+ logger = logging.getLogger("labeler")
11
+
12
+
13
+ @dataclass
14
+ class LabelInput:
15
+ image_path: str
16
+ image_abspath: Optional[Path]
17
+ image_url: Optional[str]
18
+ image_sha256: Optional[str]
19
+ item_name: str
20
+ item_description: Optional[str]
21
+ item_part: Optional[str]
22
+ source_type: str
23
+ ocid: Optional[str]
24
+ ranking: Optional[int]
25
+
26
+
27
+ def iter_inputs(
28
+ *,
29
+ input_path: Optional[Path],
30
+ db_path: Optional[Path],
31
+ only_source: str,
32
+ max_samples: Optional[int],
33
+ run_id: Optional[str],
34
+ ) -> Iterable[LabelInput]:
35
+ if input_path:
36
+ if input_path.suffix.lower() in {".jsonl", ".json"}:
37
+ yield from _iter_manifest_jsonl(input_path, only_source, max_samples)
38
+ elif input_path.suffix.lower() == ".parquet":
39
+ yield from _iter_manifest_parquet(input_path, only_source, max_samples)
40
+ else:
41
+ raise ValueError(f"Unsupported input format: {input_path}")
42
+ return
43
+
44
+ if not db_path:
45
+ raise ValueError("Provide --input or --db")
46
+
47
+ yield from _iter_db(db_path, only_source, max_samples, run_id)
48
+
49
+
50
+ def _iter_manifest_jsonl(
51
+ path: Path,
52
+ only_source: str,
53
+ max_samples: Optional[int],
54
+ ) -> Iterable[LabelInput]:
55
+ base_dir = path.parent
56
+ count = 0
57
+ with path.open("r", encoding="utf-8") as handle:
58
+ for line in handle:
59
+ line = line.strip()
60
+ if not line:
61
+ continue
62
+ try:
63
+ record = json.loads(line)
64
+ except json.JSONDecodeError:
65
+ logger.warning("Skipping invalid JSON line in %s", path)
66
+ continue
67
+ sample = _build_from_record(record, base_dir, only_source)
68
+ if not sample:
69
+ continue
70
+ yield sample
71
+ count += 1
72
+ if max_samples and count >= max_samples:
73
+ break
74
+
75
+
76
+ def _iter_manifest_parquet(
77
+ path: Path,
78
+ only_source: str,
79
+ max_samples: Optional[int],
80
+ ) -> Iterable[LabelInput]:
81
+ try:
82
+ import pyarrow.parquet as pq
83
+ except ImportError as exc:
84
+ raise RuntimeError("pyarrow is required for parquet input") from exc
85
+
86
+ base_dir = path.parent
87
+ table = pq.read_table(path)
88
+ rows = table.to_pylist()
89
+ count = 0
90
+ for record in rows:
91
+ sample = _build_from_record(record, base_dir, only_source)
92
+ if not sample:
93
+ continue
94
+ yield sample
95
+ count += 1
96
+ if max_samples and count >= max_samples:
97
+ break
98
+
99
+
100
+ def _build_from_record(
101
+ record: dict[str, object],
102
+ base_dir: Path,
103
+ only_source: str,
104
+ ) -> Optional[LabelInput]:
105
+ source_type = str(record.get("source_type") or "")
106
+ if not _source_allowed(source_type, only_source):
107
+ return None
108
+
109
+ image_path = str(record.get("image_path") or "").strip()
110
+ if not image_path:
111
+ logger.warning("Missing image_path in manifest record")
112
+ return None
113
+
114
+ image_abspath = Path(image_path)
115
+ if not image_abspath.is_absolute():
116
+ image_abspath = (base_dir / image_abspath).resolve()
117
+
118
+ item_name = str(record.get("item_name") or "").strip()
119
+ if not item_name:
120
+ logger.warning("Missing item_name in manifest record")
121
+ return None
122
+
123
+ return LabelInput(
124
+ image_path=image_path,
125
+ image_abspath=image_abspath,
126
+ image_url=_optional_str(record.get("image_url")),
127
+ image_sha256=_optional_str(record.get("image_sha256")),
128
+ item_name=item_name,
129
+ item_description=_optional_str(record.get("item_description")),
130
+ item_part=_optional_str(record.get("item_part")),
131
+ source_type=source_type,
132
+ ocid=_optional_str(record.get("ocid")),
133
+ ranking=_optional_int(record.get("ranking")),
134
+ )
135
+
136
+
137
+ def _iter_db(
138
+ db_path: Path,
139
+ only_source: str,
140
+ max_samples: Optional[int],
141
+ run_id: Optional[str],
142
+ ) -> Iterable[LabelInput]:
143
+ conn = sqlite3.connect(db_path)
144
+ conn.row_factory = sqlite3.Row
145
+ base_dir = db_path.parent
146
+
147
+ def stream(query: str, params: tuple[object, ...], source_type: str) -> Iterable[LabelInput]:
148
+ cursor = conn.execute(query, params)
149
+ for row in cursor:
150
+ local_path = row["local_path"]
151
+ image_path = local_path or ""
152
+ image_abspath = (base_dir / local_path).resolve() if local_path else None
153
+ item_name = row["item_name"] or ""
154
+ if not item_name:
155
+ continue
156
+ yield LabelInput(
157
+ image_path=image_path,
158
+ image_abspath=image_abspath,
159
+ image_url=row["image_url"],
160
+ image_sha256=row["sha256"],
161
+ item_name=item_name,
162
+ item_description=row["item_description"],
163
+ item_part=_build_item_part(row["item_part"], row["item_slot"]),
164
+ source_type=source_type,
165
+ ocid=row["ocid"],
166
+ ranking=None,
167
+ )
168
+
169
+ count = 0
170
+ if only_source in ("equipment_shape", "all"):
171
+ query, params = _equipment_query(run_id)
172
+ for sample in stream(query, params, "equipment_shape"):
173
+ yield sample
174
+ count += 1
175
+ if max_samples and count >= max_samples:
176
+ conn.close()
177
+ return
178
+
179
+ if only_source in ("cash", "all"):
180
+ query, params = _cash_query(run_id)
181
+ for sample in stream(query, params, "cash"):
182
+ yield sample
183
+ count += 1
184
+ if max_samples and count >= max_samples:
185
+ conn.close()
186
+ return
187
+
188
+ conn.close()
189
+
190
+
191
+ def _equipment_query(run_id: Optional[str]) -> tuple[str, tuple[object, ...]]:
192
+ query = (
193
+ "SELECT e.item_shape_icon_url AS image_url, a.sha256 AS sha256, a.local_path AS local_path, "
194
+ "e.item_name AS item_name, e.item_description AS item_description, "
195
+ "e.item_equipment_part AS item_part, e.equipment_slot AS item_slot, e.ocid AS ocid "
196
+ "FROM equipment_shape_items e "
197
+ "LEFT JOIN icon_assets a ON a.url = e.item_shape_icon_url "
198
+ "WHERE e.item_shape_icon_url IS NOT NULL AND e.item_shape_icon_url != ''"
199
+ )
200
+ if run_id:
201
+ query += " AND e.run_id = ?"
202
+ return query, (run_id,)
203
+ return query, ()
204
+
205
+
206
+ def _cash_query(run_id: Optional[str]) -> tuple[str, tuple[object, ...]]:
207
+ query = (
208
+ "SELECT c.cash_item_icon_url AS image_url, a.sha256 AS sha256, a.local_path AS local_path, "
209
+ "c.cash_item_name AS item_name, c.cash_item_description AS item_description, "
210
+ "c.cash_item_equipment_part AS item_part, c.cash_item_equipment_slot AS item_slot, c.ocid AS ocid "
211
+ "FROM cash_items c "
212
+ "LEFT JOIN icon_assets a ON a.url = c.cash_item_icon_url "
213
+ "WHERE c.cash_item_icon_url IS NOT NULL AND c.cash_item_icon_url != ''"
214
+ )
215
+ if run_id:
216
+ query += " AND c.run_id = ?"
217
+ return query, (run_id,)
218
+ return query, ()
219
+
220
+
221
+ def _source_allowed(source_type: str, only_source: str) -> bool:
222
+ if only_source == "all":
223
+ return source_type in {"equipment_shape", "cash"}
224
+ return source_type == only_source
225
+
226
+
227
+ def _build_item_part(part: Optional[str], slot: Optional[str]) -> Optional[str]:
228
+ part = (part or "").strip()
229
+ slot = (slot or "").strip()
230
+ if part and slot and part != slot:
231
+ return f"{part}/{slot}"
232
+ return part or slot or None
233
+
234
+
235
+ def _optional_str(value: object) -> Optional[str]:
236
+ if value is None:
237
+ return None
238
+ text = str(value).strip()
239
+ return text or None
240
+
241
+
242
+ def _optional_int(value: object) -> Optional[int]:
243
+ try:
244
+ return int(value) if value is not None else None
245
+ except (TypeError, ValueError):
246
+ return None
src/labeler/cli.py ADDED
@@ -0,0 +1,131 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import logging
4
+ from pathlib import Path
5
+ from typing import Optional
6
+
7
+ import typer
8
+
9
+ from .pipeline import LabelingConfig, run_labeling
10
+
11
+
12
+ def run(
13
+ input_path: Optional[Path] = typer.Option(
14
+ None,
15
+ "--input",
16
+ help="Path to manifest.jsonl or manifest.parquet",
17
+ ),
18
+ db_path: Optional[Path] = typer.Option(
19
+ None,
20
+ "--db",
21
+ help="Path to SQLite db.sqlite (used when --input is not provided)",
22
+ ),
23
+ outdir: Optional[Path] = typer.Option(
24
+ None,
25
+ "--outdir",
26
+ help="Output directory for labels (default: input/db parent + /labels)",
27
+ ),
28
+ model: str = typer.Option(
29
+ "Qwen/Qwen2-VL-2B-Instruct",
30
+ "--model",
31
+ help="Model ID",
32
+ ),
33
+ device: str = typer.Option(
34
+ "auto",
35
+ "--device",
36
+ help="Device string (auto, cpu, cuda, cuda:0)",
37
+ ),
38
+ precision: str = typer.Option(
39
+ "auto",
40
+ "--precision",
41
+ help="auto|fp16|bf16|fp32",
42
+ ),
43
+ batch_size: int = typer.Option(8, "--batch-size", help="Batch size"),
44
+ upscale: int = typer.Option(1, "--upscale", help="Upscale factor (e.g. 2 or 4)"),
45
+ alpha_bg: str = typer.Option(
46
+ "white",
47
+ "--alpha-bg",
48
+ help="Background for transparent PNGs: white|black|none",
49
+ ),
50
+ resume: bool = typer.Option(False, "--resume", help="Resume from existing labels.jsonl"),
51
+ lang: str = typer.Option("ko", "--lang", help="ko|en|both"),
52
+ only_source: str = typer.Option(
53
+ "all",
54
+ "--only-source",
55
+ help="equipment_shape|cash|all",
56
+ ),
57
+ max_samples: Optional[int] = typer.Option(
58
+ None,
59
+ "--max-samples",
60
+ help="Limit number of samples (for testing)",
61
+ ),
62
+ no_image: bool = typer.Option(False, "--no-image", help="Use metadata only"),
63
+ no_metadata: bool = typer.Option(False, "--no-metadata", help="Use image only"),
64
+ log_level: str = typer.Option("info", "--log-level", help="info|debug"),
65
+ parquet: bool = typer.Option(False, "--parquet", help="Write labels.parquet"),
66
+ load_4bit: bool = typer.Option(False, "--load-4bit", help="Enable 4-bit quantization"),
67
+ max_new_tokens: int = typer.Option(384, "--max-new-tokens", help="Max new tokens"),
68
+ run_id: Optional[str] = typer.Option(
69
+ None,
70
+ "--run-id",
71
+ help="Filter DB inputs by run_id",
72
+ ),
73
+ ) -> None:
74
+ """Generate CLIP-ready labels for MapleStory item icons."""
75
+
76
+ logging.basicConfig(level=_parse_log_level(log_level), format="%(levelname)s: %(message)s")
77
+
78
+ if not input_path and not db_path:
79
+ typer.echo("Provide --input or --db")
80
+ raise typer.Exit(code=1)
81
+ if alpha_bg not in {"white", "black", "none"}:
82
+ typer.echo("--alpha-bg must be white, black, or none")
83
+ raise typer.Exit(code=1)
84
+ if lang not in {"ko", "en", "both"}:
85
+ typer.echo("--lang must be ko, en, or both")
86
+ raise typer.Exit(code=1)
87
+ if only_source not in {"equipment_shape", "cash", "all"}:
88
+ typer.echo("--only-source must be equipment_shape, cash, or all")
89
+ raise typer.Exit(code=1)
90
+ if precision not in {"auto", "fp16", "bf16", "fp32"}:
91
+ typer.echo("--precision must be auto, fp16, bf16, or fp32")
92
+ raise typer.Exit(code=1)
93
+
94
+ resolved_outdir = outdir
95
+ if not resolved_outdir:
96
+ if input_path:
97
+ resolved_outdir = input_path.parent / "labels"
98
+ else:
99
+ resolved_outdir = db_path.parent / "labels"
100
+
101
+ config = LabelingConfig(
102
+ input_path=input_path,
103
+ db_path=db_path,
104
+ outdir=resolved_outdir,
105
+ model_id=model,
106
+ device=device,
107
+ precision=precision,
108
+ batch_size=batch_size,
109
+ upscale=upscale,
110
+ alpha_bg=alpha_bg,
111
+ resume=resume,
112
+ lang=lang,
113
+ only_source=only_source,
114
+ max_samples=max_samples,
115
+ no_image=no_image,
116
+ no_metadata=no_metadata,
117
+ log_level=log_level,
118
+ parquet=parquet,
119
+ load_4bit=load_4bit,
120
+ max_new_tokens=max_new_tokens,
121
+ run_id=run_id,
122
+ )
123
+
124
+ run_labeling(config)
125
+
126
+
127
+ def _parse_log_level(value: str) -> int:
128
+ value = value.lower()
129
+ if value == "debug":
130
+ return logging.DEBUG
131
+ return logging.INFO
src/labeler/image_utils.py ADDED
@@ -0,0 +1,32 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ from pathlib import Path
4
+ from typing import Optional
5
+
6
+ from PIL import Image
7
+
8
+
9
+ def load_image(
10
+ path: Path,
11
+ upscale: int,
12
+ alpha_background: str,
13
+ ) -> Image.Image:
14
+ image = Image.open(path)
15
+ image = _apply_alpha(image, alpha_background)
16
+ if upscale and upscale > 1:
17
+ image = image.resize(
18
+ (image.width * upscale, image.height * upscale),
19
+ resample=Image.BICUBIC,
20
+ )
21
+ return image
22
+
23
+
24
+ def _apply_alpha(image: Image.Image, alpha_background: str) -> Image.Image:
25
+ if image.mode in ("RGBA", "LA") or (image.mode == "P" and "transparency" in image.info):
26
+ if alpha_background == "none":
27
+ return image.convert("RGBA")
28
+ color = (255, 255, 255, 255) if alpha_background == "white" else (0, 0, 0, 255)
29
+ background = Image.new("RGBA", image.size, color)
30
+ foreground = image.convert("RGBA")
31
+ return Image.alpha_composite(background, foreground).convert("RGB")
32
+ return image.convert("RGB")
src/labeler/model.py ADDED
@@ -0,0 +1,122 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import logging
4
+ from dataclasses import dataclass
5
+ from typing import Iterable, Optional
6
+
7
+ import torch
8
+ from transformers import AutoProcessor, Qwen2VLForConditionalGeneration
9
+
10
+ logger = logging.getLogger("labeler")
11
+
12
+
13
+ @dataclass
14
+ class ModelConfig:
15
+ model_id: str
16
+ device: str
17
+ precision: str
18
+ max_new_tokens: int
19
+ load_4bit: bool
20
+
21
+
22
+ class LabelerModel:
23
+ def __init__(self, config: ModelConfig) -> None:
24
+ self.config = config
25
+ self.device = _resolve_device(config.device)
26
+ self.dtype = _resolve_dtype(config.precision, self.device)
27
+ quantization_config = None
28
+ load_kwargs: dict[str, object] = {}
29
+ if config.load_4bit:
30
+ try:
31
+ from transformers import BitsAndBytesConfig
32
+ except ImportError as exc:
33
+ raise RuntimeError("bitsandbytes is required for 4-bit loading") from exc
34
+ quantization_config = BitsAndBytesConfig(
35
+ load_in_4bit=True,
36
+ bnb_4bit_use_double_quant=True,
37
+ bnb_4bit_compute_dtype=self.dtype,
38
+ )
39
+ load_kwargs["quantization_config"] = quantization_config
40
+ load_kwargs["device_map"] = "auto"
41
+ elif self.device.startswith("cuda"):
42
+ load_kwargs["device_map"] = "auto"
43
+
44
+ self.processor = AutoProcessor.from_pretrained(config.model_id)
45
+ self.model = Qwen2VLForConditionalGeneration.from_pretrained(
46
+ config.model_id,
47
+ torch_dtype=self.dtype,
48
+ low_cpu_mem_usage=True,
49
+ **load_kwargs,
50
+ )
51
+ if not load_kwargs.get("device_map"):
52
+ self.model.to(self.device)
53
+ self.model.eval()
54
+
55
+ def generate_texts(
56
+ self,
57
+ messages_list: list[list[dict[str, object]]],
58
+ images: Optional[list[object]],
59
+ ) -> list[str]:
60
+ prompts = [
61
+ self.processor.apply_chat_template(
62
+ messages,
63
+ tokenize=False,
64
+ add_generation_prompt=True,
65
+ )
66
+ for messages in messages_list
67
+ ]
68
+
69
+ if images is None:
70
+ inputs = self.processor(
71
+ text=prompts,
72
+ padding=True,
73
+ return_tensors="pt",
74
+ )
75
+ else:
76
+ inputs = self.processor(
77
+ text=prompts,
78
+ images=images,
79
+ padding=True,
80
+ return_tensors="pt",
81
+ )
82
+ inputs = _move_to_device(inputs, self.model.device)
83
+
84
+ with torch.inference_mode():
85
+ output_ids = self.model.generate(
86
+ **inputs,
87
+ max_new_tokens=self.config.max_new_tokens,
88
+ do_sample=False,
89
+ )
90
+ prompt_length = inputs["input_ids"].shape[1]
91
+ generated_ids = output_ids[:, prompt_length:]
92
+ return self.processor.batch_decode(generated_ids, skip_special_tokens=True)
93
+
94
+
95
+ def _resolve_device(device: str) -> str:
96
+ if device == "auto":
97
+ return "cuda" if torch.cuda.is_available() else "cpu"
98
+ return device
99
+
100
+
101
+ def _resolve_dtype(precision: str, device: str) -> torch.dtype:
102
+ if precision == "fp32":
103
+ return torch.float32
104
+ if precision == "bf16":
105
+ if device.startswith("cuda") and torch.cuda.is_bf16_supported():
106
+ return torch.bfloat16
107
+ return torch.float16
108
+ if precision == "fp16":
109
+ return torch.float16
110
+ if device.startswith("cuda"):
111
+ return torch.float16
112
+ return torch.float32
113
+
114
+
115
+ def _move_to_device(inputs: dict[str, object], device: torch.device | str) -> dict[str, object]:
116
+ moved = {}
117
+ for key, value in inputs.items():
118
+ if hasattr(value, "to"):
119
+ moved[key] = value.to(device)
120
+ else:
121
+ moved[key] = value
122
+ return moved
src/labeler/pipeline.py ADDED
@@ -0,0 +1,379 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import json
4
+ import logging
5
+ from dataclasses import dataclass
6
+ from pathlib import Path
7
+ from typing import Iterable, Optional
8
+
9
+ from .adapters import LabelInput, iter_inputs
10
+ from .image_utils import load_image
11
+ from .model import LabelerModel, ModelConfig
12
+ from .prompts import PROMPT_VERSION, PromptInputs, build_messages, build_user_prompt
13
+ from pipeline.utils import ensure_dir, utc_now_iso
14
+
15
+ logger = logging.getLogger("labeler")
16
+
17
+
18
+ @dataclass
19
+ class LabelingConfig:
20
+ input_path: Optional[Path]
21
+ db_path: Optional[Path]
22
+ outdir: Path
23
+ model_id: str
24
+ device: str
25
+ precision: str
26
+ batch_size: int
27
+ upscale: int
28
+ alpha_bg: str
29
+ resume: bool
30
+ lang: str
31
+ only_source: str
32
+ max_samples: Optional[int]
33
+ no_image: bool
34
+ no_metadata: bool
35
+ log_level: str
36
+ parquet: bool
37
+ load_4bit: bool
38
+ max_new_tokens: int
39
+ run_id: Optional[str]
40
+
41
+
42
+ def run_labeling(config: LabelingConfig) -> None:
43
+ ensure_dir(config.outdir)
44
+ output_jsonl = config.outdir / "labels.jsonl"
45
+ output_parquet = config.outdir / "labels.parquet"
46
+ error_log = config.outdir / "labels_errors.log"
47
+
48
+ if output_jsonl.exists() and not config.resume:
49
+ raise RuntimeError("labels.jsonl already exists; use --resume or remove the file")
50
+
51
+ existing_paths: set[str] = set()
52
+ existing_sha: set[str] = set()
53
+ if config.resume and output_jsonl.exists():
54
+ existing_paths, existing_sha = _load_existing(output_jsonl)
55
+
56
+ model = LabelerModel(
57
+ ModelConfig(
58
+ model_id=config.model_id,
59
+ device=config.device,
60
+ precision=config.precision,
61
+ max_new_tokens=config.max_new_tokens,
62
+ load_4bit=config.load_4bit,
63
+ )
64
+ )
65
+
66
+ records_for_parquet: list[dict[str, object]] = []
67
+ seen_paths: set[str] = set()
68
+ seen_sha: set[str] = set()
69
+
70
+ with output_jsonl.open("a", encoding="utf-8") as out_handle, error_log.open(
71
+ "a", encoding="utf-8"
72
+ ) as err_handle:
73
+ batch: list[LabelInput] = []
74
+ for sample in iter_inputs(
75
+ input_path=config.input_path,
76
+ db_path=config.db_path,
77
+ only_source=config.only_source,
78
+ max_samples=config.max_samples,
79
+ run_id=config.run_id,
80
+ ):
81
+ if sample.image_path in existing_paths or sample.image_path in seen_paths:
82
+ continue
83
+ if sample.image_sha256 and (
84
+ sample.image_sha256 in existing_sha or sample.image_sha256 in seen_sha
85
+ ):
86
+ continue
87
+ if not config.no_image:
88
+ if not sample.image_abspath or not sample.image_abspath.exists():
89
+ _log_error(err_handle, sample.image_path, "missing_image")
90
+ continue
91
+ batch.append(sample)
92
+ if len(batch) >= config.batch_size:
93
+ _process_batch(
94
+ batch,
95
+ model,
96
+ config,
97
+ out_handle,
98
+ err_handle,
99
+ records_for_parquet,
100
+ seen_paths,
101
+ seen_sha,
102
+ )
103
+ batch = []
104
+
105
+ if batch:
106
+ _process_batch(
107
+ batch,
108
+ model,
109
+ config,
110
+ out_handle,
111
+ err_handle,
112
+ records_for_parquet,
113
+ seen_paths,
114
+ seen_sha,
115
+ )
116
+
117
+ if config.parquet:
118
+ _write_parquet(output_parquet, records_for_parquet)
119
+
120
+
121
+ def _process_batch(
122
+ batch: list[LabelInput],
123
+ model: LabelerModel,
124
+ config: LabelingConfig,
125
+ out_handle,
126
+ err_handle,
127
+ parquet_buffer: list[dict[str, object]],
128
+ seen_paths: set[str],
129
+ seen_sha: set[str],
130
+ ) -> None:
131
+ include_image = not config.no_image
132
+ include_metadata = not config.no_metadata
133
+
134
+ messages_list = []
135
+ images = []
136
+ active_samples: list[LabelInput] = []
137
+ for sample in batch:
138
+ prompt_inputs = PromptInputs(
139
+ item_name=sample.item_name,
140
+ item_description=sample.item_description,
141
+ item_part=sample.item_part,
142
+ source_type=sample.source_type,
143
+ include_image=include_image,
144
+ include_metadata=include_metadata,
145
+ lang=config.lang,
146
+ )
147
+ user_prompt = build_user_prompt(prompt_inputs)
148
+ messages_list.append(build_messages(user_prompt, include_image, strict=False))
149
+ if include_image:
150
+ try:
151
+ images.append(load_image(sample.image_abspath, config.upscale, config.alpha_bg))
152
+ except Exception:
153
+ _log_error(err_handle, sample.image_path, "image_load_failed")
154
+ messages_list.pop()
155
+ continue
156
+ active_samples.append(sample)
157
+
158
+ if not active_samples:
159
+ return
160
+
161
+ outputs = model.generate_texts(messages_list, images if include_image else None)
162
+
163
+ for sample, raw_text in zip(active_samples, outputs):
164
+ record = _parse_and_build(
165
+ sample,
166
+ raw_text,
167
+ model,
168
+ config,
169
+ err_handle,
170
+ include_image,
171
+ include_metadata,
172
+ )
173
+ if not record:
174
+ continue
175
+ out_handle.write(json.dumps(record, ensure_ascii=False) + "\n")
176
+ out_handle.flush()
177
+ parquet_buffer.append(record)
178
+ seen_paths.add(sample.image_path)
179
+ if sample.image_sha256:
180
+ seen_sha.add(sample.image_sha256)
181
+
182
+
183
+ def _parse_and_build(
184
+ sample: LabelInput,
185
+ raw_text: str,
186
+ model: LabelerModel,
187
+ config: LabelingConfig,
188
+ err_handle,
189
+ include_image: bool,
190
+ include_metadata: bool,
191
+ ) -> Optional[dict[str, object]]:
192
+ parsed = _try_parse(raw_text)
193
+ if parsed is None:
194
+ parsed = _retry_strict(sample, model, config, include_image, include_metadata)
195
+ if parsed is None:
196
+ _log_error(err_handle, sample.image_path, "invalid_json")
197
+ return None
198
+
199
+ try:
200
+ record = _normalize_record(parsed, sample, config)
201
+ except ValueError as exc:
202
+ _log_error(err_handle, sample.image_path, str(exc))
203
+ return None
204
+ return record
205
+
206
+
207
+ def _retry_strict(
208
+ sample: LabelInput,
209
+ model: LabelerModel,
210
+ config: LabelingConfig,
211
+ include_image: bool,
212
+ include_metadata: bool,
213
+ ) -> Optional[dict[str, object]]:
214
+ prompt_inputs = PromptInputs(
215
+ item_name=sample.item_name,
216
+ item_description=sample.item_description,
217
+ item_part=sample.item_part,
218
+ source_type=sample.source_type,
219
+ include_image=include_image,
220
+ include_metadata=include_metadata,
221
+ lang=config.lang,
222
+ )
223
+ user_prompt = build_user_prompt(prompt_inputs)
224
+ messages = [build_messages(user_prompt, include_image, strict=True)]
225
+ images = None
226
+ if include_image:
227
+ try:
228
+ images = [load_image(sample.image_abspath, config.upscale, config.alpha_bg)]
229
+ except Exception:
230
+ return None
231
+ output = model.generate_texts(messages, images)
232
+ if not output:
233
+ return None
234
+ return _try_parse(output[0])
235
+
236
+
237
+ def _try_parse(raw_text: str) -> Optional[dict[str, object]]:
238
+ text = raw_text.strip()
239
+ if text.startswith("```"):
240
+ text = text.split("\n", 1)[-1]
241
+ if text.endswith("```"):
242
+ text = text.rsplit("```", 1)[0]
243
+ text = text.strip()
244
+ try:
245
+ return json.loads(text)
246
+ except json.JSONDecodeError:
247
+ start = text.find("{")
248
+ end = text.rfind("}")
249
+ if start == -1 or end == -1 or end <= start:
250
+ return None
251
+ try:
252
+ return json.loads(text[start : end + 1])
253
+ except json.JSONDecodeError:
254
+ return None
255
+
256
+
257
+ def _normalize_record(
258
+ parsed: dict[str, object],
259
+ sample: LabelInput,
260
+ config: LabelingConfig,
261
+ ) -> dict[str, object]:
262
+ label_ko = _clean_text(parsed.get("label_ko"))
263
+ if not label_ko:
264
+ raise ValueError("label_ko_missing")
265
+
266
+ label_en = _clean_text(parsed.get("label_en"))
267
+ if config.lang == "ko":
268
+ label_en = None
269
+
270
+ tags = _normalize_list(parsed.get("tags_ko"), max_items=15)
271
+ queries = _normalize_list(parsed.get("query_variants_ko"), max_items=8)
272
+
273
+ attributes = parsed.get("attributes") if isinstance(parsed.get("attributes"), dict) else {}
274
+ normalized_attributes = {
275
+ "colors": _normalize_list(attributes.get("colors"), max_items=10),
276
+ "theme": _normalize_list(attributes.get("theme"), max_items=10),
277
+ "material": _normalize_list(attributes.get("material"), max_items=10),
278
+ "vibe": _normalize_list(attributes.get("vibe"), max_items=10),
279
+ "item_type_guess": _clean_text(attributes.get("item_type_guess")),
280
+ }
281
+
282
+ quality = parsed.get("quality_flags") if isinstance(parsed.get("quality_flags"), dict) else {}
283
+ is_uncertain = bool(quality.get("is_uncertain", False))
284
+ reasons = _normalize_list(quality.get("reasons"))
285
+
286
+ if len(tags) < 5:
287
+ is_uncertain = True
288
+ reasons.append("few_tags")
289
+ if len(queries) < 3:
290
+ is_uncertain = True
291
+ reasons.append("few_queries")
292
+
293
+ reasons = _unique_list(reasons)
294
+
295
+ return {
296
+ "image_path": sample.image_path,
297
+ "image_sha256": sample.image_sha256,
298
+ "source_type": sample.source_type,
299
+ "item_name": sample.item_name,
300
+ "item_description": sample.item_description,
301
+ "label_ko": label_ko,
302
+ "label_en": label_en,
303
+ "tags_ko": tags,
304
+ "attributes": normalized_attributes,
305
+ "query_variants_ko": queries,
306
+ "quality_flags": {"is_uncertain": is_uncertain, "reasons": reasons},
307
+ "model": config.model_id,
308
+ "prompt_version": PROMPT_VERSION,
309
+ "generated_at": utc_now_iso(),
310
+ }
311
+
312
+
313
+ def _normalize_list(value: object, max_items: Optional[int] = None) -> list[str]:
314
+ if value is None:
315
+ items: list[str] = []
316
+ elif isinstance(value, list):
317
+ items = [str(item).strip() for item in value if str(item).strip()]
318
+ else:
319
+ items = [item.strip() for item in str(value).split(",") if item.strip()]
320
+ if max_items:
321
+ items = items[:max_items]
322
+ return items
323
+
324
+
325
+ def _clean_text(value: object) -> Optional[str]:
326
+ if value is None:
327
+ return None
328
+ text = str(value).strip()
329
+ return text or None
330
+
331
+
332
+ def _unique_list(values: Iterable[str]) -> list[str]:
333
+ seen = set()
334
+ result = []
335
+ for value in values:
336
+ if value in seen:
337
+ continue
338
+ seen.add(value)
339
+ result.append(value)
340
+ return result
341
+
342
+
343
+ def _log_error(handle, image_path: str, message: str) -> None:
344
+ handle.write(f"{image_path}\t{message}\n")
345
+ handle.flush()
346
+
347
+
348
+ def _load_existing(path: Path) -> tuple[set[str], set[str]]:
349
+ paths: set[str] = set()
350
+ shas: set[str] = set()
351
+ with path.open("r", encoding="utf-8") as handle:
352
+ for line in handle:
353
+ line = line.strip()
354
+ if not line:
355
+ continue
356
+ try:
357
+ record = json.loads(line)
358
+ except json.JSONDecodeError:
359
+ continue
360
+ image_path = record.get("image_path")
361
+ image_sha = record.get("image_sha256")
362
+ if isinstance(image_path, str):
363
+ paths.add(image_path)
364
+ if isinstance(image_sha, str):
365
+ shas.add(image_sha)
366
+ return paths, shas
367
+
368
+
369
+ def _write_parquet(path: Path, records: list[dict[str, object]]) -> None:
370
+ if not records:
371
+ return
372
+ try:
373
+ import pyarrow as pa
374
+ import pyarrow.parquet as pq
375
+ except ImportError:
376
+ logger.warning("pyarrow not installed; skipping parquet output")
377
+ return
378
+ table = pa.Table.from_pylist(records)
379
+ pq.write_table(table, path)
src/labeler/prompts.py ADDED
@@ -0,0 +1,80 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ from dataclasses import dataclass
4
+ from typing import Optional
5
+
6
+ PROMPT_VERSION = "v1"
7
+
8
+ SYSTEM_PROMPT_BASE = (
9
+ "You are generating labels for MapleStory item icons for CLIP training. "
10
+ "Return a single JSON object only. Do not output markdown or extra text. "
11
+ "Use item_name as the primary identifier; keep it intact. "
12
+ "Use metadata if provided and avoid guessing. "
13
+ "If uncertain, set quality_flags.is_uncertain=true and add reasons. "
14
+ "label_ko must be one short Korean sentence with key visual keywords. "
15
+ "tags_ko must be 5-15 short Korean keywords. "
16
+ "query_variants_ko must be 3-8 natural Korean search queries. "
17
+ "attributes must include colors/theme/material/vibe lists and item_type_guess string or null. "
18
+ "If label_en is not requested, set it to null."
19
+ )
20
+
21
+ SYSTEM_PROMPT_STRICT = (
22
+ SYSTEM_PROMPT_BASE
23
+ + " Output must be valid JSON with double quotes and no trailing commas."
24
+ )
25
+
26
+
27
+ @dataclass
28
+ class PromptInputs:
29
+ item_name: str
30
+ item_description: Optional[str]
31
+ item_part: Optional[str]
32
+ source_type: str
33
+ include_image: bool
34
+ include_metadata: bool
35
+ lang: str
36
+
37
+
38
+ def build_user_prompt(inputs: PromptInputs) -> str:
39
+ lines = []
40
+ if inputs.include_metadata:
41
+ lines.append(f"item_name: {inputs.item_name}")
42
+ if inputs.item_description:
43
+ lines.append(f"item_description: {inputs.item_description}")
44
+ else:
45
+ lines.append("item_description: (none)")
46
+ if inputs.item_part:
47
+ lines.append(f"item_part: {inputs.item_part}")
48
+ else:
49
+ lines.append("item_part: (none)")
50
+ lines.append(f"source_type: {inputs.source_type}")
51
+ else:
52
+ lines.append("metadata: (not provided)")
53
+ lines.append(f"source_type: {inputs.source_type}")
54
+
55
+ if inputs.include_image:
56
+ lines.append("image: provided")
57
+ else:
58
+ lines.append("image: not provided (metadata-only)")
59
+
60
+ lines.append(f"language: {inputs.lang}")
61
+ lines.append(
62
+ "Return JSON with keys: label_ko, label_en, tags_ko, attributes, "
63
+ "query_variants_ko, quality_flags."
64
+ )
65
+ return "\n".join(lines)
66
+
67
+
68
+ def build_messages(user_prompt: str, include_image: bool, strict: bool) -> list[dict[str, object]]:
69
+ system_prompt = SYSTEM_PROMPT_STRICT if strict else SYSTEM_PROMPT_BASE
70
+ if include_image:
71
+ content: list[dict[str, object]] = [
72
+ {"type": "image"},
73
+ {"type": "text", "text": user_prompt},
74
+ ]
75
+ else:
76
+ content = [{"type": "text", "text": user_prompt}]
77
+ return [
78
+ {"role": "system", "content": system_prompt},
79
+ {"role": "user", "content": content},
80
+ ]
src/pipeline/__init__.py ADDED
@@ -0,0 +1,2 @@
 
 
 
1
+ __all__ = ["__version__"]
2
+ __version__ = "0.1.0"
src/pipeline/api.py ADDED
@@ -0,0 +1,166 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import asyncio
4
+ import json
5
+ import random
6
+ from dataclasses import dataclass
7
+ from typing import Any, Optional
8
+
9
+ import httpx
10
+
11
+ from .utils import ApiMetrics, RateLimiter
12
+
13
+ BASE_URL = "https://open.api.nexon.com"
14
+
15
+
16
+ class ApiError(RuntimeError):
17
+ def __init__(self, status_code: int, message: str, payload: Any = None) -> None:
18
+ super().__init__(message)
19
+ self.status_code = status_code
20
+ self.payload = payload
21
+
22
+
23
+ class RateLimitError(ApiError):
24
+ def __init__(self, status_code: int, message: str, payload: Any = None, retry_after: Optional[float] = None) -> None:
25
+ super().__init__(status_code, message, payload)
26
+ self.retry_after = retry_after
27
+
28
+
29
+ class ServerError(ApiError):
30
+ pass
31
+
32
+
33
+ class DataPreparingError(ApiError):
34
+ pass
35
+
36
+
37
+ class TransportError(RuntimeError):
38
+ pass
39
+
40
+
41
+ @dataclass
42
+ class ApiClient:
43
+ api_key: str
44
+ concurrency: int = 8
45
+ rps: float = 5.0
46
+ timeout_seconds: float = 30.0
47
+ max_attempts: int = 5
48
+
49
+ def __post_init__(self) -> None:
50
+ self._client: Optional[httpx.AsyncClient] = None
51
+ self._semaphore = asyncio.Semaphore(self.concurrency)
52
+ self._rate_limiter = RateLimiter(self.rps)
53
+ self.metrics = ApiMetrics()
54
+
55
+ async def __aenter__(self) -> "ApiClient":
56
+ headers = {"x-nxopen-api-key": self.api_key}
57
+ self._client = httpx.AsyncClient(base_url=BASE_URL, headers=headers)
58
+ return self
59
+
60
+ async def __aexit__(self, exc_type, exc, tb) -> None:
61
+ if self._client:
62
+ await self._client.aclose()
63
+
64
+ async def get(self, path: str, params: dict[str, Any]) -> dict[str, Any]:
65
+ return await self._request_json("GET", path, params=params)
66
+
67
+ async def _request_json(self, method: str, path: str, params: dict[str, Any]) -> dict[str, Any]:
68
+ attempt = 0
69
+ while True:
70
+ attempt += 1
71
+ try:
72
+ await self._rate_limiter.acquire()
73
+ async with self._semaphore:
74
+ assert self._client is not None
75
+ response = await self._client.request(
76
+ method,
77
+ path,
78
+ params=params,
79
+ timeout=self.timeout_seconds,
80
+ )
81
+ self.metrics.total_requests += 1
82
+ if 200 <= response.status_code < 300:
83
+ return response.json()
84
+
85
+ payload = _safe_json(response)
86
+ message = _extract_message(payload)
87
+ if response.status_code == 400 and _is_data_preparing(payload):
88
+ self.metrics.data_preparing_hits += 1
89
+ raise DataPreparingError(response.status_code, message, payload)
90
+ if response.status_code == 429:
91
+ self.metrics.rate_limit_hits += 1
92
+ retry_after = _retry_after_seconds(response)
93
+ raise RateLimitError(response.status_code, message, payload, retry_after=retry_after)
94
+ if response.status_code >= 500:
95
+ self.metrics.server_errors += 1
96
+ raise ServerError(response.status_code, message, payload)
97
+
98
+ self.metrics.other_errors += 1
99
+ raise ApiError(response.status_code, message, payload)
100
+ except (httpx.TimeoutException, httpx.TransportError) as exc:
101
+ self.metrics.other_errors += 1
102
+ error = TransportError(str(exc))
103
+ except (RateLimitError, ServerError, DataPreparingError) as exc:
104
+ error = exc
105
+ except ApiError:
106
+ raise
107
+
108
+ if attempt >= self.max_attempts:
109
+ raise error
110
+
111
+ await asyncio.sleep(_compute_wait_seconds(error, attempt))
112
+
113
+
114
+ def _safe_json(response: httpx.Response) -> Any:
115
+ try:
116
+ return response.json()
117
+ except json.JSONDecodeError:
118
+ return {"message": response.text}
119
+
120
+
121
+ def _extract_message(payload: Any) -> str:
122
+ if isinstance(payload, dict):
123
+ if isinstance(payload.get("error"), dict):
124
+ return payload["error"].get("message") or "API error"
125
+ return payload.get("message") or "API error"
126
+ return "API error"
127
+
128
+
129
+ def _extract_code(payload: Any) -> Optional[str]:
130
+ if isinstance(payload, dict):
131
+ if isinstance(payload.get("error"), dict):
132
+ return payload["error"].get("code") or payload["error"].get("error_code")
133
+ return payload.get("code") or payload.get("error_code")
134
+ return None
135
+
136
+
137
+ def _is_data_preparing(payload: Any) -> bool:
138
+ code = _extract_code(payload)
139
+ message = _extract_message(payload)
140
+ if code and code.upper() == "OPENAPI00009":
141
+ return True
142
+ if message and "Data being prepared" in message:
143
+ return True
144
+ return False
145
+
146
+
147
+ def _retry_after_seconds(response: httpx.Response) -> Optional[float]:
148
+ value = response.headers.get("Retry-After")
149
+ if not value:
150
+ return None
151
+ try:
152
+ return float(value)
153
+ except ValueError:
154
+ return None
155
+
156
+
157
+ def _compute_wait_seconds(error: Exception, attempt: int) -> float:
158
+ if isinstance(error, DataPreparingError):
159
+ return random.uniform(30, 120)
160
+ base = 1.0
161
+ max_wait = 30.0
162
+ wait = min(max_wait, base * (2 ** (attempt - 1)))
163
+ jitter = random.uniform(0, 0.5)
164
+ if isinstance(error, RateLimitError) and error.retry_after:
165
+ return max(wait + jitter, error.retry_after)
166
+ return wait + jitter
src/pipeline/cli.py ADDED
@@ -0,0 +1,104 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import asyncio
4
+ import logging
5
+ from pathlib import Path
6
+ from typing import Optional
7
+
8
+ import typer
9
+
10
+ from .pipeline import run_pipeline
11
+ from .utils import get_env_or_none, kst_yesterday_date, load_dotenv_if_available
12
+
13
+
14
+ def run(
15
+ date: Optional[str] = typer.Option(
16
+ None,
17
+ "--date",
18
+ help="Target date in YYYY-MM-DD (KST). Defaults to yesterday in KST.",
19
+ ),
20
+ top: int = typer.Option(100, "--top", help="Alias for --end-rank (default 100)."),
21
+ start_rank: int = typer.Option(1, "--start-rank", help="Starting ranking (inclusive)."),
22
+ end_rank: Optional[int] = typer.Option(
23
+ None,
24
+ "--end-rank",
25
+ help="Ending ranking (inclusive). Defaults to --top.",
26
+ ),
27
+ download_icons: bool = typer.Option(
28
+ True,
29
+ "--download-icons/--no-download",
30
+ help="Download icon assets locally.",
31
+ ),
32
+ concurrency: int = typer.Option(8, "--concurrency", help="Max concurrent requests."),
33
+ rps: float = typer.Option(5.0, "--rps", help="Requests per second throttle."),
34
+ output_dir: Optional[Path] = typer.Option(
35
+ None,
36
+ "--output-dir",
37
+ help="Base output directory for data storage.",
38
+ ),
39
+ db_path: Optional[Path] = typer.Option(
40
+ None,
41
+ "--db-path",
42
+ help="Override SQLite DB path (default is output_dir/date/db.sqlite).",
43
+ ),
44
+ world_name: Optional[str] = typer.Option(None, "--world-name", help="Ranking world name filter."),
45
+ world_type: Optional[int] = typer.Option(None, "--world-type", help="Ranking world type filter."),
46
+ class_name: Optional[str] = typer.Option(None, "--class-name", help="Ranking class filter."),
47
+ all_presets: bool = typer.Option(
48
+ False,
49
+ "--all-presets",
50
+ help="Store all cash equipment presets instead of current/default.",
51
+ ),
52
+ run_id: Optional[str] = typer.Option(
53
+ None,
54
+ "--run-id",
55
+ help="Reuse an existing run_id to merge additional ranking ranges.",
56
+ ),
57
+ api_key: Optional[str] = typer.Option(
58
+ None,
59
+ "--api-key",
60
+ help="Override NEXON_API_KEY environment variable.",
61
+ ),
62
+ ) -> None:
63
+ """Collect MapleStory ranking equipment and cash icons via Nexon Open API."""
64
+
65
+ load_dotenv_if_available()
66
+ logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
67
+
68
+ resolved_key = api_key or get_env_or_none("NEXON_API_KEY")
69
+ if not resolved_key:
70
+ typer.echo("Missing NEXON_API_KEY. Set it in the environment or pass --api-key.")
71
+ raise typer.Exit(code=1)
72
+
73
+ resolved_date = date or kst_yesterday_date()
74
+ resolved_output_dir = output_dir or Path(get_env_or_none("OUTPUT_DIR") or "data")
75
+
76
+ resolved_end_rank = end_rank or top
77
+ if start_rank < 1:
78
+ typer.echo("--start-rank must be >= 1")
79
+ raise typer.Exit(code=1)
80
+ if resolved_end_rank < start_rank:
81
+ typer.echo("--end-rank must be >= --start-rank")
82
+ raise typer.Exit(code=1)
83
+
84
+ report = asyncio.run(
85
+ run_pipeline(
86
+ api_key=resolved_key,
87
+ target_date=resolved_date,
88
+ start_rank=start_rank,
89
+ end_rank=resolved_end_rank,
90
+ download_icon_assets=download_icons,
91
+ output_dir=resolved_output_dir,
92
+ db_path=db_path,
93
+ concurrency=concurrency,
94
+ rps=rps,
95
+ world_name=world_name,
96
+ world_type=world_type,
97
+ class_name=class_name,
98
+ all_presets=all_presets,
99
+ run_id_override=run_id,
100
+ )
101
+ )
102
+
103
+ typer.echo("Run complete")
104
+ typer.echo(report.to_markdown())
src/pipeline/db.py ADDED
@@ -0,0 +1,344 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import sqlite3
4
+ from pathlib import Path
5
+ from typing import Any, Iterable
6
+
7
+
8
+ def connect(db_path: Path) -> sqlite3.Connection:
9
+ db_path.parent.mkdir(parents=True, exist_ok=True)
10
+ conn = sqlite3.connect(db_path)
11
+ conn.execute("PRAGMA foreign_keys = ON")
12
+ return conn
13
+
14
+
15
+ def init_db(conn: sqlite3.Connection) -> None:
16
+ conn.executescript(
17
+ """
18
+ CREATE TABLE IF NOT EXISTS runs (
19
+ run_id TEXT PRIMARY KEY,
20
+ target_date TEXT NOT NULL,
21
+ created_at TEXT NOT NULL,
22
+ params_json TEXT NOT NULL
23
+ );
24
+
25
+ CREATE TABLE IF NOT EXISTS ranking_entries (
26
+ run_id TEXT NOT NULL,
27
+ ranking INTEGER NOT NULL,
28
+ character_name TEXT,
29
+ world_name TEXT,
30
+ class_name TEXT,
31
+ sub_class_name TEXT,
32
+ character_level INTEGER,
33
+ character_exp INTEGER,
34
+ character_popularity INTEGER,
35
+ character_guildname TEXT,
36
+ UNIQUE(run_id, ranking)
37
+ );
38
+
39
+ CREATE TABLE IF NOT EXISTS characters (
40
+ ocid TEXT PRIMARY KEY,
41
+ character_name TEXT,
42
+ first_seen_at TEXT,
43
+ last_seen_at TEXT
44
+ );
45
+
46
+ CREATE TABLE IF NOT EXISTS equipment_shape_items (
47
+ run_id TEXT NOT NULL,
48
+ ocid TEXT NOT NULL,
49
+ item_equipment_part TEXT,
50
+ equipment_slot TEXT,
51
+ item_name TEXT,
52
+ item_icon_url TEXT,
53
+ item_description TEXT,
54
+ item_shape_name TEXT,
55
+ item_shape_icon_url TEXT,
56
+ raw_json TEXT,
57
+ UNIQUE(run_id, ocid, item_equipment_part, equipment_slot)
58
+ );
59
+
60
+ CREATE TABLE IF NOT EXISTS cash_items (
61
+ run_id TEXT NOT NULL,
62
+ ocid TEXT NOT NULL,
63
+ preset_no INTEGER,
64
+ cash_item_equipment_part TEXT,
65
+ cash_item_equipment_slot TEXT,
66
+ cash_item_name TEXT,
67
+ cash_item_icon_url TEXT,
68
+ cash_item_description TEXT,
69
+ cash_item_label TEXT,
70
+ date_expire TEXT,
71
+ date_option_expire TEXT,
72
+ raw_json TEXT,
73
+ UNIQUE(run_id, ocid, preset_no, cash_item_equipment_part, cash_item_equipment_slot)
74
+ );
75
+
76
+ CREATE TABLE IF NOT EXISTS icon_assets (
77
+ url TEXT PRIMARY KEY,
78
+ sha256 TEXT,
79
+ local_path TEXT,
80
+ content_type TEXT,
81
+ byte_size INTEGER,
82
+ fetched_at TEXT,
83
+ error TEXT
84
+ );
85
+ """
86
+ )
87
+
88
+
89
+ def fetch_run(conn: sqlite3.Connection, run_id: str) -> dict[str, Any] | None:
90
+ cursor = conn.execute(
91
+ "SELECT run_id, target_date, created_at, params_json FROM runs WHERE run_id = ?",
92
+ (run_id,),
93
+ )
94
+ row = cursor.fetchone()
95
+ if not row:
96
+ return None
97
+ return {
98
+ "run_id": row[0],
99
+ "target_date": row[1],
100
+ "created_at": row[2],
101
+ "params_json": row[3],
102
+ }
103
+
104
+
105
+ def insert_run(
106
+ conn: sqlite3.Connection,
107
+ run_id: str,
108
+ target_date: str,
109
+ created_at: str,
110
+ params_json: str,
111
+ ) -> None:
112
+ conn.execute(
113
+ """
114
+ INSERT INTO runs (run_id, target_date, created_at, params_json)
115
+ VALUES (?, ?, ?, ?)
116
+ ON CONFLICT(run_id) DO UPDATE SET
117
+ target_date = excluded.target_date,
118
+ created_at = excluded.created_at,
119
+ params_json = excluded.params_json
120
+ """,
121
+ (run_id, target_date, created_at, params_json),
122
+ )
123
+
124
+
125
+ def upsert_ranking_entries(
126
+ conn: sqlite3.Connection,
127
+ run_id: str,
128
+ entries: Iterable[dict[str, Any]],
129
+ ) -> None:
130
+ rows = [
131
+ (
132
+ run_id,
133
+ entry.get("ranking"),
134
+ entry.get("character_name"),
135
+ entry.get("world_name"),
136
+ entry.get("class_name"),
137
+ entry.get("sub_class_name"),
138
+ entry.get("character_level"),
139
+ entry.get("character_exp"),
140
+ entry.get("character_popularity"),
141
+ entry.get("character_guildname"),
142
+ )
143
+ for entry in entries
144
+ ]
145
+ conn.executemany(
146
+ """
147
+ INSERT INTO ranking_entries (
148
+ run_id,
149
+ ranking,
150
+ character_name,
151
+ world_name,
152
+ class_name,
153
+ sub_class_name,
154
+ character_level,
155
+ character_exp,
156
+ character_popularity,
157
+ character_guildname
158
+ )
159
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
160
+ ON CONFLICT(run_id, ranking) DO UPDATE SET
161
+ character_name = excluded.character_name,
162
+ world_name = excluded.world_name,
163
+ class_name = excluded.class_name,
164
+ sub_class_name = excluded.sub_class_name,
165
+ character_level = excluded.character_level,
166
+ character_exp = excluded.character_exp,
167
+ character_popularity = excluded.character_popularity,
168
+ character_guildname = excluded.character_guildname
169
+ """,
170
+ rows,
171
+ )
172
+
173
+
174
+ def upsert_characters(
175
+ conn: sqlite3.Connection,
176
+ rows: Iterable[dict[str, Any]],
177
+ ) -> None:
178
+ prepared = [
179
+ (
180
+ row.get("ocid"),
181
+ row.get("character_name"),
182
+ row.get("first_seen_at"),
183
+ row.get("last_seen_at"),
184
+ )
185
+ for row in rows
186
+ ]
187
+ conn.executemany(
188
+ """
189
+ INSERT INTO characters (ocid, character_name, first_seen_at, last_seen_at)
190
+ VALUES (?, ?, ?, ?)
191
+ ON CONFLICT(ocid) DO UPDATE SET
192
+ character_name = excluded.character_name,
193
+ last_seen_at = excluded.last_seen_at
194
+ """,
195
+ prepared,
196
+ )
197
+
198
+
199
+ def upsert_equipment_items(
200
+ conn: sqlite3.Connection,
201
+ run_id: str,
202
+ rows: Iterable[dict[str, Any]],
203
+ ) -> None:
204
+ prepared = [
205
+ (
206
+ run_id,
207
+ row.get("ocid"),
208
+ row.get("item_equipment_part"),
209
+ row.get("equipment_slot"),
210
+ row.get("item_name"),
211
+ row.get("item_icon_url"),
212
+ row.get("item_description"),
213
+ row.get("item_shape_name"),
214
+ row.get("item_shape_icon_url"),
215
+ row.get("raw_json"),
216
+ )
217
+ for row in rows
218
+ ]
219
+ conn.executemany(
220
+ """
221
+ INSERT INTO equipment_shape_items (
222
+ run_id,
223
+ ocid,
224
+ item_equipment_part,
225
+ equipment_slot,
226
+ item_name,
227
+ item_icon_url,
228
+ item_description,
229
+ item_shape_name,
230
+ item_shape_icon_url,
231
+ raw_json
232
+ )
233
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
234
+ ON CONFLICT(run_id, ocid, item_equipment_part, equipment_slot) DO UPDATE SET
235
+ item_name = excluded.item_name,
236
+ item_icon_url = excluded.item_icon_url,
237
+ item_description = excluded.item_description,
238
+ item_shape_name = excluded.item_shape_name,
239
+ item_shape_icon_url = excluded.item_shape_icon_url,
240
+ raw_json = excluded.raw_json
241
+ """,
242
+ prepared,
243
+ )
244
+
245
+
246
+ def upsert_cash_items(
247
+ conn: sqlite3.Connection,
248
+ run_id: str,
249
+ rows: Iterable[dict[str, Any]],
250
+ ) -> None:
251
+ prepared = [
252
+ (
253
+ run_id,
254
+ row.get("ocid"),
255
+ row.get("preset_no"),
256
+ row.get("cash_item_equipment_part"),
257
+ row.get("cash_item_equipment_slot"),
258
+ row.get("cash_item_name"),
259
+ row.get("cash_item_icon_url"),
260
+ row.get("cash_item_description"),
261
+ row.get("cash_item_label"),
262
+ row.get("date_expire"),
263
+ row.get("date_option_expire"),
264
+ row.get("raw_json"),
265
+ )
266
+ for row in rows
267
+ ]
268
+ conn.executemany(
269
+ """
270
+ INSERT INTO cash_items (
271
+ run_id,
272
+ ocid,
273
+ preset_no,
274
+ cash_item_equipment_part,
275
+ cash_item_equipment_slot,
276
+ cash_item_name,
277
+ cash_item_icon_url,
278
+ cash_item_description,
279
+ cash_item_label,
280
+ date_expire,
281
+ date_option_expire,
282
+ raw_json
283
+ )
284
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
285
+ ON CONFLICT(run_id, ocid, preset_no, cash_item_equipment_part, cash_item_equipment_slot)
286
+ DO UPDATE SET
287
+ cash_item_name = excluded.cash_item_name,
288
+ cash_item_icon_url = excluded.cash_item_icon_url,
289
+ cash_item_description = excluded.cash_item_description,
290
+ cash_item_label = excluded.cash_item_label,
291
+ date_expire = excluded.date_expire,
292
+ date_option_expire = excluded.date_option_expire,
293
+ raw_json = excluded.raw_json
294
+ """,
295
+ prepared,
296
+ )
297
+
298
+
299
+ def fetch_icon_assets(
300
+ conn: sqlite3.Connection,
301
+ urls: list[str],
302
+ ) -> dict[str, dict[str, Any]]:
303
+ if not urls:
304
+ return {}
305
+ placeholders = ",".join(["?"] * len(urls))
306
+ query = f"SELECT url, sha256, local_path, error FROM icon_assets WHERE url IN ({placeholders})"
307
+ cursor = conn.execute(query, urls)
308
+ result = {}
309
+ for row in cursor.fetchall():
310
+ result[row[0]] = {"sha256": row[1], "local_path": row[2], "error": row[3]}
311
+ return result
312
+
313
+
314
+ def upsert_icon_asset(conn: sqlite3.Connection, record: dict[str, Any]) -> None:
315
+ conn.execute(
316
+ """
317
+ INSERT INTO icon_assets (
318
+ url,
319
+ sha256,
320
+ local_path,
321
+ content_type,
322
+ byte_size,
323
+ fetched_at,
324
+ error
325
+ )
326
+ VALUES (?, ?, ?, ?, ?, ?, ?)
327
+ ON CONFLICT(url) DO UPDATE SET
328
+ sha256 = excluded.sha256,
329
+ local_path = excluded.local_path,
330
+ content_type = excluded.content_type,
331
+ byte_size = excluded.byte_size,
332
+ fetched_at = excluded.fetched_at,
333
+ error = excluded.error
334
+ """,
335
+ (
336
+ record.get("url"),
337
+ record.get("sha256"),
338
+ record.get("local_path"),
339
+ record.get("content_type"),
340
+ record.get("byte_size"),
341
+ record.get("fetched_at"),
342
+ record.get("error"),
343
+ ),
344
+ )
src/pipeline/downloader.py ADDED
@@ -0,0 +1,163 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import asyncio
4
+ import hashlib
5
+ from dataclasses import dataclass
6
+ from pathlib import Path
7
+ from typing import Any, Optional
8
+
9
+ import httpx
10
+
11
+ from .utils import DownloadResult, RateLimiter, guess_extension, random_wait, utc_now_iso
12
+
13
+
14
+ @dataclass
15
+ class DownloadRecord:
16
+ url: str
17
+ sha256: Optional[str]
18
+ local_path: Optional[str]
19
+ content_type: Optional[str]
20
+ byte_size: Optional[int]
21
+ fetched_at: Optional[str]
22
+ error: Optional[str]
23
+
24
+
25
+ async def download_icons(
26
+ urls_by_category: dict[str, str],
27
+ output_root: Path,
28
+ icons_root: Path,
29
+ existing_assets: dict[str, dict[str, Any]],
30
+ rps: float,
31
+ concurrency: int,
32
+ max_attempts: int = 5,
33
+ ) -> tuple[list[DownloadRecord], DownloadResult]:
34
+ limiter = RateLimiter(rps)
35
+ semaphore = asyncio.Semaphore(concurrency)
36
+ results: list[DownloadRecord] = []
37
+ counters = DownloadResult()
38
+
39
+ async with httpx.AsyncClient() as client:
40
+ tasks = []
41
+ for url, category in urls_by_category.items():
42
+ if not url:
43
+ continue
44
+ existing = existing_assets.get(url)
45
+ if existing and existing.get("sha256"):
46
+ counters.skipped += 1
47
+ continue
48
+ tasks.append(
49
+ asyncio.create_task(
50
+ _download_one(
51
+ client,
52
+ limiter,
53
+ semaphore,
54
+ url,
55
+ category,
56
+ output_root,
57
+ icons_root,
58
+ max_attempts,
59
+ )
60
+ )
61
+ )
62
+ if tasks:
63
+ completed = await asyncio.gather(*tasks, return_exceptions=True)
64
+ for item in completed:
65
+ if isinstance(item, Exception):
66
+ counters.failed += 1
67
+ results.append(
68
+ DownloadRecord(
69
+ url="unknown",
70
+ sha256=None,
71
+ local_path=None,
72
+ content_type=None,
73
+ byte_size=None,
74
+ fetched_at=utc_now_iso(),
75
+ error=str(item),
76
+ )
77
+ )
78
+ continue
79
+ record, status = item
80
+ results.append(record)
81
+ if status == "downloaded":
82
+ counters.downloaded += 1
83
+ elif status == "failed":
84
+ counters.failed += 1
85
+ return results, counters
86
+
87
+
88
+ async def _download_one(
89
+ client: httpx.AsyncClient,
90
+ limiter: RateLimiter,
91
+ semaphore: asyncio.Semaphore,
92
+ url: str,
93
+ category: str,
94
+ output_root: Path,
95
+ icons_root: Path,
96
+ max_attempts: int,
97
+ ) -> tuple[DownloadRecord, str]:
98
+ attempt = 0
99
+ while True:
100
+ attempt += 1
101
+ try:
102
+ await limiter.acquire()
103
+ async with semaphore:
104
+ response = await client.get(url, timeout=30)
105
+ if 200 <= response.status_code < 300:
106
+ content = response.content
107
+ sha256 = hashlib.sha256(content).hexdigest()
108
+ extension = guess_extension(response.headers.get("content-type"), url)
109
+ target_dir = icons_root / category
110
+ target_dir.mkdir(parents=True, exist_ok=True)
111
+ filename = f"{sha256}{extension}"
112
+ path = target_dir / filename
113
+ if not path.exists():
114
+ path.write_bytes(content)
115
+ local_path = str(path.relative_to(output_root))
116
+ return (
117
+ DownloadRecord(
118
+ url=url,
119
+ sha256=sha256,
120
+ local_path=local_path,
121
+ content_type=response.headers.get("content-type"),
122
+ byte_size=len(content),
123
+ fetched_at=utc_now_iso(),
124
+ error=None,
125
+ ),
126
+ "downloaded",
127
+ )
128
+ if response.status_code == 429 or response.status_code >= 500:
129
+ raise RuntimeError(f"HTTP {response.status_code}")
130
+ return (
131
+ DownloadRecord(
132
+ url=url,
133
+ sha256=None,
134
+ local_path=None,
135
+ content_type=response.headers.get("content-type"),
136
+ byte_size=None,
137
+ fetched_at=utc_now_iso(),
138
+ error=f"HTTP {response.status_code}",
139
+ ),
140
+ "failed",
141
+ )
142
+ except (httpx.TimeoutException, httpx.TransportError, RuntimeError) as exc:
143
+ if attempt >= max_attempts:
144
+ return (
145
+ DownloadRecord(
146
+ url=url,
147
+ sha256=None,
148
+ local_path=None,
149
+ content_type=None,
150
+ byte_size=None,
151
+ fetched_at=utc_now_iso(),
152
+ error=str(exc),
153
+ ),
154
+ "failed",
155
+ )
156
+ await asyncio.sleep(_download_backoff(attempt))
157
+
158
+
159
+ def _download_backoff(attempt: int) -> float:
160
+ base = 1.0
161
+ max_wait = 20.0
162
+ wait = min(max_wait, base * (2 ** (attempt - 1)))
163
+ return wait + random_wait(0, 0.5)
src/pipeline/parsers.py ADDED
@@ -0,0 +1,102 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ from typing import Any
4
+
5
+ from .utils import json_dumps, to_int
6
+
7
+
8
+ def extract_equipment_items(data: dict[str, Any], ocid: str) -> list[dict[str, Any]]:
9
+ items = []
10
+ for item in data.get("item_equipment", []) or []:
11
+ items.append(
12
+ {
13
+ "ocid": ocid,
14
+ "item_equipment_part": _coalesce(item.get("item_equipment_part")),
15
+ "equipment_slot": _coalesce(item.get("equipment_slot")),
16
+ "item_name": item.get("item_name"),
17
+ "item_icon_url": item.get("item_icon"),
18
+ "item_description": item.get("item_description"),
19
+ "item_shape_name": item.get("item_shape_name"),
20
+ "item_shape_icon_url": item.get("item_shape_icon"),
21
+ "raw_json": json_dumps(item),
22
+ }
23
+ )
24
+ return items
25
+
26
+
27
+ def extract_cash_items(
28
+ data: dict[str, Any],
29
+ ocid: str,
30
+ all_presets: bool,
31
+ ) -> list[dict[str, Any]]:
32
+ presets = _extract_presets(data)
33
+ if not presets:
34
+ return []
35
+
36
+ if all_presets:
37
+ selected = presets
38
+ else:
39
+ selected = _select_current_or_default_presets(data, presets)
40
+
41
+ items: list[dict[str, Any]] = []
42
+ for preset_no, preset_items in selected:
43
+ for item in preset_items:
44
+ items.append(
45
+ {
46
+ "ocid": ocid,
47
+ "preset_no": preset_no,
48
+ "cash_item_equipment_part": _coalesce(item.get("cash_item_equipment_part")),
49
+ "cash_item_equipment_slot": _coalesce(item.get("cash_item_equipment_slot")),
50
+ "cash_item_name": item.get("cash_item_name"),
51
+ "cash_item_icon_url": item.get("cash_item_icon"),
52
+ "cash_item_description": item.get("cash_item_description"),
53
+ "cash_item_label": item.get("cash_item_label"),
54
+ "date_expire": item.get("date_expire"),
55
+ "date_option_expire": item.get("date_option_expire"),
56
+ "raw_json": json_dumps(item),
57
+ }
58
+ )
59
+ return items
60
+
61
+
62
+ def _extract_presets(data: dict[str, Any]) -> list[tuple[int, list[dict[str, Any]]]]:
63
+ presets: list[tuple[int, list[dict[str, Any]]]] = []
64
+ for key, value in data.items():
65
+ if not key.startswith("cash_item_equipment_preset_"):
66
+ continue
67
+ if not isinstance(value, list):
68
+ continue
69
+ preset_no = _parse_preset_no(key)
70
+ if preset_no is None:
71
+ continue
72
+ presets.append((preset_no, value))
73
+ presets.sort(key=lambda item: item[0])
74
+ return presets
75
+
76
+
77
+ def _parse_preset_no(key: str) -> int | None:
78
+ try:
79
+ return int(key.rsplit("_", 1)[-1])
80
+ except (IndexError, ValueError):
81
+ return None
82
+
83
+
84
+ def _coalesce(value: Any) -> str:
85
+ return value if value is not None else ""
86
+
87
+
88
+ def _select_current_or_default_presets(
89
+ data: dict[str, Any],
90
+ presets: list[tuple[int, list[dict[str, Any]]]],
91
+ ) -> list[tuple[int, list[dict[str, Any]]]]:
92
+ current = to_int(data.get("preset_no"))
93
+ if current is not None:
94
+ for preset_no, items in presets:
95
+ if preset_no == current:
96
+ return [(preset_no, items)]
97
+
98
+ for preset_no, items in presets:
99
+ if preset_no == 1:
100
+ return [(preset_no, items)]
101
+
102
+ return presets
src/pipeline/pipeline.py ADDED
@@ -0,0 +1,378 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import asyncio
4
+ import json
5
+ import logging
6
+ import time
7
+ from pathlib import Path
8
+ from typing import Any, Iterable
9
+
10
+ from . import db
11
+ from .api import ApiClient
12
+ from .downloader import download_icons
13
+ from .parsers import extract_cash_items, extract_equipment_items
14
+ from .utils import (
15
+ PipelineReport,
16
+ compute_run_id,
17
+ ensure_dir,
18
+ get_env_or_none,
19
+ kst_yesterday_date,
20
+ safe_filename,
21
+ to_int,
22
+ utc_now_iso,
23
+ write_json,
24
+ )
25
+
26
+ logger = logging.getLogger("pipeline")
27
+
28
+
29
+ async def run_pipeline(
30
+ *,
31
+ api_key: str,
32
+ target_date: str | None,
33
+ start_rank: int,
34
+ end_rank: int,
35
+ download_icon_assets: bool,
36
+ output_dir: Path,
37
+ db_path: Path | None,
38
+ concurrency: int,
39
+ rps: float,
40
+ world_name: str | None,
41
+ world_type: int | None,
42
+ class_name: str | None,
43
+ all_presets: bool,
44
+ run_id_override: str | None,
45
+ ) -> PipelineReport:
46
+ start = time.monotonic()
47
+ resolved_date = target_date or kst_yesterday_date()
48
+
49
+ output_root = output_dir / resolved_date
50
+ raw_root = output_root / "raw"
51
+ raw_ocid = raw_root / "ocid"
52
+ raw_item = raw_root / "item_equipment"
53
+ raw_cash = raw_root / "cashitem_equipment"
54
+ icons_root = output_root / "icons"
55
+
56
+ ensure_dir(raw_ocid)
57
+ ensure_dir(raw_item)
58
+ ensure_dir(raw_cash)
59
+ ensure_dir(icons_root / "equipment_shape")
60
+ ensure_dir(icons_root / "cash")
61
+
62
+ resolved_db_path = db_path or Path(get_env_or_none("DB_PATH") or output_root / "db.sqlite")
63
+ conn = db.connect(resolved_db_path)
64
+ db.init_db(conn)
65
+
66
+ run_params = {
67
+ "start_rank": start_rank,
68
+ "end_rank": end_rank,
69
+ "world_name": world_name,
70
+ "world_type": world_type,
71
+ "class_name": class_name,
72
+ "all_presets": all_presets,
73
+ }
74
+ run_id = run_id_override or compute_run_id(resolved_date, run_params)
75
+ existing_run = db.fetch_run(conn, run_id)
76
+ if existing_run:
77
+ if existing_run["target_date"] != resolved_date:
78
+ raise ValueError(
79
+ f"run_id {run_id} target_date mismatch ({existing_run['target_date']} != {resolved_date})"
80
+ )
81
+ else:
82
+ db.insert_run(conn, run_id, resolved_date, utc_now_iso(), json.dumps(run_params, ensure_ascii=False))
83
+ conn.commit()
84
+
85
+ equipment_items: list[dict[str, Any]] = []
86
+ cash_items: list[dict[str, Any]] = []
87
+ ocid_results: list[dict[str, Any]] = []
88
+
89
+ async with ApiClient(api_key=api_key, concurrency=concurrency, rps=rps) as api:
90
+ ranking_entries, ranking_raw = await _fetch_ranking_entries(
91
+ api=api,
92
+ target_date=resolved_date,
93
+ start_rank=start_rank,
94
+ end_rank=end_rank,
95
+ world_name=world_name,
96
+ world_type=world_type,
97
+ class_name=class_name,
98
+ )
99
+ write_json(raw_root / "ranking_overall.json", ranking_raw)
100
+ db.upsert_ranking_entries(conn, run_id, ranking_entries)
101
+ conn.commit()
102
+
103
+ ocid_results = await _fetch_ocids(api, ranking_entries, raw_ocid)
104
+ now_iso = utc_now_iso()
105
+ character_rows = [
106
+ {
107
+ "ocid": row["ocid"],
108
+ "character_name": row["character_name"],
109
+ "first_seen_at": now_iso,
110
+ "last_seen_at": now_iso,
111
+ }
112
+ for row in ocid_results
113
+ if row.get("ocid")
114
+ ]
115
+ if character_rows:
116
+ db.upsert_characters(conn, character_rows)
117
+ conn.commit()
118
+
119
+ ocids = [row["ocid"] for row in ocid_results if row.get("ocid")]
120
+ equipment_items = await _fetch_equipment(api, ocids, resolved_date, raw_item)
121
+ if equipment_items:
122
+ db.upsert_equipment_items(conn, run_id, equipment_items)
123
+ conn.commit()
124
+
125
+ cash_items = await _fetch_cash_items(api, ocids, resolved_date, raw_cash, all_presets)
126
+ if cash_items:
127
+ db.upsert_cash_items(conn, run_id, cash_items)
128
+ conn.commit()
129
+
130
+ metrics = api.metrics
131
+
132
+ icon_results = None
133
+ download_counts = None
134
+ if download_icon_assets:
135
+ urls_by_category = _collect_icon_urls(equipment_items, cash_items)
136
+ existing = db.fetch_icon_assets(conn, list(urls_by_category.keys()))
137
+ icon_results, download_counts = await download_icons(
138
+ urls_by_category=urls_by_category,
139
+ output_root=output_root,
140
+ icons_root=icons_root,
141
+ existing_assets=existing,
142
+ rps=rps,
143
+ concurrency=concurrency,
144
+ )
145
+ for record in icon_results:
146
+ db.upsert_icon_asset(conn, record.__dict__)
147
+ conn.commit()
148
+
149
+ elapsed = time.monotonic() - start
150
+ report = PipelineReport(
151
+ run_id=run_id,
152
+ target_date=resolved_date,
153
+ start_rank=start_rank,
154
+ end_rank=end_rank,
155
+ ranking_count=len(ranking_entries),
156
+ ocid_count=len(ocid_results),
157
+ equipment_items_count=len(equipment_items),
158
+ cash_items_count=len(cash_items),
159
+ icons_downloaded=download_counts.downloaded if download_counts else 0,
160
+ icons_skipped=download_counts.skipped if download_counts else 0,
161
+ icons_failed=download_counts.failed if download_counts else 0,
162
+ rate_limit_hits=metrics.rate_limit_hits,
163
+ server_errors=metrics.server_errors,
164
+ data_preparing_hits=metrics.data_preparing_hits,
165
+ elapsed_seconds=elapsed,
166
+ )
167
+
168
+ report_path = output_root / "README_run.md"
169
+ report_path.write_text(report.to_markdown(), encoding="utf-8")
170
+ return report
171
+
172
+
173
+ async def _fetch_ranking_entries(
174
+ *,
175
+ api: ApiClient,
176
+ target_date: str,
177
+ start_rank: int,
178
+ end_rank: int,
179
+ world_name: str | None,
180
+ world_type: int | None,
181
+ class_name: str | None,
182
+ ) -> tuple[list[dict[str, Any]], dict[str, Any]]:
183
+ page = 1
184
+ collected: dict[int, dict[str, Any]] = {}
185
+ pages: list[dict[str, Any]] = []
186
+ max_pages = 50
187
+ target_count = end_rank - start_rank + 1
188
+
189
+ while len(collected) < target_count and page <= max_pages:
190
+ params = {"date": target_date, "page": page}
191
+ if world_name:
192
+ params["world_name"] = world_name
193
+ if world_type is not None:
194
+ params["world_type"] = world_type
195
+ if class_name:
196
+ params["character_class"] = class_name
197
+
198
+ data = await api.get("/maplestory/v1/ranking/overall", params=params)
199
+ pages.append({"page": page, "data": data})
200
+ ranking_list = data.get("ranking") or []
201
+ if not ranking_list:
202
+ break
203
+
204
+ new_count = 0
205
+ max_rank_in_page = None
206
+ for entry in ranking_list:
207
+ ranking = to_int(entry.get("ranking"))
208
+ if ranking is None:
209
+ continue
210
+ max_rank_in_page = ranking if max_rank_in_page is None else max(max_rank_in_page, ranking)
211
+ if ranking < start_rank or ranking > end_rank:
212
+ continue
213
+ if ranking not in collected:
214
+ collected[ranking] = {
215
+ "ranking": ranking,
216
+ "character_name": entry.get("character_name"),
217
+ "world_name": entry.get("world_name"),
218
+ "class_name": entry.get("class_name"),
219
+ "sub_class_name": entry.get("sub_class_name"),
220
+ "character_level": to_int(entry.get("character_level")),
221
+ "character_exp": to_int(entry.get("character_exp")),
222
+ "character_popularity": to_int(entry.get("character_popularity")),
223
+ "character_guildname": entry.get("character_guildname"),
224
+ }
225
+ new_count += 1
226
+ if len(collected) >= target_count:
227
+ break
228
+ if max_rank_in_page is not None and max_rank_in_page < start_rank:
229
+ page += 1
230
+ continue
231
+ if new_count == 0:
232
+ logger.warning("No new ranking entries found on page %s", page)
233
+ break
234
+ page += 1
235
+
236
+ entries = [collected[key] for key in sorted(collected.keys())]
237
+ raw = {
238
+ "target_date": target_date,
239
+ "start_rank": start_rank,
240
+ "end_rank": end_rank,
241
+ "pages": pages,
242
+ "collected_count": len(entries),
243
+ }
244
+ if len(entries) < target_count:
245
+ logger.warning("Ranking entries collected %s < requested %s", len(entries), target_count)
246
+ return entries, raw
247
+
248
+
249
+ async def _fetch_ocids(
250
+ api: ApiClient,
251
+ ranking_entries: Iterable[dict[str, Any]],
252
+ raw_dir: Path,
253
+ ) -> list[dict[str, Any]]:
254
+ tasks = []
255
+ for entry in ranking_entries:
256
+ character_name = entry.get("character_name")
257
+ rank = entry.get("ranking")
258
+ if not character_name:
259
+ continue
260
+ tasks.append(
261
+ asyncio.create_task(_fetch_single_ocid(api, character_name, rank, raw_dir))
262
+ )
263
+
264
+ results: list[dict[str, Any]] = []
265
+ if tasks:
266
+ completed = await asyncio.gather(*tasks, return_exceptions=True)
267
+ for item in completed:
268
+ if isinstance(item, Exception):
269
+ logger.warning("OCID fetch failed: %s", item)
270
+ continue
271
+ if item:
272
+ results.append(item)
273
+ return results
274
+
275
+
276
+ async def _fetch_single_ocid(
277
+ api: ApiClient,
278
+ character_name: str,
279
+ rank: int | None,
280
+ raw_dir: Path,
281
+ ) -> dict[str, Any] | None:
282
+ data = await api.get("/maplestory/v1/id", params={"character_name": character_name})
283
+ ocid = data.get("ocid")
284
+ filename = f"{rank:03d}" if rank is not None else "unknown"
285
+ filename = f"{filename}_{safe_filename(character_name)}.json"
286
+ write_json(raw_dir / filename, data)
287
+ if not ocid:
288
+ logger.warning("No OCID for %s", character_name)
289
+ return None
290
+ return {"ocid": ocid, "character_name": character_name}
291
+
292
+
293
+ async def _fetch_equipment(
294
+ api: ApiClient,
295
+ ocids: list[str],
296
+ target_date: str,
297
+ raw_dir: Path,
298
+ ) -> list[dict[str, Any]]:
299
+ tasks = [
300
+ asyncio.create_task(_fetch_single_equipment(api, ocid, target_date, raw_dir))
301
+ for ocid in ocids
302
+ ]
303
+ results: list[dict[str, Any]] = []
304
+ if tasks:
305
+ completed = await asyncio.gather(*tasks, return_exceptions=True)
306
+ for item in completed:
307
+ if isinstance(item, Exception):
308
+ logger.warning("Equipment fetch failed: %s", item)
309
+ continue
310
+ results.extend(item)
311
+ return results
312
+
313
+
314
+ async def _fetch_single_equipment(
315
+ api: ApiClient,
316
+ ocid: str,
317
+ target_date: str,
318
+ raw_dir: Path,
319
+ ) -> list[dict[str, Any]]:
320
+ data = await api.get(
321
+ "/maplestory/v1/character/item-equipment",
322
+ params={"ocid": ocid, "date": target_date},
323
+ )
324
+ write_json(raw_dir / f"{ocid}.json", data)
325
+ return extract_equipment_items(data, ocid)
326
+
327
+
328
+ async def _fetch_cash_items(
329
+ api: ApiClient,
330
+ ocids: list[str],
331
+ target_date: str,
332
+ raw_dir: Path,
333
+ all_presets: bool,
334
+ ) -> list[dict[str, Any]]:
335
+ tasks = [
336
+ asyncio.create_task(_fetch_single_cash_item(api, ocid, target_date, raw_dir, all_presets))
337
+ for ocid in ocids
338
+ ]
339
+ results: list[dict[str, Any]] = []
340
+ if tasks:
341
+ completed = await asyncio.gather(*tasks, return_exceptions=True)
342
+ for item in completed:
343
+ if isinstance(item, Exception):
344
+ logger.warning("Cash item fetch failed: %s", item)
345
+ continue
346
+ results.extend(item)
347
+ return results
348
+
349
+
350
+ async def _fetch_single_cash_item(
351
+ api: ApiClient,
352
+ ocid: str,
353
+ target_date: str,
354
+ raw_dir: Path,
355
+ all_presets: bool,
356
+ ) -> list[dict[str, Any]]:
357
+ data = await api.get(
358
+ "/maplestory/v1/character/cashitem-equipment",
359
+ params={"ocid": ocid, "date": target_date},
360
+ )
361
+ write_json(raw_dir / f"{ocid}.json", data)
362
+ return extract_cash_items(data, ocid, all_presets=all_presets)
363
+
364
+
365
+ def _collect_icon_urls(
366
+ equipment_items: Iterable[dict[str, Any]],
367
+ cash_items: Iterable[dict[str, Any]],
368
+ ) -> dict[str, str]:
369
+ urls: dict[str, str] = {}
370
+ for item in equipment_items:
371
+ url = item.get("item_shape_icon_url")
372
+ if url and url not in urls:
373
+ urls[url] = "equipment_shape"
374
+ for item in cash_items:
375
+ url = item.get("cash_item_icon_url")
376
+ if url and url not in urls:
377
+ urls[url] = "cash"
378
+ return urls
src/pipeline/utils.py ADDED
@@ -0,0 +1,177 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import hashlib
4
+ import json
5
+ import mimetypes
6
+ import os
7
+ import random
8
+ import time
9
+ from dataclasses import dataclass
10
+ from datetime import datetime, timedelta, timezone
11
+ from pathlib import Path
12
+ from typing import Any, Optional
13
+ from urllib.parse import quote
14
+ from zoneinfo import ZoneInfo
15
+
16
+
17
+ def load_dotenv_if_available() -> bool:
18
+ try:
19
+ from dotenv import load_dotenv
20
+ except ImportError:
21
+ return False
22
+
23
+ env_path = Path(".env")
24
+ if env_path.exists():
25
+ load_dotenv(dotenv_path=env_path)
26
+ return True
27
+ return False
28
+
29
+
30
+ def kst_yesterday_date() -> str:
31
+ tz = ZoneInfo("Asia/Seoul")
32
+ now = datetime.now(tz)
33
+ yesterday = (now - timedelta(days=1)).date()
34
+ return yesterday.isoformat()
35
+
36
+
37
+ def utc_now_iso() -> str:
38
+ return datetime.now(timezone.utc).isoformat()
39
+
40
+
41
+ def safe_filename(value: str) -> str:
42
+ if not value:
43
+ return "unknown"
44
+ return quote(value, safe="-_.")
45
+
46
+
47
+ def json_dumps(value: Any) -> str:
48
+ return json.dumps(value, ensure_ascii=False, separators=(",", ":"))
49
+
50
+
51
+ def ensure_dir(path: Path) -> None:
52
+ path.mkdir(parents=True, exist_ok=True)
53
+
54
+
55
+ def write_json(path: Path, data: Any) -> None:
56
+ ensure_dir(path.parent)
57
+ path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
58
+
59
+
60
+ def compute_run_id(target_date: str, params: dict[str, Any]) -> str:
61
+ payload = json.dumps(params, sort_keys=True, ensure_ascii=True)
62
+ digest = hashlib.sha256(payload.encode("utf-8")).hexdigest()[:12]
63
+ return f"{target_date}-{digest}"
64
+
65
+
66
+ def to_int(value: Any) -> Optional[int]:
67
+ if value is None:
68
+ return None
69
+ try:
70
+ return int(value)
71
+ except (TypeError, ValueError):
72
+ return None
73
+
74
+
75
+ def guess_extension(content_type: Optional[str], url: str) -> str:
76
+ if content_type:
77
+ ext = mimetypes.guess_extension(content_type.split(";")[0].strip())
78
+ if ext:
79
+ return ext
80
+ suffix = Path(url).suffix
81
+ if suffix:
82
+ return suffix
83
+ return ".bin"
84
+
85
+
86
+ def random_wait(min_seconds: float, max_seconds: float) -> float:
87
+ return random.uniform(min_seconds, max_seconds)
88
+
89
+
90
+ @dataclass
91
+ class RateLimiter:
92
+ rps: float
93
+
94
+ def __post_init__(self) -> None:
95
+ self._lock = None
96
+ self._next_time: Optional[float] = None
97
+
98
+ async def acquire(self) -> None:
99
+ if self.rps <= 0:
100
+ return
101
+ if self._lock is None:
102
+ import asyncio
103
+
104
+ self._lock = asyncio.Lock()
105
+ async with self._lock:
106
+ now = time.monotonic()
107
+ min_interval = 1 / self.rps
108
+ if self._next_time is None:
109
+ self._next_time = now
110
+ if now < self._next_time:
111
+ sleep_for = self._next_time - now
112
+ if sleep_for > 0:
113
+ import asyncio
114
+
115
+ await asyncio.sleep(sleep_for)
116
+ now = time.monotonic()
117
+ self._next_time = max(now, self._next_time) + min_interval
118
+
119
+
120
+ @dataclass
121
+ class DownloadResult:
122
+ downloaded: int = 0
123
+ skipped: int = 0
124
+ failed: int = 0
125
+
126
+
127
+ @dataclass
128
+ class ApiMetrics:
129
+ total_requests: int = 0
130
+ rate_limit_hits: int = 0
131
+ server_errors: int = 0
132
+ data_preparing_hits: int = 0
133
+ other_errors: int = 0
134
+
135
+
136
+ @dataclass
137
+ class PipelineReport:
138
+ run_id: str
139
+ target_date: str
140
+ start_rank: int
141
+ end_rank: int
142
+ ranking_count: int
143
+ ocid_count: int
144
+ equipment_items_count: int
145
+ cash_items_count: int
146
+ icons_downloaded: int
147
+ icons_skipped: int
148
+ icons_failed: int
149
+ rate_limit_hits: int
150
+ server_errors: int
151
+ data_preparing_hits: int
152
+ elapsed_seconds: float
153
+
154
+ def to_markdown(self) -> str:
155
+ return "\n".join(
156
+ [
157
+ f"Run ID: {self.run_id}",
158
+ f"Target date (KST): {self.target_date}",
159
+ f"Rank range: {self.start_rank}-{self.end_rank}",
160
+ f"Ranking entries: {self.ranking_count}",
161
+ f"OCIDs resolved: {self.ocid_count}",
162
+ f"Equipment shape items: {self.equipment_items_count}",
163
+ f"Cash items: {self.cash_items_count}",
164
+ f"Icons downloaded: {self.icons_downloaded}",
165
+ f"Icons skipped: {self.icons_skipped}",
166
+ f"Icons failed: {self.icons_failed}",
167
+ f"429 retries: {self.rate_limit_hits}",
168
+ f"5xx retries: {self.server_errors}",
169
+ f"Data preparing retries: {self.data_preparing_hits}",
170
+ f"Elapsed seconds: {self.elapsed_seconds:.2f}",
171
+ ]
172
+ )
173
+
174
+
175
+ def get_env_or_none(key: str) -> Optional[str]:
176
+ value = os.getenv(key)
177
+ return value if value else None
tests/test_db_idempotent.py ADDED
@@ -0,0 +1,54 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from pathlib import Path
2
+
3
+ from pipeline import db
4
+
5
+
6
+ def test_db_idempotent_inserts(tmp_path: Path) -> None:
7
+ db_path = tmp_path / "test.sqlite"
8
+ conn = db.connect(db_path)
9
+ db.init_db(conn)
10
+
11
+ run_id = "2025-01-01-acde"
12
+ db.insert_run(conn, run_id, "2025-01-01", "2025-01-02T00:00:00Z", "{}")
13
+
14
+ equipment = [
15
+ {
16
+ "ocid": "ocid-1",
17
+ "item_equipment_part": "head",
18
+ "equipment_slot": "slot",
19
+ "item_name": "Hat",
20
+ "item_icon_url": "http://example.com/hat.png",
21
+ "item_description": "desc",
22
+ "item_shape_name": "Shape Hat",
23
+ "item_shape_icon_url": "http://example.com/shape.png",
24
+ "raw_json": "{}",
25
+ }
26
+ ]
27
+
28
+ cash = [
29
+ {
30
+ "ocid": "ocid-1",
31
+ "preset_no": 1,
32
+ "cash_item_equipment_part": "hat",
33
+ "cash_item_equipment_slot": "slot",
34
+ "cash_item_name": "Cash Hat",
35
+ "cash_item_icon_url": "http://example.com/cash.png",
36
+ "cash_item_description": "desc",
37
+ "cash_item_label": "label",
38
+ "date_expire": None,
39
+ "date_option_expire": None,
40
+ "raw_json": "{}",
41
+ }
42
+ ]
43
+
44
+ db.upsert_equipment_items(conn, run_id, equipment)
45
+ db.upsert_equipment_items(conn, run_id, equipment)
46
+ db.upsert_cash_items(conn, run_id, cash)
47
+ db.upsert_cash_items(conn, run_id, cash)
48
+ conn.commit()
49
+
50
+ eq_count = conn.execute("SELECT COUNT(*) FROM equipment_shape_items").fetchone()[0]
51
+ cash_count = conn.execute("SELECT COUNT(*) FROM cash_items").fetchone()[0]
52
+
53
+ assert eq_count == 1
54
+ assert cash_count == 1
tests/test_parsers.py ADDED
@@ -0,0 +1,96 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from pipeline.parsers import extract_cash_items, extract_equipment_items
2
+
3
+
4
+ def test_extract_equipment_items_maps_shape_icon():
5
+ data = {
6
+ "item_equipment": [
7
+ {
8
+ "item_equipment_part": "head",
9
+ "equipment_slot": "slot1",
10
+ "item_name": "Test Hat",
11
+ "item_icon": "http://example.com/icon.png",
12
+ "item_description": "desc",
13
+ "item_shape_name": "Shape Hat",
14
+ "item_shape_icon": "http://example.com/shape.png",
15
+ }
16
+ ]
17
+ }
18
+ items = extract_equipment_items(data, "ocid-1")
19
+ assert len(items) == 1
20
+ assert items[0]["item_shape_icon_url"] == "http://example.com/shape.png"
21
+ assert items[0]["item_icon_url"] == "http://example.com/icon.png"
22
+ assert items[0]["ocid"] == "ocid-1"
23
+
24
+
25
+ def test_extract_cash_items_selects_current_preset():
26
+ data = {
27
+ "preset_no": 2,
28
+ "cash_item_equipment_preset_1": [
29
+ {
30
+ "cash_item_equipment_part": "hat",
31
+ "cash_item_equipment_slot": "slot1",
32
+ "cash_item_name": "Hat 1",
33
+ "cash_item_icon": "http://example.com/hat1.png",
34
+ }
35
+ ],
36
+ "cash_item_equipment_preset_2": [
37
+ {
38
+ "cash_item_equipment_part": "hat",
39
+ "cash_item_equipment_slot": "slot2",
40
+ "cash_item_name": "Hat 2",
41
+ "cash_item_icon": "http://example.com/hat2.png",
42
+ }
43
+ ],
44
+ }
45
+ items = extract_cash_items(data, "ocid-1", all_presets=False)
46
+ assert len(items) == 1
47
+ assert items[0]["preset_no"] == 2
48
+ assert items[0]["cash_item_icon_url"] == "http://example.com/hat2.png"
49
+
50
+
51
+ def test_extract_cash_items_defaults_to_preset1():
52
+ data = {
53
+ "cash_item_equipment_preset_1": [
54
+ {
55
+ "cash_item_equipment_part": "hat",
56
+ "cash_item_equipment_slot": "slot1",
57
+ "cash_item_name": "Hat 1",
58
+ "cash_item_icon": "http://example.com/hat1.png",
59
+ }
60
+ ],
61
+ "cash_item_equipment_preset_2": [
62
+ {
63
+ "cash_item_equipment_part": "hat",
64
+ "cash_item_equipment_slot": "slot2",
65
+ "cash_item_name": "Hat 2",
66
+ "cash_item_icon": "http://example.com/hat2.png",
67
+ }
68
+ ],
69
+ }
70
+ items = extract_cash_items(data, "ocid-1", all_presets=False)
71
+ assert len(items) == 1
72
+ assert items[0]["preset_no"] == 1
73
+
74
+
75
+ def test_extract_cash_items_all_presets():
76
+ data = {
77
+ "cash_item_equipment_preset_1": [
78
+ {
79
+ "cash_item_equipment_part": "hat",
80
+ "cash_item_equipment_slot": "slot1",
81
+ "cash_item_name": "Hat 1",
82
+ "cash_item_icon": "http://example.com/hat1.png",
83
+ }
84
+ ],
85
+ "cash_item_equipment_preset_2": [
86
+ {
87
+ "cash_item_equipment_part": "hat",
88
+ "cash_item_equipment_slot": "slot2",
89
+ "cash_item_name": "Hat 2",
90
+ "cash_item_icon": "http://example.com/hat2.png",
91
+ }
92
+ ],
93
+ }
94
+ items = extract_cash_items(data, "ocid-1", all_presets=True)
95
+ assert len(items) == 2
96
+ assert {item["preset_no"] for item in items} == {1, 2}