Reddit-Analysis / src /filter_data.py
lawlevisan's picture
Upload 4 files
f6c54d5 verified
# scripts/filter_data.py
import os
import pandas as pd
from config import settings
from utils.helpers import logger, ensure_folder
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from scripts.db import save_to_mongo
def compute_dynamic_risk(df, keyword_col="matched_keywords", location_col="matched_locations"):
"""Compute risk based on keyword & location frequencies dynamically"""
all_keywords = [kw for kws in df[keyword_col] for kw in kws]
all_locations = [loc for locs in df[location_col] for loc in locs]
keyword_freq = {k: v/len(all_keywords) for k, v in pd.Series(all_keywords).value_counts().to_dict().items()}
location_freq = {l: v/len(all_locations) for l, v in pd.Series(all_locations).value_counts().to_dict().items()}
def risk_row(kws, locs):
kw_risk = sum([keyword_freq.get(k,0.01) for k in kws])
loc_risk = sum([location_freq.get(l,0.01) for l in locs])
return kw_risk + loc_risk
df["risk_score"] = df.apply(lambda row: risk_row(row[keyword_col], row[location_col]), axis=1)
return df
def filter_data(posts_file, comments_file=None):
ensure_folder(settings.PROCESSED_DATA_PATH)
# --------------------
# Filter Posts
# --------------------
df_posts = pd.read_csv(posts_file)
df_posts["text_combined"] = (df_posts["title"].astype(str) + " " + df_posts["text"].astype(str)).str.lower()
# Tag matched keywords and locations
df_posts["matched_keywords"] = df_posts["text_combined"].apply(
lambda x: [kw for kw in settings.DRUG_KEYWORDS if kw.lower() in x]
)
df_posts["matched_locations"] = df_posts["text_combined"].apply(
lambda x: [loc for loc in settings.LOCATIONS if loc.lower() in x]
)
# OR filter
drug_mask = df_posts["matched_keywords"].apply(lambda x: len(x) > 0)
location_mask = df_posts["matched_locations"].apply(lambda x: len(x) > 0)
filtered_posts = df_posts[drug_mask | location_mask]
# Dynamic risk
if settings.AUTOMATED_RISK:
filtered_posts = compute_dynamic_risk(filtered_posts)
# Coordinates for matched locations
filtered_posts["coords"] = filtered_posts["matched_locations"].apply(
lambda locs: [settings.LOCATION_COORDS[l] for l in locs if l in settings.LOCATION_COORDS]
)
# Sentiment Analysis
analyzer = SentimentIntensityAnalyzer()
filtered_posts["sentiment_score"] = filtered_posts["text_combined"].apply(
lambda x: analyzer.polarity_scores(str(x))["compound"]
)
filtered_posts["sentiment_label"] = filtered_posts["sentiment_score"].apply(
lambda x: "Positive" if x > 0.05 else ("Negative" if x < -0.05 else "Neutral")
)
# Slang & Hashtags
filtered_posts["slang_mentions"] = filtered_posts["text_combined"].apply(
lambda x: [word for word in settings.SLANG_DICT if word in x]
)
filtered_posts["hashtags"] = filtered_posts["text_combined"].apply(
lambda x: [part[1:] for part in str(x).split() if part.startswith("#")]
)
# Save filtered posts
posts_output_file = os.path.join(settings.PROCESSED_DATA_PATH, "reddit_posts_filtered.csv")
filtered_posts.to_csv(posts_output_file, index=False)
logger.info(f"Saved filtered posts to {posts_output_file}")
# --------------------
# Filter Comments (Optional)
# --------------------
filtered_comments = None
if comments_file:
df_comments = pd.read_csv(comments_file)
df_comments["body_lower"] = df_comments["body"].astype(str).str.lower()
df_comments["matched_keywords"] = df_comments["body_lower"].apply(
lambda x: [kw for kw in settings.DRUG_KEYWORDS if kw.lower() in x]
)
df_comments["matched_locations"] = df_comments["body_lower"].apply(
lambda x: [loc for loc in settings.LOCATIONS if loc.lower() in x]
)
drug_mask_c = df_comments["matched_keywords"].apply(lambda x: len(x) > 0)
location_mask_c = df_comments["matched_locations"].apply(lambda x: len(x) > 0)
filtered_comments = df_comments[drug_mask_c | location_mask_c]
if settings.AUTOMATED_RISK:
filtered_comments = compute_dynamic_risk(filtered_comments, "matched_keywords", "matched_locations")
filtered_comments["coords"] = filtered_comments["matched_locations"].apply(
lambda locs: [settings.LOCATION_COORDS[l] for l in locs if l in settings.LOCATION_COORDS]
)
filtered_comments["sentiment_score"] = filtered_comments["body_lower"].apply(
lambda x: analyzer.polarity_scores(str(x))["compound"]
)
filtered_comments["sentiment_label"] = filtered_comments["sentiment_score"].apply(
lambda x: "Positive" if x > 0.05 else ("Negative" if x < -0.05 else "Neutral")
)
filtered_comments["slang_mentions"] = filtered_comments["body_lower"].apply(
lambda x: [word for word in settings.SLANG_DICT if word in x]
)
filtered_comments["hashtags"] = filtered_comments["body_lower"].apply(
lambda x: [part[1:] for part in str(x).split() if part.startswith("#")]
)
comments_output_file = os.path.join(settings.PROCESSED_DATA_PATH, "reddit_comments_filtered.csv")
filtered_comments.to_csv(comments_output_file, index=False)
logger.info(f"Saved filtered comments to {comments_output_file}")
save_to_mongo(filtered_posts, filtered_comments)
return filtered_posts, filtered_comments
if __name__ == "__main__":
filter_data(
f"{settings.RAW_DATA_PATH}reddit_posts.csv",
f"{settings.RAW_DATA_PATH}reddit_comments.csv"
)