File size: 3,287 Bytes
6d8fa62
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
from __future__ import annotations

import hashlib
import logging
import re
from logging.handlers import RotatingFileHandler
from pathlib import Path
from urllib.parse import urlparse, unquote

from rich.logging import RichHandler


_FILENAME_SAFE = re.compile(r"[^A-Za-z0-9._-]+")


def setup_logging(logs_root: Path, verbose: bool = False) -> logging.Logger:
    """Configure root logger with rotating file + Rich console output."""
    logs_root.mkdir(parents=True, exist_ok=True)

    logger = logging.getLogger("image_processor")
    logger.setLevel(logging.DEBUG if verbose else logging.INFO)
    logger.handlers.clear()
    logger.propagate = False

    success_handler = RotatingFileHandler(
        logs_root / "success.log", maxBytes=2_000_000, backupCount=3, encoding="utf-8"
    )
    success_handler.setLevel(logging.INFO)
    success_handler.addFilter(lambda r: r.levelno < logging.WARNING)

    failure_handler = RotatingFileHandler(
        logs_root / "failure.log", maxBytes=2_000_000, backupCount=3, encoding="utf-8"
    )
    failure_handler.setLevel(logging.WARNING)

    file_fmt = logging.Formatter(
        "%(asctime)s | %(levelname)s | %(name)s | %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )
    success_handler.setFormatter(file_fmt)
    failure_handler.setFormatter(file_fmt)

    console_handler = RichHandler(
        rich_tracebacks=True,
        show_time=False,
        show_path=False,
        markup=True,
    )
    console_handler.setLevel(logging.DEBUG if verbose else logging.INFO)

    logger.addHandler(success_handler)
    logger.addHandler(failure_handler)
    logger.addHandler(console_handler)
    return logger


def is_valid_url(url: str) -> bool:
    try:
        parsed = urlparse(url.strip())
        return parsed.scheme in ("http", "https") and bool(parsed.netloc)
    except Exception:
        return False


def extract_image_id(url: str) -> str:
    """Extract a meaningful identifier from an image URL.

    Falls back to a short hash of the URL when no usable stem is found.
    """
    parsed = urlparse(url)
    path = unquote(parsed.path)
    stem = Path(path).stem

    if not stem:
        return _hash_id(url)

    safe = _FILENAME_SAFE.sub("_", stem).strip("._-")
    return safe or _hash_id(url)


def _hash_id(value: str) -> str:
    return hashlib.sha1(value.encode("utf-8")).hexdigest()[:12]


def unique_path(directory: Path, stem: str, suffix: str = ".jpg") -> Path:
    """Return a path that does not yet exist by appending -1, -2, ..."""
    directory.mkdir(parents=True, exist_ok=True)
    candidate = directory / f"{stem}{suffix}"
    i = 1
    while candidate.exists():
        candidate = directory / f"{stem}-{i}{suffix}"
        i += 1
    return candidate


def human_size(num_bytes: int) -> str:
    size = float(num_bytes)
    for unit in ("B", "KB", "MB", "GB"):
        if size < 1024:
            return f"{size:.1f} {unit}"
        size /= 1024
    return f"{size:.1f} TB"


def read_urls_from_file(path: Path) -> list[str]:
    urls: list[str] = []
    with path.open("r", encoding="utf-8") as fh:
        for line in fh:
            stripped = line.strip()
            if not stripped or stripped.startswith("#"):
                continue
            urls.append(stripped)
    return urls