import os import time import argparse import requests from datetime import datetime from dotenv import load_dotenv import threading from http.server import BaseHTTPRequestHandler, HTTPServer # Load optional .env if present in same directory load_dotenv() # ============================================================================== # CONFIGURATION # ============================================================================== # Environment values (Make sure to populate your .env file) # We will reload these inside the loop so you can change them on the fly. # Telegram settings (Optional, loaded from .env or hardcoded here) TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "") TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID", "") # App check interval in seconds (default 60 secs = 1 min) CHECK_INTERVAL_SECONDS = 60 BIP_API = "https://bip.bitsathy.ac.in/nova-api/student-activity-masters" HEADERS = { "accept": "application/json", "x-requested-with": "XMLHttpRequest", } # State tracking for the loop LAST_EVENT_ID = None LAST_EVENT_CODE = None SESSION_EXPIRED = False EXPIRED_XSRF = None EXPIRED_BIP = None # ============================================================================== # TELEGRAM NOTIFIER # ============================================================================== TELEGRAM_OFFSET = None def update_env_file(new_xsrf, new_bip): env_path = ".env" if not os.path.exists(env_path): with open(env_path, "w") as f: if new_xsrf: f.write(f'XSRF_TOKEN="{new_xsrf}"\n') if new_bip: f.write(f'BIP_SESSION="{new_bip}"\n') return with open(env_path, "r") as f: lines = f.readlines() with open(env_path, "w") as f: xsrf_updated = False bip_updated = False for line in lines: if line.startswith("XSRF_TOKEN=") and new_xsrf is not None: f.write(f'XSRF_TOKEN="{new_xsrf}"\n') xsrf_updated = True elif line.startswith("BIP_SESSION=") and new_bip is not None: f.write(f'BIP_SESSION="{new_bip}"\n') bip_updated = True else: f.write(line) if new_xsrf is not None and not xsrf_updated: f.write(f'XSRF_TOKEN="{new_xsrf}"\n') if new_bip is not None and not bip_updated: f.write(f'BIP_SESSION="{new_bip}"\n') def check_telegram_messages(): global TELEGRAM_OFFSET, SESSION_EXPIRED, EXPIRED_XSRF, EXPIRED_BIP if not TELEGRAM_BOT_TOKEN: return False url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/getUpdates" params = {"timeout": 5} if TELEGRAM_OFFSET: params["offset"] = TELEGRAM_OFFSET try: r = requests.get(url, params=params, timeout=10) data = r.json() cookies_updated = False if data.get("ok"): for result in data.get("result", []): TELEGRAM_OFFSET = result["update_id"] + 1 msg = result.get("message", {}) text = msg.get("text", "").strip() chat_id = msg.get("chat", {}).get("id") # Only process messages from our authorized user if str(chat_id) == str(TELEGRAM_CHAT_ID): new_xsrf = None new_bip = None for line in text.replace("\r", "\n").split("\n"): line = line.strip() if line.startswith("XSRF_TOKEN=") or line.startswith("XSRF-TOKEN="): parts = line.split("=", 1) if len(parts) > 1: new_xsrf = parts[1].strip(' "\'') elif line.startswith("BIP_SESSION=") or line.startswith("bip_session="): parts = line.split("=", 1) if len(parts) > 1: new_bip = parts[1].strip(' "\'') if new_xsrf or new_bip: update_env_file(new_xsrf, new_bip) send_telegram_message("βœ… Cookies Received!\n\nI have updated the .env file and will now resume checking the BIP portal.") SESSION_EXPIRED = False EXPIRED_XSRF = None EXPIRED_BIP = None cookies_updated = True print(f"[{datetime.now().strftime('%I:%M:%S %p')}] πŸ”„ Cookies received via Telegram! Resuming checks...") return cookies_updated except Exception as e: return False def send_telegram_message(text: str): if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID: print("\n[!] Telegram not configured. The following alert would have been sent:\n") print(text) print("-" * 50) return False url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage" payload = { "chat_id": TELEGRAM_CHAT_ID, "text": text, "parse_mode": "HTML", "disable_web_page_preview": True } try: r = requests.post(url, json=payload, timeout=10) r.raise_for_status() return True except Exception as e: print(f"[!] Failed to send telegram message: {e}") return False def send_event_alerts(events): if not events: return msg = f"πŸ“’ {len(events)} New BIP Event Found!\n\n" for i, ev in enumerate(events, 1): # Format start date start = ev.get('start_date', '') if start: start = datetime.strptime(start, "%Y-%m-%d").strftime("%d-%m-%Y") # Format end date end = ev.get('end_date', '') if end: end = datetime.strptime(end, "%Y-%m-%d").strftime("%d-%m-%Y") msg += ( f"{ev.get('event_code', '-')} - " f"{ev.get('event_name', 'Unknown')}\n\n" f"{ev.get('event_category', '-')}\n\n" f"{start} to {end}\n\n" f"{ev.get('location', '-')}\n\n" f"{ev.get('status', '-')}\n\n" f"Seats: Max {ev.get('maximum_count', '-')} | " f"Applied {ev.get('applied_count', '-')}\n" f"View Event Here\n" f"────────────────\n" ) send_telegram_message(msg) # ============================================================================== # SCRAPER LOGIC # ============================================================================== def fetch_bip_events(xsrf_token, bip_session, page=1): cookies = { "XSRF-TOKEN": xsrf_token, "bip_session": bip_session } params = {"perPage": 10, "page": page} try: r = requests.get(BIP_API, params=params, headers=HEADERS, cookies=cookies, timeout=20) # Check if session expired based on HTML redirect instead of JSON if "text/html" in r.headers.get("Content-Type", "") or r.status_code == 401 or r.status_code == 403: return None, "Session expired or invalid cookies." r.raise_for_status() return r.json(), None except Exception as e: return None, str(e) def parse_event(resource): data = {} for f in resource.get("fields", []): key = f.get("attribute") val = f.get("value") if key: data[key] = val data["id"] = resource["id"]["value"] return data def check_new_events(last_id, xsrf_token, bip_session): """Fetches events and returns any newer than last_id. Automatically paginates if needed.""" new_events = [] page = 1 while page <= 10: # Limit to 10 pages for safety data, err = fetch_bip_events(xsrf_token, bip_session, page) if err or not data: return None, err resources = data.get("resources", []) if not resources: break for res in resources: ev = parse_event(res) # Stop if we reach the last known new event if last_id and str(ev["id"]) == str(last_id): return new_events, None new_events.append(ev) # First-ever run scenario: just return the latest event to set the initial ID and avoid sending 100s of alerts if not last_id and new_events: return [new_events[0]], None page += 1 return new_events, None # ============================================================================== # SCHEDULER ENGINE # ============================================================================== def process_tick(): global LAST_EVENT_ID, LAST_EVENT_CODE, SESSION_EXPIRED, EXPIRED_XSRF, EXPIRED_BIP # Reload environment variables on every tick so user can update .env without restarting load_dotenv(override=True) xsrf = os.getenv("XSRF_TOKEN", "") bip = os.getenv("BIP_SESSION", "") now = datetime.now() time_str = now.strftime('%I:%M:%S %p') # Optional logic: only run between 8AM and 6PM if now.hour < 8 or now.hour >= 18: print(f"[{time_str}] πŸŒ™ Out of hours (8 AM - 6 PM). Skipping check.") return if not xsrf or not bip: print(f"[{time_str}] ⏳ Skipping check: Please configure XSRF_TOKEN and BIP_SESSION in the .env file.") return # Unsilenced print block to report resuming checking! if SESSION_EXPIRED: if xsrf == EXPIRED_XSRF and bip == EXPIRED_BIP: print(f"{time_str.lower()} - ⏸️ Paused: Waiting for new cookies in .env to resume...") return else: print(f"{time_str.lower()} - πŸ”„ New cookies detected! Resuming checks...") SESSION_EXPIRED = False EXPIRED_XSRF = None EXPIRED_BIP = None new_events, err = check_new_events(LAST_EVENT_ID, xsrf, bip) if err: print(f"{time_str.lower()} - ❌ Error scraping events: {err}") send_telegram_message( "⚠️ Scraper Error!\n\n" f"The notifier encountered an error and has paused checking. Error:\n" f"{err}\n\n" "Please reply directly to this bot with your new cookies in this format to resume:\n\n" "XSRF_TOKEN=your_new_token_here\nBIP_SESSION=your_new_session_here" ) SESSION_EXPIRED = True EXPIRED_XSRF = xsrf EXPIRED_BIP = bip return if new_events: # If LAST_EVENT_ID is None, it's the very first startup run. Set ID without alerting. if LAST_EVENT_ID is None: LAST_EVENT_ID = new_events[0]["id"] LAST_EVENT_CODE = new_events[0].get('event_code', LAST_EVENT_ID) print(f"{time_str.lower()} - EVENT ID : {LAST_EVENT_CODE} (Tracking started)") else: send_event_alerts(new_events) LAST_EVENT_ID = new_events[0]["id"] LAST_EVENT_CODE = new_events[0].get('event_code', LAST_EVENT_ID) for ev in new_events: code = ev.get('event_code', ev['id']) print(f"{time_str.lower()} - 🚨 NEW EVENT ID : {code} (Alert Sent!)") else: # Just print the tracking status format exactly as requested on every 1-minute tick print(f"{time_str.lower()} - EVENT ID : {LAST_EVENT_CODE}") def list_all_events(): """Fetches the first page of events from BIP and prints them.""" print("Fetching recent events from BIP...") load_dotenv(override=True) xsrf = os.getenv("XSRF_TOKEN", "") bip = os.getenv("BIP_SESSION", "") data, err = fetch_bip_events(xsrf, bip, page=1) if err: print(f"❌ Error: {err}") return resources = data.get("resources", []) if not resources: print("No events found.") return print(f"\nFound {len(resources)} recent events:") print("-" * 60) for res in resources: ev = parse_event(res) print(f"[{ev.get('id')}] {ev.get('event_code')} - {ev.get('event_name')} ({ev.get('status')})") print("-" * 60) def get_latest_event(): """Fetches and prints only the single most recent event.""" print("Fetching the latest event...") load_dotenv(override=True) xsrf = os.getenv("XSRF_TOKEN", "") bip = os.getenv("BIP_SESSION", "") data, err = fetch_bip_events(xsrf, bip, page=1) if err: print(f"❌ Error: {err}") return resources = data.get("resources", []) if not resources: print("No events found.") return ev = parse_event(resources[0]) print("\n🌟 LATEST EVENT:") print("-" * 60) print(f"ID: {ev.get('id')}") print(f"Code: {ev.get('event_code')}") print(f"Name: {ev.get('event_name')}") print(f"Dates: {ev.get('start_date')} to {ev.get('end_date')}") print(f"Location: {ev.get('location')}") print(f"Status: {ev.get('status')}") print(f"Link: {ev.get('web_url')}") print("-" * 60) def test_telegram_alert(): """Sends a dummy test message to the configured Telegram chat.""" print("Sending test exact alert to Telegram...") success = send_telegram_message("πŸ€– Test Alert from BIP CLI Notifier\n\nYour Telegram integration is working perfectly!") if success: print("βœ… Test message sent successfully!") else: print("❌ Failed to send test message. Check your .env configuration.") def start_loop(): print("=" * 60) print("πŸš€ BIP CLI Notifier Started") print("=" * 60) print("Press Ctrl+C to stop the notifier at any time.\n") try: while True: process_tick() # Polling loop: Wait in exact intervals without stacking HTTP request delays target_time = time.time() + CHECK_INTERVAL_SECONDS while time.time() < target_time: cookies_updated = check_telegram_messages() if cookies_updated: # User sent new cookies, immediately break to run process_tick right now! break # Sleep safely for whatever is left over (up to 5 secs max per inner loop) remaining = target_time - time.time() if remaining > 0: time.sleep(min(5, remaining)) except KeyboardInterrupt: print("\n\nπŸ›‘ Notifier manually stopped by user. Goodbye!") def run_dummy_server(): """ Hugging Face Spaces requires a web server listening on port 7860 to pass health checks. This tiny server does nothing but say "I am alive" in the background. """ class DummyHandler(BaseHTTPRequestHandler): def do_GET(self): self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write(b"BIP Notifier is running in the background!") def log_message(self, format, *args): pass # Keep terminal clean from HTTP logs server = HTTPServer(('0.0.0.0', 7860), DummyHandler) server.serve_forever() if __name__ == "__main__": print("Starting BIP CLI Notifier...") parser = argparse.ArgumentParser(description="BIP Cloud Notifier CLI") parser.add_argument("--list-all", action="store_true", help="List all recent events and exit") parser.add_argument("--latest", action="store_true", help="Print details of the latest event and exit") parser.add_argument("--test-alert", action="store_true", help="Send a test message to Telegram and exit") parser.add_argument("--run", action="store_true", help="Start the continuous 1-minute monitoring loop") print("Parsed arguments:", parser.parse_args()) args = parser.parse_args() if args.list_all: list_all_events() elif args.latest: get_latest_event() elif args.test_alert: test_telegram_alert() elif args.run: threading.Thread(target=run_dummy_server, daemon=True).start() start_loop() else: # If no arguments provided, default to starting the loop just like before threading.Thread(target=run_dummy_server, daemon=True).start() start_loop()