File size: 4,686 Bytes
da3fe02
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
from __future__ import annotations

import hashlib
import json
import mimetypes
import os
import random
import time
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Optional
from urllib.parse import quote
from zoneinfo import ZoneInfo


def load_dotenv_if_available() -> bool:
    try:
        from dotenv import load_dotenv
    except ImportError:
        return False

    env_path = Path(".env")
    if env_path.exists():
        load_dotenv(dotenv_path=env_path)
        return True
    return False


def kst_yesterday_date() -> str:
    tz = ZoneInfo("Asia/Seoul")
    now = datetime.now(tz)
    yesterday = (now - timedelta(days=1)).date()
    return yesterday.isoformat()


def utc_now_iso() -> str:
    return datetime.now(timezone.utc).isoformat()


def safe_filename(value: str) -> str:
    if not value:
        return "unknown"
    return quote(value, safe="-_.")


def json_dumps(value: Any) -> str:
    return json.dumps(value, ensure_ascii=False, separators=(",", ":"))


def ensure_dir(path: Path) -> None:
    path.mkdir(parents=True, exist_ok=True)


def write_json(path: Path, data: Any) -> None:
    ensure_dir(path.parent)
    path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")


def compute_run_id(target_date: str, params: dict[str, Any]) -> str:
    payload = json.dumps(params, sort_keys=True, ensure_ascii=True)
    digest = hashlib.sha256(payload.encode("utf-8")).hexdigest()[:12]
    return f"{target_date}-{digest}"


def to_int(value: Any) -> Optional[int]:
    if value is None:
        return None
    try:
        return int(value)
    except (TypeError, ValueError):
        return None


def guess_extension(content_type: Optional[str], url: str) -> str:
    if content_type:
        ext = mimetypes.guess_extension(content_type.split(";")[0].strip())
        if ext:
            return ext
    suffix = Path(url).suffix
    if suffix:
        return suffix
    return ".bin"


def random_wait(min_seconds: float, max_seconds: float) -> float:
    return random.uniform(min_seconds, max_seconds)


@dataclass
class RateLimiter:
    rps: float

    def __post_init__(self) -> None:
        self._lock = None
        self._next_time: Optional[float] = None

    async def acquire(self) -> None:
        if self.rps <= 0:
            return
        if self._lock is None:
            import asyncio

            self._lock = asyncio.Lock()
        async with self._lock:
            now = time.monotonic()
            min_interval = 1 / self.rps
            if self._next_time is None:
                self._next_time = now
            if now < self._next_time:
                sleep_for = self._next_time - now
                if sleep_for > 0:
                    import asyncio

                    await asyncio.sleep(sleep_for)
                now = time.monotonic()
            self._next_time = max(now, self._next_time) + min_interval


@dataclass
class DownloadResult:
    downloaded: int = 0
    skipped: int = 0
    failed: int = 0


@dataclass
class ApiMetrics:
    total_requests: int = 0
    rate_limit_hits: int = 0
    server_errors: int = 0
    data_preparing_hits: int = 0
    other_errors: int = 0


@dataclass
class PipelineReport:
    run_id: str
    target_date: str
    start_rank: int
    end_rank: int
    ranking_count: int
    ocid_count: int
    equipment_items_count: int
    cash_items_count: int
    icons_downloaded: int
    icons_skipped: int
    icons_failed: int
    rate_limit_hits: int
    server_errors: int
    data_preparing_hits: int
    elapsed_seconds: float

    def to_markdown(self) -> str:
        return "\n".join(
            [
                f"Run ID: {self.run_id}",
                f"Target date (KST): {self.target_date}",
                f"Rank range: {self.start_rank}-{self.end_rank}",
                f"Ranking entries: {self.ranking_count}",
                f"OCIDs resolved: {self.ocid_count}",
                f"Equipment shape items: {self.equipment_items_count}",
                f"Cash items: {self.cash_items_count}",
                f"Icons downloaded: {self.icons_downloaded}",
                f"Icons skipped: {self.icons_skipped}",
                f"Icons failed: {self.icons_failed}",
                f"429 retries: {self.rate_limit_hits}",
                f"5xx retries: {self.server_errors}",
                f"Data preparing retries: {self.data_preparing_hits}",
                f"Elapsed seconds: {self.elapsed_seconds:.2f}",
            ]
        )


def get_env_or_none(key: str) -> Optional[str]:
    value = os.getenv(key)
    return value if value else None