Spaces:
Sleeping
Sleeping
| # alerts.py | |
| import os | |
| import time | |
| import logging | |
| from typing import List, Optional | |
| from email.mime.text import MIMEText | |
| from email.mime.multipart import MIMEMultipart | |
| import smtplib | |
| from dotenv import load_dotenv | |
| from db import fetch_high_risk_unnotified, mark_as_notified | |
| load_dotenv() | |
| # --- Detect Hugging Face Environment --- | |
| IS_HF = os.getenv("SPACE_ID") or os.getenv("HF_SPACE_ID") | |
| # --- Configuration (via env) --- | |
| ALERT_EMAIL = os.getenv("ALERT_EMAIL") | |
| ALERT_PASSWORD = os.getenv("ALERT_PASSWORD") | |
| ALERT_RECIPIENTS = os.getenv("ALERT_RECIPIENTS") | |
| SMTP_SERVER = os.getenv("ALERT_SMTP", "smtp.gmail.com") | |
| SMTP_PORT = int(os.getenv("ALERT_SMTP_PORT", "587")) | |
| SEND_RETRY = int(os.getenv("ALERT_SEND_RETRY", "2")) | |
| RETRY_DELAY = float(os.getenv("ALERT_RETRY_DELAY", "2.0")) | |
| BATCH_DELAY = float(os.getenv("ALERT_BATCH_DELAY", "0.5")) | |
| # If running in HF or credentials missing → use simulation mode | |
| SIMULATE_ALERTS = IS_HF or not (ALERT_EMAIL and ALERT_PASSWORD) | |
| if SIMULATE_ALERTS: | |
| logging.warning("⚠️ Running in simulation mode: Email alerts will not be sent (HF Space detected).") | |
| else: | |
| logging.info("✅ Real email alerts enabled.") | |
| def _parse_recipients(env_val: Optional[str]) -> List[str]: | |
| if not env_val: | |
| return [ALERT_EMAIL] | |
| return [r.strip() for r in env_val.split(",") if r.strip()] | |
| RECIPIENTS = _parse_recipients(ALERT_RECIPIENTS) | |
| # logger | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
| def compute_dynamic_risk(tweet: dict) -> float: | |
| """ | |
| Returns a dynamic risk score (0–100) | |
| """ | |
| # 1️⃣ Content-based weight | |
| drug_score = float(tweet.get("drug_score", 0)) | |
| crime_score = float(tweet.get("crime_score", 0)) | |
| content_score = 0.5*drug_score + 0.5*crime_score # can adjust weights | |
| # 2️⃣ User influence weight | |
| followers = int(tweet.get("followers_count", 0)) | |
| verified = 1 if tweet.get("verified", False) else 0 | |
| user_score = min(followers/1000, 1) * 0.5 + verified*0.5 # normalize | |
| # 3️⃣ Engagement weight | |
| engagement = int(tweet.get("like_count", 0)) + int(tweet.get("retweet_count", 0)) | |
| engagement_score = min(engagement / 50, 1) # normalize to 0-1 | |
| # 4️⃣ Geo relevance | |
| location = str(tweet.get("user_location", "")).lower() | |
| karnataka_keywords = ["bangalore", "bengaluru", "karnataka"] | |
| geo_score = 1 if any(k in location for k in karnataka_keywords) else 0 | |
| # Combine weights (adjust relative contributions) | |
| risk_score = ( | |
| 0.4*content_score + | |
| 0.2*user_score + | |
| 0.2*engagement_score + | |
| 0.2*geo_score | |
| ) | |
| # Scale to 0–100 | |
| return round(risk_score*100, 2) | |
| def assign_dynamic_risk_level(tweet: dict) -> str: | |
| score = compute_dynamic_risk(tweet) | |
| if score >= 75: | |
| return "CRITICAL" | |
| elif score >= 50: | |
| return "HIGH" | |
| elif score >= 25: | |
| return "MEDIUM" | |
| else: | |
| return "LOW" | |
| def _tweet_score_for_sort(tweet: dict) -> float: | |
| """Use dynamic risk for sorting instead of static RISK_PRIORITIES""" | |
| risk_score = compute_dynamic_risk(tweet) # 0–100 | |
| engagement = int(tweet.get("like_count", 0) or 0) + int(tweet.get("retweet_count", 0) or 0) | |
| return risk_score + engagement*0.1 # small weight to engagement | |
| def _select_top_tweets(tweets: List[dict], max_tweets: Optional[int], send_all: bool) -> List[dict]: | |
| """Sort and slice tweets according to priority; return selected tweets.""" | |
| if not tweets: | |
| return [] | |
| # normalize risk_level fields to uppercase to avoid mismatches | |
| for t in tweets: | |
| if "risk_level" in t and isinstance(t["risk_level"], str): | |
| t["risk_level"] = t["risk_level"].upper() | |
| # sort by risk then engagement (both descending) | |
| tweets_sorted = sorted(tweets, key=lambda t: _tweet_score_for_sort(t), reverse=True) | |
| if send_all or max_tweets is None: | |
| return tweets_sorted | |
| return tweets_sorted[:max_tweets] | |
| def _format_tweet_html_block(tweet: dict) -> str: | |
| """Return an HTML block describing a tweet (for batched email).""" | |
| tweet_id = tweet.get("tweet_id", "N/A") | |
| user = tweet.get("username") or tweet.get("user") or tweet.get("user_name") or "N/A" | |
| content = tweet.get("content") or tweet.get("text") or "" | |
| timestamp = tweet.get("datetime") or tweet.get("timestamp") or "N/A" | |
| location = tweet.get("user_location") or tweet.get("location") or "N/A" | |
| risk = tweet.get("risk_level", "N/A") | |
| likes = tweet.get("like_count", 0) | |
| rts = tweet.get("retweet_count", 0) | |
| url = tweet.get("tweet_url") or f"https://x.com/{user}/status/{tweet_id}" if tweet_id != "N/A" else "N/A" | |
| # Bulk detection | |
| bulk_keywords = ["kg", "gram", "bulk", "kilos", "ounce", "pound"] | |
| bulk_indicator = "Yes" if any(k in content.lower() for k in bulk_keywords) else "No" | |
| # Contact detection (simple digit check) | |
| contact_indicator = "Yes" if any(c.isdigit() for c in content) else "No" | |
| html = f""" | |
| <div style="border:1px solid #ddd;padding:10px;margin-bottom:8px;border-radius:6px;"> | |
| <p><strong>Risk:</strong> <span style="color:#b22222">{risk}</span> | |
| <strong>User:</strong> @{user} <strong>Time:</strong> {timestamp}</p> | |
| <p><strong>Location:</strong> {location} <td>{bulk_indicator}</td><td>{contact_indicator}</td><td>{content}</td><strong>Likes:</strong> {likes} <strong>RTs:</strong> {rts}</p> | |
| <p style="background:#f7f7f7;padding:8px;border-radius:4px;">{content}</p> | |
| <p><a href="{url}">View Tweet</a> | Tweet ID: {tweet_id}</p> | |
| </div> | |
| """ | |
| return html | |
| def _compose_batched_email(tweets: List[dict]) -> MIMEMultipart: | |
| msg = MIMEMultipart("alternative") | |
| msg["Subject"] = f"🚨 {len(tweets)} High-Priority Drug Alerts" | |
| msg["From"] = ALERT_EMAIL | |
| msg["To"] = ", ".join(RECIPIENTS) | |
| # --- Top CRITICAL summary --- | |
| critical_tweets = [t for t in tweets if t.get("risk_level") == "CRITICAL"] | |
| top_critical = sorted(critical_tweets, key=lambda t: t.get("dynamic_risk_score", 0), reverse=True)[:10] | |
| summary_html = "" | |
| if top_critical: | |
| summary_html += """ | |
| <h3 style="color:#b22222;">Top CRITICAL Tweets Summary</h3> | |
| <table border="1" cellpadding="5" cellspacing="0" style="border-collapse: collapse;"> | |
| <tr> | |
| <th>User</th><th>Dynamic Risk</th><th>Followers</th><th>Verified</th><th>Engagement</th><th>Location</th><th>Link</th> | |
| </tr> | |
| """ | |
| for t in top_critical: | |
| user = t.get("username") or t.get("user") or "N/A" | |
| risk_score = t.get("dynamic_risk_score", 0) | |
| followers = t.get("followers_count", 0) | |
| verified = "Yes" if t.get("verified", False) else "No" | |
| engagement = int(t.get("like_count", 0)) + int(t.get("retweet_count", 0)) | |
| location = t.get("user_location", "N/A") | |
| tweet_id = t.get("tweet_id", "N/A") | |
| url = t.get("tweet_url") or f"https://x.com/{user}/status/{tweet_id}" if tweet_id != "N/A" else "N/A" | |
| summary_html += f""" | |
| <tr> | |
| <td>@{user}</td> | |
| <td>{risk_score}</td> | |
| <td>{followers}</td> | |
| <td>{verified}</td> | |
| <td>{engagement}</td> | |
| <td>{location}</td> | |
| <td><a href="{url}">View</a></td> | |
| </tr> | |
| """ | |
| summary_html += "</table><br>" | |
| # --- Main email table with all metrics --- | |
| html_blocks = [""" | |
| <table border="1" cellpadding="5" cellspacing="0" style="border-collapse: collapse;"> | |
| <tr> | |
| <th>Risk</th><th>User</th><th>Dynamic Risk</th><th>Followers</th> | |
| <th>Verified</th><th>Engagement</th><th>Geo Score</th><th>Location</th> | |
| <th>Bulk</th><th>Contact</th><th>Content</th><th>Link</th> | |
| </tr> | |
| """] | |
| for t in tweets: | |
| tweet_id = t.get("tweet_id", "N/A") | |
| user = t.get("username") or t.get("user") or t.get("user_name") or "N/A" | |
| content = t.get("content") or t.get("text") or "" | |
| timestamp = t.get("datetime") or t.get("timestamp") or "N/A" | |
| location = str(t.get("user_location") or t.get("location") or "N/A").lower() | |
| risk = t.get("risk_level", "N/A") | |
| dyn_risk = t.get("dynamic_risk_score", 0) | |
| followers = t.get("followers_count", 0) | |
| verified = "Yes" if t.get("verified", False) else "No" | |
| engagement = int(t.get("like_count", 0)) + int(t.get("retweet_count", 0)) | |
| geo_score = 1 if any(k in location for k in ["bangalore", "bengaluru", "karnataka"]) else 0 | |
| url = t.get("tweet_url") or f"https://x.com/{user}/status/{tweet_id}" if tweet_id != "N/A" else "N/A" | |
| # Bulk and contact indicators | |
| bulk_keywords = ["kg", "gram", "bulk", "kilos", "ounce", "pound"] | |
| bulk_indicator = "Yes" if any(k in content.lower() for k in bulk_keywords) else "No" | |
| contact_indicator = "Yes" if any(c.isdigit() for c in content) else "No" | |
| html_blocks.append(f""" | |
| <tr> | |
| <td>{risk}</td> | |
| <td>@{user}</td> | |
| <td>{dyn_risk}</td> | |
| <td>{followers}</td> | |
| <td>{verified}</td> | |
| <td>{engagement}</td> | |
| <td>{geo_score}</td> | |
| <td>{location}</td> | |
| <td>{bulk_indicator}</td> | |
| <td>{contact_indicator}</td> | |
| <td>{content}</td> | |
| <td><a href="{url}">View</a></td> | |
| </tr> | |
| """) | |
| html_blocks.append("</table>") | |
| html_text = f""" | |
| <html> | |
| <body> | |
| <h2 style="color:#b22222;">High-Priority Drug Alerts</h2> | |
| {summary_html} | |
| {''.join(html_blocks)} | |
| <hr/> | |
| <p>Generated by Karnataka Drug Crime Monitoring System</p> | |
| </body> | |
| </html> | |
| """ | |
| # Plain-text fallback | |
| plain_text = "\n".join([ | |
| f"{t.get('risk_level')} | @{t.get('username')} | {t.get('dynamic_risk_score')} | {t.get('content','')[:100]}" | |
| for t in tweets | |
| ]) | |
| msg.attach(MIMEText(plain_text, "plain")) | |
| msg.attach(MIMEText(html_text, "html")) | |
| return msg | |
| # --- SMTP send with retries --- # | |
| def _send_email_message(msg: MIMEMultipart, recipients: List[str], retry: int = SEND_RETRY) -> bool: | |
| """Send message via SMTP (real or simulated).""" | |
| if SIMULATE_ALERTS: | |
| logging.info(f"[SIMULATED ALERT] Would send email to {recipients} with subject: {msg['Subject']}") | |
| return True | |
| attempt = 0 | |
| while attempt <= retry: | |
| try: | |
| with smtplib.SMTP(SMTP_SERVER, SMTP_PORT, timeout=20) as s: | |
| s.ehlo() | |
| if SMTP_PORT == 587: | |
| s.starttls() | |
| s.ehlo() | |
| s.login(ALERT_EMAIL, ALERT_PASSWORD) | |
| s.sendmail(ALERT_EMAIL, recipients, msg.as_string()) | |
| logging.info(f"✅ Email sent to {recipients}") | |
| return True | |
| except Exception as e: | |
| attempt += 1 | |
| logging.warning(f"Email send attempt {attempt} failed: {e}") | |
| if attempt > retry: | |
| logging.error("Exceeded email send retries.") | |
| return False | |
| time.sleep(RETRY_DELAY) | |
| # --- Public trigger function --- # | |
| def trigger_alerts(max_tweets: Optional[int] = 10, | |
| send_all: bool = False, | |
| separate_emails: bool = False): | |
| logging.info("Fetching high-risk unnotified tweets from DB...") | |
| tweets = fetch_high_risk_unnotified() | |
| if not tweets: | |
| logging.info("No unnotified high-risk tweets found.") | |
| return | |
| # --- Compute dynamic risk for all fetched tweets --- | |
| for t in tweets: | |
| t["dynamic_risk_score"] = compute_dynamic_risk(t) | |
| t["risk_level"] = assign_dynamic_risk_level(t) # automatically set risk_level | |
| selected = _select_top_tweets(tweets, max_tweets, send_all) | |
| if not selected: | |
| logging.info("No tweets selected after filtering.") | |
| return | |
| # Compose and send emails (batch or separate) | |
| success_ids, failure_ids = [], [] | |
| if separate_emails: | |
| for t in selected: | |
| msg = _compose_batched_email([t]) | |
| ok = _send_email_message(msg, RECIPIENTS) | |
| if ok: | |
| success_ids.append((t.get("tweet_id"), t.get("_collection_name"))) | |
| else: | |
| failure_ids.append(t.get("tweet_id")) | |
| time.sleep(BATCH_DELAY) | |
| else: | |
| msg = _compose_batched_email(selected) | |
| ok = _send_email_message(msg, RECIPIENTS) | |
| if ok: | |
| success_ids.extend([(t.get("tweet_id"), t.get("_collection_name")) for t in selected]) | |
| else: | |
| failure_ids.extend([t.get("tweet_id") for t in selected]) | |
| # Mark notified | |
| for tid, maybe_collection in success_ids: | |
| try: | |
| mark_as_notified(tid) | |
| except Exception as e: | |
| logging.error(f"Failed to mark {tid} as notified: {e}") | |
| logging.info(f"Alerts sent: {len(success_ids)}; failures: {len(failure_ids)}") | |
| if failure_ids: | |
| logging.warning(f"Failed tweet IDs: {failure_ids}") | |
| def compute_risk_probability(dynamic_score: float) -> float: | |
| """ | |
| Convert dynamic risk score (0–100) into a probability 0–1 | |
| """ | |
| return max(0.0, min(1.0, dynamic_score / 100)) | |
| # --- CLI usage example --- # | |
| if __name__ == "__main__": | |
| # Example usages: | |
| # - send up to 5 top tweets (batched) -> trigger_alerts(max_tweets=5) | |
| # - send all unnotified high-risk tweets -> trigger_alerts(send_all=True) | |
| # - send one email per tweet -> trigger_alerts(max_tweets=10, separate_emails=True) | |
| # Default example: top 10 (batched) | |
| trigger_alerts(max_tweets=10) | |