Spaces:
Sleeping
Sleeping
| # db.py | |
| from pymongo import MongoClient, UpdateOne | |
| import pandas as pd | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| import os | |
| import json | |
| import sys | |
| # MongoDB configuration | |
| MONGO_URI = os.getenv("MONGO_URI") | |
| if not MONGO_URI: | |
| print("β MONGO_URI not set in environment variables.") | |
| sys.exit(1) | |
| DB_NAME = "drug_monitoring_twitter" | |
| COLLECTION_NAME = "tweets" | |
| FOLDER_PATH = "drug_analysis_data_3months" # folder with scraper outputs | |
| def get_db_collection(): | |
| """Connect to MongoDB and return the collection""" | |
| client = MongoClient(MONGO_URI) | |
| db = client[DB_NAME] | |
| if COLLECTION_NAME not in db.list_collection_names(): | |
| db.create_collection(COLLECTION_NAME) | |
| print(f"β Created collection: {COLLECTION_NAME}") | |
| return db[COLLECTION_NAME] | |
| def insert_all_from_folder(folder_path=FOLDER_PATH): | |
| """Insert all CSV/JSON files from scraper folder into MongoDB""" | |
| collection = get_db_collection() | |
| if not os.path.exists(folder_path): | |
| print(f"β Folder path does not exist: {folder_path}") | |
| return | |
| for file_name in os.listdir(folder_path): | |
| file_path = os.path.join(folder_path, file_name) | |
| operations = [] | |
| try: | |
| if file_name.endswith(".csv"): | |
| df = pd.read_csv(file_path, encoding="utf-8") | |
| for _, row in df.iterrows(): | |
| doc = row.to_dict() | |
| doc["notified"] = False | |
| if "tweet_id" in doc: | |
| operations.append( | |
| UpdateOne({"tweet_id": doc["tweet_id"]}, {"$set": doc}, upsert=True) | |
| ) | |
| elif file_name.endswith(".json"): | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| if isinstance(data, list): | |
| for tweet in data: | |
| tweet["notified"] = False | |
| if "tweet_id" in tweet: | |
| operations.append( | |
| UpdateOne({"tweet_id": tweet["tweet_id"]}, {"$set": tweet}, upsert=True) | |
| ) | |
| else: | |
| # single JSON report | |
| operations.append( | |
| UpdateOne({"report_name": file_name}, {"$set": data}, upsert=True) | |
| ) | |
| if operations: | |
| result = collection.bulk_write(operations) | |
| print(f"β {file_name} -> inserted/updated {result.upserted_count + result.modified_count} documents.") | |
| except Exception as e: | |
| print(f"β Failed to process {file_name}: {e}") | |
| def fetch_high_risk_unnotified(): | |
| """Get all HIGH or CRITICAL risk tweets that are not notified yet""" | |
| collection = get_db_collection() | |
| return list(collection.find({"risk_level": {"$in": ["HIGH", "CRITICAL"]}, "notified": False})) | |
| def mark_as_notified(tweet_id): | |
| """Mark a tweet as notified after sending alert""" | |
| collection = get_db_collection() | |
| collection.update_one({"tweet_id": tweet_id}, {"$set": {"notified": True}}) | |
| if __name__ == "__main__": | |
| insert_all_from_folder() | |
| print("β All scraper folder contents inserted/updated successfully.") | |