import os
import uuid
import json
import pickle
import base64
import threading
import time
import schedule
import psycopg2
import requests
import gradio as gr
from dotenv import load_dotenv
from email.mime.text import MIMEText
from datetime import datetime
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
# ================= ENV =================
load_dotenv()
DB_URL = os.getenv("DB_URL")
GMAIL_USER = os.getenv("GMAIL_USER")
HF_URL = os.getenv("HF_URL")
ADMIN_SECRET = os.getenv("ADMIN_SECRET")
XSRF_TOKEN = os.getenv("XSRF_TOKEN")
BIP_SESSION = os.getenv("BIP_SESSION")
BIP_API = "https://bip.bitsathy.ac.in/nova-api/student-activity-masters"
TOKEN_FILE = "token.pkl"
# COOKIE_FILE = "bip_cookies.json"
STATE_FILE = "state.json"
NEW_EVENTS_FILE = "new_events.json"
PAGE1_LOG_FILE = "page1_logs.json"
SCOPES = ["https://www.googleapis.com/auth/gmail.send"]
MAX_PAGES = 50
REQUEST_TIMEOUT = 20
# ================= DB =================
def get_db():
try:
return psycopg2.connect(DB_URL, sslmode="require")
except Exception as e:
print(f"ā DB Connection error: {e}")
raise
# ================= COOKIES =================
# ================= COOKIES =================
def load_cookies():
"""Load cookies from environment variables only"""
if not XSRF_TOKEN or not BIP_SESSION:
raise Exception("XSRF_TOKEN and BIP_SESSION must be set in environment variables")
# Clean the values - remove any cookie header prefixes
xsrf_value = XSRF_TOKEN.strip()
session_value = BIP_SESSION.strip()
# Remove "XSRF-TOKEN=" prefix if present
if xsrf_value.startswith("XSRF-TOKEN="):
xsrf_value = xsrf_value[12:] # Remove "XSRF-TOKEN="
# Remove "bip_session=" prefix if present
if session_value.startswith("bip_session="):
session_value = session_value[12:] # Remove "bip_session="
# Remove any trailing cookie parts (like "; other_cookie=...")
if ";" in xsrf_value:
xsrf_value = xsrf_value.split(";")[0]
if ";" in session_value:
session_value = session_value.split(";")[0]
return {
"XSRF-TOKEN": xsrf_value,
"bip_session": session_value
}
# ================= GMAIL =================
def create_token():
if not os.path.exists("credentials.json"):
raise Exception("credentials.json missing")
flow = InstalledAppFlow.from_client_secrets_file(
"credentials.json",
SCOPES
)
creds = flow.run_local_server(port=0)
with open(TOKEN_FILE, "wb") as f:
pickle.dump(creds, f)
return creds
def get_gmail():
creds = None
if os.path.exists(TOKEN_FILE):
with open(TOKEN_FILE, "rb") as f:
creds = pickle.load(f)
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
with open(TOKEN_FILE, "wb") as f:
pickle.dump(creds, f)
if not creds:
creds = create_token()
return build("gmail", "v1", credentials=creds)
def send_email(to, subject, html):
try:
service = get_gmail()
msg = MIMEText(html, "html")
msg["To"] = to
msg["From"] = GMAIL_USER
msg["Subject"] = subject
raw = base64.urlsafe_b64encode(msg.as_bytes()).decode()
service.users().messages().send(
userId="me",
body={"raw": raw}
).execute()
print("ā
Sent:", to)
return True
except Exception as e:
print("ā Mail error:", e)
return False
# ================= EMAIL =================
def create_event_email(event, unsub):
return f"""
š¢ New BIP Event
{event['event_name']}
- Code: {event['event_code']}
- Organizer: {event['organizer']}
- Date: {event['start_date']}
- Location: {event['location']}
View
Unsubscribe
"""
# ================= BIP API =================
HEADERS = {
"accept": "application/json",
"x-requested-with": "XMLHttpRequest",
}
BASE_PARAMS = {
"perPage": 10
}
def fetch_page(page):
"""Fetch a specific page from BIP API"""
cookies = load_cookies()
params = BASE_PARAMS.copy()
params["page"] = page
r = requests.get(
BIP_API,
params=params,
headers=HEADERS,
cookies=cookies,
timeout=REQUEST_TIMEOUT
)
# Expired session
if "text/html" in r.headers.get("Content-Type", ""):
raise Exception("Session expired. Login again.")
r.raise_for_status()
return r.json()
def fetch_latest():
"""Legacy function - now uses fetch_page"""
return fetch_page(1)["resources"]
def parse_event(resource):
"""Parse event resource into clean data structure"""
data = {}
for f in resource.get("fields", []):
key = f.get("attribute")
val = f.get("value")
if key:
data[key] = val
data["id"] = resource["id"]["value"]
data["title"] = resource.get("title")
return data
def fetch_new_events(old_id):
"""Fetch all new events since the last known ID"""
page = 1
new_events = []
while page <= MAX_PAGES:
print(f"š Fetching page {page}...")
try:
data = fetch_page(page)
resources = data.get("resources", [])
except Exception as e:
print(f"ā Error fetching page {page}: {e}")
break
if not resources:
break
for res in resources:
ev = parse_event(res)
if ev["id"] == old_id:
return new_events
new_events.append(ev)
page += 1
return new_events
# ================= STATE MANAGEMENT =================
def load_state():
"""Load the last processed event state"""
try:
with open(STATE_FILE, "r") as f:
return json.load(f)
except:
return None
def save_state(latest_id):
"""Save the latest processed event ID"""
state = {
"latest_id": latest_id,
"last_updated": datetime.now().isoformat()
}
with open(STATE_FILE, "w") as f:
json.dump(state, f, indent=2)
def save_new_events(events):
"""Save new events to file"""
data = {
"timestamp": datetime.now().isoformat(),
"count": len(events),
"events": events
}
with open(NEW_EVENTS_FILE, "w") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
# ================= PAGE 1 LOGGER =================
def load_page1_logs():
"""Load page 1 historical logs"""
try:
with open(PAGE1_LOG_FILE, "r") as f:
return json.load(f)
except:
return []
def save_page1_logs(logs):
"""Save page 1 logs to file"""
with open(PAGE1_LOG_FILE, "w") as f:
json.dump(logs, f, indent=2, ensure_ascii=False)
def log_page1_to_file():
"""Save page 1 snapshot to file"""
try:
data = fetch_page(1)
resources = data.get("resources", [])
events = []
for res in resources:
events.append(parse_event(res))
logs = load_page1_logs()
entry = {
"timestamp": datetime.now().isoformat(),
"count": len(events),
"events": events
}
logs.append(entry)
# Keep only last 100 entries to prevent file from growing too large
if len(logs) > 100:
logs = logs[-100:]
save_page1_logs(logs)
print(f"š Page-1 snapshot saved ({len(events)} events)")
return True
except Exception as e:
print(f"ā Failed to log page 1: {e}")
return False
# ================= SUBSCRIBE =================
def subscribe(email):
if not email or not email.endswith("@bitsathy.ac.in"):
return "ā Use college email only"
try:
db = get_db()
cur = db.cursor()
# Check if user already exists and is verified
cur.execute("""
SELECT email_verified, unsubscribed
FROM users
WHERE email=%s
""", (email,))
existing_user = cur.fetchone()
if existing_user:
verified, unsubscribed = existing_user
if verified and not unsubscribed:
cur.close()
db.close()
return "ā
You're already subscribed to BIP alerts"
elif verified and unsubscribed:
# Re-subscribe previously unsubscribed user
cur.execute("""
UPDATE users
SET unsubscribed=false
WHERE email=%s
""", (email,))
db.commit()
cur.close()
db.close()
return "ā
Successfully re-subscribed to BIP alerts"
elif not verified:
# User exists but not verified - generate new token and resend
token = uuid.uuid4().hex
cur.execute("""
UPDATE users
SET verification_token=%s
WHERE email=%s
""", (token, email))
db.commit()
link = f"{HF_URL}?verify={token}"
if send_email(
email,
"Verify BIP Alerts",
f"Click here to verify"
):
result = "š© New verification email sent (previous one expired)"
else:
result = "ā Failed to send verification email"
cur.close()
db.close()
return result
# New user - create account and send verification
token = uuid.uuid4().hex
cur.execute("""
INSERT INTO users(email, verification_token)
VALUES(%s, %s)
""", (email, token))
db.commit()
link = f"{HF_URL}?verify={token}"
if send_email(
email,
"Verify BIP Alerts",
f"Click here to verify"
):
result = "š© Verification sent to your email"
else:
result = "ā Failed to send verification email"
cur.close()
db.close()
return result
except Exception as e:
print(f"ā Subscribe error: {e}")
return f"ā Error: {str(e)}"
# ================= VERIFY =================
def verify_user(token):
if not token:
return ""
try:
db = get_db()
cur = db.cursor()
cur.execute("""
UPDATE users
SET email_verified=true,
verification_token=NULL,
unsubscribed=false
WHERE verification_token=%s
""", (token,))
if cur.rowcount > 0:
result = "ā
Email verified successfully!"
else:
result = "ā Invalid verification token"
db.commit()
cur.close()
db.close()
return result
except Exception as e:
print(f"ā Verify error: {e}")
return f"ā Verification failed: {str(e)}"
# ================= UNSUBSCRIBE =================
def unsubscribe_user(token):
if not token:
return ""
try:
db = get_db()
cur = db.cursor()
cur.execute("""
UPDATE users
SET unsubscribed=true
WHERE verification_token=%s OR email IN (
SELECT email FROM users WHERE verification_token=%s
)
""", (token, token))
if cur.rowcount > 0:
result = "ā
Successfully unsubscribed from BIP alerts"
else:
result = "ā Invalid unsubscribe link"
db.commit()
cur.close()
db.close()
return result
except Exception as e:
print(f"ā Unsubscribe error: {e}")
return f"ā Unsubscribe failed: {str(e)}"
# ================= EMAIL STATUS =================
def check_email_status(email):
if not email or not email.endswith("@bitsathy.ac.in"):
return "ā Please enter a valid college email"
try:
db = get_db()
cur = db.cursor()
cur.execute("""
SELECT email_verified, unsubscribed, verification_token
FROM users
WHERE email=%s
""", (email,))
result = cur.fetchone()
cur.close()
db.close()
if not result:
return "š§ Email not found. Click Subscribe to register."
verified, unsubscribed, token = result
if unsubscribed:
return "š« Email is unsubscribed from alerts"
elif verified:
return "ā
Email is verified and subscribed to alerts"
else:
return "ā³ Email registered but not verified. Check your inbox."
except Exception as e:
print(f"ā Status check error: {e}")
return f"ā Database error: {str(e)}"
# ================= ENHANCED NOTIFIER =================
def check_events():
"""Enhanced event checker with state tracking"""
print("\nš Checking for new events...")
try:
# Save page 1 snapshot
log_page1_to_file()
state = load_state()
# First run
if not state:
print("š First run detected")
data = fetch_page(1)
resources = data.get("resources", [])
if not resources:
print("ā ļø No events found")
return "ā ļø No events found on first run"
first_event = parse_event(resources[0])
save_state(first_event["id"])
print("š Saved initial ID:", first_event["id"])
return f"š Initialized with event ID: {first_event['id']}"
old_id = state["latest_id"]
print("š Last known ID:", old_id)
new_events = fetch_new_events(old_id)
if not new_events:
print("ā³ No new events added")
return "ā³ No new events found"
latest = new_events[0]
print(f"\nš„ {len(new_events)} NEW EVENT(S) ADDED!")
print("š¢ Latest Event:", latest.get("event_name", "Unknown"))
# Save new events to file
save_new_events(new_events)
# Update state
save_state(latest["id"])
# Send notifications to users
return send_notifications_for_events(new_events)
except Exception as e:
error_msg = f"ā Event check error: {e}"
print(error_msg)
return error_msg
def send_notifications_for_events(new_events):
"""Send one combined email for all new events"""
if not new_events:
return "š No events to notify"
try:
db = get_db()
cur = db.cursor()
cur.execute("""
SELECT id,email,verification_token
FROM users
WHERE email_verified=true
AND unsubscribed=false
""")
users = cur.fetchall()
if not users:
cur.close()
db.close()
return "š No verified users"
# Build combined HTML
events_html = ""
for i, event in enumerate(new_events, 1):
events_html += f"""
{i}. {event['event_name']}
- Code: {event['event_code']}
- Event Name: {event['event_name']}
- Organizer: {event['organizer']}
- Date: {event['start_date']}
- Category: {event['event_category']}
- Location: {event['location']}
- View: Link
"""
subject = f"š¢ {len(new_events)} New BIP Events Added"
total_sent = 0
for uid, mail, token in users:
html = f"""
š¢ New BIP Events Alert
{len(new_events)} new events have been added:
{events_html}
Unsubscribe
"""
if send_email(mail, subject, html):
total_sent += 1
cur.close()
db.close()
return f"ā
Sent {total_sent} combined notifications"
except Exception as e:
print("ā Notification error:", e)
return f"ā Error: {e}"
def run_notifier():
"""Legacy function - now uses enhanced check_events"""
return check_events()
# ================= SCHEDULER =================
def scheduler_worker():
print("ā° Scheduler started")
# Initial run
try:
result = check_events()
print(f"Initial check: {result}")
except Exception as e:
print(f"ā Initial check failed: {e}")
# Schedule every 3 minutes
schedule.every(3).minutes.do(check_events)
while True:
try:
schedule.run_pending()
except Exception as e:
print(f"ā Scheduler error: {e}")
time.sleep(30)
# ================= ROUTER =================
def route_handler(req: gr.Request):
try:
params = req.query_params
if "verify" in params:
return verify_user(params["verify"])
if "unsubscribe" in params:
return unsubscribe_user(params["unsubscribe"])
return ""
except Exception as e:
print(f"ā Router error: {e}")
return f"ā Error: {str(e)}"
# ================= UI =================
with gr.Blocks(title="BIP Notifier", theme=gr.themes.Soft()) as app:
gr.Markdown("# š¢ BIP Event Email Alerts")
gr.Markdown("*Auto-checking every 3 minutes for new events*")
# ---------- Main Section ----------
with gr.Row():
with gr.Column():
email = gr.Textbox(
label="College Email (@bitsathy.ac.in)",
placeholder="your.name@bitsathy.ac.in"
)
with gr.Row():
sub_btn = gr.Button("Subscribe", variant="primary")
status_btn = gr.Button("Check Status", variant="secondary")
out = gr.Textbox(label="Status", interactive=False, lines=3)
sub_btn.click(subscribe, email, out)
status_btn.click(check_email_status, email, out)
# ---------- System Status ----------
with gr.Accordion("š System Status", open=False):
def get_system_status():
try:
state = load_state()
page1_logs = load_page1_logs()
status = "### Current State:\n"
if state:
status += f"- **Last Event ID**: {state.get('latest_id', 'N/A')}\n"
status += f"- **Last Updated**: {state.get('last_updated', 'N/A')}\n"
else:
status += "- **Status**: Not initialized\n"
status += f"- **Page 1 Logs**: {len(page1_logs)} entries\n"
if os.path.exists(NEW_EVENTS_FILE):
with open(NEW_EVENTS_FILE, 'r') as f:
new_events = json.load(f)
status += f"- **Last New Events**: {new_events.get('count', 0)} events\n"
else:
status += "- **Last New Events**: None\n"
return status
except Exception as e:
return f"ā Error loading status: {e}"
status_btn = gr.Button("Get System Status")
status_out = gr.Textbox(label="System Status", lines=5, interactive=False)
status_btn.click(get_system_status, None, status_out)
# ---------- Instructions ----------
with gr.Accordion("š Instructions", open=False):
gr.Markdown("""
### How to use:
1. Enter your college email (@bitsathy.ac.in)
2. Click **Subscribe**
3. Check your email and click the verification link
4. You'll receive alerts for new BIP events automatically
### Admin Setup:
1. Login to BIP portal in browser
2. Open Developer Tools (F12) ā Network tab
3. Make any request and copy XSRF-TOKEN and bip_session cookies
4. Paste them in Admin Controls above
### Files Created:
- `state.json`: Tracks last processed event
- `page1_logs.json`: Historical snapshots of page 1
- `new_events.json`: Latest batch of new events found
""")
# ---------- URL Handler ----------
app.load(route_handler, None, out)
# ================= MAIN =================
if __name__ == "__main__":
print("š BIP Event Notifier (Enhanced with File Tracking)")
print("=" * 50)
# Start scheduler in background
scheduler_thread = threading.Thread(
target=scheduler_worker,
daemon=True
)
scheduler_thread.start()
# Start Gradio app
app.launch(
share=False # Set
)