trends_spotter_LATEST / utils /data_processing.py
cryogenic22's picture
Create data_processing.py
f495741 verified
"""Utility functions for data processing and formatting"""
from typing import Dict, Any, Union
from datetime import datetime
import re
def clean_text(text: str) -> str:
"""Clean and normalize text data"""
if not text:
return ""
# Remove extra whitespace
text = re.sub(r'\s+', ' ', text.strip())
# Remove special characters but keep basic punctuation
text = re.sub(r'[^\w\s.,!?-]', '', text)
return text
def normalize_metrics(metrics: Dict[str, Any]) -> Dict[str, float]:
"""Normalize various metric values to a 0-1 scale"""
normalized = {}
for key, value in metrics.items():
if isinstance(value, (int, float)):
if key in ['engagement', 'followers', 'views']:
# Log-scale normalization for large numbers
normalized[key] = min(1.0, value / 1000000) # Normalize to millions
elif key.endswith('_rate') or key.endswith('_percentage'):
# Ensure percentage values are 0-1
normalized[key] = value / 100 if value > 1 else value
else:
# Default normalization
normalized[key] = float(value)
else:
normalized[key] = 0.0 # Default for non-numeric values
return normalized
def format_timestamp(timestamp: Union[str, datetime]) -> str:
"""Format timestamp consistently"""
if isinstance(timestamp, str):
try:
timestamp = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
except ValueError:
return ""
return timestamp.strftime("%Y-%m-%d %H:%M:%S UTC")
def calculate_growth_rate(current: float, previous: float) -> float:
"""Calculate growth rate between two values"""
if not previous:
return 0.0
return round((current - previous) / previous * 100, 2)
def extract_numeric_value(text: str) -> float:
"""Extract numeric value from text with K/M/B suffixes"""
if not text:
return 0.0
# Remove commas and convert to lowercase
text = text.replace(',', '').lower().strip()
# Look for number with optional suffix
match = re.search(r'([\d.]+)([kmb])?', text)
if not match:
return 0.0
number = float(match.group(1))
suffix = match.group(2)
# Apply multiplier based on suffix
multipliers = {'k': 1000, 'm': 1000000, 'b': 1000000000}
if suffix:
number *= multipliers.get(suffix, 1)
return number
def categorize_trend(name: str, description: str = "") -> Dict[str, str]:
"""Categorize trend based on name and description"""
# Define category keywords
categories = {
'Technology': ['ai', 'tech', 'coding', 'programming', 'software', 'app'],
'Lifestyle': ['fashion', 'beauty', 'fitness', 'wellness', 'health'],
'Entertainment': ['music', 'dance', 'comedy', 'movie', 'game'],
'Food': ['recipe', 'cooking', 'food', 'drink', 'diet'],
'Education': ['learning', 'study', 'school', 'education', 'tutorial']
}
text = f"{name} {description}".lower()
# Find matching category
for category, keywords in categories.items():
if any(keyword in text for keyword in keywords):
return {
'category': category,
'confidence': 'high' if any(keyword in name.lower() for keyword in keywords) else 'medium'
}
return {
'category': 'Other',
'confidence': 'low'
}
def generate_trend_id(trend_data: Dict[str, Any]) -> str:
"""Generate unique trend identifier"""
components = [
trend_data.get('name', '').lower().replace(' ', '-'),
trend_data.get('platform', '').lower(),
datetime.now().strftime('%Y%m')
]
return '-'.join(filter(None, components))