# alerts.py import os import time import logging from typing import List, Optional from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart import smtplib from dotenv import load_dotenv from db import fetch_high_risk_unnotified, mark_as_notified load_dotenv() # --- Detect Hugging Face Environment --- IS_HF = os.getenv("SPACE_ID") or os.getenv("HF_SPACE_ID") # --- Configuration (via env) --- ALERT_EMAIL = os.getenv("ALERT_EMAIL") ALERT_PASSWORD = os.getenv("ALERT_PASSWORD") ALERT_RECIPIENTS = os.getenv("ALERT_RECIPIENTS") SMTP_SERVER = os.getenv("ALERT_SMTP", "smtp.gmail.com") SMTP_PORT = int(os.getenv("ALERT_SMTP_PORT", "587")) SEND_RETRY = int(os.getenv("ALERT_SEND_RETRY", "2")) RETRY_DELAY = float(os.getenv("ALERT_RETRY_DELAY", "2.0")) BATCH_DELAY = float(os.getenv("ALERT_BATCH_DELAY", "0.5")) # If running in HF or credentials missing → use simulation mode SIMULATE_ALERTS = IS_HF or not (ALERT_EMAIL and ALERT_PASSWORD) if SIMULATE_ALERTS: logging.warning("⚠️ Running in simulation mode: Email alerts will not be sent (HF Space detected).") else: logging.info("✅ Real email alerts enabled.") def _parse_recipients(env_val: Optional[str]) -> List[str]: if not env_val: return [ALERT_EMAIL] return [r.strip() for r in env_val.split(",") if r.strip()] RECIPIENTS = _parse_recipients(ALERT_RECIPIENTS) # logger logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") def compute_dynamic_risk(tweet: dict) -> float: """ Returns a dynamic risk score (0–100) """ # 1️⃣ Content-based weight drug_score = float(tweet.get("drug_score", 0)) crime_score = float(tweet.get("crime_score", 0)) content_score = 0.5*drug_score + 0.5*crime_score # can adjust weights # 2️⃣ User influence weight followers = int(tweet.get("followers_count", 0)) verified = 1 if tweet.get("verified", False) else 0 user_score = min(followers/1000, 1) * 0.5 + verified*0.5 # normalize # 3️⃣ Engagement weight engagement = int(tweet.get("like_count", 0)) + int(tweet.get("retweet_count", 0)) engagement_score = min(engagement / 50, 1) # normalize to 0-1 # 4️⃣ Geo relevance location = str(tweet.get("user_location", "")).lower() karnataka_keywords = ["bangalore", "bengaluru", "karnataka"] geo_score = 1 if any(k in location for k in karnataka_keywords) else 0 # Combine weights (adjust relative contributions) risk_score = ( 0.4*content_score + 0.2*user_score + 0.2*engagement_score + 0.2*geo_score ) # Scale to 0–100 return round(risk_score*100, 2) def assign_dynamic_risk_level(tweet: dict) -> str: score = compute_dynamic_risk(tweet) if score >= 75: return "CRITICAL" elif score >= 50: return "HIGH" elif score >= 25: return "MEDIUM" else: return "LOW" def _tweet_score_for_sort(tweet: dict) -> float: """Use dynamic risk for sorting instead of static RISK_PRIORITIES""" risk_score = compute_dynamic_risk(tweet) # 0–100 engagement = int(tweet.get("like_count", 0) or 0) + int(tweet.get("retweet_count", 0) or 0) return risk_score + engagement*0.1 # small weight to engagement def _select_top_tweets(tweets: List[dict], max_tweets: Optional[int], send_all: bool) -> List[dict]: """Sort and slice tweets according to priority; return selected tweets.""" if not tweets: return [] # normalize risk_level fields to uppercase to avoid mismatches for t in tweets: if "risk_level" in t and isinstance(t["risk_level"], str): t["risk_level"] = t["risk_level"].upper() # sort by risk then engagement (both descending) tweets_sorted = sorted(tweets, key=lambda t: _tweet_score_for_sort(t), reverse=True) if send_all or max_tweets is None: return tweets_sorted return tweets_sorted[:max_tweets] def _format_tweet_html_block(tweet: dict) -> str: """Return an HTML block describing a tweet (for batched email).""" tweet_id = tweet.get("tweet_id", "N/A") user = tweet.get("username") or tweet.get("user") or tweet.get("user_name") or "N/A" content = tweet.get("content") or tweet.get("text") or "" timestamp = tweet.get("datetime") or tweet.get("timestamp") or "N/A" location = tweet.get("user_location") or tweet.get("location") or "N/A" risk = tweet.get("risk_level", "N/A") likes = tweet.get("like_count", 0) rts = tweet.get("retweet_count", 0) url = tweet.get("tweet_url") or f"https://x.com/{user}/status/{tweet_id}" if tweet_id != "N/A" else "N/A" # Bulk detection bulk_keywords = ["kg", "gram", "bulk", "kilos", "ounce", "pound"] bulk_indicator = "Yes" if any(k in content.lower() for k in bulk_keywords) else "No" # Contact detection (simple digit check) contact_indicator = "Yes" if any(c.isdigit() for c in content) else "No" html = f"""
Risk: {risk} User: @{user} Time: {timestamp}
Location: {location}
{content}
View Tweet | Tweet ID: {tweet_id}
| User | Dynamic Risk | Followers | Verified | Engagement | Location | Link |
|---|---|---|---|---|---|---|
| @{user} | {risk_score} | {followers} | {verified} | {engagement} | {location} | View |
| Risk | User | Dynamic Risk | Followers | Verified | Engagement | Geo Score | Location | Bulk | Contact | Content | Link |
|---|---|---|---|---|---|---|---|---|---|---|---|
| {risk} | @{user} | {dyn_risk} | {followers} | {verified} | {engagement} | {geo_score} | {location} | {bulk_indicator} | {contact_indicator} | {content} | View |
Generated by Karnataka Drug Crime Monitoring System
""" # Plain-text fallback plain_text = "\n".join([ f"{t.get('risk_level')} | @{t.get('username')} | {t.get('dynamic_risk_score')} | {t.get('content','')[:100]}" for t in tweets ]) msg.attach(MIMEText(plain_text, "plain")) msg.attach(MIMEText(html_text, "html")) return msg # --- SMTP send with retries --- # def _send_email_message(msg: MIMEMultipart, recipients: List[str], retry: int = SEND_RETRY) -> bool: """Send message via SMTP (real or simulated).""" if SIMULATE_ALERTS: logging.info(f"[SIMULATED ALERT] Would send email to {recipients} with subject: {msg['Subject']}") return True attempt = 0 while attempt <= retry: try: with smtplib.SMTP(SMTP_SERVER, SMTP_PORT, timeout=20) as s: s.ehlo() if SMTP_PORT == 587: s.starttls() s.ehlo() s.login(ALERT_EMAIL, ALERT_PASSWORD) s.sendmail(ALERT_EMAIL, recipients, msg.as_string()) logging.info(f"✅ Email sent to {recipients}") return True except Exception as e: attempt += 1 logging.warning(f"Email send attempt {attempt} failed: {e}") if attempt > retry: logging.error("Exceeded email send retries.") return False time.sleep(RETRY_DELAY) # --- Public trigger function --- # def trigger_alerts(max_tweets: Optional[int] = 10, send_all: bool = False, separate_emails: bool = False): logging.info("Fetching high-risk unnotified tweets from DB...") tweets = fetch_high_risk_unnotified() if not tweets: logging.info("No unnotified high-risk tweets found.") return # --- Compute dynamic risk for all fetched tweets --- for t in tweets: t["dynamic_risk_score"] = compute_dynamic_risk(t) t["risk_level"] = assign_dynamic_risk_level(t) # automatically set risk_level selected = _select_top_tweets(tweets, max_tweets, send_all) if not selected: logging.info("No tweets selected after filtering.") return # Compose and send emails (batch or separate) success_ids, failure_ids = [], [] if separate_emails: for t in selected: msg = _compose_batched_email([t]) ok = _send_email_message(msg, RECIPIENTS) if ok: success_ids.append((t.get("tweet_id"), t.get("_collection_name"))) else: failure_ids.append(t.get("tweet_id")) time.sleep(BATCH_DELAY) else: msg = _compose_batched_email(selected) ok = _send_email_message(msg, RECIPIENTS) if ok: success_ids.extend([(t.get("tweet_id"), t.get("_collection_name")) for t in selected]) else: failure_ids.extend([t.get("tweet_id") for t in selected]) # Mark notified for tid, maybe_collection in success_ids: try: mark_as_notified(tid) except Exception as e: logging.error(f"Failed to mark {tid} as notified: {e}") logging.info(f"Alerts sent: {len(success_ids)}; failures: {len(failure_ids)}") if failure_ids: logging.warning(f"Failed tweet IDs: {failure_ids}") def compute_risk_probability(dynamic_score: float) -> float: """ Convert dynamic risk score (0–100) into a probability 0–1 """ return max(0.0, min(1.0, dynamic_score / 100)) # --- CLI usage example --- # if __name__ == "__main__": # Example usages: # - send up to 5 top tweets (batched) -> trigger_alerts(max_tweets=5) # - send all unnotified high-risk tweets -> trigger_alerts(send_all=True) # - send one email per tweet -> trigger_alerts(max_tweets=10, separate_emails=True) # Default example: top 10 (batched) trigger_alerts(max_tweets=10)