Spaces:
Running
Running
| """ | |
| Utility functions for data normalization and deduplication. | |
| """ | |
| import hashlib | |
| import re | |
| from datetime import datetime | |
| from typing import Optional | |
| from urllib.parse import urlparse, urlunparse, parse_qsl, urlencode | |
| from bs4 import BeautifulSoup | |
| def normalize_whitespace(text: str) -> str: | |
| """Collapse multiple whitespace characters into single spaces.""" | |
| if not text: | |
| return "" | |
| return " ".join(text.split()) | |
| def strip_html(text: str) -> str: | |
| """Remove HTML tags from text.""" | |
| if not text: | |
| return "" | |
| soup = BeautifulSoup(text, "html.parser") | |
| return soup.get_text(separator=" ", strip=True) | |
| def clean_text(text: str) -> str: | |
| """Clean text by removing HTML and normalizing whitespace.""" | |
| if not text: | |
| return "" | |
| text = strip_html(text) | |
| text = normalize_whitespace(text) | |
| return text.strip() | |
| def canonical_title(title: str) -> str: | |
| """ | |
| Create a canonical version of a title for fuzzy dedup comparison. | |
| - Lowercase | |
| - Remove punctuation | |
| - Normalize whitespace | |
| """ | |
| if not title: | |
| return "" | |
| # Lowercase | |
| title = title.lower() | |
| # Remove punctuation (keep alphanumeric and spaces) | |
| title = re.sub(r"[^\w\s]", " ", title) | |
| # Normalize whitespace | |
| title = normalize_whitespace(title) | |
| return title | |
| def normalize_url(url: str) -> str: | |
| """ | |
| Normalize a URL by: | |
| - Removing tracking parameters (utm_*, fbclid, etc.) | |
| - Removing fragments | |
| - Lowercasing the domain | |
| - Sorting query parameters | |
| """ | |
| if not url: | |
| return "" | |
| try: | |
| parsed = urlparse(url) | |
| # Lowercase domain | |
| netloc = parsed.netloc.lower() | |
| # Remove tracking parameters | |
| tracking_params = { | |
| "utm_source", "utm_medium", "utm_campaign", "utm_term", "utm_content", | |
| "fbclid", "gclid", "ref", "source", "mc_cid", "mc_eid" | |
| } | |
| query_params = parse_qsl(parsed.query) | |
| filtered_params = [ | |
| (k, v) for k, v in query_params | |
| if k.lower() not in tracking_params | |
| ] | |
| # Sort for consistency | |
| filtered_params.sort(key=lambda x: x[0]) | |
| query = urlencode(filtered_params) | |
| # Reconstruct URL without fragment | |
| normalized = urlunparse(( | |
| parsed.scheme, | |
| netloc, | |
| parsed.path, | |
| parsed.params, | |
| query, | |
| "" # No fragment | |
| )) | |
| return normalized | |
| except Exception: | |
| return url | |
| def generate_dedup_key( | |
| url: Optional[str] = None, | |
| title: Optional[str] = None, | |
| published_at: Optional[datetime] = None, | |
| source: Optional[str] = None | |
| ) -> str: | |
| """ | |
| Generate a deduplication key for a news article. | |
| Strategy: | |
| 1. If URL exists, use normalized URL hash | |
| 2. Otherwise, use hash of (canonical_title + date + source) | |
| """ | |
| if url: | |
| normalized = normalize_url(url) | |
| if normalized: | |
| return hashlib.sha256(normalized.encode()).hexdigest()[:32] | |
| # Fallback to content-based hash | |
| parts = [] | |
| if title: | |
| parts.append(canonical_title(title)) | |
| if published_at: | |
| parts.append(published_at.strftime("%Y-%m-%d")) | |
| if source: | |
| parts.append(source.lower().strip()) | |
| if not parts: | |
| # Last resort: random key (shouldn't happen) | |
| import uuid | |
| return uuid.uuid4().hex[:32] | |
| combined = "|".join(parts) | |
| return hashlib.sha256(combined.encode()).hexdigest()[:32] | |
| def truncate_text(text: str, max_length: int = 500) -> str: | |
| """Truncate text to max_length, adding ellipsis if needed.""" | |
| if not text or len(text) <= max_length: | |
| return text or "" | |
| return text[:max_length - 3].rsplit(" ", 1)[0] + "..." | |
| def safe_parse_date( | |
| date_str: str, | |
| formats: Optional[list[str]] = None | |
| ) -> Optional[datetime]: | |
| """ | |
| Try to parse a date string using multiple formats. | |
| Returns None if parsing fails. | |
| """ | |
| from dateutil import parser as dateutil_parser | |
| from dateutil.tz import UTC | |
| if not date_str: | |
| return None | |
| # Try dateutil first (most flexible) | |
| try: | |
| dt = dateutil_parser.parse(date_str) | |
| # Ensure timezone (default to UTC) | |
| if dt.tzinfo is None: | |
| dt = dt.replace(tzinfo=UTC) | |
| return dt | |
| except Exception: | |
| pass | |
| # Try explicit formats | |
| formats = formats or [ | |
| "%Y-%m-%dT%H:%M:%SZ", | |
| "%Y-%m-%dT%H:%M:%S%z", | |
| "%Y-%m-%d %H:%M:%S", | |
| "%Y-%m-%d", | |
| "%d/%m/%Y", | |
| "%m/%d/%Y", | |
| ] | |
| for fmt in formats: | |
| try: | |
| dt = datetime.strptime(date_str, fmt) | |
| if dt.tzinfo is None: | |
| dt = dt.replace(tzinfo=UTC) | |
| return dt | |
| except ValueError: | |
| continue | |
| return None | |