Spaces:
Running
Running
File size: 4,686 Bytes
da3fe02 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
from __future__ import annotations
import hashlib
import json
import mimetypes
import os
import random
import time
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Optional
from urllib.parse import quote
from zoneinfo import ZoneInfo
def load_dotenv_if_available() -> bool:
try:
from dotenv import load_dotenv
except ImportError:
return False
env_path = Path(".env")
if env_path.exists():
load_dotenv(dotenv_path=env_path)
return True
return False
def kst_yesterday_date() -> str:
tz = ZoneInfo("Asia/Seoul")
now = datetime.now(tz)
yesterday = (now - timedelta(days=1)).date()
return yesterday.isoformat()
def utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def safe_filename(value: str) -> str:
if not value:
return "unknown"
return quote(value, safe="-_.")
def json_dumps(value: Any) -> str:
return json.dumps(value, ensure_ascii=False, separators=(",", ":"))
def ensure_dir(path: Path) -> None:
path.mkdir(parents=True, exist_ok=True)
def write_json(path: Path, data: Any) -> None:
ensure_dir(path.parent)
path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
def compute_run_id(target_date: str, params: dict[str, Any]) -> str:
payload = json.dumps(params, sort_keys=True, ensure_ascii=True)
digest = hashlib.sha256(payload.encode("utf-8")).hexdigest()[:12]
return f"{target_date}-{digest}"
def to_int(value: Any) -> Optional[int]:
if value is None:
return None
try:
return int(value)
except (TypeError, ValueError):
return None
def guess_extension(content_type: Optional[str], url: str) -> str:
if content_type:
ext = mimetypes.guess_extension(content_type.split(";")[0].strip())
if ext:
return ext
suffix = Path(url).suffix
if suffix:
return suffix
return ".bin"
def random_wait(min_seconds: float, max_seconds: float) -> float:
return random.uniform(min_seconds, max_seconds)
@dataclass
class RateLimiter:
rps: float
def __post_init__(self) -> None:
self._lock = None
self._next_time: Optional[float] = None
async def acquire(self) -> None:
if self.rps <= 0:
return
if self._lock is None:
import asyncio
self._lock = asyncio.Lock()
async with self._lock:
now = time.monotonic()
min_interval = 1 / self.rps
if self._next_time is None:
self._next_time = now
if now < self._next_time:
sleep_for = self._next_time - now
if sleep_for > 0:
import asyncio
await asyncio.sleep(sleep_for)
now = time.monotonic()
self._next_time = max(now, self._next_time) + min_interval
@dataclass
class DownloadResult:
downloaded: int = 0
skipped: int = 0
failed: int = 0
@dataclass
class ApiMetrics:
total_requests: int = 0
rate_limit_hits: int = 0
server_errors: int = 0
data_preparing_hits: int = 0
other_errors: int = 0
@dataclass
class PipelineReport:
run_id: str
target_date: str
start_rank: int
end_rank: int
ranking_count: int
ocid_count: int
equipment_items_count: int
cash_items_count: int
icons_downloaded: int
icons_skipped: int
icons_failed: int
rate_limit_hits: int
server_errors: int
data_preparing_hits: int
elapsed_seconds: float
def to_markdown(self) -> str:
return "\n".join(
[
f"Run ID: {self.run_id}",
f"Target date (KST): {self.target_date}",
f"Rank range: {self.start_rank}-{self.end_rank}",
f"Ranking entries: {self.ranking_count}",
f"OCIDs resolved: {self.ocid_count}",
f"Equipment shape items: {self.equipment_items_count}",
f"Cash items: {self.cash_items_count}",
f"Icons downloaded: {self.icons_downloaded}",
f"Icons skipped: {self.icons_skipped}",
f"Icons failed: {self.icons_failed}",
f"429 retries: {self.rate_limit_hits}",
f"5xx retries: {self.server_errors}",
f"Data preparing retries: {self.data_preparing_hits}",
f"Elapsed seconds: {self.elapsed_seconds:.2f}",
]
)
def get_env_or_none(key: str) -> Optional[str]:
value = os.getenv(key)
return value if value else None
|