trends_spotter_LATEST / agents /trend_processor.py
cryogenic22's picture
Update agents/trend_processor.py
2fe5563 verified
from typing import Dict, List, Optional, Any
import re
import json
from datetime import datetime
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TrendProcessor:
def __init__(self):
self.debug_logs = []
def add_debug_log(self, source: str, message: str, log_type: str = "info"):
log_entry = {
"timestamp": datetime.now().isoformat(),
"source": source,
"type": log_type,
"message": message
}
self.debug_logs.append(log_entry)
logger.info(f"{source}: {message}")
def extract_post_count(self, text: str) -> str:
"""
Enhanced post count extraction with better string handling
"""
if not text:
return "0"
try:
# Handle numeric input
if isinstance(text, (int, float)):
return f"{int(text):,}"
# Clean the input string
text = str(text).strip().lower()
# Extract numbers with K, M, B suffixes
patterns = [
r'(\d+(?:\.\d+)?)\s*(?:k|thousand)',
r'(\d+(?:\.\d+)?)\s*(?:m|million)',
r'(\d+(?:\.\d+)?)\s*(?:b|billion)',
r'(\d{1,3}(?:,\d{3})*(?:\.\d+)?)', # Numbers with commas
r'(\d+(?:\.\d+)?)' # Simple numbers
]
for pattern in patterns:
match = re.search(pattern, text)
if match:
num = float(match.group(1).replace(',', ''))
if 'k' in text or 'thousand' in text:
num *= 1000
elif 'm' in text or 'million' in text:
num *= 1000000
elif 'b' in text or 'billion' in text:
num *= 1000000000
return f"{int(num):,}"
return "0"
except Exception as e:
logger.error(f"Error extracting post count: {str(e)}")
return str(text)
def parse_later_blog_content(self, content: str, platform: str) -> List[Dict]:
"""
Parse Later.com blog content using regex patterns
"""
trends = []
try:
# Split content into trend blocks
blocks = re.split(r"(?:###|##)\s*\*?\*?(?:Trend:|[0-9]+\.)", content)
for block in blocks[1:]: # Skip the first block (usually intro text)
# Extract trend name and date
header_match = re.match(r"\s*(.*?)(?:\s*—\s*(.*?))?\*\*", block)
if not header_match:
continue
trend_name = header_match.group(1).strip()
if not trend_name or len(trend_name) < 3:
continue
# Extract trend description/recap
recap_match = re.search(r"\*\*(?:Trend\s+)?Recap:\*\*\s*(.*?)(?=\n\n|\*\*|$)", block, re.DOTALL)
description = recap_match.group(1).strip() if recap_match else ""
# Extract post count
posts_match = re.search(r"\*\*Current\s*#?\s*of\s*Posts:\*\*\s*([^\n]+)", block)
posts_count = posts_match.group(1).strip() if posts_match else "0"
trend = {
"name": trend_name,
"description": description,
"platform": platform,
"number_of_posts": self.extract_post_count(posts_count),
"url": "", # Will be filled later
"source": "Later.com"
}
trends.append(trend)
logger.info(f"Parsed trend: {trend_name}")
return trends
except Exception as e:
logger.error(f"Error parsing Later.com content: {str(e)}")
return []
def parse_perplexity_content(self, content: str, platform: str) -> List[Dict]:
"""
Parse Perplexity API response content
"""
trends = []
try:
# First try to parse as JSON
try:
json_data = json.loads(content)
if isinstance(json_data, list):
return [self._normalize_trend(item, platform) for item in json_data]
except json.JSONDecodeError:
pass
# If not JSON, try to parse markdown-style content
lines = content.split('\n')
current_trend = None
for line in lines:
line = line.strip()
if not line:
continue
# Check for trend indicators
trend_indicators = [
(r'(?:^|\n)[\d\.\-]+\.\s*["""](.+?)["""]', 1), # Numbered trends
(r'["""](.+?)["""] trend', 1), # Quoted trend names
(r'#(\w+)', 1), # Hashtags
(r'Trend:\s*(.+?)(?:\s*[-–]\s*|\s*$)', 1), # "Trend:" prefix
(r'^[•\-]\s*(.+?)(?:\s*[-–]\s*|\s*$)', 1) # Bullet points
]
for pattern, group in trend_indicators:
match = re.search(pattern, line)
if match:
if current_trend:
trends.append(current_trend)
trend_name = match.group(group).strip()
current_trend = {
"name": trend_name,
"description": "",
"platform": platform,
"number_of_posts": "0",
"url": "",
"source": "Perplexity"
}
break
if current_trend:
# Look for post counts
post_matches = re.findall(r'(\d+(?:\.\d+)?[KkMmBb]?)\s*(?:posts|videos|views|likes|shares)', line)
if post_matches:
current_trend["number_of_posts"] = self.extract_post_count(post_matches[0])
# Append to description if not a new trend indicator
if not any(re.search(pattern, line) for pattern, _ in trend_indicators):
if current_trend["description"]:
current_trend["description"] += " "
current_trend["description"] += line
if current_trend:
trends.append(current_trend)
return trends
except Exception as e:
logger.error(f"Error parsing Perplexity content: {str(e)}")
return []
def _normalize_trend(self, trend: Dict, platform: str) -> Dict:
"""Normalize trend data structure"""
normalized = {
"name": str(trend.get("name", "")).strip(),
"description": str(trend.get("description", "")).strip(),
"platform": platform,
"number_of_posts": self.extract_post_count(trend.get("number_of_posts", "0")),
"url": str(trend.get("url", "")),
"source": str(trend.get("source", "Unknown"))
}
# Clean and validate normalized trend
if not normalized["name"]:
logger.warning("Skipping trend with empty name")
return None
# Truncate long descriptions
max_desc_length = 500
if len(normalized["description"]) > max_desc_length:
normalized["description"] = normalized["description"][:max_desc_length].rsplit(' ', 1)[0] + '...'
return normalized
def merge_duplicate_trends(self, trends: List[Dict]) -> List[Dict]:
"""Merge duplicate trends while preserving the best information"""
merged = {}
for trend in trends:
name = trend["name"].lower()
if name in merged:
existing = merged[name]
# Keep the longer description
if len(trend["description"]) > len(existing["description"]):
existing["description"] = trend["description"]
# Keep the higher post count
if self.extract_post_count(trend["number_of_posts"]) > self.extract_post_count(existing["number_of_posts"]):
existing["number_of_posts"] = trend["number_of_posts"]
# Keep valid URL
if trend["url"] and not existing["url"]:
existing["url"] = trend["url"]
else:
merged[name] = trend
return list(merged.values())