Quivara's picture
Update alisto_project/backend/ingest_reddit.py
980b953 verified
import asyncpraw
import asyncio
import os
import torch
import pickle
import numpy as np
import torch.nn.functional as F
from datetime import datetime
from dotenv import load_dotenv
from flask import Flask
from models import db, DisasterPost
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from ner_extractor import extract_entities
from huggingface_hub import hf_hub_download
# 1. Config & Setup
# defines the subreddits to be monitored by the scraper
SUBREDDITS = "AlistoSimulation"
# SUBREDDITS = "Philippines+NaturalDisasters+DisasterUpdatePH+Assistance+Typhoon+AlistoSimulation"
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# loads environment variables from .env file
env_path_1 = os.path.join(BASE_DIR, '../.env') # Inside alisto_project
env_path_2 = os.path.join(BASE_DIR, '../../.env') # In the main root
if os.path.exists(env_path_1):
load_dotenv(env_path_1)
print("✅ Loaded .env from alisto_project folder")
elif os.path.exists(env_path_2):
load_dotenv(env_path_2)
print("✅ Loaded .env from Root folder")
else:
print("⚠️ WARNING: No .env file found! Passwords will be missing.")
# initializes the Flask application context for database access
app = Flask(__name__)
DB_PATH = os.path.join(BASE_DIR, 'alisto.db')
app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{DB_PATH}'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# sets a timeout for stable database connection
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {'connect_args': {'timeout': 15}}
db.init_app(app)
# 2. Load Models
print("Loading ALISTO Brains from Cloud...")
MODEL_ID = "Quivara/alisto-brain"
try:
# Load Tokenizer (Add subfolder argument)
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, subfolder="roberta_model")
# Load Model (Add subfolder argument)
roberta_model = AutoModelForSequenceClassification.from_pretrained(MODEL_ID, subfolder="roberta_model", num_labels=2)
device = torch.device("cpu")
roberta_model.to(device)
roberta_model.eval()
print(f"✅ Context Expert loaded from {MODEL_ID} (roberta_model folder)")
except Exception as e:
print(f"❌ Error loading Model: {e}")
# Emergency Fallback to generic model so app doesn't crash
exit()
# B. TF-IDF (The Gatekeeper)
try:
print("Downloading Gatekeeper (TF-IDF)...")
# TF-IDF is likely in the root, so no subfolder needed
tfidf_path = hf_hub_download(repo_id=MODEL_ID, filename="tfidf_ensemble.pkl")
with open(tfidf_path, 'rb') as f:
tfidf_model = pickle.load(f)
print("✅ Gatekeeper (TF-IDF) loaded")
except Exception as e:
print(f"❌ Error loading TF-IDF: {e}")
tfidf_model = None
# 3. Reference Lists (Kept from your original)
# list of Philippine locations used for basic geo-validation
PHILIPPINE_LOCATIONS = [
"Philippines", "PH", "Luzon", "Visayas", "Mindanao", "Metro Manila", "NCR",
"Manila", "Quezon City", "Makati", "Taguig", "Pasig", "Mandaluyong",
"Marikina", "Las Pinas", "Las Piñas", "Muntinlupa", "Caloocan",
"Paranaque", "Parañaque", "Valenzuela", "Pasay", "Malabon",
"Navotas", "San Juan", "Pateros",
"Cavite", "Naic", "Bacoor", "Imus", "Dasmarinas", "Dasmariñas",
"General Trias", "Tagaytay", "Kawit", "Noveleta", "Rosario", "Tanza",
"Silang", "Trece Martires", "Laguna", "Calamba", "Santa Rosa", "Binan",
"Biñan", "San Pedro", "Cabuyao", "Los Banos", "Los Baños", "Rizal",
"Antipolo", "Cainta", "Taytay", "San Mateo", "Binangonan", "Batangas",
"Bulacan", "Pampanga", "Tarlac", "Cebu", "Iloilo", "Tacloban",
"Davao", "Cagayan", "Bicol", "Albay", "Isabela"
]
# function to process a single Reddit submission through all filters and save it
async def process_post(post):
"""handles logic for a single Reddit submission (filtering, AI, saving)"""
try:
full_text = f"{post.title} {post.selftext}"
# A. Check for Duplicates & Credibility (Unchanged logic)
# checks for existing post ID in the database
with app.app_context():
exists = DisasterPost.query.filter_by(reddit_id=post.id).first()
if exists: return
# blocks posts from suspicious new/low-karma accounts
if not is_credible_user(post):
print(f"\n------------------- DEBUG REJECTION -------------------")
print(f"❌ REJECTED POST ID: {post.id} (Title: {post.title[:30]})")
print(f"REASON: Credibility Check (Account too new/Low Karma)")
print(f"---------------------------------------------------------\n")
return
# B. Logic Filter (First Defense) (Unchanged logic)
# runs simple keyword checks to filter news/financial/irrelevant content
is_bad, reason = is_news_or_irrelevant(full_text)
if is_bad:
print(f"\n------------------- DEBUG REJECTION -------------------")
print(f"❌ REJECTED POST ID: {post.id} (Title: {post.title[:30]})")
print(f"REASON: Logic Filter (Common Sense Layer) Categorized as: {reason}")
print(f"---------------------------------------------------------\n")
return
# C. AI Analysis (Unchanged logic)
# runs the cascade AI check (TF-IDF then RoBERTa)
is_urgent, score, source = predict_urgency(full_text)
if not is_urgent:
print(f"\n------------------- DEBUG REJECTION -------------------")
print(f"❌ REJECTED POST ID: {post.id} (Title: {post.title[:30]})")
print(f"REASON: AI Confidence too low Score: {score:.2%} (Source: {source})")
print(f"---------------------------------------------------------\n")
return
# D. Entity Extraction
# extracts location, contact number, and contact person name
ner_results = extract_entities(full_text)
locations = ner_results.get('locations', [])
contact_num = ner_results.get('contact', None)
contact_person_name = ner_results.get('contact_person_name', None)
# E. Final Triage and Data Preparation
# assigns location and determines disaster/assistance type
location = locations[0] if locations else "Unknown Location"
disaster_type = get_disaster_type(full_text)
assistance_type = get_assistance_type(full_text)
# 1. Calculate Dynamic Urgency (NEW)
# assigns High, Medium, or Low urgency based on severity keywords
dynamic_urgency = assign_dynamic_urgency(full_text)
# 2. Finalize Author (Fallback Logic)
# defaults to Reddit username if no contact name is explicitly extracted
reddit_username = str(post.author) if post.author else "Unknown"
final_author = contact_person_name if contact_person_name else reddit_username
# 3. Print Final Alert Confirmation
print(f"""------------------- ALERT SAVED -------------------\n🚨 ALERT ({score:.2%}): {disaster_type} in {location} Urgency: {dynamic_urgency} \n---------------------------------------------------------""")
# F. Single Database Creation and Commit
# creates and commits the final DisasterPost object to the database
new_post = DisasterPost(
reddit_id=post.id,
title=post.title,
content=post.selftext or post.title,
author=final_author,
location=location,
contact_number=contact_num,
disaster_type=disaster_type,
assistance_type=assistance_type,
urgency_level=dynamic_urgency,
is_help_request=True,
timestamp=datetime.utcfromtimestamp(post.created_utc)
)
with app.app_context():
db.session.add(new_post)
db.session.commit()
except Exception as e:
print(f"Post Processing Error for {post.id}: {e}")
# validates if the extracted location is relevant to the Philippines
def check_for_philippine_location(location_list):
if not location_list: return False
ph_locations = [loc.lower() for loc in PHILIPPINE_LOCATIONS]
for extracted_loc in location_list:
# Check partial match (e.g., "Marikina City" matches "Marikina")
for known_loc in ph_locations:
if known_loc in extracted_loc.lower() or extracted_loc.lower() in known_loc:
return True
return False
# classifies the type of disaster based on severity keywords
def get_disaster_type(text):
text_lower = text.lower()
mapping = {
"Earthquake": ["quake", "lindol", "shake", "aftershock"],
"Landslide": ["landslide", "guho", "mudslide", "natabunan"],
"Volcano": ["volcano", "lava", "ash", "magma", "taal", "mayon"],
"Fire": ["fire", "sunog", "burn", "smoke"],
"Typhoon": ["typhoon", "bagyo", "storm", "wind", "signal", "ulysses", "odette"],
"Flood": ["flood", "baha", "water", "river", "drown", "lubog", "taas ng tubig"]
}
for dtype, keywords in mapping.items():
if any(k in text_lower for k in keywords):
return dtype
return "General Emergency"
# classifies the specific type of assistance needed (e.g., Medical, Rescue, Food)
def get_assistance_type(text):
"""determines the specific help needed using Nested Priority"""
text = text.lower()
# --- 1. IMMEDIATE RESCUE (Life Threatening) ---
rescue_kw = [
"rescue", "saklolo", "trapped", "stuck", "stranded",
"bubong", "roof", "boat", "bangka", "drowning", "lunod",
"di makalabas", "unable to leave"
]
if any(k in text for k in rescue_kw):
critical_medical_override_kw = [
"bleeding", "unconscious", "head injury", "head wound",
"severely bleeding", "stroke", "heart attack", "trauma"
]
if any(k in text for k in critical_medical_override_kw):
return "Medical"
return "Rescue" # if no critical medical keywords found
# --- 2. MEDICAL (Specific Needs/Ambulance) ---
# handles standalone medical needs if no rescue keywords were found
medical_kw = [
"medical", "doctor", "gamot", "medicine", "insulin", "dialysis",
"hospital", "oxygen", "pregnant", "labor", "manganganak", "ambulance",
"first aid", "pills", "medication"
]
if any(k in text for k in medical_kw):
return "Medical"
# --- 3. EVACUATION (Shelter/Transport) ---
# classifies the need for temporary shelter or transport
evac_kw = [
"evacuate", "evacuation", "shelter", "center", "likas", "tents",
"matutuluyan", "alis", "transportation", "walang matutuluyan"
]
if any(k in text for k in evac_kw):
return "Evacuation"
# --- 4. FOOD & WATER (Logistics) ---
# classifies the need for essential supplies (food, water, formula)
food_kw = [
"food", "pagkain", "water", "tubig", "gutom", "hungry", "relief",
"goods", "makakain", "inumin", "groceries", "supplies", "supply", "wala ng stock",
"gatas", "milk", "formula", "baby supplies", "ubos na", "wala na", "stock", "stock ng"
]
if any(k in text for k in food_kw):
return "Food/Water"
return "General Assistance"
# --- LOGIC FILTERS (The "Common Sense" Layer) ---
# runs simple logic checks to filter out news reports and non-urgent context
def is_news_or_irrelevant(text):
text_lower = text.lower()
# 1. NEWS & REPORTS
news_indicators = [
"breaking:", "just in:", "news:", "update:", "report:",
"casualties", "death toll", "according to", "reported that",
"suspension", "declared", "signal no", "public advisory",
"weather update", "volcano alert", "mmda", "pagasa"
]
# 2. MONEY / SELLING
financial_indicators = [
"gcash", "paypal", "budget", "loan", "selling",
"fundraising", "donate", "send funds"
]
# 3. IRRELEVANT CONTEXT
irrelevant_contexts = [
"how can i help", "where to donate", "thoughts and prayers",
"keep safe", "god bless", "praying for", "discussion:", "opinion:"
]
# Logic Checks
if any(ind in text_lower for ind in news_indicators):
return True, "News/Report"
# blocks financial requests unless life-threatening keywords are also present
has_financial = any(ind in text_lower for ind in financial_indicators)
is_life_death = any(k in text_lower for k in ["trapped", "lubog", "roof", "rescue", "drowning", "stuck"])
if has_financial and not is_life_death:
return True, "Financial/Non-Urgent"
# blocks posts containing non-urgent discussion or commentary
if any(ctx in text_lower for ctx in irrelevant_contexts):
return True, "Context/NotUrgent"
return False, None
# runs the two-stage AI classification check (TF-IDF then RoBERTa)
def predict_urgency(text):
# 1. Gatekeeper (TF-IDF)
# quickly rejects posts with extremely low urgency confidence (below 10%)
if tfidf_model:
tfidf_probs = tfidf_model.predict_proba([text])[0]
tfidf_conf = tfidf_probs[1]
# If the fast model is sure it's junk, skip the heavy lifting
if tfidf_conf < 0.20:
return False, tfidf_conf, "TF-IDF Reject"
# 2. Context Expert (RoBERTa)
# runs the slower, context-aware model for final classification
inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=128)
with torch.no_grad():
outputs = roberta_model(**inputs)
probs = F.softmax(outputs.logits, dim=-1)
roberta_conf = probs[0][1].item() # Probability of 'Rescue Request'
# final acceptance threshold (40%) for the RoBERTa model
return (roberta_conf > 0.4), roberta_conf, "RoBERTa"
# assigns the final severity level (High, Medium, Low) based on severity keywords
def assign_dynamic_urgency(text):
text_lower = text.lower()
# 1. HIGH URGENCY (Immediate Life-Threatening Event or Critical Medical Need)
high_keywords = [
"bleeding", "unconscious", "severely injured", "severe injury", "life threatening",
"insulin", "oxygen", "ambulance", "urgent medicine", "doctor", "hospital",
"trap", "trapped", "bubong", "collapsed", "di mapigilan", "drowning",
"lampas tao", "lubog", "delikado", "baha na", "mamatay"
]
if any(k in text_lower for k in high_keywords):
return "High"
# 2. MEDIUM URGENCY (Time-Sensitive, Logistical Crisis)
medium_keywords = [
"stranded", "running out", "evacuate", "kailangan agad", "lowbat",
"paubos", "senior", "bedridden", "disabled", "gatas", "formula"
]
if any(k in text_lower for k in medium_keywords):
return "Medium"
# 3. LOW URGENCY (General Supplies/Warning)
# posts that pass the AI but lack the above severity indicators fall here
return "Low"
# blocks posts from accounts created less than 2 days ago or with negative karma
def is_credible_user(post):
try:
author = post.author
# checks if author is deleted or unknown
if not author:
return False
# 1. Check Account Age (Must be older than 2 days)
created_time = datetime.utcfromtimestamp(author.created_utc)
account_age = datetime.utcnow() - created_time
if account_age.days < 2:
print(f" ⚠️ Blocked: Account too new ({account_age.days} days)")
return False
# 2. Check Karma (Must not be negative)
total_karma = author.comment_karma + author.link_karma
if total_karma < -5:
print(f" ⚠️ Blocked: Negative Karma ({total_karma})")
return False
return True
except Exception as e:
# allows posts to pass if Reddit API fails to get user info
return True
# 4. Main Scraper Loop
async def scrape_reddit():
print("Connecting to Reddit API...")
client_id = os.getenv("REDDIT_CLIENT_ID")
client_secret = os.getenv("REDDIT_CLIENT_SECRET")
# --------------------------------
if not client_id or not client_secret:
print("❌ Error: Client ID or Secret missing in .env")
return
reddit = asyncpraw.Reddit(
client_id=os.getenv("REDDIT_CLIENT_ID"),
client_secret=os.getenv("REDDIT_CLIENT_SECRET"),
user_agent=os.getenv("REDDIT_USER_AGENT"),
username=os.getenv("REDDIT_USERNAME"),
password=os.getenv("REDDIT_PASSWORD")
)
try:
subreddit = await reddit.subreddit(SUBREDDITS)
print(f"👁️  ALISTO ACTIVE: Monitoring r/{SUBREDDITS}...")
# --- PHASE 1: FETCH LATEST EXISTING POSTS (e.g., last 500) ---
print("🔍 Scanning last 5 posts for missed alerts...")
# iterates over the last 5 posts asynchronously
async for post in subreddit.new(limit=5):
await process_post(post)
print("✅ Historical scan complete")
# --- PHASE 2: START REAL-TIME STREAM (Forever Loop) ---
print("📡 Starting real-time stream for new submissions...")
# starts the continuous loop to monitor for new submissions
async for post in subreddit.stream.submissions(skip_existing=False):
await process_post(post)
except Exception as e:
print(f"Global Scraper Error: {e}")
finally:
await reddit.close()
print("Scraper stopped")
# executes the main scraping loop when the script is run
if __name__ == "__main__":
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(scrape_reddit())
except KeyboardInterrupt:
print("\n🛑 Stopped by user")