Spaces:
Sleeping
Sleeping
| # scripts/filter_data.py | |
| import os | |
| import pandas as pd | |
| from config import settings | |
| from utils.helpers import logger, ensure_folder | |
| from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer | |
| from scripts.db import save_to_mongo | |
| def compute_dynamic_risk(df, keyword_col="matched_keywords", location_col="matched_locations"): | |
| """Compute risk based on keyword & location frequencies dynamically""" | |
| all_keywords = [kw for kws in df[keyword_col] for kw in kws] | |
| all_locations = [loc for locs in df[location_col] for loc in locs] | |
| keyword_freq = {k: v/len(all_keywords) for k, v in pd.Series(all_keywords).value_counts().to_dict().items()} | |
| location_freq = {l: v/len(all_locations) for l, v in pd.Series(all_locations).value_counts().to_dict().items()} | |
| def risk_row(kws, locs): | |
| kw_risk = sum([keyword_freq.get(k,0.01) for k in kws]) | |
| loc_risk = sum([location_freq.get(l,0.01) for l in locs]) | |
| return kw_risk + loc_risk | |
| df["risk_score"] = df.apply(lambda row: risk_row(row[keyword_col], row[location_col]), axis=1) | |
| return df | |
| def filter_data(posts_file, comments_file=None): | |
| ensure_folder(settings.PROCESSED_DATA_PATH) | |
| # -------------------- | |
| # Filter Posts | |
| # -------------------- | |
| df_posts = pd.read_csv(posts_file) | |
| df_posts["text_combined"] = (df_posts["title"].astype(str) + " " + df_posts["text"].astype(str)).str.lower() | |
| # Tag matched keywords and locations | |
| df_posts["matched_keywords"] = df_posts["text_combined"].apply( | |
| lambda x: [kw for kw in settings.DRUG_KEYWORDS if kw.lower() in x] | |
| ) | |
| df_posts["matched_locations"] = df_posts["text_combined"].apply( | |
| lambda x: [loc for loc in settings.LOCATIONS if loc.lower() in x] | |
| ) | |
| # OR filter | |
| drug_mask = df_posts["matched_keywords"].apply(lambda x: len(x) > 0) | |
| location_mask = df_posts["matched_locations"].apply(lambda x: len(x) > 0) | |
| filtered_posts = df_posts[drug_mask | location_mask] | |
| # Dynamic risk | |
| if settings.AUTOMATED_RISK: | |
| filtered_posts = compute_dynamic_risk(filtered_posts) | |
| # Coordinates for matched locations | |
| filtered_posts["coords"] = filtered_posts["matched_locations"].apply( | |
| lambda locs: [settings.LOCATION_COORDS[l] for l in locs if l in settings.LOCATION_COORDS] | |
| ) | |
| # Sentiment Analysis | |
| analyzer = SentimentIntensityAnalyzer() | |
| filtered_posts["sentiment_score"] = filtered_posts["text_combined"].apply( | |
| lambda x: analyzer.polarity_scores(str(x))["compound"] | |
| ) | |
| filtered_posts["sentiment_label"] = filtered_posts["sentiment_score"].apply( | |
| lambda x: "Positive" if x > 0.05 else ("Negative" if x < -0.05 else "Neutral") | |
| ) | |
| # Slang & Hashtags | |
| filtered_posts["slang_mentions"] = filtered_posts["text_combined"].apply( | |
| lambda x: [word for word in settings.SLANG_DICT if word in x] | |
| ) | |
| filtered_posts["hashtags"] = filtered_posts["text_combined"].apply( | |
| lambda x: [part[1:] for part in str(x).split() if part.startswith("#")] | |
| ) | |
| # Save filtered posts | |
| posts_output_file = os.path.join(settings.PROCESSED_DATA_PATH, "reddit_posts_filtered.csv") | |
| filtered_posts.to_csv(posts_output_file, index=False) | |
| logger.info(f"Saved filtered posts to {posts_output_file}") | |
| # -------------------- | |
| # Filter Comments (Optional) | |
| # -------------------- | |
| filtered_comments = None | |
| if comments_file: | |
| df_comments = pd.read_csv(comments_file) | |
| df_comments["body_lower"] = df_comments["body"].astype(str).str.lower() | |
| df_comments["matched_keywords"] = df_comments["body_lower"].apply( | |
| lambda x: [kw for kw in settings.DRUG_KEYWORDS if kw.lower() in x] | |
| ) | |
| df_comments["matched_locations"] = df_comments["body_lower"].apply( | |
| lambda x: [loc for loc in settings.LOCATIONS if loc.lower() in x] | |
| ) | |
| drug_mask_c = df_comments["matched_keywords"].apply(lambda x: len(x) > 0) | |
| location_mask_c = df_comments["matched_locations"].apply(lambda x: len(x) > 0) | |
| filtered_comments = df_comments[drug_mask_c | location_mask_c] | |
| if settings.AUTOMATED_RISK: | |
| filtered_comments = compute_dynamic_risk(filtered_comments, "matched_keywords", "matched_locations") | |
| filtered_comments["coords"] = filtered_comments["matched_locations"].apply( | |
| lambda locs: [settings.LOCATION_COORDS[l] for l in locs if l in settings.LOCATION_COORDS] | |
| ) | |
| filtered_comments["sentiment_score"] = filtered_comments["body_lower"].apply( | |
| lambda x: analyzer.polarity_scores(str(x))["compound"] | |
| ) | |
| filtered_comments["sentiment_label"] = filtered_comments["sentiment_score"].apply( | |
| lambda x: "Positive" if x > 0.05 else ("Negative" if x < -0.05 else "Neutral") | |
| ) | |
| filtered_comments["slang_mentions"] = filtered_comments["body_lower"].apply( | |
| lambda x: [word for word in settings.SLANG_DICT if word in x] | |
| ) | |
| filtered_comments["hashtags"] = filtered_comments["body_lower"].apply( | |
| lambda x: [part[1:] for part in str(x).split() if part.startswith("#")] | |
| ) | |
| comments_output_file = os.path.join(settings.PROCESSED_DATA_PATH, "reddit_comments_filtered.csv") | |
| filtered_comments.to_csv(comments_output_file, index=False) | |
| logger.info(f"Saved filtered comments to {comments_output_file}") | |
| save_to_mongo(filtered_posts, filtered_comments) | |
| return filtered_posts, filtered_comments | |
| if __name__ == "__main__": | |
| filter_data( | |
| f"{settings.RAW_DATA_PATH}reddit_posts.csv", | |
| f"{settings.RAW_DATA_PATH}reddit_comments.csv" | |
| ) | |