Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import hashlib | |
| import json | |
| import mimetypes | |
| import os | |
| import random | |
| import time | |
| from dataclasses import dataclass | |
| from datetime import datetime, timedelta, timezone | |
| from pathlib import Path | |
| from typing import Any, Optional | |
| from urllib.parse import quote | |
| from zoneinfo import ZoneInfo | |
| def load_dotenv_if_available() -> bool: | |
| try: | |
| from dotenv import load_dotenv | |
| except ImportError: | |
| return False | |
| env_path = Path(".env") | |
| if env_path.exists(): | |
| load_dotenv(dotenv_path=env_path) | |
| return True | |
| return False | |
| def kst_yesterday_date() -> str: | |
| tz = ZoneInfo("Asia/Seoul") | |
| now = datetime.now(tz) | |
| yesterday = (now - timedelta(days=1)).date() | |
| return yesterday.isoformat() | |
| def utc_now_iso() -> str: | |
| return datetime.now(timezone.utc).isoformat() | |
| def safe_filename(value: str) -> str: | |
| if not value: | |
| return "unknown" | |
| return quote(value, safe="-_.") | |
| def json_dumps(value: Any) -> str: | |
| return json.dumps(value, ensure_ascii=False, separators=(",", ":")) | |
| def ensure_dir(path: Path) -> None: | |
| path.mkdir(parents=True, exist_ok=True) | |
| def write_json(path: Path, data: Any) -> None: | |
| ensure_dir(path.parent) | |
| path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") | |
| def compute_run_id(target_date: str, params: dict[str, Any]) -> str: | |
| payload = json.dumps(params, sort_keys=True, ensure_ascii=True) | |
| digest = hashlib.sha256(payload.encode("utf-8")).hexdigest()[:12] | |
| return f"{target_date}-{digest}" | |
| def to_int(value: Any) -> Optional[int]: | |
| if value is None: | |
| return None | |
| try: | |
| return int(value) | |
| except (TypeError, ValueError): | |
| return None | |
| def guess_extension(content_type: Optional[str], url: str) -> str: | |
| if content_type: | |
| ext = mimetypes.guess_extension(content_type.split(";")[0].strip()) | |
| if ext: | |
| return ext | |
| suffix = Path(url).suffix | |
| if suffix: | |
| return suffix | |
| return ".bin" | |
| def random_wait(min_seconds: float, max_seconds: float) -> float: | |
| return random.uniform(min_seconds, max_seconds) | |
| class RateLimiter: | |
| rps: float | |
| def __post_init__(self) -> None: | |
| self._lock = None | |
| self._next_time: Optional[float] = None | |
| async def acquire(self) -> None: | |
| if self.rps <= 0: | |
| return | |
| if self._lock is None: | |
| import asyncio | |
| self._lock = asyncio.Lock() | |
| async with self._lock: | |
| now = time.monotonic() | |
| min_interval = 1 / self.rps | |
| if self._next_time is None: | |
| self._next_time = now | |
| if now < self._next_time: | |
| sleep_for = self._next_time - now | |
| if sleep_for > 0: | |
| import asyncio | |
| await asyncio.sleep(sleep_for) | |
| now = time.monotonic() | |
| self._next_time = max(now, self._next_time) + min_interval | |
| class DownloadResult: | |
| downloaded: int = 0 | |
| skipped: int = 0 | |
| failed: int = 0 | |
| class ApiMetrics: | |
| total_requests: int = 0 | |
| rate_limit_hits: int = 0 | |
| server_errors: int = 0 | |
| data_preparing_hits: int = 0 | |
| other_errors: int = 0 | |
| class PipelineReport: | |
| run_id: str | |
| target_date: str | |
| start_rank: int | |
| end_rank: int | |
| ranking_count: int | |
| ocid_count: int | |
| equipment_items_count: int | |
| cash_items_count: int | |
| icons_downloaded: int | |
| icons_skipped: int | |
| icons_failed: int | |
| rate_limit_hits: int | |
| server_errors: int | |
| data_preparing_hits: int | |
| elapsed_seconds: float | |
| def to_markdown(self) -> str: | |
| return "\n".join( | |
| [ | |
| f"Run ID: {self.run_id}", | |
| f"Target date (KST): {self.target_date}", | |
| f"Rank range: {self.start_rank}-{self.end_rank}", | |
| f"Ranking entries: {self.ranking_count}", | |
| f"OCIDs resolved: {self.ocid_count}", | |
| f"Equipment shape items: {self.equipment_items_count}", | |
| f"Cash items: {self.cash_items_count}", | |
| f"Icons downloaded: {self.icons_downloaded}", | |
| f"Icons skipped: {self.icons_skipped}", | |
| f"Icons failed: {self.icons_failed}", | |
| f"429 retries: {self.rate_limit_hits}", | |
| f"5xx retries: {self.server_errors}", | |
| f"Data preparing retries: {self.data_preparing_hits}", | |
| f"Elapsed seconds: {self.elapsed_seconds:.2f}", | |
| ] | |
| ) | |
| def get_env_or_none(key: str) -> Optional[str]: | |
| value = os.getenv(key) | |
| return value if value else None | |