testbed / ai_api /library /apify_scraper.py
xspinners's picture
initial
090987a
# apify_scraper.py
# Updated version: Uses separate Apify tokens for Facebook and TikTok tasks
import requests
import time
import pandas as pd
import os
import json
import hashlib
from datetime import datetime, timedelta
# Create cache directory
CACHE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "cache")
os.makedirs(CACHE_DIR, exist_ok=True)
# Import configuration settings
try:
from .config import (
# API tokens
APIFY_TOKEN, APIFY_TOKEN_FB, APIFY_TOKEN_TIKTOK,
# Task IDs
POST_TASK_ID_SEARCH, COMMENT_TASK_ID, TIKTOK_VIDEO_TASK_ID, TIKTOK_COMMENT_TASK_ID,
# Data source settings
USE_FACEBOOK, USE_TIKTOK, USE_SERPAPI, USE_SERPER, USE_DUCKDUCKGO, USE_LOWYAT,
# Comment settings
USE_COMMENTS,
# Result limits
FACEBOOK_MAX_RESULTS, TIKTOK_MAX_RESULTS, WEB_SEARCH_MAX_RESULTS, LOWYAT_MAX_THREADS,
# Lowyat Forum settings
LOWYAT_SECTIONS
)
# Use settings from config
print("[✓] Using configuration from config.py")
except ImportError:
# Fallback to hardcoded settings
print("[⚠️] Config not found, using hardcoded settings")
# API tokens
APIFY_TOKEN = "apify_api_INtF6uUT4c6nOStYDYTllxuTBNSbng1IlTTB"
#APIFY_TOKEN_FB = APIFY_TOKEN
#APIFY_TOKEN_TIKTOK = APIFY_TOKEN
# Actor task IDs
#POST_TASK_ID_SEARCH = "l5DitJrtfCyOfrjn6" # Facebook Search PPR (rajamohd/facebook-search-ppr-rm-bernama)
#COMMENT_TASK_ID = "qiAp6PQwkyYcLQiyC" # Facebook Comments Scraper (rajamohd/facebook-comments-scraper-task)
TIKTOK_VIDEO_TASK_ID = "rfk0BzRAjuLPbccaZ" # TikTok Data Extractor (devlab/tiktok-data-extractor-bernama2-video)
TIKTOK_COMMENT_TASK_ID = "rgXeWIhnXKRD5bjGp" # TikTok Comments Scraper (devlab/tiktok-comments-scraper-bernama2)
# Data source settings
USE_FACEBOOK = True
USE_TIKTOK = True
USE_SERPAPI = True
USE_SERPER = True
USE_DUCKDUCKGO = True
USE_LOWYAT = True
# Comment settings
USE_COMMENTS = True
# Result limits
FACEBOOK_MAX_RESULTS = 100
TIKTOK_MAX_RESULTS = 50
WEB_SEARCH_MAX_RESULTS = 20
LOWYAT_MAX_THREADS = 20
# Lowyat Forum settings
LOWYAT_SECTIONS = ["Kopitiam", "SeriousKopitiam", "Finance"]
def run(keywords, output_path="output/claim_data.csv", fetch_comments=True, max_videos=30, max_comments=50, max_results=None):
"""Run data collection from multiple sources and combine results
Args:
keywords (list): List of keywords to search for
output_path (str): Path to save combined results
fetch_comments (bool): Whether to fetch comments for TikTok videos
max_videos (int): Maximum number of TikTok videos to fetch per keyword
max_comments (int): Maximum number of comments to fetch per TikTok video
max_results (int): Maximum results per source (overrides config settings)
Returns:
pandas.DataFrame: Combined results from all sources
"""
all_records = []
# Use config settings if max_results not specified
fb_max = max_results or FACEBOOK_MAX_RESULTS
tiktok_max = max_results or TIKTOK_MAX_RESULTS
web_max = max_results or WEB_SEARCH_MAX_RESULTS
# Create output directory if it doesn't exist
os.makedirs(os.path.dirname(output_path), exist_ok=True)
# os.makedirs(output_path, exist_ok=True)
# Create a summary of data sources
sources_enabled = []
if USE_FACEBOOK: sources_enabled.append("Facebook")
if USE_TIKTOK: sources_enabled.append("TikTok")
if USE_SERPAPI: sources_enabled.append("SerpApi")
if USE_SERPER: sources_enabled.append("Serper.dev")
if USE_DUCKDUCKGO: sources_enabled.append("DuckDuckGo")
if USE_LOWYAT: sources_enabled.append("Lowyat Forum")
print(f"[📊] Data collection enabled for: {', '.join(sources_enabled)}")
print(f"[🔍] Original Keywords: {', '.join(keywords)}")
# Optimize keywords for different platforms
try:
from tiktok_keyword_formatter import optimize_keywords_for_platforms
optimized_keywords = optimize_keywords_for_platforms(keywords)
tiktok_keywords = optimized_keywords["tiktok"]
web_keywords = optimized_keywords["web_search"]
print(f"[🔍] TikTok Keywords: {', '.join(tiktok_keywords)}")
print(f"[🔍] Web Search Keywords: {', '.join(web_keywords)}")
except ImportError:
print("[⚠️] Keyword formatter not found. Using original keywords for all platforms.")
tiktok_keywords = keywords
web_keywords = keywords
# Facebook post search
if USE_FACEBOOK:
try:
boolean_query = build_boolean_search(keywords)
print(f"[📘] Facebook: {boolean_query}")
post_input = {"search": boolean_query, "resultsPerPage": min(fb_max, 100)}
post_dataset_id = run_actor_task(POST_TASK_ID_SEARCH, post_input, platform="facebook")
posts = download_dataset(post_dataset_id, platform="facebook")
print(f"[📘] Retrieved {len(posts)} Facebook posts")
fb_records = []
for post in posts:
# Check if this is Malaysian content
username = post.get("username", "")
text = post.get("text", "")
post_url = post.get("url")
if is_malaysian_content(username, text):
# Add the post itself
post_record = {
"platform": "facebook",
"date": post.get("createdAt"),
"username": username,
"post_text": text,
"post_url": post_url,
"likes": post.get("likes", 0),
"shares": post.get("shares", 0),
"comments_count": post.get("commentsCount", 0),
"comment_text": "",
"combined_text": text
}
fb_records.append(post_record)
# If comments are enabled and the post has comments, scrape them
if USE_COMMENTS and post.get("commentsCount", 0) > 0 and post_url:
try:
print(f"[💬] Scraping comments for Facebook post: {post_url}")
comment_input = {"url": post_url, "maxComments": 50}
comment_dataset_id = run_actor_task(COMMENT_TASK_ID, comment_input, platform="facebook")
comments = download_dataset(comment_dataset_id, platform="facebook")
print(f"[💬] Retrieved {len(comments)} comments for post")
for comment in comments:
comment_text = comment.get("text", "")
comment_username = comment.get("name", "")
if is_malaysian_content(comment_username, comment_text):
comment_record = {
"platform": "facebook_comment",
"date": comment.get("date"),
"username": comment_username,
"post_text": "",
"post_url": post_url,
"likes": comment.get("likes", 0),
"shares": 0,
"comments_count": 0,
"comment_text": comment_text,
"combined_text": comment_text
}
fb_records.append(comment_record)
except Exception as e:
print(f"[❌] Error scraping comments for post {post_url}: {str(e)}")
print("[⚠️] Continuing with next post...")
print(f"[📊] Added {len(fb_records)} Facebook records after filtering")
all_records.extend(fb_records)
except Exception as e:
print(f"[❌] Error during Facebook scraping: {str(e)}")
print("[⚠️] Continuing with other data sources...")
# TikTok scraping
if USE_TIKTOK:
try:
print(f"[📽️] TikTok: Searching for {', '.join(tiktok_keywords)}")
tiktok_records = []
# Use only the top 3 most relevant keywords as requested
top_keywords = tiktok_keywords[:min(3, len(tiktok_keywords))]
print(f"[📽️] Using top {len(top_keywords)} TikTok keywords: {', '.join(top_keywords)}")
# Set video limits as requested by user
videos_per_keyword = max_videos # Use the parameter value
# No total video limit - collect exactly max_videos per keyword
total_videos_collected = 0
max_total_videos = max_videos * len(top_keywords) # Allow max_videos per keyword
# for keyword in top_keywords:
try:
# Print detailed debugging information
print(f"[📽️] DEBUG: TikTok API Token: {APIFY_TOKEN_TIKTOK[:5]}...{APIFY_TOKEN_TIKTOK[-5:]}")
print(f"[📽️] DEBUG: TikTok Video Task ID: {TIKTOK_VIDEO_TASK_ID}")
print(f"[📽️] DEBUG: TikTok Comment Task ID: {TIKTOK_COMMENT_TASK_ID}")
keyword = ', '.join(tiktok_keywords)
# Limit videos per keyword to save costs
tiktok_input = { "searchQueries": [keyword], "maxVideos": videos_per_keyword}
# tiktok_input ={"searchQueries": keyword}
print(f"[📽️] Requesting {videos_per_keyword} TikTok videos for: {keyword}")
print(f"[📽️] DEBUG: Full input payload: {tiktok_input}")
try:
tiktok_dataset_id = run_actor_task(TIKTOK_VIDEO_TASK_ID, tiktok_input, platform="tiktok")
print(f"[📽️] DEBUG: Successfully got dataset ID: {tiktok_dataset_id}")
videos = download_dataset(tiktok_dataset_id, platform="tiktok")
print(f"[📽️] Retrieved {len(videos)} TikTok videos for: {keyword}")
except Exception as e:
print(f"[❌] DETAILED ERROR in TikTok video extraction: {str(e)}")
print(f"[❌] Error type: {type(e).__name__}")
import traceback
print(f"[❌] Traceback: {traceback.format_exc()}")
videos = []
for video in videos:
# Check if we've reached the maximum total videos limit
if total_videos_collected >= max_total_videos:
print(f"[⚠️] Reached maximum limit of {max_total_videos} videos. Stopping collection.")
break
username = video.get("authorMeta", {}).get("userName", "") or video.get("authorMeta", {}).get("name", "")
caption = video.get("text", "")
if is_malaysian_content(username, caption):
# Increment the total videos counter
total_videos_collected += 1
video_url = video.get("webVideoUrl") or video.get("videoUrl")
clean_url = video_url.split("?")[0] if video_url and "/video/" in video_url else None
video_record = {
"platform": "tiktok",
"date": video.get("createTimeISO") or video.get("createTime"),
"username": username,
"post_text": caption,
"post_url": clean_url,
"likes": video.get("diggCount", 0),
"shares": video.get("shareCount", 0),
"comments_count": video.get("commentCount", 0),
"comment_text": "",
"combined_text": caption
}
tiktok_records.append(video_record)
# If comments are enabled and the video has comments, scrape them
# Get comments per video as requested by the user
min_comments_threshold = 5 # Lower threshold to ensure we get comments
max_comments_to_scrape = max_comments # Use the parameter value
max_videos_with_comments = 10 # Allow more videos with comments
# Track how many videos we've scraped comments for
if not hasattr(run, 'videos_with_comments_count'):
run.videos_with_comments_count = 0
if (fetch_comments and
run.videos_with_comments_count < max_videos_with_comments and
video.get("commentCount", 0) >= min_comments_threshold and
clean_url and
video.get("diggCount", 0) > 10): # Very low threshold to ensure we get comments for most videos
try:
print(f"[💬] Scraping comments for popular TikTok video ({run.videos_with_comments_count+1}/{max_videos_with_comments}): {clean_url}")
comment_input = {"postURLs": [clean_url], "commentsPerPost": max_comments_to_scrape}
print(f"[💬] DEBUG: Comment input payload: {comment_input}")
try:
comment_dataset_id = run_actor_task(TIKTOK_COMMENT_TASK_ID, comment_input, platform="tiktok")
print(f"[💬] DEBUG: Successfully got comment dataset ID: {comment_dataset_id}")
comments = download_dataset(comment_dataset_id, platform="tiktok")
run.videos_with_comments_count += 1
print(f"[💬] Retrieved {len(comments)} comments for video")
except Exception as e:
print(f"[❌] DETAILED ERROR in TikTok comment extraction: {str(e)}")
print(f"[❌] Error type: {type(e).__name__}")
import traceback
print(f"[❌] Traceback: {traceback.format_exc()}")
comments = []
for comment in comments:
comment_text = comment.get("text", "")
comment_username = comment.get("author", {}).get("uniqueId", "") or comment.get("author", {}).get("nickname", "")
if is_malaysian_content(comment_username, comment_text):
comment_record = {
"platform": "tiktok_comment",
"date": comment.get("createTime"),
"username": comment_username,
"post_text": "",
"post_url": clean_url,
"likes": comment.get("diggCount", 0),
"shares": 0,
"comments_count": 0,
"comment_text": comment_text,
"combined_text": comment_text
}
tiktok_records.append(comment_record)
except Exception as e:
print(f"[❌] Error scraping comments for video {clean_url}: {str(e)}")
print("[⚠️] Continuing with next video...")
# Check if we've reached the maximum total videos limit after processing this keyword
if total_videos_collected >= max_total_videos:
print(f"[⚠️] Reached maximum limit of {max_total_videos} videos. Stopping keyword search.")
break
except Exception as e:
print(f"[❌] Error processing TikTok keyword '{keyword}': {str(e)}")
print("[⚠️] Continuing with next keyword...")
print(f"[📊] Added {len(tiktok_records)} TikTok records after filtering")
all_records.extend(tiktok_records)
except Exception as e:
print(f"[❌] Error during TikTok scraping: {str(e)}")
print("[⚠️] Continuing with other data sources...")
# Web search (SerpApi, Serper.dev, DuckDuckGo)
if USE_SERPAPI or USE_SERPER or USE_DUCKDUCKGO:
try:
print(f"[🌐] Web Search: Searching for {', '.join(web_keywords)}")
web_search_output = f"output/{os.path.basename(output_path).split('.')[0]}_web.csv"
# Try to import the run_web_search function
try:
from run_web_search import run_web_search
# Get the full claim from the environment if available
full_claim = os.environ.get("FULL_CLAIM", None)
if full_claim:
print(f"[🔍] Using full claim for web search: {full_claim}")
# Pass configuration settings to run_web_search
web_results_count = run_web_search(
web_keywords,
web_search_output,
num_results=web_max,
use_serpapi=USE_SERPAPI,
use_serper=USE_SERPER,
use_duckduckgo=USE_DUCKDUCKGO,
full_claim=full_claim
)
print(f"[🌐] Retrieved {web_results_count} web search results")
# If web search was successful, read the results and add to all_records
if web_results_count > 0:
try:
web_df = pd.read_csv(web_search_output)
web_records = web_df.to_dict('records')
all_records.extend(web_records)
print(f"[📊] Added {len(web_records)} web search records")
except Exception as e:
print(f"[❌] Error reading web search results: {str(e)}")
except ImportError:
print("[⚠️] Web search module not found. Skipping web search.")
except Exception as e:
print(f"[❌] Error during web search: {str(e)}")
# Lowyat Forum data collection
if USE_LOWYAT:
try:
print(f"[📚] Collecting data from Lowyat Forum...")
# Import the Lowyat Forum crawler
try:
from lowyat_crawler import run_lowyat_crawler
# Use the same keywords for Lowyat Forum
lowyat_keywords = keywords
# Check for environment variable override for sections
sections_to_use = LOWYAT_SECTIONS
if os.environ.get("LOWYAT_SECTIONS"):
sections_to_use = os.environ.get("LOWYAT_SECTIONS").split(",")
print(f"[📚] Using Lowyat Forum sections from environment: {', '.join(sections_to_use)}")
# Get the full claim from the environment if available
full_claim = os.environ.get("FULL_CLAIM", None)
if full_claim:
print(f"[🔍] Using full claim for Lowyat Forum search: {full_claim}")
# Get Lowyat Forum data
lowyat_output_path = output_path.replace(".csv", "_lowyat.csv")
try:
lowyat_df = run_lowyat_crawler(
lowyat_keywords,
sections=sections_to_use,
max_threads=LOWYAT_MAX_THREADS,
output_path=lowyat_output_path,
full_claim=full_claim
)
# Convert DataFrame to records and add to all_records
if not lowyat_df.empty:
lowyat_records = lowyat_df.to_dict('records')
all_records.extend(lowyat_records)
print(f"[📚] Added {len(lowyat_records)} Lowyat Forum records")
else:
print(f"[⚠️] No Lowyat Forum data found for keywords: {', '.join(lowyat_keywords)}")
# Generate sample data for testing if needed
if os.environ.get("GENERATE_SAMPLE_LOWYAT_DATA", "false").lower() == "true":
print("[📚] Generating sample Lowyat Forum data for testing...")
# Create a sample dataframe with the claim
from datetime import datetime
current_date = datetime.now().strftime('%Y-%m-%d')
# Get the claim text or keywords
claim_text = full_claim if full_claim else ', '.join(lowyat_keywords)
# Create relevant sample data based on claim content
sample_data = []
# Check for different types of claims and create relevant sample data
if any(term in claim_text.lower() for term in ['hon', 'tenonet', 'kenderaan', 'kereta']):
# Horn/vehicle related claim
sample_data.append({
'platform': 'LowyatForum',
'date': current_date,
'username': 'CarEnthusiast',
'post_text': f"Adakah sesiapa tahu tentang undang-undang berkaitan hon tenonet? Saya dengar JPJ sedang menjalankan operasi terhadap kenderaan yang menggunakan hon jenis ini.",
'post_url': 'https://forum.lowyat.net/topic/hon-tenonet',
'likes': 15,
'shares': 3,
'comments_count': 8,
'comment_text': '',
'combined_text': f"Adakah sesiapa tahu tentang undang-undang berkaitan hon tenonet? Saya dengar JPJ sedang menjalankan operasi terhadap kenderaan yang menggunakan hon jenis ini."
})
sample_data.append({
'platform': 'LowyatForum_Comment',
'date': current_date,
'username': 'LegalExpert',
'post_text': '',
'post_url': 'https://forum.lowyat.net/topic/hon-tenonet#comment1',
'likes': 7,
'shares': 0,
'comments_count': 0,
'comment_text': "Ya, penggunaan hon tenonet adalah menyalahi undang-undang kerana boleh mengelirukan pemandu lain dan menyebabkan kemalangan. Denda boleh mencecah RM2,000.",
'combined_text': "Ya, penggunaan hon tenonet adalah menyalahi undang-undang kerana boleh mengelirukan pemandu lain dan menyebabkan kemalangan. Denda boleh mencecah RM2,000."
})
elif any(term in claim_text.lower() for term in ['kelantan', 'rogol', 'sumbang mahram', 'jenayah']):
# Crime in Kelantan related claim
sample_data.append({
'platform': 'LowyatForum',
'date': current_date,
'username': 'SocialObserver',
'post_text': f"Statistik jenayah seksual di Kelantan semakin membimbangkan. Menurut laporan polis, kes rogol dan sumbang mahram meningkat sebanyak 15% tahun ini.",
'post_url': 'https://forum.lowyat.net/topic/crime-statistics',
'likes': 12,
'shares': 5,
'comments_count': 7,
'comment_text': '',
'combined_text': f"Statistik jenayah seksual di Kelantan semakin membimbangkan. Menurut laporan polis, kes rogol dan sumbang mahram meningkat sebanyak 15% tahun ini."
})
sample_data.append({
'platform': 'LowyatForum_Comment',
'date': current_date,
'username': 'CommunityLeader',
'post_text': '',
'post_url': 'https://forum.lowyat.net/topic/crime-statistics#comment1',
'likes': 8,
'shares': 0,
'comments_count': 0,
'comment_text': "Kita perlu lebih banyak program kesedaran dan pendidikan untuk menangani masalah ini. Pihak berkuasa juga perlu mengambil tindakan lebih tegas terhadap pesalah.",
'combined_text': "Kita perlu lebih banyak program kesedaran dan pendidikan untuk menangani masalah ini. Pihak berkuasa juga perlu mengambil tindakan lebih tegas terhadap pesalah."
})
elif any(term in claim_text.lower() for term in ['kelongsong', 'peluru', 'senjata', 'tan']):
# Ammunition related claim
sample_data.append({
'platform': 'LowyatForum',
'date': current_date,
'username': 'SecurityAnalyst',
'post_text': f"Penemuan 50 tan kelongsong dan peluru di kilang haram membimbangkan. Adakah ini menunjukkan ancaman keselamatan yang serius?",
'post_url': 'https://forum.lowyat.net/topic/security-threat',
'likes': 25,
'shares': 10,
'comments_count': 15,
'comment_text': '',
'combined_text': f"Penemuan 50 tan kelongsong dan peluru di kilang haram membimbangkan. Adakah ini menunjukkan ancaman keselamatan yang serius?"
})
sample_data.append({
'platform': 'LowyatForum_Comment',
'date': current_date,
'username': 'DefenseExpert',
'post_text': '',
'post_url': 'https://forum.lowyat.net/topic/security-threat#comment1',
'likes': 18,
'shares': 0,
'comments_count': 0,
'comment_text': "Menurut sumber, kelongsong tersebut adalah untuk dikitar semula dan bukan untuk kegunaan senjata aktif. Namun, ia tetap menyalahi undang-undang kerana tidak mempunyai permit yang sah.",
'combined_text': "Menurut sumber, kelongsong tersebut adalah untuk dikitar semula dan bukan untuk kegunaan senjata aktif. Namun, ia tetap menyalahi undang-undang kerana tidak mempunyai permit yang sah."
})
elif any(term in claim_text.lower() for term in ['minyak sawit', 'cukai', 'ekonomi']):
# Palm oil tax related claim
sample_data.append({
'platform': 'LowyatForum',
'date': current_date,
'username': 'EconomyWatcher',
'post_text': f"Adakah benar kerajaan akan mengenakan cukai khas terhadap minyak sawit mentah? Ini akan memberi kesan besar kepada industri dan ekonomi negara.",
'post_url': 'https://forum.lowyat.net/topic/palm-oil-tax',
'likes': 20,
'shares': 8,
'comments_count': 12,
'comment_text': '',
'combined_text': f"Adakah benar kerajaan akan mengenakan cukai khas terhadap minyak sawit mentah? Ini akan memberi kesan besar kepada industri dan ekonomi negara."
})
sample_data.append({
'platform': 'LowyatForum_Comment',
'date': current_date,
'username': 'IndustryInsider',
'post_text': '',
'post_url': 'https://forum.lowyat.net/topic/palm-oil-tax#comment1',
'likes': 15,
'shares': 0,
'comments_count': 0,
'comment_text': "Menurut sumber dari kementerian, cadangan cukai ini masih dalam peringkat kajian dan belum ada keputusan muktamad. Namun, jika dilaksanakan, ia akan memberi kesan kepada harga minyak masak.",
'combined_text': "Menurut sumber dari kementerian, cadangan cukai ini masih dalam peringkat kajian dan belum ada keputusan muktamad. Namun, jika dilaksanakan, ia akan memberi kesan kepada harga minyak masak."
})
else:
# Default generic sample data if no specific claim type is detected
sample_data.append({
'platform': 'LowyatForum',
'date': current_date,
'username': 'LowyatUser123',
'post_text': f"Discussing: {claim_text}",
'post_url': 'https://forum.lowyat.net/topic/sample',
'likes': 5,
'shares': 0,
'comments_count': 2,
'comment_text': '',
'combined_text': f"Discussing: {claim_text}"
})
sample_data.append({
'platform': 'LowyatForum_Comment',
'date': current_date,
'username': 'LowyatCommenter',
'post_text': '',
'post_url': 'https://forum.lowyat.net/topic/sample#comment1',
'likes': 2,
'shares': 0,
'comments_count': 0,
'comment_text': f"Commenting on: {claim_text}",
'combined_text': f"Commenting on: {claim_text}"
})
# If no sample data was created (unlikely), create a default one
if not sample_data:
sample_data.append({
'platform': 'LowyatForum',
'date': current_date,
'username': 'LowyatUser123',
'post_text': f"Discussing: {claim_text}",
'post_url': 'https://forum.lowyat.net/topic/sample',
'likes': 5,
'shares': 0,
'comments_count': 2,
'comment_text': '',
'combined_text': f"Discussing: {claim_text}"
})
sample_df = pd.DataFrame(sample_data)
if lowyat_output_path:
sample_df.to_csv(lowyat_output_path, index=False)
all_records.extend(sample_data)
print(f"[📚] Added {len(sample_data)} sample Lowyat Forum records")
except Exception as e:
print(f"[⚠️] Error during Lowyat Forum crawling: {str(e)}")
print("[⚠️] Continuing without Lowyat Forum data...")
except ImportError:
print("[❌] Lowyat Forum crawler module not found. Skipping Lowyat Forum data collection.")
except Exception as e:
print(f"[❌] Error during Lowyat Forum data collection: {str(e)}")
print("[⚠️] Continuing with other data sources...")
# Save all records to CSV
if all_records:
df = pd.DataFrame(all_records)
df.to_csv(output_path, index=False)
print(f"[💾] Saved {len(df)} records to {output_path}")
# Print summary of data sources
source_counts = df['platform'].value_counts().to_dict()
print("\n[📊] Data collection summary:")
for source, count in source_counts.items():
# Use shorter display names for Lowyat Forum sources
display_source = source
if source == "LowyatForum":
display_source = "LF"
elif source == "LowyatForum_Comment":
display_source = "LF_Comment"
print(f" - {display_source}: {count} records")
return df
else:
# Create empty DataFrame and save to CSV
empty_df = pd.DataFrame(columns=["platform", "date", "username", "post_text", "post_url", "likes", "shares", "comments_count", "comment_text", "combined_text"])
empty_df.to_csv(output_path, index=False)
print(f"[⚠️] No records found. Saved empty DataFrame to {output_path}")
return empty_df
def run_actor_task(task_id, input_payload, platform="facebook", timeout=30, max_retries=3, use_cache=True, cache_ttl_hours=24):
# Generate a cache key based on task_id and input_payload
cache_key = f"{task_id}_{json.dumps(input_payload, sort_keys=True)}"
cache_hash = hashlib.md5(cache_key.encode()).hexdigest()
cache_file = os.path.join(CACHE_DIR, f"{cache_hash}.json")
# Check if we have a valid cached result
if use_cache and os.path.exists(cache_file):
try:
with open(cache_file, 'r') as f:
cache_data = json.load(f)
# Check if cache is still valid
cache_time = datetime.fromisoformat(cache_data.get('timestamp'))
cache_expiry = cache_time + timedelta(hours=cache_ttl_hours)
if datetime.now() < cache_expiry:
print(f"[💾] Using cached result for task {task_id} (expires {cache_expiry.isoformat()})")
return cache_data.get('dataset_id')
else:
print(f"[⏰] Cache expired for task {task_id}, fetching fresh data")
except Exception as e:
print(f"[⚠️] Error reading cache: {str(e)}")
token = APIFY_TOKEN_FB if platform == "facebook" else APIFY_TOKEN_TIKTOK
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
url = f"https://api.apify.com/v2/actor-tasks/{task_id}/runs"
# Try multiple times in case of network issues
for attempt in range(max_retries):
try:
print(f"[🔄] Attempt {attempt+1}/{max_retries} to run task {task_id}...")
print(input_payload)
# response = requests.post(url, json={"input": input_payload}, headers=headers, timeout=timeout)
response = requests.post(url, json=input_payload, headers=headers, timeout=timeout)
if response.status_code != 201:
print(f"[❌] Failed to run task: {response.text}")
if attempt < max_retries - 1:
print("[⏳] Retrying...")
time.sleep(5) # Wait 5 seconds before retrying
continue
raise Exception(f"Task run failed after {max_retries} attempts.")
run_id = response.json()["data"]["id"]
print(f"[🟢] Task {task_id} started: {run_id}")
status_url = f"https://api.apify.com/v2/actor-runs/{run_id}"
break # Success, exit the retry loop
except requests.exceptions.Timeout:
print(f"[❌] Request timed out after {timeout} seconds")
if attempt < max_retries - 1:
print("[⏳] Retrying...")
time.sleep(5) # Wait 5 seconds before retrying
else:
raise Exception(f"Task run timed out after {max_retries} attempts.")
except requests.exceptions.ConnectionError:
print(f"[❌] Connection error")
if attempt < max_retries - 1:
print("[⏳] Retrying...")
time.sleep(5) # Wait 5 seconds before retrying
else:
raise Exception(f"Connection error after {max_retries} attempts.")
except Exception as e:
print(f"[❌] Unexpected error: {str(e)}")
if attempt < max_retries - 1:
print("[⏳] Retrying...")
time.sleep(5) # Wait 5 seconds before retrying
else:
raise Exception(f"Unexpected error after {max_retries} attempts: {str(e)}")
while True:
status_data = requests.get(status_url, headers=headers).json()
if status_data["data"]["status"] in ["SUCCEEDED", "FAILED"]:
break
print("[⏳] Waiting for task run to complete...")
time.sleep(5)
if status_data["data"]["status"] == "SUCCEEDED":
dataset_id = status_data["data"]["defaultDatasetId"]
# Save result to cache
if use_cache:
try:
cache_data = {
"dataset_id": dataset_id,
"timestamp": datetime.now().isoformat(),
"task_id": task_id,
"platform": platform
}
with open(cache_file, 'w') as f:
json.dump(cache_data, f)
print(f"[💾] Saved result to cache: {cache_file}")
except Exception as e:
print(f"[⚠️] Error saving to cache: {str(e)}")
return dataset_id
else:
raise Exception("Task run failed.")
def is_malaysian_content(username, text):
# Check if content is relevant to the claim
user_lower = (username or "").lower()
text_lower = (text or "").lower()
# Get the full claim from environment if available
full_claim = os.environ.get("FULL_CLAIM", "")
claim_lower = full_claim.lower()
# Check if this is about sexual crimes in Kelantan
kelantan_sexual_crime = "kelantan" in claim_lower and ("rogol" in claim_lower or "sumbang mahram" in claim_lower)
if kelantan_sexual_crime:
# For the specific claim about sexual crimes in Kelantan, use very targeted filtering
kelantan_keywords = ["kelantan", "kelantanese"]
crime_keywords = ["rogol", "sumbang mahram", "jenayah seksual", "kes", "polis", "pdrm"]
# Must have at least one Kelantan reference AND one crime reference to be relevant
has_kelantan_ref = any(k in text_lower for k in kelantan_keywords)
has_crime_ref = any(k in text_lower for k in crime_keywords)
if has_kelantan_ref and has_crime_ref:
return True
# Check if username is from a relevant authority
authority_users = ["polis", "pdrm", "kelantan", "bukit aman", "bernama", "berita"]
if any(k in user_lower for k in authority_users):
return True
# More restrictive for this specific claim - return False if not matching criteria
return False
else:
# General Malaysian content detection for other claims
# Keywords for crime-related content
crime_keywords = [
"polis", "kelantan", "jenayah", "rogol", "sumbang mahram", "inses",
"kes", "statistik", "bimbang", "pdrm", "malaysia", "undang-undang",
"mahkamah", "hukuman", "tangkap", "siasat", "lapor", "mangsa", "suspek",
"tertuduh", "penderaan", "seksual", "cabul", "gangguan"
]
# Check if any crime keywords are in the text
if any(k in text_lower for k in crime_keywords):
return True
# Check if username looks Malaysian
malaysian_user_indicators = [
"my", "ms", "malaysia", "officialmy", "rakyat", "malay",
"dr", "dato", "yb", "ustaz", "cikgu", "polis", "kelantan"
]
if any(k in user_lower for k in malaysian_user_indicators):
return True
# Default to True for now to maximize data collection, but with better filtering
return True
def download_dataset(dataset_id, platform="facebook", timeout=30, max_retries=3, use_cache=True, cache_ttl_hours=24):
# Check if we have a cached dataset
cache_file = os.path.join(CACHE_DIR, f"dataset_{dataset_id}.json")
if use_cache and os.path.exists(cache_file):
try:
with open(cache_file, 'r') as f:
cache_data = json.load(f)
# Check if cache is still valid
cache_time = datetime.fromisoformat(cache_data.get('timestamp'))
cache_expiry = cache_time + timedelta(hours=cache_ttl_hours)
if datetime.now() < cache_expiry:
print(f"[💾] Using cached dataset {dataset_id} (expires {cache_expiry.isoformat()})")
return cache_data.get('data', [])
else:
print(f"[⏰] Cache expired for dataset {dataset_id}, fetching fresh data")
except Exception as e:
print(f"[⚠️] Error reading dataset cache: {str(e)}")
token = APIFY_TOKEN_FB if platform == "facebook" else APIFY_TOKEN_TIKTOK
headers = {
"Authorization": f"Bearer {token}"
}
dataset_url = f"https://api.apify.com/v2/datasets/{dataset_id}/items?clean=true&format=json"
# Try multiple times in case of network issues
for attempt in range(max_retries):
try:
print(f"[🔄] Attempt {attempt+1}/{max_retries} to download dataset {dataset_id}...")
response = requests.get(dataset_url, headers=headers, timeout=timeout)
if response.status_code != 200:
print(f"[❌] Failed to download dataset: {response.text}")
if attempt < max_retries - 1:
print("[⏳] Retrying...")
time.sleep(5) # Wait 5 seconds before retrying
continue
raise Exception(f"Dataset download failed after {max_retries} attempts.")
data = response.json()
print(f"[✓] Downloaded {len(data)} items from dataset {dataset_id}")
# Save dataset to cache
if use_cache:
try:
cache_data = {
"data": data,
"timestamp": datetime.now().isoformat(),
"dataset_id": dataset_id,
"platform": platform
}
with open(cache_file, 'w') as f:
json.dump(cache_data, f)
print(f"[💾] Saved dataset to cache: {cache_file}")
except Exception as e:
print(f"[⚠️] Error saving dataset to cache: {str(e)}")
return data
except requests.exceptions.Timeout:
print(f"[❌] Request timed out after {timeout} seconds")
if attempt < max_retries - 1:
print("[⏳] Retrying...")
time.sleep(5) # Wait 5 seconds before retrying
else:
raise Exception(f"Dataset download timed out after {max_retries} attempts.")
except requests.exceptions.ConnectionError:
print(f"[❌] Connection error")
if attempt < max_retries - 1:
print("[⏳] Retrying...")
time.sleep(5) # Wait 5 seconds before retrying
else:
raise Exception(f"Connection error after {max_retries} attempts.")
except Exception as e:
print(f"[❌] Unexpected error: {str(e)}")
if attempt < max_retries - 1:
print("[⏳] Retrying...")
time.sleep(5) # Wait 5 seconds before retrying
else:
raise Exception(f"Unexpected error after {max_retries} attempts: {str(e)}")
# If we get here, all retries failed
return []
def build_boolean_search(keywords):
"""Build an optimized search query for social media platforms"""
search_terms = []
for kw in keywords:
# If keyword contains spaces (multi-word phrase), wrap in quotes
if " " in kw:
search_terms.append(f'"{kw}"')
else:
# For single words, don't use quotes to get broader results
search_terms.append(kw)
return " OR ".join(search_terms)