gyubin02's picture
Initial commit
da3fe02
from __future__ import annotations
import hashlib
import json
import mimetypes
import os
import random
import time
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Optional
from urllib.parse import quote
from zoneinfo import ZoneInfo
def load_dotenv_if_available() -> bool:
try:
from dotenv import load_dotenv
except ImportError:
return False
env_path = Path(".env")
if env_path.exists():
load_dotenv(dotenv_path=env_path)
return True
return False
def kst_yesterday_date() -> str:
tz = ZoneInfo("Asia/Seoul")
now = datetime.now(tz)
yesterday = (now - timedelta(days=1)).date()
return yesterday.isoformat()
def utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def safe_filename(value: str) -> str:
if not value:
return "unknown"
return quote(value, safe="-_.")
def json_dumps(value: Any) -> str:
return json.dumps(value, ensure_ascii=False, separators=(",", ":"))
def ensure_dir(path: Path) -> None:
path.mkdir(parents=True, exist_ok=True)
def write_json(path: Path, data: Any) -> None:
ensure_dir(path.parent)
path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
def compute_run_id(target_date: str, params: dict[str, Any]) -> str:
payload = json.dumps(params, sort_keys=True, ensure_ascii=True)
digest = hashlib.sha256(payload.encode("utf-8")).hexdigest()[:12]
return f"{target_date}-{digest}"
def to_int(value: Any) -> Optional[int]:
if value is None:
return None
try:
return int(value)
except (TypeError, ValueError):
return None
def guess_extension(content_type: Optional[str], url: str) -> str:
if content_type:
ext = mimetypes.guess_extension(content_type.split(";")[0].strip())
if ext:
return ext
suffix = Path(url).suffix
if suffix:
return suffix
return ".bin"
def random_wait(min_seconds: float, max_seconds: float) -> float:
return random.uniform(min_seconds, max_seconds)
@dataclass
class RateLimiter:
rps: float
def __post_init__(self) -> None:
self._lock = None
self._next_time: Optional[float] = None
async def acquire(self) -> None:
if self.rps <= 0:
return
if self._lock is None:
import asyncio
self._lock = asyncio.Lock()
async with self._lock:
now = time.monotonic()
min_interval = 1 / self.rps
if self._next_time is None:
self._next_time = now
if now < self._next_time:
sleep_for = self._next_time - now
if sleep_for > 0:
import asyncio
await asyncio.sleep(sleep_for)
now = time.monotonic()
self._next_time = max(now, self._next_time) + min_interval
@dataclass
class DownloadResult:
downloaded: int = 0
skipped: int = 0
failed: int = 0
@dataclass
class ApiMetrics:
total_requests: int = 0
rate_limit_hits: int = 0
server_errors: int = 0
data_preparing_hits: int = 0
other_errors: int = 0
@dataclass
class PipelineReport:
run_id: str
target_date: str
start_rank: int
end_rank: int
ranking_count: int
ocid_count: int
equipment_items_count: int
cash_items_count: int
icons_downloaded: int
icons_skipped: int
icons_failed: int
rate_limit_hits: int
server_errors: int
data_preparing_hits: int
elapsed_seconds: float
def to_markdown(self) -> str:
return "\n".join(
[
f"Run ID: {self.run_id}",
f"Target date (KST): {self.target_date}",
f"Rank range: {self.start_rank}-{self.end_rank}",
f"Ranking entries: {self.ranking_count}",
f"OCIDs resolved: {self.ocid_count}",
f"Equipment shape items: {self.equipment_items_count}",
f"Cash items: {self.cash_items_count}",
f"Icons downloaded: {self.icons_downloaded}",
f"Icons skipped: {self.icons_skipped}",
f"Icons failed: {self.icons_failed}",
f"429 retries: {self.rate_limit_hits}",
f"5xx retries: {self.server_errors}",
f"Data preparing retries: {self.data_preparing_hits}",
f"Elapsed seconds: {self.elapsed_seconds:.2f}",
]
)
def get_env_or_none(key: str) -> Optional[str]:
value = os.getenv(key)
return value if value else None