"""
subject = "New Event Added"
if len(events) == 1:
event_name = events[0].get("event_name")
if event_name:
subject = f"New Event: {event_name}"
send_email_message(subject, msg, is_html=True, recipient=EVENT_EMAIL_RECIPIENT)
# ==============================================================================
# SCRAPER LOGIC
# ==============================================================================
def fetch_bip_events(xsrf_token, bip_session, page=1):
global SESSION_EXPIRED
logger.debug(f"--> fetch_bip_events(page={page})")
cookies = {
"XSRF-TOKEN": xsrf_token,
"bip_session": bip_session
}
params = {"perPage": 10, "page": page}
# Task 3: Exponential Backoff Retry Logic
max_retries = 3
for attempt in range(max_retries):
try:
# Task 2: Use global requests.Session instead of bare requests.get
r = SESSION.get(BIP_API, params=params, cookies=cookies, timeout=20)
# Check for session expiration
if "text/html" in r.headers.get("Content-Type", "") or r.status_code in [401, 403]:
logger.warning(f"Session expired detected! Content-type: {r.headers.get('Content-Type')}, Status: {r.status_code}")
SESSION_EXPIRED = True
return None, "Session expired or invalid cookies."
else:
SESSION_EXPIRED = False
r.raise_for_status()
# Task 7: Response format validation
data = r.json()
if not isinstance(data, dict) or "resources" not in data or not isinstance(data["resources"], list):
if attempt < max_retries - 1:
raise Exception("Invalid JSON structure received from BIP API: missing or malformed 'resources' list")
return None, "Invalid JSON structure received from BIP API."
return data, None
except Exception as e:
if attempt < max_retries - 1:
sleep_time = (2 ** attempt) + random.uniform(0.1, 1.0)
logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {sleep_time:.2f}s...")
time.sleep(sleep_time)
else:
logger.error(f"Network/Request EXCEPTION in fetch_bip_events after {max_retries} attempts: {e}")
return None, str(e)
return None, "Max retries exceeded."
def parse_event(resource):
data = {}
for f in resource.get("fields", []):
key = f.get("attribute")
val = f.get("value")
if key:
data[key] = val
data["id"] = resource["id"]["value"]
return data
def check_new_events(last_id, xsrf_token, bip_session):
"""Fetches events and returns any newer than last_id. Automatically paginates if needed."""
new_events = []
page = 1
while page <= 10: # Limit to 10 pages for safety
data, err = fetch_bip_events(xsrf_token, bip_session, page)
if err or not data:
return None, err
resources = data.get("resources", [])
if not resources:
break
for res in resources:
ev = parse_event(res)
# Stop if we reach the last known new event
if last_id and str(ev["id"]) == str(last_id):
return new_events, None
new_events.append(ev)
# First-ever run scenario: just return the latest event to set the initial ID and avoid sending 100s of alerts
if not last_id and new_events:
return [new_events[0]], None
page += 1
return new_events, None
# ==============================================================================
# SCHEDULER ENGINE
# ==============================================================================
def get_current_interval():
"""Returns the polling interval in seconds, or 0 if in inactive hours."""
global FAST_MODE_UNTIL
fast_interval = int(os.getenv("FAST_INTERVAL", "30"))
normal_interval = int(os.getenv("NORMAL_INTERVAL", "120"))
now_ist = datetime.now(IST)
current_hour = now_ist.hour
if 8 <= current_hour < 17:
if time.time() < FAST_MODE_UNTIL:
return fast_interval
else:
return normal_interval
return 0
def process_tick():
global LAST_EVENT_ID, LAST_EVENT_CODE, FAST_MODE_UNTIL
logger.debug("--- process_tick starting ---")
try:
# Reload environment variables on every tick
load_dotenv(override=True)
xsrf = os.getenv("XSRF_TOKEN", "")
bip = os.getenv("BIP_SESSION", "")
if not xsrf or not bip:
logger.warning("Skipping check: Please configure XSRF_TOKEN and BIP_SESSION in the deployment environment.")
send_email_message(
"ā ļø BIP Scraper Error",
"ā ļø Deployment Configuration Error!
"
"The application was started without the required XSRF_TOKEN or BIP_SESSION secrets.
"
"Please configure these variables in your deployment settings to begin tracking.",
is_html=True
)
raise SystemExit(1)
# Task 1: Load state if we just started
if LAST_EVENT_ID is None:
load_state()
new_events, err = check_new_events(LAST_EVENT_ID, xsrf, bip)
if err:
logger.error(f"Error scraping events: {err}")
send_email_message(
"ā ļø BIP Scraper Error",
"ā ļø Scraper Error!
"
f"The notifier encountered an error and has paused checking. Error: "
f"{err}
"
"Please update the `XSRF_TOKEN` and `BIP_SESSION` variables in your Secret/Env configuration and restart the Space.",
is_html=True
)
logger.error("Notifier is shutting down completely because of the scraping error.")
raise SystemExit(1)
if new_events:
# If LAST_EVENT_ID is None, it's the very first startup run. Set ID without alerting.
if LAST_EVENT_ID is None:
LAST_EVENT_ID = new_events[0]["id"]
LAST_EVENT_CODE = new_events[0].get('event_code', LAST_EVENT_ID)
save_state(LAST_EVENT_ID, LAST_EVENT_CODE)
logger.info(f"EVENT ID : {LAST_EVENT_CODE} (Tracking started)")
# Send the startup notification
send_email_message(
"š BIP Notifier is Online!",
f"You are receiving this because the BIP Auto Notifier script has successfully started tracking on the cloud.
"
f"Current Active Event: {LAST_EVENT_CODE} "
f"The script is now monitoring in the background. You will receive alerts for any newer events.",
is_html=True
)
else:
fast_duration = int(os.getenv("FAST_DURATION", "120"))
FAST_MODE_UNTIL = time.time() + fast_duration
send_event_alerts(new_events)
LAST_EVENT_ID = new_events[0]["id"]
LAST_EVENT_CODE = new_events[0].get('event_code', LAST_EVENT_ID)
save_state(LAST_EVENT_ID, LAST_EVENT_CODE)
for ev in new_events:
code = ev.get('event_code', ev['id'])
logger.info(f"šØ NEW EVENT ID : {code} (Alert Sent!)")
else:
display_code = LAST_EVENT_CODE if LAST_EVENT_CODE else LAST_EVENT_ID
logger.info(f"EVENT ID : {display_code}")
except Exception as e:
logger.error(f"CRITICAL EXCEPTION in process_tick: {e}")
error_details = traceback.format_exc()
logger.error(error_details)
send_email_message(
"šØ CRITICAL: BIP Notifier Tick Crashed",
f"The notifier encountered an unexpected exception during process_tick data parsing.
"
f"Error Traceback:
{error_details}
"
f"The application has successfully caught this error, and is safely shutting down to prevent mail loops.",
is_html=True
)
raise SystemExit(1)
def list_all_events():
"""Fetches the first page of events from BIP and prints them."""
logger.info("Fetching recent events from BIP...")
load_dotenv(override=True)
xsrf = os.getenv("XSRF_TOKEN", "")
bip = os.getenv("BIP_SESSION", "")
data, err = fetch_bip_events(xsrf, bip, page=1)
if err:
logger.error(f"Error: {err}")
return
resources = data.get("resources", [])
if not resources:
logger.info("No events found.")
return
logger.info(f"\nFound {len(resources)} recent events:")
print("-" * 60)
for res in resources:
ev = parse_event(res)
print(f"[{ev.get('id')}] {ev.get('event_code')} - {ev.get('event_name')} ({ev.get('status')})")
print("-" * 60)
def get_latest_event():
"""Fetches and prints only the single most recent event."""
logger.info("Fetching the latest event...")
load_dotenv(override=True)
xsrf = os.getenv("XSRF_TOKEN", "")
bip = os.getenv("BIP_SESSION", "")
data, err = fetch_bip_events(xsrf, bip, page=1)
if err:
logger.error(f"Error: {err}")
return
resources = data.get("resources", [])
if not resources:
logger.info("No events found.")
return
ev = parse_event(resources[0])
print("\nš LATEST EVENT:")
print("-" * 60)
print(f"ID: {ev.get('id')}")
print(f"Code: {ev.get('event_code')}")
print(f"Name: {ev.get('event_name')}")
print(f"Dates: {ev.get('start_date')} to {ev.get('end_date')}")
print(f"Location: {ev.get('location')}")
print(f"Status: {ev.get('status')}")
print(f"Link: {ev.get('web_url')}")
print("-" * 60)
def test_email_alert():
"""Sends a dummy test message to the configured Email."""
logger.info("Sending test exact alert to Event Email...")
success1 = send_email_message("š¤ Test Alert - Event Receivers", "š¤ Test Alert from BIP CLI Notifier
Event emails are working perfectly!", is_html=True, recipient=EVENT_EMAIL_RECIPIENT)
logger.info("Sending test exact alert to Warning Email...")
success2 = send_email_message("š¤ Test Alert - Warning Receivers", "š¤ Test Alert from BIP CLI Notifier
Warning emails are working perfectly!", is_html=True, recipient=WARNING_EMAIL_RECIPIENT)
if success1 and success2:
logger.info("ā Test messages sent successfully!")
else:
logger.error("ā Failed to send one or more test messages. Check your .env configuration.")
def test_real_event_alert():
"""Fetches the actual latest event from BIP and sends it as a test alert."""
logger.info("Fetching the real latest event to send as a test alert...")
load_dotenv(override=True)
xsrf = os.getenv("XSRF_TOKEN", "")
bip = os.getenv("BIP_SESSION", "")
data, err = fetch_bip_events(xsrf, bip, page=1)
if err:
logger.error(f"Error fetching real event: {err}")
return
resources = data.get("resources", [])
if not resources:
logger.warning("No events found to test with.")
return
ev = parse_event(resources[0])
logger.info(f"Triggering send_event_alerts with real event data: {ev.get('event_code')}")
send_event_alerts([ev])
logger.info("ā Real latest event alert sent successfully!")
def start_loop():
global FAST_MODE_UNTIL
logger.info("š BIP CLI Notifier Started")
sleep_notified = False
try:
while True:
check_interval = get_current_interval()
if check_interval > 0:
fast_interval = int(os.getenv("FAST_INTERVAL", "30"))
if check_interval == fast_interval:
logger.info(f"ā” Fast Mode Active ({fast_interval}s interval)")
sleep_notified = False
else:
# Sleep until next 8 AM
now_ist = datetime.now(IST)
current_hour = now_ist.hour
next_8am = now_ist.replace(hour=8, minute=0, second=0, microsecond=0)
if current_hour >= 17:
next_8am += timedelta(days=1)
sleep_seconds = (next_8am - now_ist).total_seconds()
sleep_hours = round(sleep_seconds / 3600, 1)
logger.info(f"Inactive hours. Sleeping until 8 AM ({int(sleep_seconds)} sec)")
if not sleep_notified:
send_email_message(
"š BIP Notifier Sleeping",
f"The BIP Auto Notifier has entered its scheduled inactive period.
"
f"You will not receive any scraping alerts until the notifier wakes up.",
is_html=True
)
sleep_notified = True
FAST_MODE_UNTIL = 0
time.sleep(sleep_seconds)
continue
process_tick()
# Polling loop: Wait in exact intervals without stacking HTTP request delays
target_time = time.time() + check_interval
while time.time() < target_time:
# Sleep safely for whatever is left over (up to 5 secs max per inner loop)
remaining = target_time - time.time()
if remaining > 0:
time.sleep(min(5, remaining))
except KeyboardInterrupt:
logger.info("\nš Keyboard interrupt received.")
except Exception as e:
logger.error(f"FATAL SYSTEM ERROR in start_loop: {e}")
error_details = traceback.format_exc()
logger.error(error_details)
send_email_message(
"šØ CRITICAL: BIP Notifier Crashed",
f"The notifier encountered an unexpected fatal system error and has violently shut down.
"
f"Error Traceback:
{error_details}
"
f"The application has been stopped to prevent instability. Please check your deployment logs.",
is_html=True
)
raise SystemExit(1)
finally:
logger.info("Cleaning up resources...")
try:
SESSION.close()
except:
pass
logger.info("Notifier stopped gracefully. Goodbye!")
# ==============================================================================
# FASTAPI MICROSERVICE (Task 10)
# ==============================================================================
def background_scraper_loop():
"""Runs the main notification loop within the FastAPI background."""
start_loop()
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
threading.Thread(target=background_scraper_loop, daemon=True).start()
yield
# Shutdown (nothing to do here currently)
app = FastAPI(title="BIP Auto Notifier", lifespan=lifespan)
@app.get("/health")
async def health_check():
"""
Task 6 & 10: Improved Health Endpoint
Returns status, active tracking state, and expiration errors.
"""
return {
"status": "online",
"session_expired": SESSION_EXPIRED,
"last_event_id": LAST_EVENT_ID,
"timestamp": datetime.now(IST).strftime("%I:%M %p (%d-%m-%Y)")
}
@app.get("/latest")
async def fetch_latest_api():
"""Fetches the latest currently available event on the portal."""
load_dotenv(override=True)
xsrf = os.getenv("XSRF_TOKEN", "")
bip = os.getenv("BIP_SESSION", "")
data, err = fetch_bip_events(xsrf, bip, page=1)
if err:
return {"error": err}
resources = data.get("resources", [])
if not resources:
return {"message": "No events found."}
return parse_event(resources[0])
@app.get("/", response_class=HTMLResponse)
async def root(request: Request):
# Fetch real-time data internally without an HTTP request overhead
health_data = await health_check()
assistant_data = await internal_assistant()
# Format nicely as JSON strings
health_json = json.dumps(health_data, indent=2)
assistant_json = json.dumps(assistant_data, indent=2)
return f"""
BIP Auto Notifier
BIP Auto Notifier
System is Healthy & Tracking š
Loading logs...
{assistant_json}
{health_json}
Cloud Notifier Interface
"""
from fastapi.responses import HTMLResponse
@app.get("/logs/data")
async def get_logs_data():
"""API Endpoint: Returns raw text for the frontend to poll."""
try:
with open("app.log", "r") as f:
lines = f.readlines()
log_content = "".join(lines)
log_content = log_content.replace("&", "&").replace("<", "<").replace(">", ">")
return HTMLResponse(content=log_content)
except Exception as e:
return HTMLResponse(content=f"Error reading logs: {e}")
async def internal_assistant():
load_dotenv(override=True)
current_interval = get_current_interval()
fast_interval = int(os.getenv("FAST_INTERVAL", "30"))
if current_interval == fast_interval:
status_msg = "Running (fast mode)"
elif current_interval > 0:
status_msg = "Running"
else:
status_msg = "Paused (inactive hours)"
return {
"assistant": "BIP Auto Notifier Internal Assistant",
"system_status": status_msg,
"last_event_id": LAST_EVENT_ID,
"last_event_code": LAST_EVENT_CODE,
"session_expired": SESSION_EXPIRED,
"check_interval_seconds": current_interval,
"email_configured": bool(BREVO_API_KEY or (EMAIL_ADDRESS and EMAIL_PASSWORD)),
"bip_api": BIP_API,
"timestamp": datetime.now(IST).strftime("%I:%M %p (%d-%m-%Y)"),
"advice": (
"System operating normally."
if not SESSION_EXPIRED
else "Session expired. Update XSRF_TOKEN and BIP_SESSION."
)
}
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="BIP Cloud Notifier CLI")
parser.add_argument("--list-all", action="store_true", help="List all recent events and exit")
parser.add_argument("--latest", action="store_true", help="Print details of the latest event and exit")
parser.add_argument("--test-alert", action="store_true", help="Send a test message to Email and exit")
parser.add_argument("--test-real-event", action="store_true", help="Fetch the actual latest event and send it as an alert")
parser.add_argument("--run", action="store_true", help="Start the continuous monitoring loop (via FastAPI)")
# Task 8: Fix duplicate parsing
args = parser.parse_args()
logger.debug(f"Parsed arguments: {args}")
if args.list_all:
list_all_events()
elif args.latest:
get_latest_event()
elif args.test_alert:
test_email_alert()
elif args.test_real_event:
test_real_event_alert()
elif args.run:
# Launch FastAPI which internally starts the loop
port = int(os.getenv("PORT", 7860))
uvicorn.run(app, host="0.0.0.0", port=port, log_level="warning")
else:
# Default behavior: run FastAPI
port = int(os.getenv("PORT", 7860))
uvicorn.run(app, host="0.0.0.0", port=port, log_level="warning")