cryogenic22 commited on
Commit
ebcd35b
·
verified ·
1 Parent(s): f06e91c

Update agents/collection_agent.py

Browse files
Files changed (1) hide show
  1. agents/collection_agent.py +279 -84
agents/collection_agent.py CHANGED
@@ -1,3 +1,4 @@
 
1
  import re
2
  import json
3
  import logging
@@ -7,19 +8,39 @@ import urllib.parse
7
  from bs4 import BeautifulSoup
8
  import streamlit as st
9
  from datetime import datetime, timedelta
10
- from typing import Any, Optional, List, Dict, Callable
11
 
12
  logging.basicConfig(level=logging.INFO)
13
  logger = logging.getLogger(__name__)
14
 
15
  def robust_json_parse(text: str) -> Optional[Any]:
16
  text = text.strip()
 
 
 
 
 
17
  try:
18
  return json.loads(text)
19
  except json.JSONDecodeError:
20
- return None
 
 
 
 
 
 
 
 
 
 
 
 
 
21
 
22
  def convert_post_count_str_to_number(count_str: str) -> float:
 
 
 
23
  try:
24
  cleaned = count_str.replace(',', '')
25
  if cleaned and cleaned[-1].lower() in ['k', 'm', 'b']:
@@ -33,6 +54,12 @@ def convert_post_count_str_to_number(count_str: str) -> float:
33
  return 0.0
34
 
35
  def parse_trends_from_raw_text(text: str, platform: str) -> List[Dict]:
 
 
 
 
 
 
36
  trends = []
37
  blocks = re.split(r"### \*\*Trend:", text)
38
  for block in blocks[1:]:
@@ -90,40 +117,101 @@ class TrendCollectionAgent:
90
 
91
  async def collect_trends(self,
92
  platforms: Optional[List[str]] = None,
 
93
  progress_callback: Optional[Callable[[int, int, str], None]] = None
94
  ) -> List[Dict]:
 
 
 
95
  if platforms is None:
96
  platforms = self.default_platforms
97
- steps_per_platform = 2 # 1 for fetching, 1 for processing
98
- total_steps = len(platforms) * steps_per_platform
 
99
  current_step = 0
100
- combined_trends = []
101
  for platform in platforms:
102
- current_step += 1
103
- if progress_callback:
104
- progress_callback(current_step, total_steps, f"Fetching {platform} data...")
105
  try:
106
- query = f"What are the current top trending topics on {platform}? List them with descriptions and the current number of posts."
 
 
 
 
 
 
107
  perplexity_response = self.perplexity_client.search(query)
108
- perplexity_raw = str(perplexity_response)
109
- self.add_debug_log("perplexity", f"Raw perplexity response for {platform}: {perplexity_raw[:300]}...")
110
- parsed_trends = parse_trends_from_raw_text(perplexity_raw, platform)
111
- combined_trends.extend(parsed_trends)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
112
  except Exception as e:
113
- self.add_debug_log("collection", f"Error collecting {platform} trends: {str(e)}", "error")
114
  continue
115
- current_step += 1
116
- if progress_callback:
117
- progress_callback(current_step, total_steps, f"Processing {platform} data...")
118
- for trend in combined_trends:
119
- if trend.get("platform", "").lower() == platform.lower() and not trend.get("url"):
120
- trend["url"] = await self.fetch_trend_url(trend)
121
- return combined_trends
 
122
 
123
  async def fetch_trend_url(self, trend: Dict) -> str:
124
  """
125
- Enhanced function to fetch relevant URLs for social media trends.
126
- Uses multiple search queries and search engines with delays.
127
  """
128
  def is_valid_url(url: str, platform: str) -> bool:
129
  if not url:
@@ -203,6 +291,7 @@ class TrendCollectionAgent:
203
  clean_result = clean_url(url)
204
  self.add_debug_log("url_search", f"Found valid URL for '{trend_name}': {clean_result}")
205
  return clean_result
 
206
  all_urls = re.findall(r'href="(https?://[^"]+)"', html)
207
  for url in all_urls:
208
  if is_valid_url(url, platform):
@@ -215,84 +304,190 @@ class TrendCollectionAgent:
215
  self.add_debug_log("url_search", f"Critical error in fetch_trend_url: {str(e)}", "error")
216
  return trend.get("url", "")
217
 
218
- async def get_trend_sentiment(self, trend: Dict) -> str:
219
- """Analyze the sentiment of a trend."""
220
- prompt = f"""You are an expert in social media sentiment analysis.
221
- Analyze the following trend and classify its overall sentiment as Positive, Negative, or Neutral.
222
- Trend details:
223
- Name: {trend.get('name')}
224
- Description: {trend.get('description')}
225
- Posts: {trend.get('number_of_posts', '0')}
226
- Platform: {trend.get('platform')}
227
- Return your answer as one word (Positive, Negative, or Neutral) followed by a brief explanation.
228
- """
 
 
 
 
 
 
 
 
229
  try:
230
- response = await asyncio.to_thread(
231
- self.client_anthropic.messages.create,
232
- model="claude-3-5-sonnet-20241022",
233
- max_tokens=500,
234
- temperature=0.3,
235
- messages=[{"role": "user", "content": prompt}]
 
236
  )
237
- content = response.content[0].text if isinstance(response.content, list) else response.content
238
- return content.strip()
 
 
 
 
 
 
239
  except Exception as e:
240
- self.add_debug_log("sentiment", f"Error analyzing sentiment: {str(e)}", "error")
241
- return "Unknown"
 
 
 
 
 
 
 
 
 
 
 
 
242
 
243
- async def _scrape_reader_trends(self, platform: str) -> List[Dict]:
244
- collected_text = ""
245
- urls = st.session_state.source_urls.get(platform)
246
- if not urls:
247
- urls = [self.default_reader_sources.get(platform)]
248
- for url in urls:
249
- if not url.startswith("https://r.jina.ai/"):
250
- reader_url = "https://r.jina.ai/" + url
251
- else:
252
- reader_url = url
253
  try:
254
- async with aiohttp.ClientSession() as session:
255
- async with session.get(reader_url, timeout=10) as response:
256
- content = await response.text() if response.status == 200 else ""
257
- self.add_debug_log("reader", f"Fetched raw content from {reader_url} for {platform} (first 500 chars): {content[:500]}")
258
- collected_text += "\n" + content
259
- except Exception as e:
260
- self.add_debug_log("reader", f"Error fetching {reader_url}: {str(e)}", "error")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
261
  continue
262
- return parse_trends_from_raw_text(collected_text, platform)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
263
 
264
- async def get_bank_recommendations(self, trends: List[Dict]) -> str:
265
- prompt = f"""You are an expert in digital marketing for financial institutions.
266
- Given the following trend data, identify which trends would be most suitable for a bank to leverage in its marketing strategy.
267
- Provide the names of the trends along with a brief reason for each selection.
268
- Trends:
269
- {json.dumps(trends, indent=2)}
270
- Return your response as plain text.
271
- """
272
- try:
273
- response = await asyncio.to_thread(
274
- self.client_anthropic.messages.create,
275
- model="claude-3-5-sonnet-20241022",
276
- max_tokens=1000,
277
- temperature=0.3,
278
- messages=[{"role": "user", "content": prompt}]
279
- )
280
- content = response.content[0].text if isinstance(response.content, list) else response.content
281
- return content.strip()
282
- except Exception as e:
283
- self.add_debug_log("bank", f"Error retrieving bank recommendations: {str(e)}", "error")
284
- return "No recommendations available"
 
 
 
 
 
 
 
 
 
 
 
285
 
286
  if __name__ == "__main__":
287
  import os
288
  from utils.api_clients import initialize_api_clients
289
  client_anthropic, client_openai, perplexity_client = initialize_api_clients()
290
  agent = TrendCollectionAgent(client_anthropic, perplexity_client)
 
291
  async def test_collect():
292
  trends = await agent.collect_trends(
293
  platforms=["TikTok", "Instagram"],
294
- progress_callback=lambda current, total, msg: print(f"[{current}/{total}] {msg}")
295
  )
296
  print("Curated Trends:")
297
  print(json.dumps(trends, indent=2))
 
298
  asyncio.run(test_collect())
 
1
+ from typing import Dict, List, Optional, Callable, Any
2
  import re
3
  import json
4
  import logging
 
8
  from bs4 import BeautifulSoup
9
  import streamlit as st
10
  from datetime import datetime, timedelta
 
11
 
12
  logging.basicConfig(level=logging.INFO)
13
  logger = logging.getLogger(__name__)
14
 
15
  def robust_json_parse(text: str) -> Optional[Any]:
16
  text = text.strip()
17
+ # If the text is wrapped in a TextBlock(...) call, extract the inner text.
18
+ if "TextBlock(" in text:
19
+ m = re.search(r"TextBlock\(citations=None,\s*text='(.*?)',\s*type='text'\)", text, re.DOTALL)
20
+ if m:
21
+ text = m.group(1)
22
  try:
23
  return json.loads(text)
24
  except json.JSONDecodeError:
25
+ # Try to extract a JSON array or object from the text
26
+ array_match = re.search(r'(\[.*\])', text, re.DOTALL)
27
+ if array_match:
28
+ try:
29
+ return json.loads(array_match.group(1))
30
+ except json.JSONDecodeError:
31
+ pass
32
+ object_match = re.search(r'(\{.*\})', text, re.DOTALL)
33
+ if object_match:
34
+ try:
35
+ return json.loads(object_match.group(1))
36
+ except json.JSONDecodeError:
37
+ pass
38
+ return None
39
 
40
  def convert_post_count_str_to_number(count_str: str) -> float:
41
+ """
42
+ Converts a post count string (e.g., '1.2K', '2,000,000', '200') to a numerical value.
43
+ """
44
  try:
45
  cleaned = count_str.replace(',', '')
46
  if cleaned and cleaned[-1].lower() in ['k', 'm', 'b']:
 
54
  return 0.0
55
 
56
  def parse_trends_from_raw_text(text: str, platform: str) -> List[Dict]:
57
+ """
58
+ Parses raw markdown text containing trends.
59
+ Assumes each trend block begins with "### **Trend:".
60
+ Extracts trend name, discovery date, trend recap (as description),
61
+ and the current posts value.
62
+ """
63
  trends = []
64
  blocks = re.split(r"### \*\*Trend:", text)
65
  for block in blocks[1:]:
 
117
 
118
  async def collect_trends(self,
119
  platforms: Optional[List[str]] = None,
120
+ time_range: int = 60,
121
  progress_callback: Optional[Callable[[int, int, str], None]] = None
122
  ) -> List[Dict]:
123
+ """
124
+ Collect trends from Perplexity and then process them.
125
+ """
126
  if platforms is None:
127
  platforms = self.default_platforms
128
+
129
+ raw_trends = []
130
+ total_steps = len(platforms) * 2 # 2 steps per platform
131
  current_step = 0
132
+
133
  for platform in platforms:
 
 
 
134
  try:
135
+ if progress_callback:
136
+ progress_callback(current_step, total_steps, f"Analyzing {platform} trends...")
137
+
138
+ query = (
139
+ f"What are the current top trending topics on {platform} in the last {time_range} days? "
140
+ "List them with engagement metrics and descriptions."
141
+ )
142
  perplexity_response = self.perplexity_client.search(query)
143
+
144
+ current_step += 1
145
+ if progress_callback:
146
+ progress_callback(current_step, total_steps, f"Processing {platform} data...")
147
+
148
+ # Use the older curation prompt logic to extract trends.
149
+ curation_prompt = f"""Analyze this raw trend data and extract the key trends with their metrics.
150
+
151
+ The output should be a JSON array. Each trend object must have the following keys:
152
+ - name (string)
153
+ - description (string)
154
+ - engagement (integer; use 0 if not available)
155
+ - url (string)
156
+ - platform (string)
157
+ - audio_url (string; if available, else an empty string)
158
+ - number_of_posts (integer; if available, else 0)
159
+
160
+ For Instagram trends: Only include genuine trending topics and exclude aggregated Instagram reels or generic collections.
161
+
162
+ Raw data:
163
+ {perplexity_response}
164
+
165
+ Output only a valid JSON array.
166
+ """
167
+ try:
168
+ response = await asyncio.to_thread(
169
+ self.client_anthropic.messages.create,
170
+ model="claude-3-5-sonnet-20241022",
171
+ max_tokens=2000,
172
+ temperature=0,
173
+ messages=[{"role": "user", "content": curation_prompt}]
174
+ )
175
+ content = response.content[0].text if isinstance(response.content, list) else response.content
176
+ content = content.strip()
177
+ self.add_debug_log("curation", f"Raw curation response for {platform}: {content[:300]}...")
178
+ platform_trends = None
179
+ try:
180
+ platform_trends = json.loads(content)
181
+ except json.JSONDecodeError:
182
+ json_match = re.search(r'\[.*\]', content, re.DOTALL)
183
+ if json_match:
184
+ try:
185
+ platform_trends = json.loads(json_match.group())
186
+ except json.JSONDecodeError:
187
+ self.add_debug_log("curation", f"Failed to parse JSON for {platform}", "error")
188
+ if platform_trends and isinstance(platform_trends, list):
189
+ raw_trends.extend(platform_trends)
190
+ else:
191
+ self.add_debug_log("curation", f"Invalid trends format for {platform}", "error")
192
+ except Exception as e:
193
+ self.add_debug_log("curation", f"Error processing {platform} trends: {str(e)}", "error")
194
+ continue
195
+
196
+ current_step += 1
197
+ if progress_callback:
198
+ progress_callback(current_step, total_steps, f"Completed {platform} analysis")
199
+
200
  except Exception as e:
201
+ self.add_debug_log("collection", f"Error processing platform {platform}: {str(e)}", "error")
202
  continue
203
+
204
+ # For each collected trend, update the URL if missing.
205
+ for trend in raw_trends:
206
+ if not trend.get("url"):
207
+ trend["url"] = await self.fetch_trend_url(trend)
208
+
209
+ # (Optional) You can filter or sort trends here.
210
+ return raw_trends
211
 
212
  async def fetch_trend_url(self, trend: Dict) -> str:
213
  """
214
+ Fetch a URL for the given trend using multiple search queries and engines.
 
215
  """
216
  def is_valid_url(url: str, platform: str) -> bool:
217
  if not url:
 
291
  clean_result = clean_url(url)
292
  self.add_debug_log("url_search", f"Found valid URL for '{trend_name}': {clean_result}")
293
  return clean_result
294
+ # Fallback: use regex to extract all URLs
295
  all_urls = re.findall(r'href="(https?://[^"]+)"', html)
296
  for url in all_urls:
297
  if is_valid_url(url, platform):
 
304
  self.add_debug_log("url_search", f"Critical error in fetch_trend_url: {str(e)}", "error")
305
  return trend.get("url", "")
306
 
307
+ def _curate_trends(self, trends: List[Dict]) -> List[Dict]:
308
+ prepared_trends = self._prepare_for_json(trends)
309
+ prompt = f"""You are an expert in digital trends. Consolidate the following raw trend data into a curated JSON array.
310
+ Each trend object must have the following keys:
311
+ - name (string): a clearly defined trend name
312
+ - description (string): a concise description of the trend
313
+ - engagement (integer): the engagement value (or 0 if unavailable)
314
+ - url (string): the source URL
315
+ - platform (string): e.g., TikTok or Instagram
316
+ - audio_url (string): if available, else an empty string
317
+ - number_of_posts (integer): if available, else 0
318
+
319
+ Remove duplicates and only include trends that are relevant and published in the last 60 days.
320
+
321
+ Raw data:
322
+ {json.dumps(prepared_trends, indent=2)}
323
+
324
+ Output only a valid JSON array and nothing else."""
325
+ self.add_debug_log("curation", "Sending curation prompt to Claude LLM.")
326
  try:
327
+ response = asyncio.run(
328
+ asyncio.to_thread(
329
+ self.client_anthropic.messages.create,
330
+ model="claude-3-5-sonnet-20241022",
331
+ max_tokens=3000,
332
+ messages=[{"role": "user", "content": prompt}]
333
+ )
334
  )
335
+ content = str(response.content) if not isinstance(response.content, dict) else json.dumps(response.content)
336
+ self.add_debug_log("curation_raw", f"Raw curation response:\n{content}")
337
+ parsed = robust_json_parse(content)
338
+ if parsed is None:
339
+ self.add_debug_log("curation", "Robust JSON parsing failed for curation response.", "error")
340
+ parsed = self._parse_trends_from_text(content, "unknown")
341
+ curated_trends = parsed if isinstance(parsed, list) else []
342
+ return curated_trends
343
  except Exception as e:
344
+ self.add_debug_log("curation", f"Error during trend curation: {e}", "error")
345
+ return trends
346
+
347
+ def _prepare_for_json(self, trends: List[Dict]) -> List[Dict]:
348
+ prepared = []
349
+ for trend in trends:
350
+ new_trend = {}
351
+ for key, value in trend.items():
352
+ if isinstance(value, datetime):
353
+ new_trend[key] = value.isoformat()
354
+ else:
355
+ new_trend[key] = value
356
+ prepared.append(new_trend)
357
+ return prepared
358
 
359
+ def _extract_posts_count(self, text: str) -> int:
360
+ match = re.search(r'(\d+(?:\.\d+)?)\s*(?:posts|post)', text, re.IGNORECASE)
361
+ if match:
 
 
 
 
 
 
 
362
  try:
363
+ return int(float(match.group(1)))
364
+ except Exception:
365
+ return 0
366
+ return 0
367
+
368
+ def _parse_trends_from_text(self, text: str, platform: str) -> List[Dict]:
369
+ trends = []
370
+ current_trend = None
371
+ if isinstance(text, (list, dict)):
372
+ text = str(text)
373
+ published_time = None
374
+ pub_match = re.search(r'Published Time:\s*([\d\-T:+]+)', text)
375
+ if pub_match:
376
+ try:
377
+ published_time = datetime.fromisoformat(pub_match.group(1))
378
+ except Exception:
379
+ published_time = None
380
+ lines = text.split('\n')
381
+ trend_indicators = [
382
+ r'trend:?\s*"([^"]+)"',
383
+ r'trend:?\s*(.+?(?=\s*[\n\-•]|$))',
384
+ r'"([\w\s&]+)"\s*(?:trend|challenge)',
385
+ r'#(\w+)',
386
+ ]
387
+ def is_valid_trend_name(name: str) -> bool:
388
+ invalid_patterns = [
389
+ r'metrics$', r'statistics$', r'engagement$', r'trends$', r'analytics$',
390
+ r'strategy$', r'content$', r'marketing$', r'features?$', r'format$',
391
+ r'^type', r'^category', r'^format', r'^strategy', r'^content'
392
+ ]
393
+ name = name.lower().strip()
394
+ if any(re.search(pattern, name, re.IGNORECASE) for pattern in invalid_patterns):
395
+ return False
396
+ if len(name) < 3 or len(name.split()) > 8:
397
+ return False
398
+ if not re.search(r'[a-zA-Z]', name):
399
+ return False
400
+ return True
401
+ def extract_trend_name(line: str) -> Optional[str]:
402
+ for pattern in trend_indicators:
403
+ match = re.search(pattern, line, re.IGNORECASE)
404
+ if match:
405
+ name = match.group(1).strip()
406
+ if is_valid_trend_name(name):
407
+ return name
408
+ return None
409
+ for line in lines:
410
+ line = line.strip()
411
+ if not line:
412
  continue
413
+ trend_name = extract_trend_name(line)
414
+ if trend_name:
415
+ if current_trend:
416
+ trends.append(current_trend)
417
+ description = re.sub(f'{trend_name}|trend:|challenge:', '', line, flags=re.IGNORECASE).strip()
418
+ current_trend = {
419
+ 'name': trend_name,
420
+ 'description': description,
421
+ 'platform': platform,
422
+ 'discovery_date': published_time if published_time is not None else datetime.now(),
423
+ 'engagement': self._extract_numeric_value(line) or 0,
424
+ 'demographics': [],
425
+ 'regions': [],
426
+ 'content_type': self._determine_content_type({'platform': platform, 'description': description}),
427
+ 'audio_url': "",
428
+ 'number_of_posts': 0
429
+ }
430
+ elif current_trend:
431
+ lower_line = line.lower()
432
+ if 'engagement' in lower_line or any(word in lower_line for word in ['views', 'likes']):
433
+ current_trend['engagement'] = self._extract_numeric_value(line)
434
+ elif any(word in lower_line for word in ['demographic', 'audience']):
435
+ current_trend['demographics'].append(line)
436
+ elif any(word in lower_line for word in ['region', 'country']):
437
+ current_trend['regions'].append(line)
438
+ elif 'post' in lower_line:
439
+ posts_count = self._extract_posts_count(line)
440
+ current_trend['number_of_posts'] = max(current_trend.get('number_of_posts', 0), posts_count)
441
+ if current_trend:
442
+ trends.append(current_trend)
443
+ validated_trends = [trend for trend in trends if is_valid_trend_name(trend['name'])]
444
+ return validated_trends
445
 
446
+ def _extract_numeric_value(self, text: str) -> int:
447
+ matches = re.findall(r'(\d+(?:\.\d+)?)\s*([kmb])?(?:\s+(?:views|likes|shares|engagement))?', text.lower())
448
+ highest_value = 0
449
+ for value_str, suffix in matches:
450
+ try:
451
+ value = float(value_str)
452
+ if suffix:
453
+ multiplier = {'k': 1000, 'm': 1000000, 'b': 1000000000}.get(suffix.lower(), 1)
454
+ value *= multiplier
455
+ highest_value = max(highest_value, int(value))
456
+ except (ValueError, TypeError):
457
+ continue
458
+ return highest_value
459
+
460
+ def _determine_content_type(self, trend: Dict) -> str:
461
+ platform = trend.get('platform', '')
462
+ description = trend.get('description', '').lower()
463
+ if platform == 'TikTok':
464
+ if any(word in description for word in ['challenge', 'dance']):
465
+ return 'challenge'
466
+ if any(word in description for word in ['sound', 'audio', 'song']):
467
+ return 'sound'
468
+ return 'video'
469
+ elif platform == 'Instagram':
470
+ if any(word in description for word in ['reel', 'video']):
471
+ return 'video'
472
+ if any(word in description for word in ['photo', 'image', 'picture']):
473
+ return 'image'
474
+ if any(word in description for word in ['challenge', 'dance']):
475
+ return 'challenge'
476
+ return 'mixed'
477
+ return 'mixed'
478
 
479
  if __name__ == "__main__":
480
  import os
481
  from utils.api_clients import initialize_api_clients
482
  client_anthropic, client_openai, perplexity_client = initialize_api_clients()
483
  agent = TrendCollectionAgent(client_anthropic, perplexity_client)
484
+
485
  async def test_collect():
486
  trends = await agent.collect_trends(
487
  platforms=["TikTok", "Instagram"],
488
+ progress_callback=lambda step, total, msg: print(f"[{step}/{total}] {msg}")
489
  )
490
  print("Curated Trends:")
491
  print(json.dumps(trends, indent=2))
492
+
493
  asyncio.run(test_collect())