import asyncpraw import asyncio import os import torch import pickle import numpy as np import torch.nn.functional as F from datetime import datetime from dotenv import load_dotenv from flask import Flask from models import db, DisasterPost from transformers import AutoTokenizer, AutoModelForSequenceClassification from ner_extractor import extract_entities from huggingface_hub import hf_hub_download # 1. Config & Setup # defines the subreddits to be monitored by the scraper SUBREDDITS = "AlistoSimulation" # SUBREDDITS = "Philippines+NaturalDisasters+DisasterUpdatePH+Assistance+Typhoon+AlistoSimulation" BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # loads environment variables from .env file env_path_1 = os.path.join(BASE_DIR, '../.env') # Inside alisto_project env_path_2 = os.path.join(BASE_DIR, '../../.env') # In the main root if os.path.exists(env_path_1): load_dotenv(env_path_1) print("✅ Loaded .env from alisto_project folder") elif os.path.exists(env_path_2): load_dotenv(env_path_2) print("✅ Loaded .env from Root folder") else: print("⚠️ WARNING: No .env file found! Passwords will be missing.") # initializes the Flask application context for database access app = Flask(__name__) DB_PATH = os.path.join(BASE_DIR, 'alisto.db') app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{DB_PATH}' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # sets a timeout for stable database connection app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {'connect_args': {'timeout': 15}} db.init_app(app) # 2. Load Models print("Loading ALISTO Brains from Cloud...") MODEL_ID = "Quivara/alisto-brain" try: # Load Tokenizer (Add subfolder argument) tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, subfolder="roberta_model") # Load Model (Add subfolder argument) roberta_model = AutoModelForSequenceClassification.from_pretrained(MODEL_ID, subfolder="roberta_model", num_labels=2) device = torch.device("cpu") roberta_model.to(device) roberta_model.eval() print(f"✅ Context Expert loaded from {MODEL_ID} (roberta_model folder)") except Exception as e: print(f"❌ Error loading Model: {e}") # Emergency Fallback to generic model so app doesn't crash exit() # B. TF-IDF (The Gatekeeper) try: print("Downloading Gatekeeper (TF-IDF)...") # TF-IDF is likely in the root, so no subfolder needed tfidf_path = hf_hub_download(repo_id=MODEL_ID, filename="tfidf_ensemble.pkl") with open(tfidf_path, 'rb') as f: tfidf_model = pickle.load(f) print("✅ Gatekeeper (TF-IDF) loaded") except Exception as e: print(f"❌ Error loading TF-IDF: {e}") tfidf_model = None # 3. Reference Lists (Kept from your original) # list of Philippine locations used for basic geo-validation PHILIPPINE_LOCATIONS = [ "Philippines", "PH", "Luzon", "Visayas", "Mindanao", "Metro Manila", "NCR", "Manila", "Quezon City", "Makati", "Taguig", "Pasig", "Mandaluyong", "Marikina", "Las Pinas", "Las Piñas", "Muntinlupa", "Caloocan", "Paranaque", "Parañaque", "Valenzuela", "Pasay", "Malabon", "Navotas", "San Juan", "Pateros", "Cavite", "Naic", "Bacoor", "Imus", "Dasmarinas", "Dasmariñas", "General Trias", "Tagaytay", "Kawit", "Noveleta", "Rosario", "Tanza", "Silang", "Trece Martires", "Laguna", "Calamba", "Santa Rosa", "Binan", "Biñan", "San Pedro", "Cabuyao", "Los Banos", "Los Baños", "Rizal", "Antipolo", "Cainta", "Taytay", "San Mateo", "Binangonan", "Batangas", "Bulacan", "Pampanga", "Tarlac", "Cebu", "Iloilo", "Tacloban", "Davao", "Cagayan", "Bicol", "Albay", "Isabela" ] # function to process a single Reddit submission through all filters and save it async def process_post(post): """handles logic for a single Reddit submission (filtering, AI, saving)""" try: full_text = f"{post.title} {post.selftext}" # A. Check for Duplicates & Credibility (Unchanged logic) # checks for existing post ID in the database with app.app_context(): exists = DisasterPost.query.filter_by(reddit_id=post.id).first() if exists: return # blocks posts from suspicious new/low-karma accounts if not is_credible_user(post): print(f"\n------------------- DEBUG REJECTION -------------------") print(f"❌ REJECTED POST ID: {post.id} (Title: {post.title[:30]})") print(f"REASON: Credibility Check (Account too new/Low Karma)") print(f"---------------------------------------------------------\n") return # B. Logic Filter (First Defense) (Unchanged logic) # runs simple keyword checks to filter news/financial/irrelevant content is_bad, reason = is_news_or_irrelevant(full_text) if is_bad: print(f"\n------------------- DEBUG REJECTION -------------------") print(f"❌ REJECTED POST ID: {post.id} (Title: {post.title[:30]})") print(f"REASON: Logic Filter (Common Sense Layer) Categorized as: {reason}") print(f"---------------------------------------------------------\n") return # C. AI Analysis (Unchanged logic) # runs the cascade AI check (TF-IDF then RoBERTa) is_urgent, score, source = predict_urgency(full_text) if not is_urgent: print(f"\n------------------- DEBUG REJECTION -------------------") print(f"❌ REJECTED POST ID: {post.id} (Title: {post.title[:30]})") print(f"REASON: AI Confidence too low Score: {score:.2%} (Source: {source})") print(f"---------------------------------------------------------\n") return # D. Entity Extraction # extracts location, contact number, and contact person name ner_results = extract_entities(full_text) locations = ner_results.get('locations', []) contact_num = ner_results.get('contact', None) contact_person_name = ner_results.get('contact_person_name', None) # E. Final Triage and Data Preparation # assigns location and determines disaster/assistance type location = locations[0] if locations else "Unknown Location" disaster_type = get_disaster_type(full_text) assistance_type = get_assistance_type(full_text) # 1. Calculate Dynamic Urgency (NEW) # assigns High, Medium, or Low urgency based on severity keywords dynamic_urgency = assign_dynamic_urgency(full_text) # 2. Finalize Author (Fallback Logic) # defaults to Reddit username if no contact name is explicitly extracted reddit_username = str(post.author) if post.author else "Unknown" final_author = contact_person_name if contact_person_name else reddit_username # 3. Print Final Alert Confirmation print(f"""------------------- ALERT SAVED -------------------\n🚨 ALERT ({score:.2%}): {disaster_type} in {location} Urgency: {dynamic_urgency} \n---------------------------------------------------------""") # F. Single Database Creation and Commit # creates and commits the final DisasterPost object to the database new_post = DisasterPost( reddit_id=post.id, title=post.title, content=post.selftext or post.title, author=final_author, location=location, contact_number=contact_num, disaster_type=disaster_type, assistance_type=assistance_type, urgency_level=dynamic_urgency, is_help_request=True, timestamp=datetime.utcfromtimestamp(post.created_utc) ) with app.app_context(): db.session.add(new_post) db.session.commit() except Exception as e: print(f"Post Processing Error for {post.id}: {e}") # validates if the extracted location is relevant to the Philippines def check_for_philippine_location(location_list): if not location_list: return False ph_locations = [loc.lower() for loc in PHILIPPINE_LOCATIONS] for extracted_loc in location_list: # Check partial match (e.g., "Marikina City" matches "Marikina") for known_loc in ph_locations: if known_loc in extracted_loc.lower() or extracted_loc.lower() in known_loc: return True return False # classifies the type of disaster based on severity keywords def get_disaster_type(text): text_lower = text.lower() mapping = { "Earthquake": ["quake", "lindol", "shake", "aftershock"], "Landslide": ["landslide", "guho", "mudslide", "natabunan"], "Volcano": ["volcano", "lava", "ash", "magma", "taal", "mayon"], "Fire": ["fire", "sunog", "burn", "smoke"], "Typhoon": ["typhoon", "bagyo", "storm", "wind", "signal", "ulysses", "odette"], "Flood": ["flood", "baha", "water", "river", "drown", "lubog", "taas ng tubig"] } for dtype, keywords in mapping.items(): if any(k in text_lower for k in keywords): return dtype return "General Emergency" # classifies the specific type of assistance needed (e.g., Medical, Rescue, Food) def get_assistance_type(text): """determines the specific help needed using Nested Priority""" text = text.lower() # --- 1. IMMEDIATE RESCUE (Life Threatening) --- rescue_kw = [ "rescue", "saklolo", "trapped", "stuck", "stranded", "bubong", "roof", "boat", "bangka", "drowning", "lunod", "di makalabas", "unable to leave" ] if any(k in text for k in rescue_kw): critical_medical_override_kw = [ "bleeding", "unconscious", "head injury", "head wound", "severely bleeding", "stroke", "heart attack", "trauma" ] if any(k in text for k in critical_medical_override_kw): return "Medical" return "Rescue" # if no critical medical keywords found # --- 2. MEDICAL (Specific Needs/Ambulance) --- # handles standalone medical needs if no rescue keywords were found medical_kw = [ "medical", "doctor", "gamot", "medicine", "insulin", "dialysis", "hospital", "oxygen", "pregnant", "labor", "manganganak", "ambulance", "first aid", "pills", "medication" ] if any(k in text for k in medical_kw): return "Medical" # --- 3. EVACUATION (Shelter/Transport) --- # classifies the need for temporary shelter or transport evac_kw = [ "evacuate", "evacuation", "shelter", "center", "likas", "tents", "matutuluyan", "alis", "transportation", "walang matutuluyan" ] if any(k in text for k in evac_kw): return "Evacuation" # --- 4. FOOD & WATER (Logistics) --- # classifies the need for essential supplies (food, water, formula) food_kw = [ "food", "pagkain", "water", "tubig", "gutom", "hungry", "relief", "goods", "makakain", "inumin", "groceries", "supplies", "supply", "wala ng stock", "gatas", "milk", "formula", "baby supplies", "ubos na", "wala na", "stock", "stock ng" ] if any(k in text for k in food_kw): return "Food/Water" return "General Assistance" # --- LOGIC FILTERS (The "Common Sense" Layer) --- # runs simple logic checks to filter out news reports and non-urgent context def is_news_or_irrelevant(text): text_lower = text.lower() # 1. NEWS & REPORTS news_indicators = [ "breaking:", "just in:", "news:", "update:", "report:", "casualties", "death toll", "according to", "reported that", "suspension", "declared", "signal no", "public advisory", "weather update", "volcano alert", "mmda", "pagasa" ] # 2. MONEY / SELLING financial_indicators = [ "gcash", "paypal", "budget", "loan", "selling", "fundraising", "donate", "send funds" ] # 3. IRRELEVANT CONTEXT irrelevant_contexts = [ "how can i help", "where to donate", "thoughts and prayers", "keep safe", "god bless", "praying for", "discussion:", "opinion:" ] # Logic Checks if any(ind in text_lower for ind in news_indicators): return True, "News/Report" # blocks financial requests unless life-threatening keywords are also present has_financial = any(ind in text_lower for ind in financial_indicators) is_life_death = any(k in text_lower for k in ["trapped", "lubog", "roof", "rescue", "drowning", "stuck"]) if has_financial and not is_life_death: return True, "Financial/Non-Urgent" # blocks posts containing non-urgent discussion or commentary if any(ctx in text_lower for ctx in irrelevant_contexts): return True, "Context/NotUrgent" return False, None # runs the two-stage AI classification check (TF-IDF then RoBERTa) def predict_urgency(text): # 1. Gatekeeper (TF-IDF) # quickly rejects posts with extremely low urgency confidence (below 10%) if tfidf_model: tfidf_probs = tfidf_model.predict_proba([text])[0] tfidf_conf = tfidf_probs[1] # If the fast model is sure it's junk, skip the heavy lifting if tfidf_conf < 0.20: return False, tfidf_conf, "TF-IDF Reject" # 2. Context Expert (RoBERTa) # runs the slower, context-aware model for final classification inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=128) with torch.no_grad(): outputs = roberta_model(**inputs) probs = F.softmax(outputs.logits, dim=-1) roberta_conf = probs[0][1].item() # Probability of 'Rescue Request' # final acceptance threshold (40%) for the RoBERTa model return (roberta_conf > 0.4), roberta_conf, "RoBERTa" # assigns the final severity level (High, Medium, Low) based on severity keywords def assign_dynamic_urgency(text): text_lower = text.lower() # 1. HIGH URGENCY (Immediate Life-Threatening Event or Critical Medical Need) high_keywords = [ "bleeding", "unconscious", "severely injured", "severe injury", "life threatening", "insulin", "oxygen", "ambulance", "urgent medicine", "doctor", "hospital", "trap", "trapped", "bubong", "collapsed", "di mapigilan", "drowning", "lampas tao", "lubog", "delikado", "baha na", "mamatay" ] if any(k in text_lower for k in high_keywords): return "High" # 2. MEDIUM URGENCY (Time-Sensitive, Logistical Crisis) medium_keywords = [ "stranded", "running out", "evacuate", "kailangan agad", "lowbat", "paubos", "senior", "bedridden", "disabled", "gatas", "formula" ] if any(k in text_lower for k in medium_keywords): return "Medium" # 3. LOW URGENCY (General Supplies/Warning) # posts that pass the AI but lack the above severity indicators fall here return "Low" # blocks posts from accounts created less than 2 days ago or with negative karma def is_credible_user(post): try: author = post.author # checks if author is deleted or unknown if not author: return False # 1. Check Account Age (Must be older than 2 days) created_time = datetime.utcfromtimestamp(author.created_utc) account_age = datetime.utcnow() - created_time if account_age.days < 2: print(f" ⚠️ Blocked: Account too new ({account_age.days} days)") return False # 2. Check Karma (Must not be negative) total_karma = author.comment_karma + author.link_karma if total_karma < -5: print(f" ⚠️ Blocked: Negative Karma ({total_karma})") return False return True except Exception as e: # allows posts to pass if Reddit API fails to get user info return True # 4. Main Scraper Loop async def scrape_reddit(): print("Connecting to Reddit API...") client_id = os.getenv("REDDIT_CLIENT_ID") client_secret = os.getenv("REDDIT_CLIENT_SECRET") # -------------------------------- if not client_id or not client_secret: print("❌ Error: Client ID or Secret missing in .env") return reddit = asyncpraw.Reddit( client_id=os.getenv("REDDIT_CLIENT_ID"), client_secret=os.getenv("REDDIT_CLIENT_SECRET"), user_agent=os.getenv("REDDIT_USER_AGENT"), username=os.getenv("REDDIT_USERNAME"), password=os.getenv("REDDIT_PASSWORD") ) try: subreddit = await reddit.subreddit(SUBREDDITS) print(f"👁️  ALISTO ACTIVE: Monitoring r/{SUBREDDITS}...") # --- PHASE 1: FETCH LATEST EXISTING POSTS (e.g., last 500) --- print("🔍 Scanning last 5 posts for missed alerts...") # iterates over the last 5 posts asynchronously async for post in subreddit.new(limit=5): await process_post(post) print("✅ Historical scan complete") # --- PHASE 2: START REAL-TIME STREAM (Forever Loop) --- print("📡 Starting real-time stream for new submissions...") # starts the continuous loop to monitor for new submissions async for post in subreddit.stream.submissions(skip_existing=False): await process_post(post) except Exception as e: print(f"Global Scraper Error: {e}") finally: await reddit.close() print("Scraper stopped") # executes the main scraping loop when the script is run if __name__ == "__main__": try: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(scrape_reddit()) except KeyboardInterrupt: print("\n🛑 Stopped by user")