Spaces:
Sleeping
Sleeping
File size: 5,801 Bytes
f6c54d5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 | # scripts/filter_data.py
import os
import pandas as pd
from config import settings
from utils.helpers import logger, ensure_folder
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from scripts.db import save_to_mongo
def compute_dynamic_risk(df, keyword_col="matched_keywords", location_col="matched_locations"):
"""Compute risk based on keyword & location frequencies dynamically"""
all_keywords = [kw for kws in df[keyword_col] for kw in kws]
all_locations = [loc for locs in df[location_col] for loc in locs]
keyword_freq = {k: v/len(all_keywords) for k, v in pd.Series(all_keywords).value_counts().to_dict().items()}
location_freq = {l: v/len(all_locations) for l, v in pd.Series(all_locations).value_counts().to_dict().items()}
def risk_row(kws, locs):
kw_risk = sum([keyword_freq.get(k,0.01) for k in kws])
loc_risk = sum([location_freq.get(l,0.01) for l in locs])
return kw_risk + loc_risk
df["risk_score"] = df.apply(lambda row: risk_row(row[keyword_col], row[location_col]), axis=1)
return df
def filter_data(posts_file, comments_file=None):
ensure_folder(settings.PROCESSED_DATA_PATH)
# --------------------
# Filter Posts
# --------------------
df_posts = pd.read_csv(posts_file)
df_posts["text_combined"] = (df_posts["title"].astype(str) + " " + df_posts["text"].astype(str)).str.lower()
# Tag matched keywords and locations
df_posts["matched_keywords"] = df_posts["text_combined"].apply(
lambda x: [kw for kw in settings.DRUG_KEYWORDS if kw.lower() in x]
)
df_posts["matched_locations"] = df_posts["text_combined"].apply(
lambda x: [loc for loc in settings.LOCATIONS if loc.lower() in x]
)
# OR filter
drug_mask = df_posts["matched_keywords"].apply(lambda x: len(x) > 0)
location_mask = df_posts["matched_locations"].apply(lambda x: len(x) > 0)
filtered_posts = df_posts[drug_mask | location_mask]
# Dynamic risk
if settings.AUTOMATED_RISK:
filtered_posts = compute_dynamic_risk(filtered_posts)
# Coordinates for matched locations
filtered_posts["coords"] = filtered_posts["matched_locations"].apply(
lambda locs: [settings.LOCATION_COORDS[l] for l in locs if l in settings.LOCATION_COORDS]
)
# Sentiment Analysis
analyzer = SentimentIntensityAnalyzer()
filtered_posts["sentiment_score"] = filtered_posts["text_combined"].apply(
lambda x: analyzer.polarity_scores(str(x))["compound"]
)
filtered_posts["sentiment_label"] = filtered_posts["sentiment_score"].apply(
lambda x: "Positive" if x > 0.05 else ("Negative" if x < -0.05 else "Neutral")
)
# Slang & Hashtags
filtered_posts["slang_mentions"] = filtered_posts["text_combined"].apply(
lambda x: [word for word in settings.SLANG_DICT if word in x]
)
filtered_posts["hashtags"] = filtered_posts["text_combined"].apply(
lambda x: [part[1:] for part in str(x).split() if part.startswith("#")]
)
# Save filtered posts
posts_output_file = os.path.join(settings.PROCESSED_DATA_PATH, "reddit_posts_filtered.csv")
filtered_posts.to_csv(posts_output_file, index=False)
logger.info(f"Saved filtered posts to {posts_output_file}")
# --------------------
# Filter Comments (Optional)
# --------------------
filtered_comments = None
if comments_file:
df_comments = pd.read_csv(comments_file)
df_comments["body_lower"] = df_comments["body"].astype(str).str.lower()
df_comments["matched_keywords"] = df_comments["body_lower"].apply(
lambda x: [kw for kw in settings.DRUG_KEYWORDS if kw.lower() in x]
)
df_comments["matched_locations"] = df_comments["body_lower"].apply(
lambda x: [loc for loc in settings.LOCATIONS if loc.lower() in x]
)
drug_mask_c = df_comments["matched_keywords"].apply(lambda x: len(x) > 0)
location_mask_c = df_comments["matched_locations"].apply(lambda x: len(x) > 0)
filtered_comments = df_comments[drug_mask_c | location_mask_c]
if settings.AUTOMATED_RISK:
filtered_comments = compute_dynamic_risk(filtered_comments, "matched_keywords", "matched_locations")
filtered_comments["coords"] = filtered_comments["matched_locations"].apply(
lambda locs: [settings.LOCATION_COORDS[l] for l in locs if l in settings.LOCATION_COORDS]
)
filtered_comments["sentiment_score"] = filtered_comments["body_lower"].apply(
lambda x: analyzer.polarity_scores(str(x))["compound"]
)
filtered_comments["sentiment_label"] = filtered_comments["sentiment_score"].apply(
lambda x: "Positive" if x > 0.05 else ("Negative" if x < -0.05 else "Neutral")
)
filtered_comments["slang_mentions"] = filtered_comments["body_lower"].apply(
lambda x: [word for word in settings.SLANG_DICT if word in x]
)
filtered_comments["hashtags"] = filtered_comments["body_lower"].apply(
lambda x: [part[1:] for part in str(x).split() if part.startswith("#")]
)
comments_output_file = os.path.join(settings.PROCESSED_DATA_PATH, "reddit_comments_filtered.csv")
filtered_comments.to_csv(comments_output_file, index=False)
logger.info(f"Saved filtered comments to {comments_output_file}")
save_to_mongo(filtered_posts, filtered_comments)
return filtered_posts, filtered_comments
if __name__ == "__main__":
filter_data(
f"{settings.RAW_DATA_PATH}reddit_posts.csv",
f"{settings.RAW_DATA_PATH}reddit_comments.csv"
)
|