# db.py from pymongo import MongoClient, UpdateOne import pandas as pd from dotenv import load_dotenv load_dotenv() import os import json import sys # MongoDB configuration MONGO_URI = os.getenv("MONGO_URI") if not MONGO_URI: print("❌ MONGO_URI not set in environment variables.") sys.exit(1) DB_NAME = "drug_monitoring_twitter" COLLECTION_NAME = "tweets" FOLDER_PATH = "drug_analysis_data_3months" # folder with scraper outputs def get_db_collection(): """Connect to MongoDB and return the collection""" client = MongoClient(MONGO_URI) db = client[DB_NAME] if COLLECTION_NAME not in db.list_collection_names(): db.create_collection(COLLECTION_NAME) print(f"✅ Created collection: {COLLECTION_NAME}") return db[COLLECTION_NAME] def insert_all_from_folder(folder_path=FOLDER_PATH): """Insert all CSV/JSON files from scraper folder into MongoDB""" collection = get_db_collection() if not os.path.exists(folder_path): print(f"❌ Folder path does not exist: {folder_path}") return for file_name in os.listdir(folder_path): file_path = os.path.join(folder_path, file_name) operations = [] try: if file_name.endswith(".csv"): df = pd.read_csv(file_path, encoding="utf-8") for _, row in df.iterrows(): doc = row.to_dict() doc["notified"] = False if "tweet_id" in doc: operations.append( UpdateOne({"tweet_id": doc["tweet_id"]}, {"$set": doc}, upsert=True) ) elif file_name.endswith(".json"): with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) if isinstance(data, list): for tweet in data: tweet["notified"] = False if "tweet_id" in tweet: operations.append( UpdateOne({"tweet_id": tweet["tweet_id"]}, {"$set": tweet}, upsert=True) ) else: # single JSON report operations.append( UpdateOne({"report_name": file_name}, {"$set": data}, upsert=True) ) if operations: result = collection.bulk_write(operations) print(f"✅ {file_name} -> inserted/updated {result.upserted_count + result.modified_count} documents.") except Exception as e: print(f"❌ Failed to process {file_name}: {e}") def fetch_high_risk_unnotified(): """Get all HIGH or CRITICAL risk tweets that are not notified yet""" collection = get_db_collection() return list(collection.find({"risk_level": {"$in": ["HIGH", "CRITICAL"]}, "notified": False})) def mark_as_notified(tweet_id): """Mark a tweet as notified after sending alert""" collection = get_db_collection() collection.update_one({"tweet_id": tweet_id}, {"$set": {"notified": True}}) if __name__ == "__main__": insert_all_from_folder() print("✅ All scraper folder contents inserted/updated successfully.")