| from __future__ import annotations |
|
|
| import hashlib |
| import logging |
| import re |
| from logging.handlers import RotatingFileHandler |
| from pathlib import Path |
| from urllib.parse import urlparse, unquote |
|
|
| from rich.logging import RichHandler |
|
|
|
|
| _FILENAME_SAFE = re.compile(r"[^A-Za-z0-9._-]+") |
|
|
|
|
| def setup_logging(logs_root: Path, verbose: bool = False) -> logging.Logger: |
| """Configure root logger with rotating file + Rich console output.""" |
| logs_root.mkdir(parents=True, exist_ok=True) |
|
|
| logger = logging.getLogger("image_processor") |
| logger.setLevel(logging.DEBUG if verbose else logging.INFO) |
| logger.handlers.clear() |
| logger.propagate = False |
|
|
| success_handler = RotatingFileHandler( |
| logs_root / "success.log", maxBytes=2_000_000, backupCount=3, encoding="utf-8" |
| ) |
| success_handler.setLevel(logging.INFO) |
| success_handler.addFilter(lambda r: r.levelno < logging.WARNING) |
|
|
| failure_handler = RotatingFileHandler( |
| logs_root / "failure.log", maxBytes=2_000_000, backupCount=3, encoding="utf-8" |
| ) |
| failure_handler.setLevel(logging.WARNING) |
|
|
| file_fmt = logging.Formatter( |
| "%(asctime)s | %(levelname)s | %(name)s | %(message)s", |
| datefmt="%Y-%m-%d %H:%M:%S", |
| ) |
| success_handler.setFormatter(file_fmt) |
| failure_handler.setFormatter(file_fmt) |
|
|
| console_handler = RichHandler( |
| rich_tracebacks=True, |
| show_time=False, |
| show_path=False, |
| markup=True, |
| ) |
| console_handler.setLevel(logging.DEBUG if verbose else logging.INFO) |
|
|
| logger.addHandler(success_handler) |
| logger.addHandler(failure_handler) |
| logger.addHandler(console_handler) |
| return logger |
|
|
|
|
| def is_valid_url(url: str) -> bool: |
| try: |
| parsed = urlparse(url.strip()) |
| return parsed.scheme in ("http", "https") and bool(parsed.netloc) |
| except Exception: |
| return False |
|
|
|
|
| def extract_image_id(url: str) -> str: |
| """Extract a meaningful identifier from an image URL. |
| |
| Falls back to a short hash of the URL when no usable stem is found. |
| """ |
| parsed = urlparse(url) |
| path = unquote(parsed.path) |
| stem = Path(path).stem |
|
|
| if not stem: |
| return _hash_id(url) |
|
|
| safe = _FILENAME_SAFE.sub("_", stem).strip("._-") |
| return safe or _hash_id(url) |
|
|
|
|
| def _hash_id(value: str) -> str: |
| return hashlib.sha1(value.encode("utf-8")).hexdigest()[:12] |
|
|
|
|
| def unique_path(directory: Path, stem: str, suffix: str = ".jpg") -> Path: |
| """Return a path that does not yet exist by appending -1, -2, ...""" |
| directory.mkdir(parents=True, exist_ok=True) |
| candidate = directory / f"{stem}{suffix}" |
| i = 1 |
| while candidate.exists(): |
| candidate = directory / f"{stem}-{i}{suffix}" |
| i += 1 |
| return candidate |
|
|
|
|
| def human_size(num_bytes: int) -> str: |
| size = float(num_bytes) |
| for unit in ("B", "KB", "MB", "GB"): |
| if size < 1024: |
| return f"{size:.1f} {unit}" |
| size /= 1024 |
| return f"{size:.1f} TB" |
|
|
|
|
| def read_urls_from_file(path: Path) -> list[str]: |
| urls: list[str] = [] |
| with path.open("r", encoding="utf-8") as fh: |
| for line in fh: |
| stripped = line.strip() |
| if not stripped or stripped.startswith("#"): |
| continue |
| urls.append(stripped) |
| return urls |
|
|