from __future__ import annotations import re from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit BINARY_EXTENSIONS = { ".7z", ".avi", ".bin", ".bz2", ".csv", ".doc", ".docx", ".epub", ".gif", ".gz", ".ico", ".jpeg", ".jpg", ".json", ".m4a", ".m4v", ".mov", ".mp3", ".mp4", ".mpeg", ".ogg", ".pdf", ".png", ".ppt", ".pptx", ".rar", ".svg", ".tar", ".tgz", ".tif", ".tiff", ".wav", ".webm", ".webp", ".xls", ".xlsx", ".xml", ".xz", ".zip", } TRACKING_QUERY_KEYS = { "fbclid", "gclid", "mc_cid", "mc_eid", "ref", "source", "spm", "yclid", } def normalize_url(raw_url: str) -> str | None: try: parts = urlsplit(raw_url.strip()) except ValueError: return None scheme = parts.scheme.lower() if scheme not in {"http", "https"}: return None host = (parts.hostname or "").lower().strip(".") if not host: return None try: port = parts.port except ValueError: return None if (scheme == "http" and port == 80) or (scheme == "https" and port == 443): netloc = host elif port: netloc = f"{host}:{port}" else: netloc = host path = parts.path or "/" path = re.sub(r"/{2,}", "/", path) query_pairs: list[tuple[str, str]] = [] for key, value in parse_qsl(parts.query, keep_blank_values=True): lowered = key.lower() if lowered.startswith("utm_") or lowered in TRACKING_QUERY_KEYS: continue query_pairs.append((key, value)) query = urlencode(query_pairs, doseq=True) return urlunsplit((scheme, netloc, path, query, "")) def has_binary_extension(url: str) -> bool: path = urlsplit(url).path.lower() if not path: return False dot_index = path.rfind(".") if dot_index == -1: return False return path[dot_index:] in BINARY_EXTENSIONS def is_html_response(content_type: str, final_url: str) -> bool: if has_binary_extension(final_url): return False if not content_type: return True lowered = content_type.lower() return ( "text/html" in lowered or "application/xhtml+xml" in lowered or "text/plain" in lowered )