Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import argparse | |
| import requests | |
| import random | |
| import logging | |
| import json | |
| import traceback | |
| from datetime import datetime | |
| from dotenv import load_dotenv | |
| import threading | |
| import asyncio | |
| from fastapi import FastAPI, BackgroundTasks | |
| import uvicorn | |
| # Task 4 & 5 dependencies (Sendgrid API & Logging) | |
| from urllib.error import HTTPError | |
| # Load optional .env if present in same directory | |
| load_dotenv() | |
| # ============================================================================== | |
| # LOGGING (Task 5) | |
| # ============================================================================== | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='[%(asctime)s] %(levelname)s: %(message)s', | |
| datefmt='%I:%M:%S %p' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # ============================================================================== | |
| # CONFIGURATION | |
| # ============================================================================== | |
| # Environment values (Make sure to populate your .env file) | |
| # We will reload these inside the loop so you can change them on the fly. | |
| # Email settings | |
| EMAIL_ADDRESS = os.getenv("EMAIL_ADDRESS", "") | |
| EVENT_EMAIL_RECIPIENT = os.getenv("EVENT_EMAIL_RECIPIENT", os.getenv("EMAIL_RECIPIENT", EMAIL_ADDRESS)) | |
| WARNING_EMAIL_RECIPIENT = os.getenv("WARNING_EMAIL_RECIPIENT", os.getenv("EMAIL_RECIPIENT", EMAIL_ADDRESS)) | |
| EMAIL_PASSWORD = os.getenv("EMAIL_PASSWORD", "") | |
| BREVO_API_KEY = os.getenv("BREVO_API_KEY", "") | |
| # App check interval in seconds (default 60 secs = 1 min) | |
| CHECK_INTERVAL_SECONDS = int(os.getenv("CHECK_INTERVAL_SECONDS", "60")) | |
| BIP_API = "https://bip.bitsathy.ac.in/nova-api/student-activity-masters" | |
| HEADERS = { | |
| "accept": "application/json", | |
| "x-requested-with": "XMLHttpRequest", | |
| } | |
| # ============================================================================== | |
| # GLOBAL SESSION (Task 2) | |
| # ============================================================================== | |
| SESSION = requests.Session() | |
| SESSION.headers.update(HEADERS) | |
| # State tracking for the loop | |
| STATE_FILE = "state.txt" | |
| LAST_EVENT_ID = None | |
| LAST_EVENT_CODE = None | |
| SESSION_EXPIRED = False | |
| # ============================================================================== | |
| # STATE MANAGEMENT (Task 1) | |
| # ============================================================================== | |
| def load_state(): | |
| global LAST_EVENT_ID, LAST_EVENT_CODE | |
| if os.path.exists(STATE_FILE): | |
| try: | |
| with open(STATE_FILE, "r") as f: | |
| content = f.read().strip() | |
| if content: | |
| # In case the file has multiple lines (id and code) | |
| parts = content.split(",") | |
| LAST_EVENT_ID = int(parts[0]) | |
| if len(parts) > 1: | |
| LAST_EVENT_CODE = parts[1] | |
| logger.info(f"Loaded LAST_EVENT_ID from state: {LAST_EVENT_ID}") | |
| except Exception as e: | |
| logger.error(f"Failed to read state file: {e}") | |
| def save_state(event_id, event_code=None): | |
| try: | |
| with open(STATE_FILE, "w") as f: | |
| if event_code: | |
| f.write(f"{event_id},{event_code}") | |
| else: | |
| f.write(str(event_id)) | |
| except Exception as e: | |
| logger.error(f"Failed to write state file: {e}") | |
| # ============================================================================== | |
| # EMAIL NOTIFIER | |
| # ============================================================================== | |
| import smtplib | |
| from email.mime.text import MIMEText | |
| from email.mime.multipart import MIMEMultipart | |
| def send_email_message(subject: str, body: str, is_html=False, recipient=None): | |
| if recipient is None: | |
| recipient = WARNING_EMAIL_RECIPIENT | |
| # Cloud Safe Alternative (Brevo API) | |
| if BREVO_API_KEY: | |
| logger.debug("--> send_email_message called via Brevo API") | |
| try: | |
| url = "https://api.brevo.com/v3/smtp/email" | |
| headers = { | |
| "accept": "application/json", | |
| "api-key": BREVO_API_KEY, | |
| "content-type": "application/json" | |
| } | |
| recipient_list = [{"email": e.strip()} for e in recipient.split(",") if e.strip()] | |
| payload = { | |
| "sender": {"name": "BIP Auto Notifier", "email": EMAIL_ADDRESS}, | |
| "bcc": recipient_list, | |
| "subject": subject, | |
| "replyTo": {"name": "BIP Auto Notifier", "email": "noreply@bip.notifier"} | |
| } | |
| if is_html: | |
| payload["htmlContent"] = body | |
| else: | |
| payload["textContent"] = body | |
| response = requests.post(url, json=payload, headers=headers, timeout=10) | |
| if response.status_code in [201, 200, 202]: | |
| logger.info(f"Brevo email sent successfully! Status: {response.status_code}") | |
| return True | |
| else: | |
| try: | |
| error_data = response.json() | |
| logger.error(f"Brevo API Error (Status {response.status_code}): {error_data.get('message', 'Unknown error')} - Code: {error_data.get('code', 'Unknown')}") | |
| except Exception: | |
| logger.error(f"Brevo API Error: {response.status_code} - {response.text}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Brevo Network EXCEPTION: {e}") | |
| return False | |
| # Local Fallback (SMTP) | |
| logger.debug("--> send_email_message called via Standard SMTP") | |
| if not EMAIL_ADDRESS or not EMAIL_PASSWORD or not recipient: | |
| logger.warning(f"Email credentials not configured. The following alert '{subject}' would have been sent.") | |
| return False | |
| try: | |
| msg = MIMEMultipart() | |
| msg['From'] = EMAIL_ADDRESS | |
| msg['Bcc'] = recipient | |
| msg['Subject'] = subject | |
| if is_html: | |
| msg.attach(MIMEText(body, 'html')) | |
| else: | |
| msg.attach(MIMEText(body, 'plain')) | |
| recipient_list = [e.strip() for e in recipient.split(",") if e.strip()] | |
| server = smtplib.SMTP('smtp.gmail.com', 587) | |
| server.starttls() | |
| server.login(EMAIL_ADDRESS, EMAIL_PASSWORD) | |
| text = msg.as_string() | |
| server.sendmail(EMAIL_ADDRESS, recipient_list, text) | |
| server.quit() | |
| logger.info(f"SMTP Email sent successfully.") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Network/SMTP EXCEPTION: {e}") | |
| return False | |
| def send_event_alerts(events): | |
| if not events: | |
| return | |
| msg = f"๐ข <b>{len(events)} New BIP Event Found!</b><br><br>" | |
| for i, ev in enumerate(events, 1): | |
| # Format start date | |
| start = ev.get('start_date', '') | |
| if start: | |
| start = datetime.strptime(start, "%Y-%m-%d").strftime("%d-%m-%Y") | |
| # Format end date | |
| end = ev.get('end_date', '') | |
| if end: | |
| end = datetime.strptime(end, "%Y-%m-%d").strftime("%d-%m-%Y") | |
| msg += ( | |
| f"<b>{ev.get('event_code', '-')} - " | |
| f"{ev.get('event_name', 'Unknown')}</b><br><br>" | |
| f"{ev.get('event_category', '-')}<br><br>" | |
| f"{start} to {end}<br><br>" | |
| f"{ev.get('location', '-')}<br><br>" | |
| f"{ev.get('status', '-')}<br><br>" | |
| f"Seats: Max {ev.get('maximum_count', '-')} | " | |
| f"Applied {ev.get('applied_count', '-')}<br>" | |
| f"<a href='{ev.get('web_url', '#')}'>View Event Here</a><br>" | |
| f"<hr>" | |
| ) | |
| send_email_message("๐ข New BIP Event(s) Found!", msg, is_html=True, recipient=EVENT_EMAIL_RECIPIENT) | |
| # ============================================================================== | |
| # SCRAPER LOGIC | |
| # ============================================================================== | |
| def fetch_bip_events(xsrf_token, bip_session, page=1): | |
| global SESSION_EXPIRED | |
| logger.debug(f"--> fetch_bip_events(page={page})") | |
| cookies = { | |
| "XSRF-TOKEN": xsrf_token, | |
| "bip_session": bip_session | |
| } | |
| params = {"perPage": 10, "page": page} | |
| # Task 3: Exponential Backoff Retry Logic | |
| max_retries = 3 | |
| for attempt in range(max_retries): | |
| try: | |
| # Task 2: Use global requests.Session instead of bare requests.get | |
| r = SESSION.get(BIP_API, params=params, cookies=cookies, timeout=20) | |
| # Check for session expiration | |
| if "text/html" in r.headers.get("Content-Type", "") or r.status_code in [401, 403]: | |
| logger.warning(f"Session expired detected! Content-type: {r.headers.get('Content-Type')}, Status: {r.status_code}") | |
| SESSION_EXPIRED = True | |
| return None, "Session expired or invalid cookies." | |
| else: | |
| SESSION_EXPIRED = False | |
| r.raise_for_status() | |
| # Task 7: Response format validation | |
| data = r.json() | |
| if not isinstance(data, dict) or "resources" not in data or not isinstance(data["resources"], list): | |
| if attempt < max_retries - 1: | |
| raise Exception("Invalid JSON structure received from BIP API: missing or malformed 'resources' list") | |
| return None, "Invalid JSON structure received from BIP API." | |
| return data, None | |
| except Exception as e: | |
| if attempt < max_retries - 1: | |
| sleep_time = (2 ** attempt) + random.uniform(0.1, 1.0) | |
| logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {sleep_time:.2f}s...") | |
| time.sleep(sleep_time) | |
| else: | |
| logger.error(f"Network/Request EXCEPTION in fetch_bip_events after {max_retries} attempts: {e}") | |
| return None, str(e) | |
| return None, "Max retries exceeded." | |
| def parse_event(resource): | |
| data = {} | |
| for f in resource.get("fields", []): | |
| key = f.get("attribute") | |
| val = f.get("value") | |
| if key: | |
| data[key] = val | |
| data["id"] = resource["id"]["value"] | |
| return data | |
| def check_new_events(last_id, xsrf_token, bip_session): | |
| """Fetches events and returns any newer than last_id. Automatically paginates if needed.""" | |
| new_events = [] | |
| page = 1 | |
| while page <= 10: # Limit to 10 pages for safety | |
| data, err = fetch_bip_events(xsrf_token, bip_session, page) | |
| if err or not data: | |
| return None, err | |
| resources = data.get("resources", []) | |
| if not resources: | |
| break | |
| for res in resources: | |
| ev = parse_event(res) | |
| # Stop if we reach the last known new event | |
| if last_id and str(ev["id"]) == str(last_id): | |
| return new_events, None | |
| new_events.append(ev) | |
| # First-ever run scenario: just return the latest event to set the initial ID and avoid sending 100s of alerts | |
| if not last_id and new_events: | |
| return [new_events[0]], None | |
| page += 1 | |
| return new_events, None | |
| # ============================================================================== | |
| # SCHEDULER ENGINE | |
| # ============================================================================== | |
| def process_tick(): | |
| global LAST_EVENT_ID, LAST_EVENT_CODE | |
| logger.debug("--- process_tick starting ---") | |
| try: | |
| # Reload environment variables on every tick | |
| load_dotenv(override=True) | |
| xsrf = os.getenv("XSRF_TOKEN", "") | |
| bip = os.getenv("BIP_SESSION", "") | |
| if not xsrf or not bip: | |
| logger.warning("Skipping check: Please configure XSRF_TOKEN and BIP_SESSION in the deployment environment.") | |
| send_email_message( | |
| "โ ๏ธ BIP Scraper Error", | |
| "โ ๏ธ <b>Deployment Configuration Error!</b><br><br>" | |
| "The application was started without the required <code>XSRF_TOKEN</code> or <code>BIP_SESSION</code> secrets.<br><br>" | |
| "Please configure these variables in your deployment settings to begin tracking.", | |
| is_html=True | |
| ) | |
| raise SystemExit(1) | |
| # Task 1: Load state if we just started | |
| if LAST_EVENT_ID is None: | |
| load_state() | |
| new_events, err = check_new_events(LAST_EVENT_ID, xsrf, bip) | |
| if err: | |
| logger.error(f"Error scraping events: {err}") | |
| send_email_message( | |
| "โ ๏ธ BIP Scraper Error", | |
| "โ ๏ธ <b>Scraper Error!</b><br><br>" | |
| f"The notifier encountered an error and has paused checking. Error:<br>" | |
| f"<code>{err}</code><br><br>" | |
| "Please update the `XSRF_TOKEN` and `BIP_SESSION` variables in your Secret/Env configuration and restart the Space.", | |
| is_html=True | |
| ) | |
| logger.error("Notifier is shutting down completely because of the scraping error.") | |
| raise SystemExit(1) | |
| if new_events: | |
| # If LAST_EVENT_ID is None, it's the very first startup run. Set ID without alerting. | |
| if LAST_EVENT_ID is None: | |
| LAST_EVENT_ID = new_events[0]["id"] | |
| LAST_EVENT_CODE = new_events[0].get('event_code', LAST_EVENT_ID) | |
| save_state(LAST_EVENT_ID, LAST_EVENT_CODE) | |
| logger.info(f"EVENT ID : {LAST_EVENT_CODE} (Tracking started)") | |
| # Send the startup notification | |
| send_email_message( | |
| "๐ BIP Notifier is Online!", | |
| f"You are receiving this because the <b>BIP Auto Notifier</b> script has successfully started tracking on the cloud.<br><br>" | |
| f"<b>Current Active Event:</b> {LAST_EVENT_CODE}<br>" | |
| f"The script is now monitoring in the background. You will receive alerts for any newer events.", | |
| is_html=True | |
| ) | |
| else: | |
| send_event_alerts(new_events) | |
| LAST_EVENT_ID = new_events[0]["id"] | |
| LAST_EVENT_CODE = new_events[0].get('event_code', LAST_EVENT_ID) | |
| save_state(LAST_EVENT_ID, LAST_EVENT_CODE) | |
| for ev in new_events: | |
| code = ev.get('event_code', ev['id']) | |
| logger.info(f"๐จ NEW EVENT ID : {code} (Alert Sent!)") | |
| else: | |
| display_code = LAST_EVENT_CODE if LAST_EVENT_CODE else LAST_EVENT_ID | |
| logger.info(f"EVENT ID : {display_code}") | |
| except Exception as e: | |
| logger.error(f"CRITICAL EXCEPTION in process_tick: {e}") | |
| error_details = traceback.format_exc() | |
| logger.error(error_details) | |
| send_email_message( | |
| "๐จ CRITICAL: BIP Notifier Tick Crashed", | |
| f"<b>The notifier encountered an unexpected exception during process_tick data parsing.</b><br><br>" | |
| f"<b>Error Traceback:</b><br><pre>{error_details}</pre><br>" | |
| f"The application has successfully caught this error, and is safely shutting down to prevent mail loops.", | |
| is_html=True | |
| ) | |
| raise SystemExit(1) | |
| def list_all_events(): | |
| """Fetches the first page of events from BIP and prints them.""" | |
| logger.info("Fetching recent events from BIP...") | |
| load_dotenv(override=True) | |
| xsrf = os.getenv("XSRF_TOKEN", "") | |
| bip = os.getenv("BIP_SESSION", "") | |
| data, err = fetch_bip_events(xsrf, bip, page=1) | |
| if err: | |
| logger.error(f"Error: {err}") | |
| return | |
| resources = data.get("resources", []) | |
| if not resources: | |
| logger.info("No events found.") | |
| return | |
| logger.info(f"\nFound {len(resources)} recent events:") | |
| print("-" * 60) | |
| for res in resources: | |
| ev = parse_event(res) | |
| print(f"[{ev.get('id')}] {ev.get('event_code')} - {ev.get('event_name')} ({ev.get('status')})") | |
| print("-" * 60) | |
| def get_latest_event(): | |
| """Fetches and prints only the single most recent event.""" | |
| logger.info("Fetching the latest event...") | |
| load_dotenv(override=True) | |
| xsrf = os.getenv("XSRF_TOKEN", "") | |
| bip = os.getenv("BIP_SESSION", "") | |
| data, err = fetch_bip_events(xsrf, bip, page=1) | |
| if err: | |
| logger.error(f"Error: {err}") | |
| return | |
| resources = data.get("resources", []) | |
| if not resources: | |
| logger.info("No events found.") | |
| return | |
| ev = parse_event(resources[0]) | |
| print("\n๐ LATEST EVENT:") | |
| print("-" * 60) | |
| print(f"ID: {ev.get('id')}") | |
| print(f"Code: {ev.get('event_code')}") | |
| print(f"Name: {ev.get('event_name')}") | |
| print(f"Dates: {ev.get('start_date')} to {ev.get('end_date')}") | |
| print(f"Location: {ev.get('location')}") | |
| print(f"Status: {ev.get('status')}") | |
| print(f"Link: {ev.get('web_url')}") | |
| print("-" * 60) | |
| def test_email_alert(): | |
| """Sends a dummy test message to the configured Email.""" | |
| logger.info("Sending test exact alert to Event Email...") | |
| success1 = send_email_message("๐ค Test Alert - Event Receivers", "๐ค <b>Test Alert from BIP CLI Notifier</b><br><br>Event emails are working perfectly!", is_html=True, recipient=EVENT_EMAIL_RECIPIENT) | |
| logger.info("Sending test exact alert to Warning Email...") | |
| success2 = send_email_message("๐ค Test Alert - Warning Receivers", "๐ค <b>Test Alert from BIP CLI Notifier</b><br><br>Warning emails are working perfectly!", is_html=True, recipient=WARNING_EMAIL_RECIPIENT) | |
| if success1 and success2: | |
| logger.info("โ Test messages sent successfully!") | |
| else: | |
| logger.error("โ Failed to send one or more test messages. Check your .env configuration.") | |
| def test_real_event_alert(): | |
| """Fetches the actual latest event from BIP and sends it as a test alert.""" | |
| logger.info("Fetching the real latest event to send as a test alert...") | |
| load_dotenv(override=True) | |
| xsrf = os.getenv("XSRF_TOKEN", "") | |
| bip = os.getenv("BIP_SESSION", "") | |
| data, err = fetch_bip_events(xsrf, bip, page=1) | |
| if err: | |
| logger.error(f"Error fetching real event: {err}") | |
| return | |
| resources = data.get("resources", []) | |
| if not resources: | |
| logger.warning("No events found to test with.") | |
| return | |
| ev = parse_event(resources[0]) | |
| logger.info(f"Triggering send_event_alerts with real event data: {ev.get('event_code')}") | |
| send_event_alerts([ev]) | |
| logger.info("โ Real latest event alert sent successfully!") | |
| def start_loop(): | |
| logger.info("=" * 60) | |
| logger.info("๐ BIP CLI Notifier Started") | |
| logger.info("=" * 60) | |
| logger.info("Press Ctrl+C to stop the notifier at any time.\n") | |
| try: | |
| while True: | |
| process_tick() | |
| # Polling loop: Wait in exact intervals without stacking HTTP request delays | |
| target_time = time.time() + CHECK_INTERVAL_SECONDS | |
| while time.time() < target_time: | |
| # Sleep safely for whatever is left over (up to 5 secs max per inner loop) | |
| remaining = target_time - time.time() | |
| if remaining > 0: | |
| time.sleep(min(5, remaining)) | |
| except KeyboardInterrupt: | |
| logger.info("\n๐ Keyboard interrupt received.") | |
| except Exception as e: | |
| logger.error(f"FATAL SYSTEM ERROR in start_loop: {e}") | |
| error_details = traceback.format_exc() | |
| logger.error(error_details) | |
| send_email_message( | |
| "๐จ CRITICAL: BIP Notifier Crashed", | |
| f"<b>The notifier encountered an unexpected fatal system error and has violently shut down.</b><br><br>" | |
| f"<b>Error Traceback:</b><br><pre>{error_details}</pre><br>" | |
| f"The application has been stopped to prevent instability. Please check your deployment logs.", | |
| is_html=True | |
| ) | |
| raise SystemExit(1) | |
| finally: | |
| logger.info("Cleaning up resources...") | |
| try: | |
| SESSION.close() | |
| except: | |
| pass | |
| logger.info("Notifier stopped gracefully. Goodbye!") | |
| # ============================================================================== | |
| # FASTAPI MICROSERVICE (Task 10) | |
| # ============================================================================== | |
| app = FastAPI(title="BIP Auto Notifier") | |
| def background_scraper_loop(): | |
| """Runs the main notification loop within the FastAPI background.""" | |
| start_loop() | |
| async def startup_event(): | |
| logger.info("FastAPI starting up. Launching background tracker thread...") | |
| threading.Thread(target=background_scraper_loop, daemon=True).start() | |
| async def health_check(): | |
| """ | |
| Task 6 & 10: Improved Health Endpoint | |
| Returns status, active tracking state, and expiration errors. | |
| """ | |
| return { | |
| "status": "online", | |
| "session_expired": SESSION_EXPIRED, | |
| "last_event_id": LAST_EVENT_ID, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| async def fetch_latest_api(): | |
| """Fetches the latest currently available event on the portal.""" | |
| load_dotenv(override=True) | |
| xsrf = os.getenv("XSRF_TOKEN", "") | |
| bip = os.getenv("BIP_SESSION", "") | |
| data, err = fetch_bip_events(xsrf, bip, page=1) | |
| if err: | |
| return {"error": err} | |
| resources = data.get("resources", []) | |
| if not resources: | |
| return {"message": "No events found."} | |
| return parse_event(resources[0]) | |
| async def root(): | |
| return { | |
| "app": "BIP Auto Notifier", | |
| "status": "healthy", | |
| "message": "Background tracker running ๐", | |
| "health_endpoint": "/health", | |
| "assistant_endpoint": "/assistant" | |
| } | |
| async def internal_assistant(): | |
| load_dotenv(override=True) | |
| return { | |
| "assistant": "BIP Auto Notifier Internal Assistant", | |
| "system_status": "running", | |
| "last_event_id": LAST_EVENT_ID, | |
| "last_event_code": LAST_EVENT_CODE, | |
| "session_expired": SESSION_EXPIRED, | |
| "check_interval_seconds": CHECK_INTERVAL_SECONDS, | |
| "email_configured": bool(BREVO_API_KEY or (EMAIL_ADDRESS and EMAIL_PASSWORD)), | |
| "bip_api": BIP_API, | |
| "timestamp": datetime.now().isoformat(), | |
| "advice": ( | |
| "System operating normally." | |
| if not SESSION_EXPIRED | |
| else "Session expired. Update XSRF_TOKEN and BIP_SESSION." | |
| ) | |
| } | |
| if __name__ == "__main__": | |
| logger.info("Starting BIP CLI Notifier...") | |
| parser = argparse.ArgumentParser(description="BIP Cloud Notifier CLI") | |
| parser.add_argument("--list-all", action="store_true", help="List all recent events and exit") | |
| parser.add_argument("--latest", action="store_true", help="Print details of the latest event and exit") | |
| parser.add_argument("--test-alert", action="store_true", help="Send a test message to Email and exit") | |
| parser.add_argument("--test-real-event", action="store_true", help="Fetch the actual latest event and send it as an alert") | |
| parser.add_argument("--run", action="store_true", help="Start the continuous monitoring loop (via FastAPI)") | |
| # Task 8: Fix duplicate parsing | |
| args = parser.parse_args() | |
| logger.debug(f"Parsed arguments: {args}") | |
| if args.list_all: | |
| list_all_events() | |
| elif args.latest: | |
| get_latest_event() | |
| elif args.test_alert: | |
| test_email_alert() | |
| elif args.test_real_event: | |
| test_real_event_alert() | |
| elif args.run: | |
| # Launch FastAPI which internally starts the loop | |
| port = int(os.getenv("PORT", 7860)) | |
| uvicorn.run(app, host="0.0.0.0", port=port) | |
| else: | |
| # Default behavior: run FastAPI | |
| port = int(os.getenv("PORT", 7860)) | |
| uvicorn.run(app, host="0.0.0.0", port=port) | |