"""Utility functions for data processing and formatting""" from typing import Dict, Any, Union from datetime import datetime import re def clean_text(text: str) -> str: """Clean and normalize text data""" if not text: return "" # Remove extra whitespace text = re.sub(r'\s+', ' ', text.strip()) # Remove special characters but keep basic punctuation text = re.sub(r'[^\w\s.,!?-]', '', text) return text def normalize_metrics(metrics: Dict[str, Any]) -> Dict[str, float]: """Normalize various metric values to a 0-1 scale""" normalized = {} for key, value in metrics.items(): if isinstance(value, (int, float)): if key in ['engagement', 'followers', 'views']: # Log-scale normalization for large numbers normalized[key] = min(1.0, value / 1000000) # Normalize to millions elif key.endswith('_rate') or key.endswith('_percentage'): # Ensure percentage values are 0-1 normalized[key] = value / 100 if value > 1 else value else: # Default normalization normalized[key] = float(value) else: normalized[key] = 0.0 # Default for non-numeric values return normalized def format_timestamp(timestamp: Union[str, datetime]) -> str: """Format timestamp consistently""" if isinstance(timestamp, str): try: timestamp = datetime.fromisoformat(timestamp.replace('Z', '+00:00')) except ValueError: return "" return timestamp.strftime("%Y-%m-%d %H:%M:%S UTC") def calculate_growth_rate(current: float, previous: float) -> float: """Calculate growth rate between two values""" if not previous: return 0.0 return round((current - previous) / previous * 100, 2) def extract_numeric_value(text: str) -> float: """Extract numeric value from text with K/M/B suffixes""" if not text: return 0.0 # Remove commas and convert to lowercase text = text.replace(',', '').lower().strip() # Look for number with optional suffix match = re.search(r'([\d.]+)([kmb])?', text) if not match: return 0.0 number = float(match.group(1)) suffix = match.group(2) # Apply multiplier based on suffix multipliers = {'k': 1000, 'm': 1000000, 'b': 1000000000} if suffix: number *= multipliers.get(suffix, 1) return number def categorize_trend(name: str, description: str = "") -> Dict[str, str]: """Categorize trend based on name and description""" # Define category keywords categories = { 'Technology': ['ai', 'tech', 'coding', 'programming', 'software', 'app'], 'Lifestyle': ['fashion', 'beauty', 'fitness', 'wellness', 'health'], 'Entertainment': ['music', 'dance', 'comedy', 'movie', 'game'], 'Food': ['recipe', 'cooking', 'food', 'drink', 'diet'], 'Education': ['learning', 'study', 'school', 'education', 'tutorial'] } text = f"{name} {description}".lower() # Find matching category for category, keywords in categories.items(): if any(keyword in text for keyword in keywords): return { 'category': category, 'confidence': 'high' if any(keyword in name.lower() for keyword in keywords) else 'medium' } return { 'category': 'Other', 'confidence': 'low' } def generate_trend_id(trend_data: Dict[str, Any]) -> str: """Generate unique trend identifier""" components = [ trend_data.get('name', '').lower().replace(' ', '-'), trend_data.get('platform', '').lower(), datetime.now().strftime('%Y%m') ] return '-'.join(filter(None, components))