File size: 3,874 Bytes
f495741
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
"""Utility functions for data processing and formatting"""
from typing import Dict, Any, Union
from datetime import datetime
import re

def clean_text(text: str) -> str:
    """Clean and normalize text data"""
    if not text:
        return ""
        
    # Remove extra whitespace
    text = re.sub(r'\s+', ' ', text.strip())
    
    # Remove special characters but keep basic punctuation
    text = re.sub(r'[^\w\s.,!?-]', '', text)
    
    return text

def normalize_metrics(metrics: Dict[str, Any]) -> Dict[str, float]:
    """Normalize various metric values to a 0-1 scale"""
    normalized = {}
    
    for key, value in metrics.items():
        if isinstance(value, (int, float)):
            if key in ['engagement', 'followers', 'views']:
                # Log-scale normalization for large numbers
                normalized[key] = min(1.0, value / 1000000)  # Normalize to millions
            elif key.endswith('_rate') or key.endswith('_percentage'):
                # Ensure percentage values are 0-1
                normalized[key] = value / 100 if value > 1 else value
            else:
                # Default normalization
                normalized[key] = float(value)
        else:
            normalized[key] = 0.0  # Default for non-numeric values
            
    return normalized

def format_timestamp(timestamp: Union[str, datetime]) -> str:
    """Format timestamp consistently"""
    if isinstance(timestamp, str):
        try:
            timestamp = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
        except ValueError:
            return ""
            
    return timestamp.strftime("%Y-%m-%d %H:%M:%S UTC")

def calculate_growth_rate(current: float, previous: float) -> float:
    """Calculate growth rate between two values"""
    if not previous:
        return 0.0
    return round((current - previous) / previous * 100, 2)

def extract_numeric_value(text: str) -> float:
    """Extract numeric value from text with K/M/B suffixes"""
    if not text:
        return 0.0
        
    # Remove commas and convert to lowercase
    text = text.replace(',', '').lower().strip()
    
    # Look for number with optional suffix
    match = re.search(r'([\d.]+)([kmb])?', text)
    if not match:
        return 0.0
        
    number = float(match.group(1))
    suffix = match.group(2)
    
    # Apply multiplier based on suffix
    multipliers = {'k': 1000, 'm': 1000000, 'b': 1000000000}
    if suffix:
        number *= multipliers.get(suffix, 1)
        
    return number

def categorize_trend(name: str, description: str = "") -> Dict[str, str]:
    """Categorize trend based on name and description"""
    # Define category keywords
    categories = {
        'Technology': ['ai', 'tech', 'coding', 'programming', 'software', 'app'],
        'Lifestyle': ['fashion', 'beauty', 'fitness', 'wellness', 'health'],
        'Entertainment': ['music', 'dance', 'comedy', 'movie', 'game'],
        'Food': ['recipe', 'cooking', 'food', 'drink', 'diet'],
        'Education': ['learning', 'study', 'school', 'education', 'tutorial']
    }
    
    text = f"{name} {description}".lower()
    
    # Find matching category
    for category, keywords in categories.items():
        if any(keyword in text for keyword in keywords):
            return {
                'category': category,
                'confidence': 'high' if any(keyword in name.lower() for keyword in keywords) else 'medium'
            }
            
    return {
        'category': 'Other',
        'confidence': 'low'
    }

def generate_trend_id(trend_data: Dict[str, Any]) -> str:
    """Generate unique trend identifier"""
    components = [
        trend_data.get('name', '').lower().replace(' ', '-'),
        trend_data.get('platform', '').lower(),
        datetime.now().strftime('%Y%m')
    ]
    
    return '-'.join(filter(None, components))