Spaces:
Runtime error
Runtime error
| from typing import Dict, List, Optional, Any | |
| import re | |
| import json | |
| from datetime import datetime | |
| import logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class TrendProcessor: | |
| def __init__(self): | |
| self.debug_logs = [] | |
| def add_debug_log(self, source: str, message: str, log_type: str = "info"): | |
| log_entry = { | |
| "timestamp": datetime.now().isoformat(), | |
| "source": source, | |
| "type": log_type, | |
| "message": message | |
| } | |
| self.debug_logs.append(log_entry) | |
| logger.info(f"{source}: {message}") | |
| def extract_post_count(self, text: str) -> str: | |
| """ | |
| Enhanced post count extraction with better string handling | |
| """ | |
| if not text: | |
| return "0" | |
| try: | |
| # Handle numeric input | |
| if isinstance(text, (int, float)): | |
| return f"{int(text):,}" | |
| # Clean the input string | |
| text = str(text).strip().lower() | |
| # Extract numbers with K, M, B suffixes | |
| patterns = [ | |
| r'(\d+(?:\.\d+)?)\s*(?:k|thousand)', | |
| r'(\d+(?:\.\d+)?)\s*(?:m|million)', | |
| r'(\d+(?:\.\d+)?)\s*(?:b|billion)', | |
| r'(\d{1,3}(?:,\d{3})*(?:\.\d+)?)', # Numbers with commas | |
| r'(\d+(?:\.\d+)?)' # Simple numbers | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, text) | |
| if match: | |
| num = float(match.group(1).replace(',', '')) | |
| if 'k' in text or 'thousand' in text: | |
| num *= 1000 | |
| elif 'm' in text or 'million' in text: | |
| num *= 1000000 | |
| elif 'b' in text or 'billion' in text: | |
| num *= 1000000000 | |
| return f"{int(num):,}" | |
| return "0" | |
| except Exception as e: | |
| logger.error(f"Error extracting post count: {str(e)}") | |
| return str(text) | |
| def parse_later_blog_content(self, content: str, platform: str) -> List[Dict]: | |
| """ | |
| Parse Later.com blog content using regex patterns | |
| """ | |
| trends = [] | |
| try: | |
| # Split content into trend blocks | |
| blocks = re.split(r"(?:###|##)\s*\*?\*?(?:Trend:|[0-9]+\.)", content) | |
| for block in blocks[1:]: # Skip the first block (usually intro text) | |
| # Extract trend name and date | |
| header_match = re.match(r"\s*(.*?)(?:\s*—\s*(.*?))?\*\*", block) | |
| if not header_match: | |
| continue | |
| trend_name = header_match.group(1).strip() | |
| if not trend_name or len(trend_name) < 3: | |
| continue | |
| # Extract trend description/recap | |
| recap_match = re.search(r"\*\*(?:Trend\s+)?Recap:\*\*\s*(.*?)(?=\n\n|\*\*|$)", block, re.DOTALL) | |
| description = recap_match.group(1).strip() if recap_match else "" | |
| # Extract post count | |
| posts_match = re.search(r"\*\*Current\s*#?\s*of\s*Posts:\*\*\s*([^\n]+)", block) | |
| posts_count = posts_match.group(1).strip() if posts_match else "0" | |
| trend = { | |
| "name": trend_name, | |
| "description": description, | |
| "platform": platform, | |
| "number_of_posts": self.extract_post_count(posts_count), | |
| "url": "", # Will be filled later | |
| "source": "Later.com" | |
| } | |
| trends.append(trend) | |
| logger.info(f"Parsed trend: {trend_name}") | |
| return trends | |
| except Exception as e: | |
| logger.error(f"Error parsing Later.com content: {str(e)}") | |
| return [] | |
| def parse_perplexity_content(self, content: str, platform: str) -> List[Dict]: | |
| """ | |
| Parse Perplexity API response content | |
| """ | |
| trends = [] | |
| try: | |
| # First try to parse as JSON | |
| try: | |
| json_data = json.loads(content) | |
| if isinstance(json_data, list): | |
| return [self._normalize_trend(item, platform) for item in json_data] | |
| except json.JSONDecodeError: | |
| pass | |
| # If not JSON, try to parse markdown-style content | |
| lines = content.split('\n') | |
| current_trend = None | |
| for line in lines: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| # Check for trend indicators | |
| trend_indicators = [ | |
| (r'(?:^|\n)[\d\.\-]+\.\s*["""](.+?)["""]', 1), # Numbered trends | |
| (r'["""](.+?)["""] trend', 1), # Quoted trend names | |
| (r'#(\w+)', 1), # Hashtags | |
| (r'Trend:\s*(.+?)(?:\s*[-–]\s*|\s*$)', 1), # "Trend:" prefix | |
| (r'^[•\-]\s*(.+?)(?:\s*[-–]\s*|\s*$)', 1) # Bullet points | |
| ] | |
| for pattern, group in trend_indicators: | |
| match = re.search(pattern, line) | |
| if match: | |
| if current_trend: | |
| trends.append(current_trend) | |
| trend_name = match.group(group).strip() | |
| current_trend = { | |
| "name": trend_name, | |
| "description": "", | |
| "platform": platform, | |
| "number_of_posts": "0", | |
| "url": "", | |
| "source": "Perplexity" | |
| } | |
| break | |
| if current_trend: | |
| # Look for post counts | |
| post_matches = re.findall(r'(\d+(?:\.\d+)?[KkMmBb]?)\s*(?:posts|videos|views|likes|shares)', line) | |
| if post_matches: | |
| current_trend["number_of_posts"] = self.extract_post_count(post_matches[0]) | |
| # Append to description if not a new trend indicator | |
| if not any(re.search(pattern, line) for pattern, _ in trend_indicators): | |
| if current_trend["description"]: | |
| current_trend["description"] += " " | |
| current_trend["description"] += line | |
| if current_trend: | |
| trends.append(current_trend) | |
| return trends | |
| except Exception as e: | |
| logger.error(f"Error parsing Perplexity content: {str(e)}") | |
| return [] | |
| def _normalize_trend(self, trend: Dict, platform: str) -> Dict: | |
| """Normalize trend data structure""" | |
| normalized = { | |
| "name": str(trend.get("name", "")).strip(), | |
| "description": str(trend.get("description", "")).strip(), | |
| "platform": platform, | |
| "number_of_posts": self.extract_post_count(trend.get("number_of_posts", "0")), | |
| "url": str(trend.get("url", "")), | |
| "source": str(trend.get("source", "Unknown")) | |
| } | |
| # Clean and validate normalized trend | |
| if not normalized["name"]: | |
| logger.warning("Skipping trend with empty name") | |
| return None | |
| # Truncate long descriptions | |
| max_desc_length = 500 | |
| if len(normalized["description"]) > max_desc_length: | |
| normalized["description"] = normalized["description"][:max_desc_length].rsplit(' ', 1)[0] + '...' | |
| return normalized | |
| def merge_duplicate_trends(self, trends: List[Dict]) -> List[Dict]: | |
| """Merge duplicate trends while preserving the best information""" | |
| merged = {} | |
| for trend in trends: | |
| name = trend["name"].lower() | |
| if name in merged: | |
| existing = merged[name] | |
| # Keep the longer description | |
| if len(trend["description"]) > len(existing["description"]): | |
| existing["description"] = trend["description"] | |
| # Keep the higher post count | |
| if self.extract_post_count(trend["number_of_posts"]) > self.extract_post_count(existing["number_of_posts"]): | |
| existing["number_of_posts"] = trend["number_of_posts"] | |
| # Keep valid URL | |
| if trend["url"] and not existing["url"]: | |
| existing["url"] = trend["url"] | |
| else: | |
| merged[name] = trend | |
| return list(merged.values()) |