File size: 3,347 Bytes
e0324fc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# db.py
from pymongo import MongoClient, UpdateOne
import pandas as pd
from dotenv import load_dotenv
load_dotenv()
import os
import json
import sys

# MongoDB configuration
MONGO_URI = os.getenv("MONGO_URI")
if not MONGO_URI:
    print("❌ MONGO_URI not set in environment variables.")
    sys.exit(1)

DB_NAME = "drug_monitoring_twitter"
COLLECTION_NAME = "tweets"
FOLDER_PATH = "drug_analysis_data_3months"  # folder with scraper outputs

def get_db_collection():
    """Connect to MongoDB and return the collection"""
    client = MongoClient(MONGO_URI)
    db = client[DB_NAME]

    if COLLECTION_NAME not in db.list_collection_names():
        db.create_collection(COLLECTION_NAME)
        print(f"βœ… Created collection: {COLLECTION_NAME}")

    return db[COLLECTION_NAME]

def insert_all_from_folder(folder_path=FOLDER_PATH):
    """Insert all CSV/JSON files from scraper folder into MongoDB"""
    collection = get_db_collection()

    if not os.path.exists(folder_path):
        print(f"❌ Folder path does not exist: {folder_path}")
        return

    for file_name in os.listdir(folder_path):
        file_path = os.path.join(folder_path, file_name)
        operations = []

        try:
            if file_name.endswith(".csv"):
                df = pd.read_csv(file_path, encoding="utf-8")
                for _, row in df.iterrows():
                    doc = row.to_dict()
                    doc["notified"] = False
                    if "tweet_id" in doc:
                        operations.append(
                            UpdateOne({"tweet_id": doc["tweet_id"]}, {"$set": doc}, upsert=True)
                        )

            elif file_name.endswith(".json"):
                with open(file_path, "r", encoding="utf-8") as f:
                    data = json.load(f)
                if isinstance(data, list):
                    for tweet in data:
                        tweet["notified"] = False
                        if "tweet_id" in tweet:
                            operations.append(
                                UpdateOne({"tweet_id": tweet["tweet_id"]}, {"$set": tweet}, upsert=True)
                            )
                else:
                    # single JSON report
                    operations.append(
                        UpdateOne({"report_name": file_name}, {"$set": data}, upsert=True)
                    )

            if operations:
                result = collection.bulk_write(operations)
                print(f"βœ… {file_name} -> inserted/updated {result.upserted_count + result.modified_count} documents.")

        except Exception as e:
            print(f"❌ Failed to process {file_name}: {e}")

def fetch_high_risk_unnotified():
    """Get all HIGH or CRITICAL risk tweets that are not notified yet"""
    collection = get_db_collection()
    return list(collection.find({"risk_level": {"$in": ["HIGH", "CRITICAL"]}, "notified": False}))

def mark_as_notified(tweet_id):
    """Mark a tweet as notified after sending alert"""
    collection = get_db_collection()
    collection.update_one({"tweet_id": tweet_id}, {"$set": {"notified": True}})

if __name__ == "__main__":
    insert_all_from_folder()
    print("βœ… All scraper folder contents inserted/updated successfully.")