Spaces:
Running
Running
| import os | |
| import time | |
| import argparse | |
| import requests | |
| from datetime import datetime | |
| from dotenv import load_dotenv | |
| import threading | |
| from http.server import BaseHTTPRequestHandler, HTTPServer | |
| # Load optional .env if present in same directory | |
| load_dotenv() | |
| # ============================================================================== | |
| # CONFIGURATION | |
| # ============================================================================== | |
| # Environment values (Make sure to populate your .env file) | |
| # We will reload these inside the loop so you can change them on the fly. | |
| # Telegram settings (Optional, loaded from .env or hardcoded here) | |
| TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "") | |
| TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID", "") | |
| # App check interval in seconds (default 60 secs = 1 min) | |
| CHECK_INTERVAL_SECONDS = 60 | |
| BIP_API = "https://bip.bitsathy.ac.in/nova-api/student-activity-masters" | |
| HEADERS = { | |
| "accept": "application/json", | |
| "x-requested-with": "XMLHttpRequest", | |
| } | |
| # State tracking for the loop | |
| LAST_EVENT_ID = None | |
| LAST_EVENT_CODE = None | |
| SESSION_EXPIRED = False | |
| EXPIRED_XSRF = None | |
| EXPIRED_BIP = None | |
| # ============================================================================== | |
| # TELEGRAM NOTIFIER | |
| # ============================================================================== | |
| TELEGRAM_OFFSET = None | |
| def update_env_file(new_xsrf, new_bip): | |
| env_path = ".env" | |
| if not os.path.exists(env_path): | |
| with open(env_path, "w") as f: | |
| if new_xsrf: f.write(f'XSRF_TOKEN="{new_xsrf}"\n') | |
| if new_bip: f.write(f'BIP_SESSION="{new_bip}"\n') | |
| return | |
| with open(env_path, "r") as f: | |
| lines = f.readlines() | |
| with open(env_path, "w") as f: | |
| xsrf_updated = False | |
| bip_updated = False | |
| for line in lines: | |
| if line.startswith("XSRF_TOKEN=") and new_xsrf is not None: | |
| f.write(f'XSRF_TOKEN="{new_xsrf}"\n') | |
| xsrf_updated = True | |
| elif line.startswith("BIP_SESSION=") and new_bip is not None: | |
| f.write(f'BIP_SESSION="{new_bip}"\n') | |
| bip_updated = True | |
| else: | |
| f.write(line) | |
| if new_xsrf is not None and not xsrf_updated: | |
| f.write(f'XSRF_TOKEN="{new_xsrf}"\n') | |
| if new_bip is not None and not bip_updated: | |
| f.write(f'BIP_SESSION="{new_bip}"\n') | |
| def check_telegram_messages(): | |
| global TELEGRAM_OFFSET, SESSION_EXPIRED, EXPIRED_XSRF, EXPIRED_BIP | |
| if not TELEGRAM_BOT_TOKEN: return False | |
| url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/getUpdates" | |
| params = {"timeout": 5} | |
| if TELEGRAM_OFFSET: | |
| params["offset"] = TELEGRAM_OFFSET | |
| try: | |
| r = requests.get(url, params=params, timeout=10) | |
| data = r.json() | |
| cookies_updated = False | |
| if data.get("ok"): | |
| for result in data.get("result", []): | |
| TELEGRAM_OFFSET = result["update_id"] + 1 | |
| msg = result.get("message", {}) | |
| text = msg.get("text", "").strip() | |
| chat_id = msg.get("chat", {}).get("id") | |
| # Only process messages from our authorized user | |
| if str(chat_id) == str(TELEGRAM_CHAT_ID): | |
| new_xsrf = None | |
| new_bip = None | |
| for line in text.replace("\r", "\n").split("\n"): | |
| line = line.strip() | |
| if line.startswith("XSRF_TOKEN=") or line.startswith("XSRF-TOKEN="): | |
| parts = line.split("=", 1) | |
| if len(parts) > 1: | |
| new_xsrf = parts[1].strip(' "\'') | |
| elif line.startswith("BIP_SESSION=") or line.startswith("bip_session="): | |
| parts = line.split("=", 1) | |
| if len(parts) > 1: | |
| new_bip = parts[1].strip(' "\'') | |
| if new_xsrf or new_bip: | |
| update_env_file(new_xsrf, new_bip) | |
| send_telegram_message("β <b>Cookies Received!</b>\n\nI have updated the <code>.env</code> file and will now resume checking the BIP portal.") | |
| SESSION_EXPIRED = False | |
| EXPIRED_XSRF = None | |
| EXPIRED_BIP = None | |
| cookies_updated = True | |
| print(f"[{datetime.now().strftime('%I:%M:%S %p')}] π Cookies received via Telegram! Resuming checks...") | |
| return cookies_updated | |
| except Exception as e: | |
| return False | |
| def send_telegram_message(text: str): | |
| if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID: | |
| print("\n[!] Telegram not configured. The following alert would have been sent:\n") | |
| print(text) | |
| print("-" * 50) | |
| return False | |
| url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage" | |
| payload = { | |
| "chat_id": TELEGRAM_CHAT_ID, | |
| "text": text, | |
| "parse_mode": "HTML", | |
| "disable_web_page_preview": True | |
| } | |
| try: | |
| r = requests.post(url, json=payload, timeout=10) | |
| r.raise_for_status() | |
| return True | |
| except Exception as e: | |
| print(f"[!] Failed to send telegram message: {e}") | |
| return False | |
| def send_event_alerts(events): | |
| if not events: | |
| return | |
| msg = f"π’ <b>{len(events)} New BIP Event Found!</b>\n\n" | |
| for i, ev in enumerate(events, 1): | |
| # Format start date | |
| start = ev.get('start_date', '') | |
| if start: | |
| start = datetime.strptime(start, "%Y-%m-%d").strftime("%d-%m-%Y") | |
| # Format end date | |
| end = ev.get('end_date', '') | |
| if end: | |
| end = datetime.strptime(end, "%Y-%m-%d").strftime("%d-%m-%Y") | |
| msg += ( | |
| f"<b>{ev.get('event_code', '-')} - " | |
| f"{ev.get('event_name', 'Unknown')}</b>\n\n" | |
| f"{ev.get('event_category', '-')}\n\n" | |
| f"{start} to {end}\n\n" | |
| f"{ev.get('location', '-')}\n\n" | |
| f"{ev.get('status', '-')}\n\n" | |
| f"Seats: Max {ev.get('maximum_count', '-')} | " | |
| f"Applied {ev.get('applied_count', '-')}\n" | |
| f"<a href='{ev.get('web_url', '#')}'>View Event Here</a>\n" | |
| f"ββββββββββββββββ\n" | |
| ) | |
| send_telegram_message(msg) | |
| # ============================================================================== | |
| # SCRAPER LOGIC | |
| # ============================================================================== | |
| def fetch_bip_events(xsrf_token, bip_session, page=1): | |
| cookies = { | |
| "XSRF-TOKEN": xsrf_token, | |
| "bip_session": bip_session | |
| } | |
| params = {"perPage": 10, "page": page} | |
| try: | |
| r = requests.get(BIP_API, params=params, headers=HEADERS, cookies=cookies, timeout=20) | |
| # Check if session expired based on HTML redirect instead of JSON | |
| if "text/html" in r.headers.get("Content-Type", "") or r.status_code == 401 or r.status_code == 403: | |
| return None, "Session expired or invalid cookies." | |
| r.raise_for_status() | |
| return r.json(), None | |
| except Exception as e: | |
| return None, str(e) | |
| def parse_event(resource): | |
| data = {} | |
| for f in resource.get("fields", []): | |
| key = f.get("attribute") | |
| val = f.get("value") | |
| if key: | |
| data[key] = val | |
| data["id"] = resource["id"]["value"] | |
| return data | |
| def check_new_events(last_id, xsrf_token, bip_session): | |
| """Fetches events and returns any newer than last_id. Automatically paginates if needed.""" | |
| new_events = [] | |
| page = 1 | |
| while page <= 10: # Limit to 10 pages for safety | |
| data, err = fetch_bip_events(xsrf_token, bip_session, page) | |
| if err or not data: | |
| return None, err | |
| resources = data.get("resources", []) | |
| if not resources: | |
| break | |
| for res in resources: | |
| ev = parse_event(res) | |
| # Stop if we reach the last known new event | |
| if last_id and str(ev["id"]) == str(last_id): | |
| return new_events, None | |
| new_events.append(ev) | |
| # First-ever run scenario: just return the latest event to set the initial ID and avoid sending 100s of alerts | |
| if not last_id and new_events: | |
| return [new_events[0]], None | |
| page += 1 | |
| return new_events, None | |
| # ============================================================================== | |
| # SCHEDULER ENGINE | |
| # ============================================================================== | |
| def process_tick(): | |
| global LAST_EVENT_ID, LAST_EVENT_CODE, SESSION_EXPIRED, EXPIRED_XSRF, EXPIRED_BIP | |
| # Reload environment variables on every tick so user can update .env without restarting | |
| load_dotenv(override=True) | |
| xsrf = os.getenv("XSRF_TOKEN", "") | |
| bip = os.getenv("BIP_SESSION", "") | |
| now = datetime.now() | |
| time_str = now.strftime('%I:%M:%S %p') | |
| # Optional logic: only run between 8AM and 6PM | |
| if now.hour < 8 or now.hour >= 18: | |
| print(f"[{time_str}] π Out of hours (8 AM - 6 PM). Skipping check.") | |
| return | |
| if not xsrf or not bip: | |
| print(f"[{time_str}] β³ Skipping check: Please configure XSRF_TOKEN and BIP_SESSION in the .env file.") | |
| return | |
| # Unsilenced print block to report resuming checking! | |
| if SESSION_EXPIRED: | |
| if xsrf == EXPIRED_XSRF and bip == EXPIRED_BIP: | |
| print(f"{time_str.lower()} - βΈοΈ Paused: Waiting for new cookies in .env to resume...") | |
| return | |
| else: | |
| print(f"{time_str.lower()} - π New cookies detected! Resuming checks...") | |
| SESSION_EXPIRED = False | |
| EXPIRED_XSRF = None | |
| EXPIRED_BIP = None | |
| new_events, err = check_new_events(LAST_EVENT_ID, xsrf, bip) | |
| if err: | |
| print(f"{time_str.lower()} - β Error scraping events: {err}") | |
| send_telegram_message( | |
| "β οΈ <b>Scraper Error!</b>\n\n" | |
| f"The notifier encountered an error and has paused checking. Error:\n" | |
| f"<code>{err}</code>\n\n" | |
| "Please reply directly to this bot with your new cookies in this format to resume:\n\n" | |
| "<code>XSRF_TOKEN=your_new_token_here\nBIP_SESSION=your_new_session_here</code>" | |
| ) | |
| SESSION_EXPIRED = True | |
| EXPIRED_XSRF = xsrf | |
| EXPIRED_BIP = bip | |
| return | |
| if new_events: | |
| # If LAST_EVENT_ID is None, it's the very first startup run. Set ID without alerting. | |
| if LAST_EVENT_ID is None: | |
| LAST_EVENT_ID = new_events[0]["id"] | |
| LAST_EVENT_CODE = new_events[0].get('event_code', LAST_EVENT_ID) | |
| print(f"{time_str.lower()} - EVENT ID : {LAST_EVENT_CODE} (Tracking started)") | |
| else: | |
| send_event_alerts(new_events) | |
| LAST_EVENT_ID = new_events[0]["id"] | |
| LAST_EVENT_CODE = new_events[0].get('event_code', LAST_EVENT_ID) | |
| for ev in new_events: | |
| code = ev.get('event_code', ev['id']) | |
| print(f"{time_str.lower()} - π¨ NEW EVENT ID : {code} (Alert Sent!)") | |
| else: | |
| # Just print the tracking status format exactly as requested on every 1-minute tick | |
| print(f"{time_str.lower()} - EVENT ID : {LAST_EVENT_CODE}") | |
| def list_all_events(): | |
| """Fetches the first page of events from BIP and prints them.""" | |
| print("Fetching recent events from BIP...") | |
| load_dotenv(override=True) | |
| xsrf = os.getenv("XSRF_TOKEN", "") | |
| bip = os.getenv("BIP_SESSION", "") | |
| data, err = fetch_bip_events(xsrf, bip, page=1) | |
| if err: | |
| print(f"β Error: {err}") | |
| return | |
| resources = data.get("resources", []) | |
| if not resources: | |
| print("No events found.") | |
| return | |
| print(f"\nFound {len(resources)} recent events:") | |
| print("-" * 60) | |
| for res in resources: | |
| ev = parse_event(res) | |
| print(f"[{ev.get('id')}] {ev.get('event_code')} - {ev.get('event_name')} ({ev.get('status')})") | |
| print("-" * 60) | |
| def get_latest_event(): | |
| """Fetches and prints only the single most recent event.""" | |
| print("Fetching the latest event...") | |
| load_dotenv(override=True) | |
| xsrf = os.getenv("XSRF_TOKEN", "") | |
| bip = os.getenv("BIP_SESSION", "") | |
| data, err = fetch_bip_events(xsrf, bip, page=1) | |
| if err: | |
| print(f"β Error: {err}") | |
| return | |
| resources = data.get("resources", []) | |
| if not resources: | |
| print("No events found.") | |
| return | |
| ev = parse_event(resources[0]) | |
| print("\nπ LATEST EVENT:") | |
| print("-" * 60) | |
| print(f"ID: {ev.get('id')}") | |
| print(f"Code: {ev.get('event_code')}") | |
| print(f"Name: {ev.get('event_name')}") | |
| print(f"Dates: {ev.get('start_date')} to {ev.get('end_date')}") | |
| print(f"Location: {ev.get('location')}") | |
| print(f"Status: {ev.get('status')}") | |
| print(f"Link: {ev.get('web_url')}") | |
| print("-" * 60) | |
| def test_telegram_alert(): | |
| """Sends a dummy test message to the configured Telegram chat.""" | |
| print("Sending test exact alert to Telegram...") | |
| success = send_telegram_message("π€ <b>Test Alert from BIP CLI Notifier</b>\n\nYour Telegram integration is working perfectly!") | |
| if success: | |
| print("β Test message sent successfully!") | |
| else: | |
| print("β Failed to send test message. Check your .env configuration.") | |
| def start_loop(): | |
| print("=" * 60) | |
| print("π BIP CLI Notifier Started") | |
| print("=" * 60) | |
| print("Press Ctrl+C to stop the notifier at any time.\n") | |
| try: | |
| while True: | |
| process_tick() | |
| # Polling loop: Wait in exact intervals without stacking HTTP request delays | |
| target_time = time.time() + CHECK_INTERVAL_SECONDS | |
| while time.time() < target_time: | |
| cookies_updated = check_telegram_messages() | |
| if cookies_updated: | |
| # User sent new cookies, immediately break to run process_tick right now! | |
| break | |
| # Sleep safely for whatever is left over (up to 5 secs max per inner loop) | |
| remaining = target_time - time.time() | |
| if remaining > 0: | |
| time.sleep(min(5, remaining)) | |
| except KeyboardInterrupt: | |
| print("\n\nπ Notifier manually stopped by user. Goodbye!") | |
| def run_dummy_server(): | |
| """ | |
| Hugging Face Spaces requires a web server listening on port 7860 to pass health checks. | |
| This tiny server does nothing but say "I am alive" in the background. | |
| """ | |
| class DummyHandler(BaseHTTPRequestHandler): | |
| def do_GET(self): | |
| self.send_response(200) | |
| self.send_header('Content-type', 'text/html') | |
| self.end_headers() | |
| self.wfile.write(b"BIP Notifier is running in the background!") | |
| def log_message(self, format, *args): | |
| pass # Keep terminal clean from HTTP logs | |
| server = HTTPServer(('0.0.0.0', 7860), DummyHandler) | |
| server.serve_forever() | |
| if __name__ == "__main__": | |
| print("Starting BIP CLI Notifier...") | |
| parser = argparse.ArgumentParser(description="BIP Cloud Notifier CLI") | |
| parser.add_argument("--list-all", action="store_true", help="List all recent events and exit") | |
| parser.add_argument("--latest", action="store_true", help="Print details of the latest event and exit") | |
| parser.add_argument("--test-alert", action="store_true", help="Send a test message to Telegram and exit") | |
| parser.add_argument("--run", action="store_true", help="Start the continuous 1-minute monitoring loop") | |
| print("Parsed arguments:", parser.parse_args()) | |
| args = parser.parse_args() | |
| if args.list_all: | |
| list_all_events() | |
| elif args.latest: | |
| get_latest_event() | |
| elif args.test_alert: | |
| test_telegram_alert() | |
| elif args.run: | |
| threading.Thread(target=run_dummy_server, daemon=True).start() | |
| start_loop() | |
| else: | |
| # If no arguments provided, default to starting the loop just like before | |
| threading.Thread(target=run_dummy_server, daemon=True).start() | |
| start_loop() |