File size: 5,801 Bytes
f6c54d5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# scripts/filter_data.py
import os
import pandas as pd
from config import settings
from utils.helpers import logger, ensure_folder
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from scripts.db import save_to_mongo

def compute_dynamic_risk(df, keyword_col="matched_keywords", location_col="matched_locations"):
    """Compute risk based on keyword & location frequencies dynamically"""
    all_keywords = [kw for kws in df[keyword_col] for kw in kws]
    all_locations = [loc for locs in df[location_col] for loc in locs]

    keyword_freq = {k: v/len(all_keywords) for k, v in pd.Series(all_keywords).value_counts().to_dict().items()}
    location_freq = {l: v/len(all_locations) for l, v in pd.Series(all_locations).value_counts().to_dict().items()}

    def risk_row(kws, locs):
        kw_risk = sum([keyword_freq.get(k,0.01) for k in kws])
        loc_risk = sum([location_freq.get(l,0.01) for l in locs])
        return kw_risk + loc_risk

    df["risk_score"] = df.apply(lambda row: risk_row(row[keyword_col], row[location_col]), axis=1)
    return df

def filter_data(posts_file, comments_file=None):
    ensure_folder(settings.PROCESSED_DATA_PATH)

    # --------------------
    # Filter Posts
    # --------------------
    df_posts = pd.read_csv(posts_file)
    df_posts["text_combined"] = (df_posts["title"].astype(str) + " " + df_posts["text"].astype(str)).str.lower()

    # Tag matched keywords and locations
    df_posts["matched_keywords"] = df_posts["text_combined"].apply(
        lambda x: [kw for kw in settings.DRUG_KEYWORDS if kw.lower() in x]
    )
    df_posts["matched_locations"] = df_posts["text_combined"].apply(
        lambda x: [loc for loc in settings.LOCATIONS if loc.lower() in x]
    )

    # OR filter
    drug_mask = df_posts["matched_keywords"].apply(lambda x: len(x) > 0)
    location_mask = df_posts["matched_locations"].apply(lambda x: len(x) > 0)
    filtered_posts = df_posts[drug_mask | location_mask]

    # Dynamic risk
    if settings.AUTOMATED_RISK:
        filtered_posts = compute_dynamic_risk(filtered_posts)

    # Coordinates for matched locations
    filtered_posts["coords"] = filtered_posts["matched_locations"].apply(
        lambda locs: [settings.LOCATION_COORDS[l] for l in locs if l in settings.LOCATION_COORDS]
    )

    # Sentiment Analysis
    analyzer = SentimentIntensityAnalyzer()
    filtered_posts["sentiment_score"] = filtered_posts["text_combined"].apply(
        lambda x: analyzer.polarity_scores(str(x))["compound"]
    )
    filtered_posts["sentiment_label"] = filtered_posts["sentiment_score"].apply(
        lambda x: "Positive" if x > 0.05 else ("Negative" if x < -0.05 else "Neutral")
    )

    # Slang & Hashtags
    filtered_posts["slang_mentions"] = filtered_posts["text_combined"].apply(
        lambda x: [word for word in settings.SLANG_DICT if word in x]
    )
    filtered_posts["hashtags"] = filtered_posts["text_combined"].apply(
        lambda x: [part[1:] for part in str(x).split() if part.startswith("#")]
    )

    # Save filtered posts
    posts_output_file = os.path.join(settings.PROCESSED_DATA_PATH, "reddit_posts_filtered.csv")
    filtered_posts.to_csv(posts_output_file, index=False)
    logger.info(f"Saved filtered posts to {posts_output_file}")

    # --------------------
    # Filter Comments (Optional)
    # --------------------
    filtered_comments = None
    if comments_file:
        df_comments = pd.read_csv(comments_file)
        df_comments["body_lower"] = df_comments["body"].astype(str).str.lower()

        df_comments["matched_keywords"] = df_comments["body_lower"].apply(
            lambda x: [kw for kw in settings.DRUG_KEYWORDS if kw.lower() in x]
        )
        df_comments["matched_locations"] = df_comments["body_lower"].apply(
            lambda x: [loc for loc in settings.LOCATIONS if loc.lower() in x]
        )

        drug_mask_c = df_comments["matched_keywords"].apply(lambda x: len(x) > 0)
        location_mask_c = df_comments["matched_locations"].apply(lambda x: len(x) > 0)
        filtered_comments = df_comments[drug_mask_c | location_mask_c]

        if settings.AUTOMATED_RISK:
            filtered_comments = compute_dynamic_risk(filtered_comments, "matched_keywords", "matched_locations")

        filtered_comments["coords"] = filtered_comments["matched_locations"].apply(
            lambda locs: [settings.LOCATION_COORDS[l] for l in locs if l in settings.LOCATION_COORDS]
        )
        filtered_comments["sentiment_score"] = filtered_comments["body_lower"].apply(
            lambda x: analyzer.polarity_scores(str(x))["compound"]
        )
        filtered_comments["sentiment_label"] = filtered_comments["sentiment_score"].apply(
            lambda x: "Positive" if x > 0.05 else ("Negative" if x < -0.05 else "Neutral")
        )
        filtered_comments["slang_mentions"] = filtered_comments["body_lower"].apply(
            lambda x: [word for word in settings.SLANG_DICT if word in x]
        )
        filtered_comments["hashtags"] = filtered_comments["body_lower"].apply(
            lambda x: [part[1:] for part in str(x).split() if part.startswith("#")]
        )

        comments_output_file = os.path.join(settings.PROCESSED_DATA_PATH, "reddit_comments_filtered.csv")
        filtered_comments.to_csv(comments_output_file, index=False)
        logger.info(f"Saved filtered comments to {comments_output_file}")

        save_to_mongo(filtered_posts, filtered_comments)

    return filtered_posts, filtered_comments

if __name__ == "__main__":
    filter_data(
        f"{settings.RAW_DATA_PATH}reddit_posts.csv",
        f"{settings.RAW_DATA_PATH}reddit_comments.csv"
    )