File size: 4,905 Bytes
f64b002
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
"""
Utility functions for data normalization and deduplication.
"""

import hashlib
import re
from datetime import datetime
from typing import Optional
from urllib.parse import urlparse, urlunparse, parse_qsl, urlencode
from bs4 import BeautifulSoup


def normalize_whitespace(text: str) -> str:
    """Collapse multiple whitespace characters into single spaces."""
    if not text:
        return ""
    return " ".join(text.split())


def strip_html(text: str) -> str:
    """Remove HTML tags from text."""
    if not text:
        return ""
    soup = BeautifulSoup(text, "html.parser")
    return soup.get_text(separator=" ", strip=True)


def clean_text(text: str) -> str:
    """Clean text by removing HTML and normalizing whitespace."""
    if not text:
        return ""
    text = strip_html(text)
    text = normalize_whitespace(text)
    return text.strip()


def canonical_title(title: str) -> str:
    """
    Create a canonical version of a title for fuzzy dedup comparison.
    - Lowercase
    - Remove punctuation
    - Normalize whitespace
    """
    if not title:
        return ""
    # Lowercase
    title = title.lower()
    # Remove punctuation (keep alphanumeric and spaces)
    title = re.sub(r"[^\w\s]", " ", title)
    # Normalize whitespace
    title = normalize_whitespace(title)
    return title


def normalize_url(url: str) -> str:
    """
    Normalize a URL by:
    - Removing tracking parameters (utm_*, fbclid, etc.)
    - Removing fragments
    - Lowercasing the domain
    - Sorting query parameters
    """
    if not url:
        return ""
    
    try:
        parsed = urlparse(url)
        
        # Lowercase domain
        netloc = parsed.netloc.lower()
        
        # Remove tracking parameters
        tracking_params = {
            "utm_source", "utm_medium", "utm_campaign", "utm_term", "utm_content",
            "fbclid", "gclid", "ref", "source", "mc_cid", "mc_eid"
        }
        query_params = parse_qsl(parsed.query)
        filtered_params = [
            (k, v) for k, v in query_params 
            if k.lower() not in tracking_params
        ]
        # Sort for consistency
        filtered_params.sort(key=lambda x: x[0])
        query = urlencode(filtered_params)
        
        # Reconstruct URL without fragment
        normalized = urlunparse((
            parsed.scheme,
            netloc,
            parsed.path,
            parsed.params,
            query,
            ""  # No fragment
        ))
        
        return normalized
    except Exception:
        return url


def generate_dedup_key(
    url: Optional[str] = None,
    title: Optional[str] = None,
    published_at: Optional[datetime] = None,
    source: Optional[str] = None
) -> str:
    """
    Generate a deduplication key for a news article.
    
    Strategy:
    1. If URL exists, use normalized URL hash
    2. Otherwise, use hash of (canonical_title + date + source)
    """
    if url:
        normalized = normalize_url(url)
        if normalized:
            return hashlib.sha256(normalized.encode()).hexdigest()[:32]
    
    # Fallback to content-based hash
    parts = []
    if title:
        parts.append(canonical_title(title))
    if published_at:
        parts.append(published_at.strftime("%Y-%m-%d"))
    if source:
        parts.append(source.lower().strip())
    
    if not parts:
        # Last resort: random key (shouldn't happen)
        import uuid
        return uuid.uuid4().hex[:32]
    
    combined = "|".join(parts)
    return hashlib.sha256(combined.encode()).hexdigest()[:32]


def truncate_text(text: str, max_length: int = 500) -> str:
    """Truncate text to max_length, adding ellipsis if needed."""
    if not text or len(text) <= max_length:
        return text or ""
    return text[:max_length - 3].rsplit(" ", 1)[0] + "..."


def safe_parse_date(
    date_str: str,
    formats: Optional[list[str]] = None
) -> Optional[datetime]:
    """
    Try to parse a date string using multiple formats.
    Returns None if parsing fails.
    """
    from dateutil import parser as dateutil_parser
    from dateutil.tz import UTC
    
    if not date_str:
        return None
    
    # Try dateutil first (most flexible)
    try:
        dt = dateutil_parser.parse(date_str)
        # Ensure timezone (default to UTC)
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=UTC)
        return dt
    except Exception:
        pass
    
    # Try explicit formats
    formats = formats or [
        "%Y-%m-%dT%H:%M:%SZ",
        "%Y-%m-%dT%H:%M:%S%z",
        "%Y-%m-%d %H:%M:%S",
        "%Y-%m-%d",
        "%d/%m/%Y",
        "%m/%d/%Y",
    ]
    
    for fmt in formats:
        try:
            dt = datetime.strptime(date_str, fmt)
            if dt.tzinfo is None:
                dt = dt.replace(tzinfo=UTC)
            return dt
        except ValueError:
            continue
    
    return None