# scripts/filter_data.py import os import pandas as pd from config import settings from utils.helpers import logger, ensure_folder from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer from scripts.db import save_to_mongo def compute_dynamic_risk(df, keyword_col="matched_keywords", location_col="matched_locations"): """Compute risk based on keyword & location frequencies dynamically""" all_keywords = [kw for kws in df[keyword_col] for kw in kws] all_locations = [loc for locs in df[location_col] for loc in locs] keyword_freq = {k: v/len(all_keywords) for k, v in pd.Series(all_keywords).value_counts().to_dict().items()} location_freq = {l: v/len(all_locations) for l, v in pd.Series(all_locations).value_counts().to_dict().items()} def risk_row(kws, locs): kw_risk = sum([keyword_freq.get(k,0.01) for k in kws]) loc_risk = sum([location_freq.get(l,0.01) for l in locs]) return kw_risk + loc_risk df["risk_score"] = df.apply(lambda row: risk_row(row[keyword_col], row[location_col]), axis=1) return df def filter_data(posts_file, comments_file=None): ensure_folder(settings.PROCESSED_DATA_PATH) # -------------------- # Filter Posts # -------------------- df_posts = pd.read_csv(posts_file) df_posts["text_combined"] = (df_posts["title"].astype(str) + " " + df_posts["text"].astype(str)).str.lower() # Tag matched keywords and locations df_posts["matched_keywords"] = df_posts["text_combined"].apply( lambda x: [kw for kw in settings.DRUG_KEYWORDS if kw.lower() in x] ) df_posts["matched_locations"] = df_posts["text_combined"].apply( lambda x: [loc for loc in settings.LOCATIONS if loc.lower() in x] ) # OR filter drug_mask = df_posts["matched_keywords"].apply(lambda x: len(x) > 0) location_mask = df_posts["matched_locations"].apply(lambda x: len(x) > 0) filtered_posts = df_posts[drug_mask | location_mask] # Dynamic risk if settings.AUTOMATED_RISK: filtered_posts = compute_dynamic_risk(filtered_posts) # Coordinates for matched locations filtered_posts["coords"] = filtered_posts["matched_locations"].apply( lambda locs: [settings.LOCATION_COORDS[l] for l in locs if l in settings.LOCATION_COORDS] ) # Sentiment Analysis analyzer = SentimentIntensityAnalyzer() filtered_posts["sentiment_score"] = filtered_posts["text_combined"].apply( lambda x: analyzer.polarity_scores(str(x))["compound"] ) filtered_posts["sentiment_label"] = filtered_posts["sentiment_score"].apply( lambda x: "Positive" if x > 0.05 else ("Negative" if x < -0.05 else "Neutral") ) # Slang & Hashtags filtered_posts["slang_mentions"] = filtered_posts["text_combined"].apply( lambda x: [word for word in settings.SLANG_DICT if word in x] ) filtered_posts["hashtags"] = filtered_posts["text_combined"].apply( lambda x: [part[1:] for part in str(x).split() if part.startswith("#")] ) # Save filtered posts posts_output_file = os.path.join(settings.PROCESSED_DATA_PATH, "reddit_posts_filtered.csv") filtered_posts.to_csv(posts_output_file, index=False) logger.info(f"Saved filtered posts to {posts_output_file}") # -------------------- # Filter Comments (Optional) # -------------------- filtered_comments = None if comments_file: df_comments = pd.read_csv(comments_file) df_comments["body_lower"] = df_comments["body"].astype(str).str.lower() df_comments["matched_keywords"] = df_comments["body_lower"].apply( lambda x: [kw for kw in settings.DRUG_KEYWORDS if kw.lower() in x] ) df_comments["matched_locations"] = df_comments["body_lower"].apply( lambda x: [loc for loc in settings.LOCATIONS if loc.lower() in x] ) drug_mask_c = df_comments["matched_keywords"].apply(lambda x: len(x) > 0) location_mask_c = df_comments["matched_locations"].apply(lambda x: len(x) > 0) filtered_comments = df_comments[drug_mask_c | location_mask_c] if settings.AUTOMATED_RISK: filtered_comments = compute_dynamic_risk(filtered_comments, "matched_keywords", "matched_locations") filtered_comments["coords"] = filtered_comments["matched_locations"].apply( lambda locs: [settings.LOCATION_COORDS[l] for l in locs if l in settings.LOCATION_COORDS] ) filtered_comments["sentiment_score"] = filtered_comments["body_lower"].apply( lambda x: analyzer.polarity_scores(str(x))["compound"] ) filtered_comments["sentiment_label"] = filtered_comments["sentiment_score"].apply( lambda x: "Positive" if x > 0.05 else ("Negative" if x < -0.05 else "Neutral") ) filtered_comments["slang_mentions"] = filtered_comments["body_lower"].apply( lambda x: [word for word in settings.SLANG_DICT if word in x] ) filtered_comments["hashtags"] = filtered_comments["body_lower"].apply( lambda x: [part[1:] for part in str(x).split() if part.startswith("#")] ) comments_output_file = os.path.join(settings.PROCESSED_DATA_PATH, "reddit_comments_filtered.csv") filtered_comments.to_csv(comments_output_file, index=False) logger.info(f"Saved filtered comments to {comments_output_file}") save_to_mongo(filtered_posts, filtered_comments) return filtered_posts, filtered_comments if __name__ == "__main__": filter_data( f"{settings.RAW_DATA_PATH}reddit_posts.csv", f"{settings.RAW_DATA_PATH}reddit_comments.csv" )