Event_Stacker / main.py
DinoPLayZ's picture
Update main.py
c903803 verified
raw
history blame
20.8 kB
import os
import time
import argparse
import requests
import random
import logging
import json
from datetime import datetime
from dotenv import load_dotenv
import threading
import asyncio
from fastapi import FastAPI, BackgroundTasks
import uvicorn
# Task 4 & 5 dependencies (Sendgrid API & Logging)
from urllib.error import HTTPError
# Task 4 & 5 dependencies (Sendgrid API & Logging)
from urllib.error import HTTPError
# Load optional .env if present in same directory
load_dotenv()
# ==============================================================================
# LOGGING (Task 5)
# ==============================================================================
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] %(levelname)s: %(message)s',
datefmt='%I:%M:%S %p'
)
logger = logging.getLogger(__name__)
# ==============================================================================
# CONFIGURATION
# ==============================================================================
# Environment values (Make sure to populate your .env file)
# We will reload these inside the loop so you can change them on the fly.
# Email settings
EMAIL_ADDRESS = os.getenv("EMAIL_ADDRESS", "")
EMAIL_RECIPIENT = os.getenv("EMAIL_RECIPIENT", EMAIL_ADDRESS)
EMAIL_PASSWORD = os.getenv("EMAIL_PASSWORD", "")
BREVO_API_KEY = os.getenv("BREVO_API_KEY", "")
# App check interval in seconds (default 60 secs = 1 min)
CHECK_INTERVAL_SECONDS = int(os.getenv("CHECK_INTERVAL_SECONDS", "60"))
BIP_API = "https://bip.bitsathy.ac.in/nova-api/student-activity-masters"
HEADERS = {
"accept": "application/json",
"x-requested-with": "XMLHttpRequest",
}
# ==============================================================================
# GLOBAL SESSION (Task 2)
# ==============================================================================
SESSION = requests.Session()
SESSION.headers.update(HEADERS)
# State tracking for the loop
STATE_FILE = "state.txt"
LAST_EVENT_ID = None
LAST_EVENT_CODE = None
SESSION_EXPIRED = False
EXPIRED_XSRF = None
EXPIRED_BIP = None
# ==============================================================================
# STATE MANAGEMENT (Task 1)
# ==============================================================================
def load_state():
global LAST_EVENT_ID
if os.path.exists(STATE_FILE):
try:
with open(STATE_FILE, "r") as f:
content = f.read().strip()
if content:
LAST_EVENT_ID = int(content)
logger.info(f"Loaded LAST_EVENT_ID from state: {LAST_EVENT_ID}")
except Exception as e:
logger.error(f"Failed to read state file: {e}")
def save_state(event_id):
try:
with open(STATE_FILE, "w") as f:
f.write(str(event_id))
except Exception as e:
logger.error(f"Failed to write state file: {e}")
# Environment values (Make sure to populate your .env file)
# We will reload these inside the loop so you can change them on the fly.
# Email settings (Loaded from .env)
# The email you are sending FROM and TO (can be the same)
EMAIL_ADDRESS = os.getenv("EMAIL_ADDRESS", "")
# Gmail App Password (NOT your regular password)
EMAIL_PASSWORD = os.getenv("EMAIL_PASSWORD", "")
# Notification recipient
EMAIL_RECIPIENT = os.getenv("EMAIL_RECIPIENT", EMAIL_ADDRESS)
# App check interval in seconds (default 60 secs = 1 min)
CHECK_INTERVAL_SECONDS = 60
BIP_API = "https://bip.bitsathy.ac.in/nova-api/student-activity-masters"
HEADERS = {
"accept": "application/json",
"x-requested-with": "XMLHttpRequest",
}
# State tracking for the loop
LAST_EVENT_ID = None
LAST_EVENT_CODE = None
SESSION_EXPIRED = False
EXPIRED_XSRF = None
EXPIRED_BIP = None
# ==============================================================================
# EMAIL NOTIFIER
# ==============================================================================
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
def send_email_message(subject: str, body: str, is_html=False):
# Cloud Safe Alternative (Brevo API)
if BREVO_API_KEY:
logger.debug("--> send_email_message called via Brevo API")
try:
url = "https://api.brevo.com/v3/smtp/email"
headers = {
"accept": "application/json",
"api-key": BREVO_API_KEY,
"content-type": "application/json"
}
payload = {
"sender": {"name": "BIP Auto Notifier", "email": EMAIL_ADDRESS},
"to": [{"email": EMAIL_RECIPIENT}],
"subject": subject
}
if is_html:
payload["htmlContent"] = body
else:
payload["textContent"] = body
response = requests.post(url, json=payload, headers=headers, timeout=10)
if response.status_code in [201, 200, 202]:
logger.info(f"Brevo email sent successfully! Status: {response.status_code}")
return True
else:
try:
error_data = response.json()
logger.error(f"Brevo API Error (Status {response.status_code}): {error_data.get('message', 'Unknown error')} - Code: {error_data.get('code', 'Unknown')}")
except Exception:
logger.error(f"Brevo API Error: {response.status_code} - {response.text}")
return False
except Exception as e:
logger.error(f"Brevo Network EXCEPTION: {e}")
return False
# Local Fallback (SMTP)
logger.debug("--> send_email_message called via Standard SMTP")
if not EMAIL_ADDRESS or not EMAIL_PASSWORD or not EMAIL_RECIPIENT:
logger.warning(f"Email credentials not configured. The following alert '{subject}' would have been sent.")
return False
try:
msg = MIMEMultipart()
msg['From'] = EMAIL_ADDRESS
msg['To'] = EMAIL_RECIPIENT
msg['Subject'] = subject
if is_html:
msg.attach(MIMEText(body, 'html'))
else:
msg.attach(MIMEText(body, 'plain'))
server = smtplib.SMTP('smtp.gmail.com', 587)
server.starttls()
server.login(EMAIL_ADDRESS, EMAIL_PASSWORD)
text = msg.as_string()
server.sendmail(EMAIL_ADDRESS, EMAIL_RECIPIENT, text)
server.quit()
logger.info(f"SMTP Email sent successfully.")
return True
except Exception as e:
logger.error(f"Network/SMTP EXCEPTION: {e}")
return False
def send_event_alerts(events):
if not events:
return
msg = f"πŸ“’ <b>{len(events)} New BIP Event Found!</b>\n\n"
for i, ev in enumerate(events, 1):
# Format start date
start = ev.get('start_date', '')
if start:
start = datetime.strptime(start, "%Y-%m-%d").strftime("%d-%m-%Y")
# Format end date
end = ev.get('end_date', '')
if end:
end = datetime.strptime(end, "%Y-%m-%d").strftime("%d-%m-%Y")
msg += (
f"<b>{ev.get('event_code', '-')} - "
f"{ev.get('event_name', 'Unknown')}</b><br><br>"
f"{ev.get('event_category', '-')}<br><br>"
f"{start} to {end}<br><br>"
f"{ev.get('location', '-')}<br><br>"
f"{ev.get('status', '-')}<br><br>"
f"Seats: Max {ev.get('maximum_count', '-')} | "
f"Applied {ev.get('applied_count', '-')}<br>"
f"<a href='{ev.get('web_url', '#')}'>View Event Here</a><br>"
f"<hr>"
)
send_email_message("πŸ“’ New BIP Event(s) Found!", msg, is_html=True)
# ==============================================================================
# SCRAPER LOGIC
# ==============================================================================
def fetch_bip_events(xsrf_token, bip_session, page=1):
logger.debug(f"--> fetch_bip_events(page={page})")
cookies = {
"XSRF-TOKEN": xsrf_token,
"bip_session": bip_session
}
params = {"perPage": 10, "page": page}
# Task 3: Exponential Backoff Retry Logic
max_retries = 3
for attempt in range(max_retries):
try:
# Task 2: Use global requests.Session instead of bare requests.get
r = SESSION.get(BIP_API, params=params, cookies=cookies, timeout=20)
# Check for session expiration
if "text/html" in r.headers.get("Content-Type", "") or r.status_code in [401, 403]:
logger.warning(f"Session expired detected! Content-type: {r.headers.get('Content-Type')}, Status: {r.status_code}")
return None, "Session expired or invalid cookies."
r.raise_for_status()
# Task 7: Response format validation
data = r.json()
if not isinstance(data, dict) or "resources" not in data or not isinstance(data["resources"], list):
if attempt < max_retries - 1:
raise Exception("Invalid JSON structure received from BIP API: missing or malformed 'resources' list")
return None, "Invalid JSON structure received from BIP API."
return data, None
except Exception as e:
if attempt < max_retries - 1:
sleep_time = (2 ** attempt) + random.uniform(0.1, 1.0)
logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {sleep_time:.2f}s...")
time.sleep(sleep_time)
else:
logger.error(f"Network/Request EXCEPTION in fetch_bip_events after {max_retries} attempts: {e}")
return None, str(e)
return None, "Max retries exceeded."
def parse_event(resource):
data = {}
for f in resource.get("fields", []):
key = f.get("attribute")
val = f.get("value")
if key:
data[key] = val
data["id"] = resource["id"]["value"]
return data
def check_new_events(last_id, xsrf_token, bip_session):
"""Fetches events and returns any newer than last_id. Automatically paginates if needed."""
new_events = []
page = 1
while page <= 10: # Limit to 10 pages for safety
data, err = fetch_bip_events(xsrf_token, bip_session, page)
if err or not data:
return None, err
resources = data.get("resources", [])
if not resources:
break
for res in resources:
ev = parse_event(res)
# Stop if we reach the last known new event
if last_id and str(ev["id"]) == str(last_id):
return new_events, None
new_events.append(ev)
# First-ever run scenario: just return the latest event to set the initial ID and avoid sending 100s of alerts
if not last_id and new_events:
return [new_events[0]], None
page += 1
return new_events, None
# ==============================================================================
# SCHEDULER ENGINE
# ==============================================================================
def process_tick():
global LAST_EVENT_ID, LAST_EVENT_CODE, SESSION_EXPIRED, EXPIRED_XSRF, EXPIRED_BIP
logger.debug("--- process_tick starting ---")
# Reload environment variables on every tick
load_dotenv(override=True)
xsrf = os.getenv("XSRF_TOKEN", "")
bip = os.getenv("BIP_SESSION", "")
if not xsrf or not bip:
logger.warning("Skipping check: Please configure XSRF_TOKEN and BIP_SESSION in the deployment environment.")
os._exit(1)
# Task 1: Load state if we just started
if LAST_EVENT_ID is None:
load_state()
new_events, err = check_new_events(LAST_EVENT_ID, xsrf, bip)
if err:
logger.error(f"Error scraping events: {err}")
send_email_message(
"⚠️ BIP Scraper Error",
"⚠️ <b>Scraper Error!</b><br><br>"
f"The notifier encountered an error and has paused checking. Error:<br>"
f"<code>{err}</code><br><br>"
"Please update the `XSRF_TOKEN` and `BIP_SESSION` variables in your Secret/Env configuration and restart the Space.",
is_html=True
)
logger.error("Notifier is shutting down completely because of the scraping error.")
os._exit(1)
if new_events:
# If LAST_EVENT_ID is None, it's the very first startup run. Set ID without alerting.
if LAST_EVENT_ID is None:
LAST_EVENT_ID = new_events[0]["id"]
LAST_EVENT_CODE = new_events[0].get('event_code', LAST_EVENT_ID)
save_state(LAST_EVENT_ID)
logger.info(f"EVENT ID : {LAST_EVENT_CODE} (Tracking started)")
# Send the startup notification
send_email_message(
"πŸš€ BIP Notifier is Online!",
f"You are receiving this because the <b>BIP Auto Notifier</b> script has successfully started tracking on the cloud.<br><br>"
f"<b>Current Active Event:</b> {LAST_EVENT_CODE}<br>"
f"The script is now monitoring in the background. You will receive alerts for any newer events.",
is_html=True
)
else:
send_event_alerts(new_events)
LAST_EVENT_ID = new_events[0]["id"]
LAST_EVENT_CODE = new_events[0].get('event_code', LAST_EVENT_ID)
save_state(LAST_EVENT_ID)
for ev in new_events:
code = ev.get('event_code', ev['id'])
logger.info(f"🚨 NEW EVENT ID : {code} (Alert Sent!)")
else:
logger.info(f"EVENT ID : {LAST_EVENT_CODE}")
def list_all_events():
"""Fetches the first page of events from BIP and prints them."""
logger.info("Fetching recent events from BIP...")
load_dotenv(override=True)
xsrf = os.getenv("XSRF_TOKEN", "")
bip = os.getenv("BIP_SESSION", "")
data, err = fetch_bip_events(xsrf, bip, page=1)
if err:
logger.error(f"Error: {err}")
return
resources = data.get("resources", [])
if not resources:
logger.info("No events found.")
return
logger.info(f"\nFound {len(resources)} recent events:")
print("-" * 60)
for res in resources:
ev = parse_event(res)
print(f"[{ev.get('id')}] {ev.get('event_code')} - {ev.get('event_name')} ({ev.get('status')})")
print("-" * 60)
def get_latest_event():
"""Fetches and prints only the single most recent event."""
logger.info("Fetching the latest event...")
load_dotenv(override=True)
xsrf = os.getenv("XSRF_TOKEN", "")
bip = os.getenv("BIP_SESSION", "")
data, err = fetch_bip_events(xsrf, bip, page=1)
if err:
logger.error(f"Error: {err}")
return
resources = data.get("resources", [])
if not resources:
logger.info("No events found.")
return
ev = parse_event(resources[0])
print("\n🌟 LATEST EVENT:")
print("-" * 60)
print(f"ID: {ev.get('id')}")
print(f"Code: {ev.get('event_code')}")
print(f"Name: {ev.get('event_name')}")
print(f"Dates: {ev.get('start_date')} to {ev.get('end_date')}")
print(f"Location: {ev.get('location')}")
print(f"Status: {ev.get('status')}")
print(f"Link: {ev.get('web_url')}")
print("-" * 60)
def test_email_alert():
"""Sends a dummy test message to the configured Email."""
logger.info("Sending test exact alert to Email...")
success = send_email_message("πŸ€– Test Alert", "πŸ€– <b>Test Alert from BIP CLI Notifier</b><br><br>Your currently configured email system is working perfectly!", is_html=True)
if success:
logger.info("βœ… Test message sent successfully!")
else:
logger.error("❌ Failed to send test message. Check your .env configuration.")
def start_loop():
logger.info("=" * 60)
logger.info("πŸš€ BIP CLI Notifier Started")
logger.info("=" * 60)
logger.info("Press Ctrl+C to stop the notifier at any time.\n")
try:
while True:
process_tick()
# Polling loop: Wait in exact intervals without stacking HTTP request delays
target_time = time.time() + CHECK_INTERVAL_SECONDS
while time.time() < target_time:
# Sleep safely for whatever is left over (up to 5 secs max per inner loop)
remaining = target_time - time.time()
if remaining > 0:
time.sleep(min(5, remaining))
except KeyboardInterrupt:
logger.info("\nπŸ›‘ Keyboard interrupt received.")
finally:
logger.info("Cleaning up resources...")
try:
SESSION.close()
except:
pass
logger.info("Notifier stopped gracefully. Goodbye!")
# ==============================================================================
# FASTAPI MICROSERVICE (Task 10)
# ==============================================================================
app = FastAPI(title="BIP Auto Notifier")
def background_scraper_loop():
"""Runs the main notification loop within the FastAPI background."""
start_loop()
@app.on_event("startup")
async def startup_event():
logger.info("FastAPI starting up. Launching background tracker thread...")
threading.Thread(target=background_scraper_loop, daemon=True).start()
@app.get("/health")
async def health_check():
"""
Task 6 & 10: Improved Health Endpoint
Returns status, active tracking state, and expiration errors.
"""
return {
"status": "online",
"session_expired": SESSION_EXPIRED,
"last_event_id": LAST_EVENT_ID,
"timestamp": datetime.now().isoformat()
}
@app.get("/latest")
async def fetch_latest_api():
"""Fetches the latest currently available event on the portal."""
load_dotenv(override=True)
xsrf = os.getenv("XSRF_TOKEN", "")
bip = os.getenv("BIP_SESSION", "")
data, err = fetch_bip_events(xsrf, bip, page=1)
if err:
return {"error": err}
resources = data.get("resources", [])
if not resources:
return {"message": "No events found."}
return parse_event(resources[0])
@app.get("/")
async def root():
return {
"app": "BIP Auto Notifier",
"status": "healthy",
"message": "Background tracker running πŸš€",
"health_endpoint": "/health",
"assistant_endpoint": "/assistant"
}
@app.get("/assistant")
async def internal_assistant():
load_dotenv(override=True)
return {
"assistant": "BIP Auto Notifier Internal Assistant",
"system_status": "running",
"last_event_id": LAST_EVENT_ID,
"last_event_code": LAST_EVENT_CODE,
"session_expired": SESSION_EXPIRED,
"check_interval_seconds": CHECK_INTERVAL_SECONDS,
"email_configured": bool(BREVO_API_KEY or (EMAIL_ADDRESS and EMAIL_PASSWORD)),
"bip_api": BIP_API,
"timestamp": datetime.now().isoformat(),
"advice": (
"System operating normally."
if not SESSION_EXPIRED
else "Session expired. Update XSRF_TOKEN and BIP_SESSION."
)
}
if __name__ == "__main__":
logger.info("Starting BIP CLI Notifier...")
parser = argparse.ArgumentParser(description="BIP Cloud Notifier CLI")
parser.add_argument("--list-all", action="store_true", help="List all recent events and exit")
parser.add_argument("--latest", action="store_true", help="Print details of the latest event and exit")
parser.add_argument("--test-alert", action="store_true", help="Send a test message to Email and exit")
parser.add_argument("--run", action="store_true", help="Start the continuous monitoring loop (via FastAPI)")
# Task 8: Fix duplicate parsing
args = parser.parse_args()
logger.debug(f"Parsed arguments: {args}")
if args.list_all:
list_all_events()
elif args.latest:
get_latest_event()
elif args.test_alert:
test_email_alert()
elif args.run:
# Launch FastAPI which internally starts the loop
port = int(os.getenv("PORT", 7860))
uvicorn.run(app, host="0.0.0.0", port=port)
else:
# Default behavior: run FastAPI
port = int(os.getenv("PORT", 7860))
uvicorn.run(app, host="0.0.0.0", port=port)