ABSA / src /utils /frontend_helpers.py
parthnuwal7's picture
Adding Mongo+Redis concept
4b62d23
"""
Frontend helper utilities for integrating with new backend features.
Use these functions in your Streamlit app to:
- Log telemetry events
- Handle rate limiting
- Submit async jobs
- Track job status
"""
import streamlit as st
import requests
import uuid
from typing import Optional, Dict, Any
import logging
logger = logging.getLogger(__name__)
def get_or_create_device_id() -> str:
"""
Get or create device ID for current session.
Stored in Streamlit session state.
"""
if "device_id" not in st.session_state:
st.session_state.device_id = str(uuid.uuid4())
return st.session_state.device_id
def get_user_id() -> Optional[str]:
"""
Get user ID if authenticated.
Returns None if user is not logged in.
"""
return st.session_state.get("user_id")
def log_session_metadata(
api_url: str,
ip_address: str = "127.0.0.1",
user_agent: Optional[str] = None
) -> bool:
"""
Log session metadata (IP, location) with Redis gating.
Args:
api_url: Backend API base URL
ip_address: Client IP address
user_agent: User agent string
Returns:
True if logged, False if skipped or failed
"""
try:
device_id = get_or_create_device_id()
user_id = get_user_id()
payload = {
"device_id": device_id,
"user_id": user_id,
"ip_address": ip_address,
"user_agent": user_agent
}
response = requests.post(
f"{api_url}/log-session",
json=payload,
timeout=5
)
if response.status_code == 200:
data = response.json()
return data.get("logged", False)
return False
except Exception as e:
logger.error(f"Failed to log session metadata: {str(e)}")
return False
def log_event(
api_url: str,
event_type: str,
metadata: Optional[Dict[str, Any]] = None
) -> bool:
"""
Log a telemetry event to MongoDB.
Args:
api_url: Backend API base URL
event_type: Event type (DASHBOARD_VIEW, ANALYSIS_REQUEST, etc.)
metadata: Optional event metadata
Returns:
True if logged successfully
"""
try:
device_id = get_or_create_device_id()
user_id = get_user_id()
payload = {
"event_type": event_type,
"device_id": device_id,
"user_id": user_id,
"metadata": metadata
}
response = requests.post(
f"{api_url}/log-event",
json=payload,
timeout=5
)
return response.status_code == 200
except Exception as e:
logger.error(f"Failed to log event: {str(e)}")
return False
def submit_analysis_job(
api_url: str,
data: list,
options: Optional[Dict[str, Any]] = None
) -> Optional[str]:
"""
Submit ABSA analysis job to async queue.
Args:
api_url: Backend API base URL
data: Review data (list of dicts)
options: Optional processing options
Returns:
Job ID if successful, None otherwise
"""
try:
device_id = get_or_create_device_id()
user_id = get_user_id()
if options is None:
options = {}
# Add device_id to options for tracking
options["device_id"] = device_id
payload = {
"data": data,
"options": options,
"user_id": user_id or "anonymous"
}
response = requests.post(
f"{api_url}/submit-job",
json=payload,
timeout=10
)
if response.status_code == 429:
# Rate limit hit
st.error("⚠️ Rate limit exceeded. Please wait a minute before trying again.")
return None
response.raise_for_status()
data = response.json()
job_id = data.get("job_id")
return job_id
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
error_data = e.response.json()
st.error(f"⚠️ {error_data.get('detail', {}).get('message', 'Rate limit exceeded')}")
else:
st.error(f"Failed to submit job: {str(e)}")
return None
except Exception as e:
st.error(f"Failed to submit job: {str(e)}")
logger.error(f"Job submission error: {str(e)}")
return None
def get_job_status(api_url: str, job_id: str) -> Optional[Dict[str, Any]]:
"""
Get status of submitted job.
Args:
api_url: Backend API base URL
job_id: Job identifier
Returns:
Job status dict or None
"""
try:
response = requests.get(
f"{api_url}/job-status/{job_id}",
timeout=5
)
if response.status_code == 404:
return None
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"Failed to get job status: {str(e)}")
return None
def poll_job_until_complete(
api_url: str,
job_id: str,
progress_callback=None,
max_wait_seconds: int = 300
) -> Optional[Dict[str, Any]]:
"""
Poll job status until complete.
Args:
api_url: Backend API base URL
job_id: Job identifier
progress_callback: Optional callback function for progress updates
max_wait_seconds: Maximum time to wait
Returns:
Job result if completed, None otherwise
"""
import time
start_time = time.time()
while True:
if time.time() - start_time > max_wait_seconds:
if progress_callback:
progress_callback("Timeout waiting for job completion")
return None
status_data = get_job_status(api_url, job_id)
if not status_data:
if progress_callback:
progress_callback("Job not found")
return None
status = status_data.get("status")
if progress_callback:
progress_callback(f"Status: {status}")
if status == "DONE":
return status_data.get("result")
elif status == "FAILED":
if progress_callback:
progress_callback("Job failed")
return None
# Wait before polling again
time.sleep(2)
def initialize_telemetry(api_url: str):
"""
Initialize telemetry on app load.
Call this once at the start of your Streamlit app.
Args:
api_url: Backend API base URL
"""
# Log session metadata (gated by Redis)
log_session_metadata(api_url)
# Log dashboard view event
log_event(api_url, "DASHBOARD_VIEW")
# Example usage in Streamlit app:
"""
import streamlit as st
from utils.frontend_helpers import initialize_telemetry, submit_analysis_job, poll_job_until_complete
# At app startup
api_url = "http://localhost:7860"
initialize_telemetry(api_url)
# When user uploads data
if st.button("Analyze"):
# Prepare data
data = df.to_dict('records')
# Submit job
job_id = submit_analysis_job(api_url, data)
if job_id:
st.info(f"Job submitted: {job_id}")
# Show progress
progress_placeholder = st.empty()
def update_progress(msg):
progress_placeholder.write(msg)
# Poll for result
result = poll_job_until_complete(api_url, job_id, update_progress)
if result:
st.success("Analysis complete!")
st.json(result)
else:
st.error("Analysis failed or timed out")
"""