from typing import Dict, List, Optional, Any import re import json from datetime import datetime import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class TrendProcessor: def __init__(self): self.debug_logs = [] def add_debug_log(self, source: str, message: str, log_type: str = "info"): log_entry = { "timestamp": datetime.now().isoformat(), "source": source, "type": log_type, "message": message } self.debug_logs.append(log_entry) logger.info(f"{source}: {message}") def extract_post_count(self, text: str) -> str: """ Enhanced post count extraction with better string handling """ if not text: return "0" try: # Handle numeric input if isinstance(text, (int, float)): return f"{int(text):,}" # Clean the input string text = str(text).strip().lower() # Extract numbers with K, M, B suffixes patterns = [ r'(\d+(?:\.\d+)?)\s*(?:k|thousand)', r'(\d+(?:\.\d+)?)\s*(?:m|million)', r'(\d+(?:\.\d+)?)\s*(?:b|billion)', r'(\d{1,3}(?:,\d{3})*(?:\.\d+)?)', # Numbers with commas r'(\d+(?:\.\d+)?)' # Simple numbers ] for pattern in patterns: match = re.search(pattern, text) if match: num = float(match.group(1).replace(',', '')) if 'k' in text or 'thousand' in text: num *= 1000 elif 'm' in text or 'million' in text: num *= 1000000 elif 'b' in text or 'billion' in text: num *= 1000000000 return f"{int(num):,}" return "0" except Exception as e: logger.error(f"Error extracting post count: {str(e)}") return str(text) def parse_later_blog_content(self, content: str, platform: str) -> List[Dict]: """ Parse Later.com blog content using regex patterns """ trends = [] try: # Split content into trend blocks blocks = re.split(r"(?:###|##)\s*\*?\*?(?:Trend:|[0-9]+\.)", content) for block in blocks[1:]: # Skip the first block (usually intro text) # Extract trend name and date header_match = re.match(r"\s*(.*?)(?:\s*—\s*(.*?))?\*\*", block) if not header_match: continue trend_name = header_match.group(1).strip() if not trend_name or len(trend_name) < 3: continue # Extract trend description/recap recap_match = re.search(r"\*\*(?:Trend\s+)?Recap:\*\*\s*(.*?)(?=\n\n|\*\*|$)", block, re.DOTALL) description = recap_match.group(1).strip() if recap_match else "" # Extract post count posts_match = re.search(r"\*\*Current\s*#?\s*of\s*Posts:\*\*\s*([^\n]+)", block) posts_count = posts_match.group(1).strip() if posts_match else "0" trend = { "name": trend_name, "description": description, "platform": platform, "number_of_posts": self.extract_post_count(posts_count), "url": "", # Will be filled later "source": "Later.com" } trends.append(trend) logger.info(f"Parsed trend: {trend_name}") return trends except Exception as e: logger.error(f"Error parsing Later.com content: {str(e)}") return [] def parse_perplexity_content(self, content: str, platform: str) -> List[Dict]: """ Parse Perplexity API response content """ trends = [] try: # First try to parse as JSON try: json_data = json.loads(content) if isinstance(json_data, list): return [self._normalize_trend(item, platform) for item in json_data] except json.JSONDecodeError: pass # If not JSON, try to parse markdown-style content lines = content.split('\n') current_trend = None for line in lines: line = line.strip() if not line: continue # Check for trend indicators trend_indicators = [ (r'(?:^|\n)[\d\.\-]+\.\s*["""](.+?)["""]', 1), # Numbered trends (r'["""](.+?)["""] trend', 1), # Quoted trend names (r'#(\w+)', 1), # Hashtags (r'Trend:\s*(.+?)(?:\s*[-–]\s*|\s*$)', 1), # "Trend:" prefix (r'^[•\-]\s*(.+?)(?:\s*[-–]\s*|\s*$)', 1) # Bullet points ] for pattern, group in trend_indicators: match = re.search(pattern, line) if match: if current_trend: trends.append(current_trend) trend_name = match.group(group).strip() current_trend = { "name": trend_name, "description": "", "platform": platform, "number_of_posts": "0", "url": "", "source": "Perplexity" } break if current_trend: # Look for post counts post_matches = re.findall(r'(\d+(?:\.\d+)?[KkMmBb]?)\s*(?:posts|videos|views|likes|shares)', line) if post_matches: current_trend["number_of_posts"] = self.extract_post_count(post_matches[0]) # Append to description if not a new trend indicator if not any(re.search(pattern, line) for pattern, _ in trend_indicators): if current_trend["description"]: current_trend["description"] += " " current_trend["description"] += line if current_trend: trends.append(current_trend) return trends except Exception as e: logger.error(f"Error parsing Perplexity content: {str(e)}") return [] def _normalize_trend(self, trend: Dict, platform: str) -> Dict: """Normalize trend data structure""" normalized = { "name": str(trend.get("name", "")).strip(), "description": str(trend.get("description", "")).strip(), "platform": platform, "number_of_posts": self.extract_post_count(trend.get("number_of_posts", "0")), "url": str(trend.get("url", "")), "source": str(trend.get("source", "Unknown")) } # Clean and validate normalized trend if not normalized["name"]: logger.warning("Skipping trend with empty name") return None # Truncate long descriptions max_desc_length = 500 if len(normalized["description"]) > max_desc_length: normalized["description"] = normalized["description"][:max_desc_length].rsplit(' ', 1)[0] + '...' return normalized def merge_duplicate_trends(self, trends: List[Dict]) -> List[Dict]: """Merge duplicate trends while preserving the best information""" merged = {} for trend in trends: name = trend["name"].lower() if name in merged: existing = merged[name] # Keep the longer description if len(trend["description"]) > len(existing["description"]): existing["description"] = trend["description"] # Keep the higher post count if self.extract_post_count(trend["number_of_posts"]) > self.extract_post_count(existing["number_of_posts"]): existing["number_of_posts"] = trend["number_of_posts"] # Keep valid URL if trend["url"] and not existing["url"]: existing["url"] = trend["url"] else: merged[name] = trend return list(merged.values())