Twitter-Analysis / src /alerts.py
lawlevisan's picture
Update src/alerts.py
283b076 verified
# alerts.py
import os
import time
import logging
from typing import List, Optional
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import smtplib
from dotenv import load_dotenv
from db import fetch_high_risk_unnotified, mark_as_notified
load_dotenv()
# --- Detect Hugging Face Environment ---
IS_HF = os.getenv("SPACE_ID") or os.getenv("HF_SPACE_ID")
# --- Configuration (via env) ---
ALERT_EMAIL = os.getenv("ALERT_EMAIL")
ALERT_PASSWORD = os.getenv("ALERT_PASSWORD")
ALERT_RECIPIENTS = os.getenv("ALERT_RECIPIENTS")
SMTP_SERVER = os.getenv("ALERT_SMTP", "smtp.gmail.com")
SMTP_PORT = int(os.getenv("ALERT_SMTP_PORT", "587"))
SEND_RETRY = int(os.getenv("ALERT_SEND_RETRY", "2"))
RETRY_DELAY = float(os.getenv("ALERT_RETRY_DELAY", "2.0"))
BATCH_DELAY = float(os.getenv("ALERT_BATCH_DELAY", "0.5"))
# If running in HF or credentials missing → use simulation mode
SIMULATE_ALERTS = IS_HF or not (ALERT_EMAIL and ALERT_PASSWORD)
if SIMULATE_ALERTS:
logging.warning("⚠️ Running in simulation mode: Email alerts will not be sent (HF Space detected).")
else:
logging.info("✅ Real email alerts enabled.")
def _parse_recipients(env_val: Optional[str]) -> List[str]:
if not env_val:
return [ALERT_EMAIL]
return [r.strip() for r in env_val.split(",") if r.strip()]
RECIPIENTS = _parse_recipients(ALERT_RECIPIENTS)
# logger
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
def compute_dynamic_risk(tweet: dict) -> float:
"""
Returns a dynamic risk score (0–100)
"""
# 1️⃣ Content-based weight
drug_score = float(tweet.get("drug_score", 0))
crime_score = float(tweet.get("crime_score", 0))
content_score = 0.5*drug_score + 0.5*crime_score # can adjust weights
# 2️⃣ User influence weight
followers = int(tweet.get("followers_count", 0))
verified = 1 if tweet.get("verified", False) else 0
user_score = min(followers/1000, 1) * 0.5 + verified*0.5 # normalize
# 3️⃣ Engagement weight
engagement = int(tweet.get("like_count", 0)) + int(tweet.get("retweet_count", 0))
engagement_score = min(engagement / 50, 1) # normalize to 0-1
# 4️⃣ Geo relevance
location = str(tweet.get("user_location", "")).lower()
karnataka_keywords = ["bangalore", "bengaluru", "karnataka"]
geo_score = 1 if any(k in location for k in karnataka_keywords) else 0
# Combine weights (adjust relative contributions)
risk_score = (
0.4*content_score +
0.2*user_score +
0.2*engagement_score +
0.2*geo_score
)
# Scale to 0–100
return round(risk_score*100, 2)
def assign_dynamic_risk_level(tweet: dict) -> str:
score = compute_dynamic_risk(tweet)
if score >= 75:
return "CRITICAL"
elif score >= 50:
return "HIGH"
elif score >= 25:
return "MEDIUM"
else:
return "LOW"
def _tweet_score_for_sort(tweet: dict) -> float:
"""Use dynamic risk for sorting instead of static RISK_PRIORITIES"""
risk_score = compute_dynamic_risk(tweet) # 0–100
engagement = int(tweet.get("like_count", 0) or 0) + int(tweet.get("retweet_count", 0) or 0)
return risk_score + engagement*0.1 # small weight to engagement
def _select_top_tweets(tweets: List[dict], max_tweets: Optional[int], send_all: bool) -> List[dict]:
"""Sort and slice tweets according to priority; return selected tweets."""
if not tweets:
return []
# normalize risk_level fields to uppercase to avoid mismatches
for t in tweets:
if "risk_level" in t and isinstance(t["risk_level"], str):
t["risk_level"] = t["risk_level"].upper()
# sort by risk then engagement (both descending)
tweets_sorted = sorted(tweets, key=lambda t: _tweet_score_for_sort(t), reverse=True)
if send_all or max_tweets is None:
return tweets_sorted
return tweets_sorted[:max_tweets]
def _format_tweet_html_block(tweet: dict) -> str:
"""Return an HTML block describing a tweet (for batched email)."""
tweet_id = tweet.get("tweet_id", "N/A")
user = tweet.get("username") or tweet.get("user") or tweet.get("user_name") or "N/A"
content = tweet.get("content") or tweet.get("text") or ""
timestamp = tweet.get("datetime") or tweet.get("timestamp") or "N/A"
location = tweet.get("user_location") or tweet.get("location") or "N/A"
risk = tweet.get("risk_level", "N/A")
likes = tweet.get("like_count", 0)
rts = tweet.get("retweet_count", 0)
url = tweet.get("tweet_url") or f"https://x.com/{user}/status/{tweet_id}" if tweet_id != "N/A" else "N/A"
# Bulk detection
bulk_keywords = ["kg", "gram", "bulk", "kilos", "ounce", "pound"]
bulk_indicator = "Yes" if any(k in content.lower() for k in bulk_keywords) else "No"
# Contact detection (simple digit check)
contact_indicator = "Yes" if any(c.isdigit() for c in content) else "No"
html = f"""
<div style="border:1px solid #ddd;padding:10px;margin-bottom:8px;border-radius:6px;">
<p><strong>Risk:</strong> <span style="color:#b22222">{risk}</span> &nbsp;
<strong>User:</strong> @{user} &nbsp; <strong>Time:</strong> {timestamp}</p>
<p><strong>Location:</strong> {location} &nbsp;<td>{bulk_indicator}</td><td>{contact_indicator}</td><td>{content}</td><strong>Likes:</strong> {likes} &nbsp; <strong>RTs:</strong> {rts}</p>
<p style="background:#f7f7f7;padding:8px;border-radius:4px;">{content}</p>
<p><a href="{url}">View Tweet</a> | Tweet ID: {tweet_id}</p>
</div>
"""
return html
def _compose_batched_email(tweets: List[dict]) -> MIMEMultipart:
msg = MIMEMultipart("alternative")
msg["Subject"] = f"🚨 {len(tweets)} High-Priority Drug Alerts"
msg["From"] = ALERT_EMAIL
msg["To"] = ", ".join(RECIPIENTS)
# --- Top CRITICAL summary ---
critical_tweets = [t for t in tweets if t.get("risk_level") == "CRITICAL"]
top_critical = sorted(critical_tweets, key=lambda t: t.get("dynamic_risk_score", 0), reverse=True)[:10]
summary_html = ""
if top_critical:
summary_html += """
<h3 style="color:#b22222;">Top CRITICAL Tweets Summary</h3>
<table border="1" cellpadding="5" cellspacing="0" style="border-collapse: collapse;">
<tr>
<th>User</th><th>Dynamic Risk</th><th>Followers</th><th>Verified</th><th>Engagement</th><th>Location</th><th>Link</th>
</tr>
"""
for t in top_critical:
user = t.get("username") or t.get("user") or "N/A"
risk_score = t.get("dynamic_risk_score", 0)
followers = t.get("followers_count", 0)
verified = "Yes" if t.get("verified", False) else "No"
engagement = int(t.get("like_count", 0)) + int(t.get("retweet_count", 0))
location = t.get("user_location", "N/A")
tweet_id = t.get("tweet_id", "N/A")
url = t.get("tweet_url") or f"https://x.com/{user}/status/{tweet_id}" if tweet_id != "N/A" else "N/A"
summary_html += f"""
<tr>
<td>@{user}</td>
<td>{risk_score}</td>
<td>{followers}</td>
<td>{verified}</td>
<td>{engagement}</td>
<td>{location}</td>
<td><a href="{url}">View</a></td>
</tr>
"""
summary_html += "</table><br>"
# --- Main email table with all metrics ---
html_blocks = ["""
<table border="1" cellpadding="5" cellspacing="0" style="border-collapse: collapse;">
<tr>
<th>Risk</th><th>User</th><th>Dynamic Risk</th><th>Followers</th>
<th>Verified</th><th>Engagement</th><th>Geo Score</th><th>Location</th>
<th>Bulk</th><th>Contact</th><th>Content</th><th>Link</th>
</tr>
"""]
for t in tweets:
tweet_id = t.get("tweet_id", "N/A")
user = t.get("username") or t.get("user") or t.get("user_name") or "N/A"
content = t.get("content") or t.get("text") or ""
timestamp = t.get("datetime") or t.get("timestamp") or "N/A"
location = str(t.get("user_location") or t.get("location") or "N/A").lower()
risk = t.get("risk_level", "N/A")
dyn_risk = t.get("dynamic_risk_score", 0)
followers = t.get("followers_count", 0)
verified = "Yes" if t.get("verified", False) else "No"
engagement = int(t.get("like_count", 0)) + int(t.get("retweet_count", 0))
geo_score = 1 if any(k in location for k in ["bangalore", "bengaluru", "karnataka"]) else 0
url = t.get("tweet_url") or f"https://x.com/{user}/status/{tweet_id}" if tweet_id != "N/A" else "N/A"
# Bulk and contact indicators
bulk_keywords = ["kg", "gram", "bulk", "kilos", "ounce", "pound"]
bulk_indicator = "Yes" if any(k in content.lower() for k in bulk_keywords) else "No"
contact_indicator = "Yes" if any(c.isdigit() for c in content) else "No"
html_blocks.append(f"""
<tr>
<td>{risk}</td>
<td>@{user}</td>
<td>{dyn_risk}</td>
<td>{followers}</td>
<td>{verified}</td>
<td>{engagement}</td>
<td>{geo_score}</td>
<td>{location}</td>
<td>{bulk_indicator}</td>
<td>{contact_indicator}</td>
<td>{content}</td>
<td><a href="{url}">View</a></td>
</tr>
""")
html_blocks.append("</table>")
html_text = f"""
<html>
<body>
<h2 style="color:#b22222;">High-Priority Drug Alerts</h2>
{summary_html}
{''.join(html_blocks)}
<hr/>
<p>Generated by Karnataka Drug Crime Monitoring System</p>
</body>
</html>
"""
# Plain-text fallback
plain_text = "\n".join([
f"{t.get('risk_level')} | @{t.get('username')} | {t.get('dynamic_risk_score')} | {t.get('content','')[:100]}"
for t in tweets
])
msg.attach(MIMEText(plain_text, "plain"))
msg.attach(MIMEText(html_text, "html"))
return msg
# --- SMTP send with retries --- #
def _send_email_message(msg: MIMEMultipart, recipients: List[str], retry: int = SEND_RETRY) -> bool:
"""Send message via SMTP (real or simulated)."""
if SIMULATE_ALERTS:
logging.info(f"[SIMULATED ALERT] Would send email to {recipients} with subject: {msg['Subject']}")
return True
attempt = 0
while attempt <= retry:
try:
with smtplib.SMTP(SMTP_SERVER, SMTP_PORT, timeout=20) as s:
s.ehlo()
if SMTP_PORT == 587:
s.starttls()
s.ehlo()
s.login(ALERT_EMAIL, ALERT_PASSWORD)
s.sendmail(ALERT_EMAIL, recipients, msg.as_string())
logging.info(f"✅ Email sent to {recipients}")
return True
except Exception as e:
attempt += 1
logging.warning(f"Email send attempt {attempt} failed: {e}")
if attempt > retry:
logging.error("Exceeded email send retries.")
return False
time.sleep(RETRY_DELAY)
# --- Public trigger function --- #
def trigger_alerts(max_tweets: Optional[int] = 10,
send_all: bool = False,
separate_emails: bool = False):
logging.info("Fetching high-risk unnotified tweets from DB...")
tweets = fetch_high_risk_unnotified()
if not tweets:
logging.info("No unnotified high-risk tweets found.")
return
# --- Compute dynamic risk for all fetched tweets ---
for t in tweets:
t["dynamic_risk_score"] = compute_dynamic_risk(t)
t["risk_level"] = assign_dynamic_risk_level(t) # automatically set risk_level
selected = _select_top_tweets(tweets, max_tweets, send_all)
if not selected:
logging.info("No tweets selected after filtering.")
return
# Compose and send emails (batch or separate)
success_ids, failure_ids = [], []
if separate_emails:
for t in selected:
msg = _compose_batched_email([t])
ok = _send_email_message(msg, RECIPIENTS)
if ok:
success_ids.append((t.get("tweet_id"), t.get("_collection_name")))
else:
failure_ids.append(t.get("tweet_id"))
time.sleep(BATCH_DELAY)
else:
msg = _compose_batched_email(selected)
ok = _send_email_message(msg, RECIPIENTS)
if ok:
success_ids.extend([(t.get("tweet_id"), t.get("_collection_name")) for t in selected])
else:
failure_ids.extend([t.get("tweet_id") for t in selected])
# Mark notified
for tid, maybe_collection in success_ids:
try:
mark_as_notified(tid)
except Exception as e:
logging.error(f"Failed to mark {tid} as notified: {e}")
logging.info(f"Alerts sent: {len(success_ids)}; failures: {len(failure_ids)}")
if failure_ids:
logging.warning(f"Failed tweet IDs: {failure_ids}")
def compute_risk_probability(dynamic_score: float) -> float:
"""
Convert dynamic risk score (0–100) into a probability 0–1
"""
return max(0.0, min(1.0, dynamic_score / 100))
# --- CLI usage example --- #
if __name__ == "__main__":
# Example usages:
# - send up to 5 top tweets (batched) -> trigger_alerts(max_tweets=5)
# - send all unnotified high-risk tweets -> trigger_alerts(send_all=True)
# - send one email per tweet -> trigger_alerts(max_tweets=10, separate_emails=True)
# Default example: top 10 (batched)
trigger_alerts(max_tweets=10)