File size: 4,181 Bytes
bcb6b46
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
"""
Utility functions for the Email Intelligence Platform
"""

import re
import hashlib
from typing import List, Dict, Any, Optional
from datetime import datetime, timezone


def clean_text(text: Optional[str]) -> str:
    """Clean and normalize text for analysis"""
    if not text:
        return ""
    
    # Convert to lowercase
    text = text.lower()
    
    # Remove extra whitespace
    text = re.sub(r'\s+', ' ', text.strip())
    
    # Remove special characters but keep basic punctuation
    text = re.sub(r'[^\w\s.,!?-]', '', text)
    
    return text


def extract_email_address(sender_header: str) -> str:
    """Extract email address from a sender header"""
    if not sender_header:
        return ""
    
    # Try to extract email from format "Name <email@domain.com>"
    match = re.search(r'<([^>]+)>', sender_header)
    if match:
        return match.group(1).lower()
    
    # If no angle brackets, assume the whole string is an email
    if '@' in sender_header:
        return sender_header.strip().lower()
    
    return ""


def extract_domain(email_address: str) -> str:
    """Extract domain from email address"""
    if '@' in email_address:
        return email_address.split('@')[-1].lower()
    return ""


def generate_id(prefix: str, content: str) -> str:
    """Generate a unique ID based on content hash"""
    timestamp = datetime.now(timezone.utc).timestamp()
    hash_input = f"{content}_{timestamp}"
    content_hash = hashlib.sha256(hash_input.encode()).hexdigest()[:8]
    return f"{prefix}_{content_hash}"


def extract_keywords(text: str, min_length: int = 4, max_keywords: int = 10) -> List[str]:
    """Extract keywords from text"""
    if not text:
        return []
    
    # Common stop words to filter out
    stop_words = {
        'the', 'and', 'for', 'are', 'but', 'not', 'you', 'all', 'can', 'had',
        'her', 'was', 'one', 'our', 'out', 'has', 'have', 'been', 'were', 'they',
        'this', 'that', 'with', 'from', 'your', 'will', 'would', 'could', 'should',
        'what', 'when', 'where', 'which', 'their', 'there', 'these', 'those'
    }
    
    # Extract words
    words = re.findall(r'\b\w+\b', text.lower())
    
    # Filter words
    keywords = [
        word for word in words
        if len(word) >= min_length and word not in stop_words and word.isalpha()
    ]
    
    # Count frequency and return top keywords
    from collections import Counter
    word_freq = Counter(keywords)
    return [word for word, _ in word_freq.most_common(max_keywords)]


def format_timestamp(dt: Optional[datetime] = None) -> str:
    """Format datetime to ISO string"""
    if dt is None:
        dt = datetime.now(timezone.utc)
    return dt.isoformat()


def parse_json_safely(json_str: str, default: Any = None) -> Any:
    """Safely parse JSON string"""
    import json
    try:
        return json.loads(json_str)
    except (json.JSONDecodeError, TypeError):
        return default if default is not None else {}


def truncate_text(text: str, max_length: int = 100, suffix: str = "...") -> str:
    """Truncate text to maximum length"""
    if not text or len(text) <= max_length:
        return text
    return text[:max_length - len(suffix)] + suffix


def calculate_confidence(scores: List[float]) -> float:
    """Calculate average confidence from a list of scores"""
    if not scores:
        return 0.0
    return sum(scores) / len(scores)


def validate_email_format(email: str) -> bool:
    """Validate email format"""
    pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    return bool(re.match(pattern, email))


def sanitize_html(html_content: str) -> str:
    """Remove potentially dangerous HTML tags"""
    if not html_content:
        return ""
    
    # Remove script tags
    html_content = re.sub(r'<script[^>]*>.*?</script>', '', html_content, flags=re.DOTALL | re.IGNORECASE)
    
    # Remove style tags
    html_content = re.sub(r'<style[^>]*>.*?</style>', '', html_content, flags=re.DOTALL | re.IGNORECASE)
    
    # Remove event handlers
    html_content = re.sub(r'\s+on\w+\s*=\s*["\'][^"\']*["\']', '', html_content, flags=re.IGNORECASE)
    
    return html_content