lawlevisan's picture
Upload 5 files
e0324fc verified
# db.py
from pymongo import MongoClient, UpdateOne
import pandas as pd
from dotenv import load_dotenv
load_dotenv()
import os
import json
import sys
# MongoDB configuration
MONGO_URI = os.getenv("MONGO_URI")
if not MONGO_URI:
print("❌ MONGO_URI not set in environment variables.")
sys.exit(1)
DB_NAME = "drug_monitoring_twitter"
COLLECTION_NAME = "tweets"
FOLDER_PATH = "drug_analysis_data_3months" # folder with scraper outputs
def get_db_collection():
"""Connect to MongoDB and return the collection"""
client = MongoClient(MONGO_URI)
db = client[DB_NAME]
if COLLECTION_NAME not in db.list_collection_names():
db.create_collection(COLLECTION_NAME)
print(f"βœ… Created collection: {COLLECTION_NAME}")
return db[COLLECTION_NAME]
def insert_all_from_folder(folder_path=FOLDER_PATH):
"""Insert all CSV/JSON files from scraper folder into MongoDB"""
collection = get_db_collection()
if not os.path.exists(folder_path):
print(f"❌ Folder path does not exist: {folder_path}")
return
for file_name in os.listdir(folder_path):
file_path = os.path.join(folder_path, file_name)
operations = []
try:
if file_name.endswith(".csv"):
df = pd.read_csv(file_path, encoding="utf-8")
for _, row in df.iterrows():
doc = row.to_dict()
doc["notified"] = False
if "tweet_id" in doc:
operations.append(
UpdateOne({"tweet_id": doc["tweet_id"]}, {"$set": doc}, upsert=True)
)
elif file_name.endswith(".json"):
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, list):
for tweet in data:
tweet["notified"] = False
if "tweet_id" in tweet:
operations.append(
UpdateOne({"tweet_id": tweet["tweet_id"]}, {"$set": tweet}, upsert=True)
)
else:
# single JSON report
operations.append(
UpdateOne({"report_name": file_name}, {"$set": data}, upsert=True)
)
if operations:
result = collection.bulk_write(operations)
print(f"βœ… {file_name} -> inserted/updated {result.upserted_count + result.modified_count} documents.")
except Exception as e:
print(f"❌ Failed to process {file_name}: {e}")
def fetch_high_risk_unnotified():
"""Get all HIGH or CRITICAL risk tweets that are not notified yet"""
collection = get_db_collection()
return list(collection.find({"risk_level": {"$in": ["HIGH", "CRITICAL"]}, "notified": False}))
def mark_as_notified(tweet_id):
"""Mark a tweet as notified after sending alert"""
collection = get_db_collection()
collection.update_one({"tweet_id": tweet_id}, {"$set": {"notified": True}})
if __name__ == "__main__":
insert_all_from_folder()
print("βœ… All scraper folder contents inserted/updated successfully.")