Spaces:
Running
Running
File size: 4,905 Bytes
f64b002 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 | """
Utility functions for data normalization and deduplication.
"""
import hashlib
import re
from datetime import datetime
from typing import Optional
from urllib.parse import urlparse, urlunparse, parse_qsl, urlencode
from bs4 import BeautifulSoup
def normalize_whitespace(text: str) -> str:
"""Collapse multiple whitespace characters into single spaces."""
if not text:
return ""
return " ".join(text.split())
def strip_html(text: str) -> str:
"""Remove HTML tags from text."""
if not text:
return ""
soup = BeautifulSoup(text, "html.parser")
return soup.get_text(separator=" ", strip=True)
def clean_text(text: str) -> str:
"""Clean text by removing HTML and normalizing whitespace."""
if not text:
return ""
text = strip_html(text)
text = normalize_whitespace(text)
return text.strip()
def canonical_title(title: str) -> str:
"""
Create a canonical version of a title for fuzzy dedup comparison.
- Lowercase
- Remove punctuation
- Normalize whitespace
"""
if not title:
return ""
# Lowercase
title = title.lower()
# Remove punctuation (keep alphanumeric and spaces)
title = re.sub(r"[^\w\s]", " ", title)
# Normalize whitespace
title = normalize_whitespace(title)
return title
def normalize_url(url: str) -> str:
"""
Normalize a URL by:
- Removing tracking parameters (utm_*, fbclid, etc.)
- Removing fragments
- Lowercasing the domain
- Sorting query parameters
"""
if not url:
return ""
try:
parsed = urlparse(url)
# Lowercase domain
netloc = parsed.netloc.lower()
# Remove tracking parameters
tracking_params = {
"utm_source", "utm_medium", "utm_campaign", "utm_term", "utm_content",
"fbclid", "gclid", "ref", "source", "mc_cid", "mc_eid"
}
query_params = parse_qsl(parsed.query)
filtered_params = [
(k, v) for k, v in query_params
if k.lower() not in tracking_params
]
# Sort for consistency
filtered_params.sort(key=lambda x: x[0])
query = urlencode(filtered_params)
# Reconstruct URL without fragment
normalized = urlunparse((
parsed.scheme,
netloc,
parsed.path,
parsed.params,
query,
"" # No fragment
))
return normalized
except Exception:
return url
def generate_dedup_key(
url: Optional[str] = None,
title: Optional[str] = None,
published_at: Optional[datetime] = None,
source: Optional[str] = None
) -> str:
"""
Generate a deduplication key for a news article.
Strategy:
1. If URL exists, use normalized URL hash
2. Otherwise, use hash of (canonical_title + date + source)
"""
if url:
normalized = normalize_url(url)
if normalized:
return hashlib.sha256(normalized.encode()).hexdigest()[:32]
# Fallback to content-based hash
parts = []
if title:
parts.append(canonical_title(title))
if published_at:
parts.append(published_at.strftime("%Y-%m-%d"))
if source:
parts.append(source.lower().strip())
if not parts:
# Last resort: random key (shouldn't happen)
import uuid
return uuid.uuid4().hex[:32]
combined = "|".join(parts)
return hashlib.sha256(combined.encode()).hexdigest()[:32]
def truncate_text(text: str, max_length: int = 500) -> str:
"""Truncate text to max_length, adding ellipsis if needed."""
if not text or len(text) <= max_length:
return text or ""
return text[:max_length - 3].rsplit(" ", 1)[0] + "..."
def safe_parse_date(
date_str: str,
formats: Optional[list[str]] = None
) -> Optional[datetime]:
"""
Try to parse a date string using multiple formats.
Returns None if parsing fails.
"""
from dateutil import parser as dateutil_parser
from dateutil.tz import UTC
if not date_str:
return None
# Try dateutil first (most flexible)
try:
dt = dateutil_parser.parse(date_str)
# Ensure timezone (default to UTC)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=UTC)
return dt
except Exception:
pass
# Try explicit formats
formats = formats or [
"%Y-%m-%dT%H:%M:%SZ",
"%Y-%m-%dT%H:%M:%S%z",
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%d",
"%d/%m/%Y",
"%m/%d/%Y",
]
for fmt in formats:
try:
dt = datetime.strptime(date_str, fmt)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=UTC)
return dt
except ValueError:
continue
return None
|