import os import time import argparse import requests import random import logging import json import traceback from datetime import datetime, timezone, timedelta from dotenv import load_dotenv import threading import asyncio from contextlib import asynccontextmanager from fastapi import FastAPI, BackgroundTasks, Request from fastapi.responses import HTMLResponse import uvicorn # Task 4 & 5 dependencies (Sendgrid API & Logging) from urllib.error import HTTPError # Load optional .env if present in same directory load_dotenv() # ============================================================================== # LOGGING (Task 5) # ============================================================================== IST = timezone(timedelta(hours=5, minutes=30)) logging.Formatter.converter = lambda *args: datetime.fromtimestamp(args[-1], tz=IST).timetuple() logging.basicConfig( level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s', datefmt='%I:%M:%S %p', handlers=[ logging.FileHandler("app.log"), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # ============================================================================== # CONFIGURATION # ============================================================================== # Environment values (Make sure to populate your .env file) # We will reload these inside the loop so you can change them on the fly. # Email settings EMAIL_ADDRESS = os.getenv("EMAIL_ADDRESS", "") EVENT_EMAIL_RECIPIENT = os.getenv("EVENT_EMAIL_RECIPIENT", os.getenv("EMAIL_RECIPIENT", EMAIL_ADDRESS)) WARNING_EMAIL_RECIPIENT = os.getenv("WARNING_EMAIL_RECIPIENT", os.getenv("EMAIL_RECIPIENT", EMAIL_ADDRESS)) EMAIL_PASSWORD = os.getenv("EMAIL_PASSWORD", "") BREVO_API_KEY = os.getenv("BREVO_API_KEY", "") # App check intervals in seconds # Read dynamically via os.getenv for NORMAL_INTERVAL, FAST_INTERVAL, FAST_DURATION BIP_API = "https://bip.bitsathy.ac.in/nova-api/student-activity-masters" HEADERS = { "accept": "application/json", "x-requested-with": "XMLHttpRequest", } # ============================================================================== # GLOBAL SESSION (Task 2) # ============================================================================== SESSION = requests.Session() SESSION.headers.update(HEADERS) # State tracking for the loop STATE_FILE = "state.txt" LAST_EVENT_ID = None LAST_EVENT_CODE = None SESSION_EXPIRED = False FAST_MODE_UNTIL = 0 # ============================================================================== # STATE MANAGEMENT (Task 1) # ============================================================================== def load_state(): global LAST_EVENT_ID, LAST_EVENT_CODE if os.path.exists(STATE_FILE): try: with open(STATE_FILE, "r") as f: content = f.read().strip() if content: # In case the file has multiple lines (id and code) parts = content.split(",") LAST_EVENT_ID = int(parts[0]) if len(parts) > 1: LAST_EVENT_CODE = parts[1] logger.info(f"Loaded LAST_EVENT_ID from state: {LAST_EVENT_ID}") except Exception as e: logger.error(f"Failed to read state file: {e}") def save_state(event_id, event_code=None): try: with open(STATE_FILE, "w") as f: if event_code: f.write(f"{event_id},{event_code}") else: f.write(str(event_id)) except Exception as e: logger.error(f"Failed to write state file: {e}") # ============================================================================== # EMAIL NOTIFIER # ============================================================================== import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart def send_email_message(subject: str, body: str, is_html=False, recipient=None): if recipient is None: recipient = WARNING_EMAIL_RECIPIENT # Cloud Safe Alternative (Brevo API) if BREVO_API_KEY: logger.debug("--> send_email_message called via Brevo API") try: url = "https://api.brevo.com/v3/smtp/email" headers = { "accept": "application/json", "api-key": BREVO_API_KEY, "content-type": "application/json" } recipient_list = [{"email": e.strip()} for e in recipient.split(",") if e.strip()] payload = { "sender": {"name": "BIP Auto Notifier", "email": EMAIL_ADDRESS}, "bcc": recipient_list, "subject": subject, "replyTo": {"name": "BIP Auto Notifier", "email": "noreply@bip.notifier"} } if is_html: payload["htmlContent"] = body else: payload["textContent"] = body response = requests.post(url, json=payload, headers=headers, timeout=10) if response.status_code in [201, 200, 202]: logger.info(f"Brevo email sent successfully! Status: {response.status_code}") return True else: try: error_data = response.json() logger.error(f"Brevo API Error (Status {response.status_code}): {error_data.get('message', 'Unknown error')} - Code: {error_data.get('code', 'Unknown')}") except Exception: logger.error(f"Brevo API Error: {response.status_code} - {response.text}") return False except Exception as e: logger.error(f"Brevo Network EXCEPTION: {e}") return False # Local Fallback (SMTP) logger.debug("--> send_email_message called via Standard SMTP") if not EMAIL_ADDRESS or not EMAIL_PASSWORD or not recipient: logger.warning(f"Email credentials not configured. The following alert '{subject}' would have been sent.") return False try: msg = MIMEMultipart() msg['From'] = EMAIL_ADDRESS msg['Bcc'] = recipient msg['Subject'] = subject if is_html: msg.attach(MIMEText(body, 'html')) else: msg.attach(MIMEText(body, 'plain')) recipient_list = [e.strip() for e in recipient.split(",") if e.strip()] server = smtplib.SMTP('smtp.gmail.com', 587) server.starttls() server.login(EMAIL_ADDRESS, EMAIL_PASSWORD) text = msg.as_string() server.sendmail(EMAIL_ADDRESS, recipient_list, text) server.quit() logger.info(f"SMTP Email sent successfully.") return True except Exception as e: logger.error(f"Network/SMTP EXCEPTION: {e}") return False def send_event_alerts(events): if not events: return def format_date(date_str): try: dt = datetime.strptime(date_str, "%Y-%m-%d") return dt.strftime('%d-%m-%Y') except: return date_str msg = """
šŸ“¢ Event Details
""" for i, ev in enumerate(events, 1): start = format_date(ev.get("start_date", "")) if ev.get("start_date") else "" end = format_date(ev.get("end_date", "")) if ev.get("end_date") else "" date_str = f"{start} to {end}" if start and end else (start or end or "-") # Code formatting without highlight code = ev.get("event_code", "-") # Count values (convert to ints to ensure accurate calculation) try: max_count = int(ev.get("maximum_count", 0)) except (ValueError, TypeError): max_count = 0 try: applied = int(ev.get("applied_count", 0)) except (ValueError, TypeError): applied = 0 try: balance = int(ev.get("ComputedField", 0)) except (ValueError, TypeError): balance = 0 msg += f"""
Code : {code}
Name : {ev.get('event_name', '-')}
Organizer : {ev.get('organizer', '-')}
Date : {date_str}
Category : {ev.get('event_category', '-')}
Location : {ev.get('location', '-')}
Logger URL : Link
View Link : View Event Here
Count : Max: {max_count} | Balance: {balance} | Applied: {applied}
""" if i < len(events): msg += "
" msg += """
Automated Notification from BIP Tracker
""" subject = "New Event Added" if len(events) == 1: event_name = events[0].get("event_name") if event_name: subject = f"New Event: {event_name}" send_email_message(subject, msg, is_html=True, recipient=EVENT_EMAIL_RECIPIENT) # ============================================================================== # SCRAPER LOGIC # ============================================================================== def fetch_bip_events(xsrf_token, bip_session, page=1): global SESSION_EXPIRED logger.debug(f"--> fetch_bip_events(page={page})") cookies = { "XSRF-TOKEN": xsrf_token, "bip_session": bip_session } params = {"perPage": 10, "page": page} # Task 3: Exponential Backoff Retry Logic max_retries = 3 for attempt in range(max_retries): try: # Task 2: Use global requests.Session instead of bare requests.get r = SESSION.get(BIP_API, params=params, cookies=cookies, timeout=20) # Check for session expiration if "text/html" in r.headers.get("Content-Type", "") or r.status_code in [401, 403]: logger.warning(f"Session expired detected! Content-type: {r.headers.get('Content-Type')}, Status: {r.status_code}") SESSION_EXPIRED = True return None, "Session expired or invalid cookies." else: SESSION_EXPIRED = False r.raise_for_status() # Task 7: Response format validation data = r.json() if not isinstance(data, dict) or "resources" not in data or not isinstance(data["resources"], list): if attempt < max_retries - 1: raise Exception("Invalid JSON structure received from BIP API: missing or malformed 'resources' list") return None, "Invalid JSON structure received from BIP API." return data, None except Exception as e: if attempt < max_retries - 1: sleep_time = (2 ** attempt) + random.uniform(0.1, 1.0) logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {sleep_time:.2f}s...") time.sleep(sleep_time) else: logger.error(f"Network/Request EXCEPTION in fetch_bip_events after {max_retries} attempts: {e}") return None, str(e) return None, "Max retries exceeded." def parse_event(resource): data = {} for f in resource.get("fields", []): key = f.get("attribute") val = f.get("value") if key: data[key] = val data["id"] = resource["id"]["value"] return data def check_new_events(last_id, xsrf_token, bip_session): """Fetches events and returns any newer than last_id. Automatically paginates if needed.""" new_events = [] page = 1 while page <= 10: # Limit to 10 pages for safety data, err = fetch_bip_events(xsrf_token, bip_session, page) if err or not data: return None, err resources = data.get("resources", []) if not resources: break for res in resources: ev = parse_event(res) # Stop if we reach the last known new event if last_id and str(ev["id"]) == str(last_id): return new_events, None new_events.append(ev) # First-ever run scenario: just return the latest event to set the initial ID and avoid sending 100s of alerts if not last_id and new_events: return [new_events[0]], None page += 1 return new_events, None # ============================================================================== # SCHEDULER ENGINE # ============================================================================== def get_current_interval(): """Returns the polling interval in seconds, or 0 if in inactive hours.""" global FAST_MODE_UNTIL fast_interval = int(os.getenv("FAST_INTERVAL", "30")) normal_interval = int(os.getenv("NORMAL_INTERVAL", "120")) now_ist = datetime.now(IST) current_hour = now_ist.hour if 8 <= current_hour < 17: if time.time() < FAST_MODE_UNTIL: return fast_interval else: return normal_interval return 0 def process_tick(): global LAST_EVENT_ID, LAST_EVENT_CODE, FAST_MODE_UNTIL logger.debug("--- process_tick starting ---") try: # Reload environment variables on every tick load_dotenv(override=True) xsrf = os.getenv("XSRF_TOKEN", "") bip = os.getenv("BIP_SESSION", "") if not xsrf or not bip: logger.warning("Skipping check: Please configure XSRF_TOKEN and BIP_SESSION in the deployment environment.") send_email_message( "āš ļø BIP Scraper Error", "āš ļø Deployment Configuration Error!

" "The application was started without the required XSRF_TOKEN or BIP_SESSION secrets.

" "Please configure these variables in your deployment settings to begin tracking.", is_html=True ) raise SystemExit(1) # Task 1: Load state if we just started if LAST_EVENT_ID is None: load_state() new_events, err = check_new_events(LAST_EVENT_ID, xsrf, bip) if err: logger.error(f"Error scraping events: {err}") send_email_message( "āš ļø BIP Scraper Error", "āš ļø Scraper Error!

" f"The notifier encountered an error and has paused checking. Error:
" f"{err}

" "Please update the `XSRF_TOKEN` and `BIP_SESSION` variables in your Secret/Env configuration and restart the Space.", is_html=True ) logger.error("Notifier is shutting down completely because of the scraping error.") raise SystemExit(1) if new_events: # If LAST_EVENT_ID is None, it's the very first startup run. Set ID without alerting. if LAST_EVENT_ID is None: LAST_EVENT_ID = new_events[0]["id"] LAST_EVENT_CODE = new_events[0].get('event_code', LAST_EVENT_ID) save_state(LAST_EVENT_ID, LAST_EVENT_CODE) logger.info(f"EVENT ID : {LAST_EVENT_CODE} (Tracking started)") # Send the startup notification send_email_message( "šŸš€ BIP Notifier is Online!", f"You are receiving this because the BIP Auto Notifier script has successfully started tracking on the cloud.

" f"Current Active Event: {LAST_EVENT_CODE}
" f"The script is now monitoring in the background. You will receive alerts for any newer events.", is_html=True ) else: fast_duration = int(os.getenv("FAST_DURATION", "120")) FAST_MODE_UNTIL = time.time() + fast_duration send_event_alerts(new_events) LAST_EVENT_ID = new_events[0]["id"] LAST_EVENT_CODE = new_events[0].get('event_code', LAST_EVENT_ID) save_state(LAST_EVENT_ID, LAST_EVENT_CODE) for ev in new_events: code = ev.get('event_code', ev['id']) logger.info(f"🚨 NEW EVENT ID : {code} (Alert Sent!)") else: display_code = LAST_EVENT_CODE if LAST_EVENT_CODE else LAST_EVENT_ID logger.info(f"EVENT ID : {display_code}") except Exception as e: logger.error(f"CRITICAL EXCEPTION in process_tick: {e}") error_details = traceback.format_exc() logger.error(error_details) send_email_message( "🚨 CRITICAL: BIP Notifier Tick Crashed", f"The notifier encountered an unexpected exception during process_tick data parsing.

" f"Error Traceback:
{error_details}

" f"The application has successfully caught this error, and is safely shutting down to prevent mail loops.", is_html=True ) raise SystemExit(1) def list_all_events(): """Fetches the first page of events from BIP and prints them.""" logger.info("Fetching recent events from BIP...") load_dotenv(override=True) xsrf = os.getenv("XSRF_TOKEN", "") bip = os.getenv("BIP_SESSION", "") data, err = fetch_bip_events(xsrf, bip, page=1) if err: logger.error(f"Error: {err}") return resources = data.get("resources", []) if not resources: logger.info("No events found.") return logger.info(f"\nFound {len(resources)} recent events:") print("-" * 60) for res in resources: ev = parse_event(res) print(f"[{ev.get('id')}] {ev.get('event_code')} - {ev.get('event_name')} ({ev.get('status')})") print("-" * 60) def get_latest_event(): """Fetches and prints only the single most recent event.""" logger.info("Fetching the latest event...") load_dotenv(override=True) xsrf = os.getenv("XSRF_TOKEN", "") bip = os.getenv("BIP_SESSION", "") data, err = fetch_bip_events(xsrf, bip, page=1) if err: logger.error(f"Error: {err}") return resources = data.get("resources", []) if not resources: logger.info("No events found.") return ev = parse_event(resources[0]) print("\n🌟 LATEST EVENT:") print("-" * 60) print(f"ID: {ev.get('id')}") print(f"Code: {ev.get('event_code')}") print(f"Name: {ev.get('event_name')}") print(f"Dates: {ev.get('start_date')} to {ev.get('end_date')}") print(f"Location: {ev.get('location')}") print(f"Status: {ev.get('status')}") print(f"Link: {ev.get('web_url')}") print("-" * 60) def test_email_alert(): """Sends a dummy test message to the configured Email.""" logger.info("Sending test exact alert to Event Email...") success1 = send_email_message("šŸ¤– Test Alert - Event Receivers", "šŸ¤– Test Alert from BIP CLI Notifier

Event emails are working perfectly!", is_html=True, recipient=EVENT_EMAIL_RECIPIENT) logger.info("Sending test exact alert to Warning Email...") success2 = send_email_message("šŸ¤– Test Alert - Warning Receivers", "šŸ¤– Test Alert from BIP CLI Notifier

Warning emails are working perfectly!", is_html=True, recipient=WARNING_EMAIL_RECIPIENT) if success1 and success2: logger.info("āœ… Test messages sent successfully!") else: logger.error("āŒ Failed to send one or more test messages. Check your .env configuration.") def test_real_event_alert(): """Fetches the actual latest event from BIP and sends it as a test alert.""" logger.info("Fetching the real latest event to send as a test alert...") load_dotenv(override=True) xsrf = os.getenv("XSRF_TOKEN", "") bip = os.getenv("BIP_SESSION", "") data, err = fetch_bip_events(xsrf, bip, page=1) if err: logger.error(f"Error fetching real event: {err}") return resources = data.get("resources", []) if not resources: logger.warning("No events found to test with.") return ev = parse_event(resources[0]) logger.info(f"Triggering send_event_alerts with real event data: {ev.get('event_code')}") send_event_alerts([ev]) logger.info("āœ… Real latest event alert sent successfully!") def start_loop(): global FAST_MODE_UNTIL logger.info("šŸš€ BIP CLI Notifier Started") sleep_notified = False try: while True: check_interval = get_current_interval() if check_interval > 0: fast_interval = int(os.getenv("FAST_INTERVAL", "30")) if check_interval == fast_interval: logger.info(f"⚔ Fast Mode Active ({fast_interval}s interval)") sleep_notified = False else: # Sleep until next 8 AM now_ist = datetime.now(IST) current_hour = now_ist.hour next_8am = now_ist.replace(hour=8, minute=0, second=0, microsecond=0) if current_hour >= 17: next_8am += timedelta(days=1) sleep_seconds = (next_8am - now_ist).total_seconds() sleep_hours = round(sleep_seconds / 3600, 1) logger.info(f"Inactive hours. Sleeping until 8 AM ({int(sleep_seconds)} sec)") if not sleep_notified: send_email_message( "šŸŒ™ BIP Notifier Sleeping", f"The BIP Auto Notifier has entered its scheduled inactive period.

" f"Resuming at: {next_8am.strftime('%I:%M %p (%d-%m-%Y)')}
" f"Sleeping for: ~{sleep_hours} hours

" f"You will not receive any scraping alerts until the notifier wakes up.", is_html=True ) sleep_notified = True FAST_MODE_UNTIL = 0 time.sleep(sleep_seconds) continue process_tick() # Polling loop: Wait in exact intervals without stacking HTTP request delays target_time = time.time() + check_interval while time.time() < target_time: # Sleep safely for whatever is left over (up to 5 secs max per inner loop) remaining = target_time - time.time() if remaining > 0: time.sleep(min(5, remaining)) except KeyboardInterrupt: logger.info("\nšŸ›‘ Keyboard interrupt received.") except Exception as e: logger.error(f"FATAL SYSTEM ERROR in start_loop: {e}") error_details = traceback.format_exc() logger.error(error_details) send_email_message( "🚨 CRITICAL: BIP Notifier Crashed", f"The notifier encountered an unexpected fatal system error and has violently shut down.

" f"Error Traceback:
{error_details}

" f"The application has been stopped to prevent instability. Please check your deployment logs.", is_html=True ) raise SystemExit(1) finally: logger.info("Cleaning up resources...") try: SESSION.close() except: pass logger.info("Notifier stopped gracefully. Goodbye!") # ============================================================================== # FASTAPI MICROSERVICE (Task 10) # ============================================================================== def background_scraper_loop(): """Runs the main notification loop within the FastAPI background.""" start_loop() @asynccontextmanager async def lifespan(app: FastAPI): # Startup threading.Thread(target=background_scraper_loop, daemon=True).start() yield # Shutdown (nothing to do here currently) app = FastAPI(title="BIP Auto Notifier", lifespan=lifespan) @app.get("/health") async def health_check(): """ Task 6 & 10: Improved Health Endpoint Returns status, active tracking state, and expiration errors. """ return { "status": "online", "session_expired": SESSION_EXPIRED, "last_event_id": LAST_EVENT_ID, "timestamp": datetime.now(IST).strftime("%I:%M %p (%d-%m-%Y)") } @app.get("/latest") async def fetch_latest_api(): """Fetches the latest currently available event on the portal.""" load_dotenv(override=True) xsrf = os.getenv("XSRF_TOKEN", "") bip = os.getenv("BIP_SESSION", "") data, err = fetch_bip_events(xsrf, bip, page=1) if err: return {"error": err} resources = data.get("resources", []) if not resources: return {"message": "No events found."} return parse_event(resources[0]) @app.get("/", response_class=HTMLResponse) async def root(request: Request): # Fetch real-time data internally without an HTTP request overhead health_data = await health_check() assistant_data = await internal_assistant() # Format nicely as JSON strings health_json = json.dumps(health_data, indent=2) assistant_json = json.dumps(assistant_data, indent=2) return f""" BIP Auto Notifier

BIP Auto Notifier

System is Healthy & Tracking šŸš€

Loading logs...
{assistant_json}
{health_json}
""" from fastapi.responses import HTMLResponse @app.get("/logs/data") async def get_logs_data(): """API Endpoint: Returns raw text for the frontend to poll.""" try: with open("app.log", "r") as f: lines = f.readlines() log_content = "".join(lines) log_content = log_content.replace("&", "&").replace("<", "<").replace(">", ">") return HTMLResponse(content=log_content) except Exception as e: return HTMLResponse(content=f"Error reading logs: {e}") async def internal_assistant(): load_dotenv(override=True) current_interval = get_current_interval() fast_interval = int(os.getenv("FAST_INTERVAL", "30")) if current_interval == fast_interval: status_msg = "Running (fast mode)" elif current_interval > 0: status_msg = "Running" else: status_msg = "Paused (inactive hours)" return { "assistant": "BIP Auto Notifier Internal Assistant", "system_status": status_msg, "last_event_id": LAST_EVENT_ID, "last_event_code": LAST_EVENT_CODE, "session_expired": SESSION_EXPIRED, "check_interval_seconds": current_interval, "email_configured": bool(BREVO_API_KEY or (EMAIL_ADDRESS and EMAIL_PASSWORD)), "bip_api": BIP_API, "timestamp": datetime.now(IST).strftime("%I:%M %p (%d-%m-%Y)"), "advice": ( "System operating normally." if not SESSION_EXPIRED else "Session expired. Update XSRF_TOKEN and BIP_SESSION." ) } if __name__ == "__main__": parser = argparse.ArgumentParser(description="BIP Cloud Notifier CLI") parser.add_argument("--list-all", action="store_true", help="List all recent events and exit") parser.add_argument("--latest", action="store_true", help="Print details of the latest event and exit") parser.add_argument("--test-alert", action="store_true", help="Send a test message to Email and exit") parser.add_argument("--test-real-event", action="store_true", help="Fetch the actual latest event and send it as an alert") parser.add_argument("--run", action="store_true", help="Start the continuous monitoring loop (via FastAPI)") # Task 8: Fix duplicate parsing args = parser.parse_args() logger.debug(f"Parsed arguments: {args}") if args.list_all: list_all_events() elif args.latest: get_latest_event() elif args.test_alert: test_email_alert() elif args.test_real_event: test_real_event_alert() elif args.run: # Launch FastAPI which internally starts the loop port = int(os.getenv("PORT", 7860)) uvicorn.run(app, host="0.0.0.0", port=port, log_level="warning") else: # Default behavior: run FastAPI port = int(os.getenv("PORT", 7860)) uvicorn.run(app, host="0.0.0.0", port=port, log_level="warning")