import os
import time
import argparse
import requests
from datetime import datetime
from dotenv import load_dotenv
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer
# Load optional .env if present in same directory
load_dotenv()
# ==============================================================================
# CONFIGURATION
# ==============================================================================
# Environment values (Make sure to populate your .env file)
# We will reload these inside the loop so you can change them on the fly.
# Telegram settings (Optional, loaded from .env or hardcoded here)
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "")
TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID", "")
# App check interval in seconds (default 60 secs = 1 min)
CHECK_INTERVAL_SECONDS = 60
BIP_API = "https://bip.bitsathy.ac.in/nova-api/student-activity-masters"
HEADERS = {
"accept": "application/json",
"x-requested-with": "XMLHttpRequest",
}
# State tracking for the loop
LAST_EVENT_ID = None
LAST_EVENT_CODE = None
SESSION_EXPIRED = False
EXPIRED_XSRF = None
EXPIRED_BIP = None
# ==============================================================================
# TELEGRAM NOTIFIER
# ==============================================================================
TELEGRAM_OFFSET = None
def update_env_file(new_xsrf, new_bip):
env_path = ".env"
if not os.path.exists(env_path):
with open(env_path, "w") as f:
if new_xsrf: f.write(f'XSRF_TOKEN="{new_xsrf}"\n')
if new_bip: f.write(f'BIP_SESSION="{new_bip}"\n')
return
with open(env_path, "r") as f:
lines = f.readlines()
with open(env_path, "w") as f:
xsrf_updated = False
bip_updated = False
for line in lines:
if line.startswith("XSRF_TOKEN=") and new_xsrf is not None:
f.write(f'XSRF_TOKEN="{new_xsrf}"\n')
xsrf_updated = True
elif line.startswith("BIP_SESSION=") and new_bip is not None:
f.write(f'BIP_SESSION="{new_bip}"\n')
bip_updated = True
else:
f.write(line)
if new_xsrf is not None and not xsrf_updated:
f.write(f'XSRF_TOKEN="{new_xsrf}"\n')
if new_bip is not None and not bip_updated:
f.write(f'BIP_SESSION="{new_bip}"\n')
def check_telegram_messages():
global TELEGRAM_OFFSET, SESSION_EXPIRED, EXPIRED_XSRF, EXPIRED_BIP
if not TELEGRAM_BOT_TOKEN: return False
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/getUpdates"
params = {"timeout": 5}
if TELEGRAM_OFFSET:
params["offset"] = TELEGRAM_OFFSET
try:
r = requests.get(url, params=params, timeout=10)
data = r.json()
cookies_updated = False
if data.get("ok"):
for result in data.get("result", []):
TELEGRAM_OFFSET = result["update_id"] + 1
msg = result.get("message", {})
text = msg.get("text", "").strip()
chat_id = msg.get("chat", {}).get("id")
# Only process messages from our authorized user
if str(chat_id) == str(TELEGRAM_CHAT_ID):
new_xsrf = None
new_bip = None
for line in text.replace("\r", "\n").split("\n"):
line = line.strip()
if line.startswith("XSRF_TOKEN=") or line.startswith("XSRF-TOKEN="):
parts = line.split("=", 1)
if len(parts) > 1:
new_xsrf = parts[1].strip(' "\'')
elif line.startswith("BIP_SESSION=") or line.startswith("bip_session="):
parts = line.split("=", 1)
if len(parts) > 1:
new_bip = parts[1].strip(' "\'')
if new_xsrf or new_bip:
update_env_file(new_xsrf, new_bip)
send_telegram_message("β
Cookies Received!\n\nI have updated the .env file and will now resume checking the BIP portal.")
SESSION_EXPIRED = False
EXPIRED_XSRF = None
EXPIRED_BIP = None
cookies_updated = True
print(f"[{datetime.now().strftime('%I:%M:%S %p')}] π Cookies received via Telegram! Resuming checks...")
return cookies_updated
except Exception as e:
return False
def send_telegram_message(text: str):
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
print("\n[!] Telegram not configured. The following alert would have been sent:\n")
print(text)
print("-" * 50)
return False
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
payload = {
"chat_id": TELEGRAM_CHAT_ID,
"text": text,
"parse_mode": "HTML",
"disable_web_page_preview": True
}
try:
r = requests.post(url, json=payload, timeout=10)
r.raise_for_status()
return True
except Exception as e:
print(f"[!] Failed to send telegram message: {e}")
return False
def send_event_alerts(events):
if not events:
return
msg = f"π’ {len(events)} New BIP Event Found!\n\n"
for i, ev in enumerate(events, 1):
# Format start date
start = ev.get('start_date', '')
if start:
start = datetime.strptime(start, "%Y-%m-%d").strftime("%d-%m-%Y")
# Format end date
end = ev.get('end_date', '')
if end:
end = datetime.strptime(end, "%Y-%m-%d").strftime("%d-%m-%Y")
msg += (
f"{ev.get('event_code', '-')} - "
f"{ev.get('event_name', 'Unknown')}\n\n"
f"{ev.get('event_category', '-')}\n\n"
f"{start} to {end}\n\n"
f"{ev.get('location', '-')}\n\n"
f"{ev.get('status', '-')}\n\n"
f"Seats: Max {ev.get('maximum_count', '-')} | "
f"Applied {ev.get('applied_count', '-')}\n"
f"View Event Here\n"
f"ββββββββββββββββ\n"
)
send_telegram_message(msg)
# ==============================================================================
# SCRAPER LOGIC
# ==============================================================================
def fetch_bip_events(xsrf_token, bip_session, page=1):
cookies = {
"XSRF-TOKEN": xsrf_token,
"bip_session": bip_session
}
params = {"perPage": 10, "page": page}
try:
r = requests.get(BIP_API, params=params, headers=HEADERS, cookies=cookies, timeout=20)
# Check if session expired based on HTML redirect instead of JSON
if "text/html" in r.headers.get("Content-Type", "") or r.status_code == 401 or r.status_code == 403:
return None, "Session expired or invalid cookies."
r.raise_for_status()
return r.json(), None
except Exception as e:
return None, str(e)
def parse_event(resource):
data = {}
for f in resource.get("fields", []):
key = f.get("attribute")
val = f.get("value")
if key:
data[key] = val
data["id"] = resource["id"]["value"]
return data
def check_new_events(last_id, xsrf_token, bip_session):
"""Fetches events and returns any newer than last_id. Automatically paginates if needed."""
new_events = []
page = 1
while page <= 10: # Limit to 10 pages for safety
data, err = fetch_bip_events(xsrf_token, bip_session, page)
if err or not data:
return None, err
resources = data.get("resources", [])
if not resources:
break
for res in resources:
ev = parse_event(res)
# Stop if we reach the last known new event
if last_id and str(ev["id"]) == str(last_id):
return new_events, None
new_events.append(ev)
# First-ever run scenario: just return the latest event to set the initial ID and avoid sending 100s of alerts
if not last_id and new_events:
return [new_events[0]], None
page += 1
return new_events, None
# ==============================================================================
# SCHEDULER ENGINE
# ==============================================================================
def process_tick():
global LAST_EVENT_ID, LAST_EVENT_CODE, SESSION_EXPIRED, EXPIRED_XSRF, EXPIRED_BIP
# Reload environment variables on every tick so user can update .env without restarting
load_dotenv(override=True)
xsrf = os.getenv("XSRF_TOKEN", "")
bip = os.getenv("BIP_SESSION", "")
now = datetime.now()
time_str = now.strftime('%I:%M:%S %p')
# Optional logic: only run between 8AM and 6PM
if now.hour < 8 or now.hour >= 18:
print(f"[{time_str}] π Out of hours (8 AM - 6 PM). Skipping check.")
return
if not xsrf or not bip:
print(f"[{time_str}] β³ Skipping check: Please configure XSRF_TOKEN and BIP_SESSION in the .env file.")
return
# Unsilenced print block to report resuming checking!
if SESSION_EXPIRED:
if xsrf == EXPIRED_XSRF and bip == EXPIRED_BIP:
print(f"{time_str.lower()} - βΈοΈ Paused: Waiting for new cookies in .env to resume...")
return
else:
print(f"{time_str.lower()} - π New cookies detected! Resuming checks...")
SESSION_EXPIRED = False
EXPIRED_XSRF = None
EXPIRED_BIP = None
new_events, err = check_new_events(LAST_EVENT_ID, xsrf, bip)
if err:
print(f"{time_str.lower()} - β Error scraping events: {err}")
send_telegram_message(
"β οΈ Scraper Error!\n\n"
f"The notifier encountered an error and has paused checking. Error:\n"
f"{err}\n\n"
"Please reply directly to this bot with your new cookies in this format to resume:\n\n"
"XSRF_TOKEN=your_new_token_here\nBIP_SESSION=your_new_session_here"
)
SESSION_EXPIRED = True
EXPIRED_XSRF = xsrf
EXPIRED_BIP = bip
return
if new_events:
# If LAST_EVENT_ID is None, it's the very first startup run. Set ID without alerting.
if LAST_EVENT_ID is None:
LAST_EVENT_ID = new_events[0]["id"]
LAST_EVENT_CODE = new_events[0].get('event_code', LAST_EVENT_ID)
print(f"{time_str.lower()} - EVENT ID : {LAST_EVENT_CODE} (Tracking started)")
else:
send_event_alerts(new_events)
LAST_EVENT_ID = new_events[0]["id"]
LAST_EVENT_CODE = new_events[0].get('event_code', LAST_EVENT_ID)
for ev in new_events:
code = ev.get('event_code', ev['id'])
print(f"{time_str.lower()} - π¨ NEW EVENT ID : {code} (Alert Sent!)")
else:
# Just print the tracking status format exactly as requested on every 1-minute tick
print(f"{time_str.lower()} - EVENT ID : {LAST_EVENT_CODE}")
def list_all_events():
"""Fetches the first page of events from BIP and prints them."""
print("Fetching recent events from BIP...")
load_dotenv(override=True)
xsrf = os.getenv("XSRF_TOKEN", "")
bip = os.getenv("BIP_SESSION", "")
data, err = fetch_bip_events(xsrf, bip, page=1)
if err:
print(f"β Error: {err}")
return
resources = data.get("resources", [])
if not resources:
print("No events found.")
return
print(f"\nFound {len(resources)} recent events:")
print("-" * 60)
for res in resources:
ev = parse_event(res)
print(f"[{ev.get('id')}] {ev.get('event_code')} - {ev.get('event_name')} ({ev.get('status')})")
print("-" * 60)
def get_latest_event():
"""Fetches and prints only the single most recent event."""
print("Fetching the latest event...")
load_dotenv(override=True)
xsrf = os.getenv("XSRF_TOKEN", "")
bip = os.getenv("BIP_SESSION", "")
data, err = fetch_bip_events(xsrf, bip, page=1)
if err:
print(f"β Error: {err}")
return
resources = data.get("resources", [])
if not resources:
print("No events found.")
return
ev = parse_event(resources[0])
print("\nπ LATEST EVENT:")
print("-" * 60)
print(f"ID: {ev.get('id')}")
print(f"Code: {ev.get('event_code')}")
print(f"Name: {ev.get('event_name')}")
print(f"Dates: {ev.get('start_date')} to {ev.get('end_date')}")
print(f"Location: {ev.get('location')}")
print(f"Status: {ev.get('status')}")
print(f"Link: {ev.get('web_url')}")
print("-" * 60)
def test_telegram_alert():
"""Sends a dummy test message to the configured Telegram chat."""
print("Sending test exact alert to Telegram...")
success = send_telegram_message("π€ Test Alert from BIP CLI Notifier\n\nYour Telegram integration is working perfectly!")
if success:
print("β
Test message sent successfully!")
else:
print("β Failed to send test message. Check your .env configuration.")
def start_loop():
print("=" * 60)
print("π BIP CLI Notifier Started")
print("=" * 60)
print("Press Ctrl+C to stop the notifier at any time.\n")
try:
while True:
process_tick()
# Polling loop: Wait in exact intervals without stacking HTTP request delays
target_time = time.time() + CHECK_INTERVAL_SECONDS
while time.time() < target_time:
cookies_updated = check_telegram_messages()
if cookies_updated:
# User sent new cookies, immediately break to run process_tick right now!
break
# Sleep safely for whatever is left over (up to 5 secs max per inner loop)
remaining = target_time - time.time()
if remaining > 0:
time.sleep(min(5, remaining))
except KeyboardInterrupt:
print("\n\nπ Notifier manually stopped by user. Goodbye!")
def run_dummy_server():
"""
Hugging Face Spaces requires a web server listening on port 7860 to pass health checks.
This tiny server does nothing but say "I am alive" in the background.
"""
class DummyHandler(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(b"BIP Notifier is running in the background!")
def log_message(self, format, *args):
pass # Keep terminal clean from HTTP logs
server = HTTPServer(('0.0.0.0', 7860), DummyHandler)
server.serve_forever()
if __name__ == "__main__":
print("Starting BIP CLI Notifier...")
parser = argparse.ArgumentParser(description="BIP Cloud Notifier CLI")
parser.add_argument("--list-all", action="store_true", help="List all recent events and exit")
parser.add_argument("--latest", action="store_true", help="Print details of the latest event and exit")
parser.add_argument("--test-alert", action="store_true", help="Send a test message to Telegram and exit")
parser.add_argument("--run", action="store_true", help="Start the continuous 1-minute monitoring loop")
print("Parsed arguments:", parser.parse_args())
args = parser.parse_args()
if args.list_all:
list_all_events()
elif args.latest:
get_latest_event()
elif args.test_alert:
test_telegram_alert()
elif args.run:
threading.Thread(target=run_dummy_server, daemon=True).start()
start_loop()
else:
# If no arguments provided, default to starting the loop just like before
threading.Thread(target=run_dummy_server, daemon=True).start()
start_loop()