Spaces:
Runtime error
Runtime error
| """Utility functions for data processing and formatting""" | |
| from typing import Dict, Any, Union | |
| from datetime import datetime | |
| import re | |
| def clean_text(text: str) -> str: | |
| """Clean and normalize text data""" | |
| if not text: | |
| return "" | |
| # Remove extra whitespace | |
| text = re.sub(r'\s+', ' ', text.strip()) | |
| # Remove special characters but keep basic punctuation | |
| text = re.sub(r'[^\w\s.,!?-]', '', text) | |
| return text | |
| def normalize_metrics(metrics: Dict[str, Any]) -> Dict[str, float]: | |
| """Normalize various metric values to a 0-1 scale""" | |
| normalized = {} | |
| for key, value in metrics.items(): | |
| if isinstance(value, (int, float)): | |
| if key in ['engagement', 'followers', 'views']: | |
| # Log-scale normalization for large numbers | |
| normalized[key] = min(1.0, value / 1000000) # Normalize to millions | |
| elif key.endswith('_rate') or key.endswith('_percentage'): | |
| # Ensure percentage values are 0-1 | |
| normalized[key] = value / 100 if value > 1 else value | |
| else: | |
| # Default normalization | |
| normalized[key] = float(value) | |
| else: | |
| normalized[key] = 0.0 # Default for non-numeric values | |
| return normalized | |
| def format_timestamp(timestamp: Union[str, datetime]) -> str: | |
| """Format timestamp consistently""" | |
| if isinstance(timestamp, str): | |
| try: | |
| timestamp = datetime.fromisoformat(timestamp.replace('Z', '+00:00')) | |
| except ValueError: | |
| return "" | |
| return timestamp.strftime("%Y-%m-%d %H:%M:%S UTC") | |
| def calculate_growth_rate(current: float, previous: float) -> float: | |
| """Calculate growth rate between two values""" | |
| if not previous: | |
| return 0.0 | |
| return round((current - previous) / previous * 100, 2) | |
| def extract_numeric_value(text: str) -> float: | |
| """Extract numeric value from text with K/M/B suffixes""" | |
| if not text: | |
| return 0.0 | |
| # Remove commas and convert to lowercase | |
| text = text.replace(',', '').lower().strip() | |
| # Look for number with optional suffix | |
| match = re.search(r'([\d.]+)([kmb])?', text) | |
| if not match: | |
| return 0.0 | |
| number = float(match.group(1)) | |
| suffix = match.group(2) | |
| # Apply multiplier based on suffix | |
| multipliers = {'k': 1000, 'm': 1000000, 'b': 1000000000} | |
| if suffix: | |
| number *= multipliers.get(suffix, 1) | |
| return number | |
| def categorize_trend(name: str, description: str = "") -> Dict[str, str]: | |
| """Categorize trend based on name and description""" | |
| # Define category keywords | |
| categories = { | |
| 'Technology': ['ai', 'tech', 'coding', 'programming', 'software', 'app'], | |
| 'Lifestyle': ['fashion', 'beauty', 'fitness', 'wellness', 'health'], | |
| 'Entertainment': ['music', 'dance', 'comedy', 'movie', 'game'], | |
| 'Food': ['recipe', 'cooking', 'food', 'drink', 'diet'], | |
| 'Education': ['learning', 'study', 'school', 'education', 'tutorial'] | |
| } | |
| text = f"{name} {description}".lower() | |
| # Find matching category | |
| for category, keywords in categories.items(): | |
| if any(keyword in text for keyword in keywords): | |
| return { | |
| 'category': category, | |
| 'confidence': 'high' if any(keyword in name.lower() for keyword in keywords) else 'medium' | |
| } | |
| return { | |
| 'category': 'Other', | |
| 'confidence': 'low' | |
| } | |
| def generate_trend_id(trend_data: Dict[str, Any]) -> str: | |
| """Generate unique trend identifier""" | |
| components = [ | |
| trend_data.get('name', '').lower().replace(' ', '-'), | |
| trend_data.get('platform', '').lower(), | |
| datetime.now().strftime('%Y%m') | |
| ] | |
| return '-'.join(filter(None, components)) |