""" Frontend helper utilities for integrating with new backend features. Use these functions in your Streamlit app to: - Log telemetry events - Handle rate limiting - Submit async jobs - Track job status """ import streamlit as st import requests import uuid from typing import Optional, Dict, Any import logging logger = logging.getLogger(__name__) def get_or_create_device_id() -> str: """ Get or create device ID for current session. Stored in Streamlit session state. """ if "device_id" not in st.session_state: st.session_state.device_id = str(uuid.uuid4()) return st.session_state.device_id def get_user_id() -> Optional[str]: """ Get user ID if authenticated. Returns None if user is not logged in. """ return st.session_state.get("user_id") def log_session_metadata( api_url: str, ip_address: str = "127.0.0.1", user_agent: Optional[str] = None ) -> bool: """ Log session metadata (IP, location) with Redis gating. Args: api_url: Backend API base URL ip_address: Client IP address user_agent: User agent string Returns: True if logged, False if skipped or failed """ try: device_id = get_or_create_device_id() user_id = get_user_id() payload = { "device_id": device_id, "user_id": user_id, "ip_address": ip_address, "user_agent": user_agent } response = requests.post( f"{api_url}/log-session", json=payload, timeout=5 ) if response.status_code == 200: data = response.json() return data.get("logged", False) return False except Exception as e: logger.error(f"Failed to log session metadata: {str(e)}") return False def log_event( api_url: str, event_type: str, metadata: Optional[Dict[str, Any]] = None ) -> bool: """ Log a telemetry event to MongoDB. Args: api_url: Backend API base URL event_type: Event type (DASHBOARD_VIEW, ANALYSIS_REQUEST, etc.) metadata: Optional event metadata Returns: True if logged successfully """ try: device_id = get_or_create_device_id() user_id = get_user_id() payload = { "event_type": event_type, "device_id": device_id, "user_id": user_id, "metadata": metadata } response = requests.post( f"{api_url}/log-event", json=payload, timeout=5 ) return response.status_code == 200 except Exception as e: logger.error(f"Failed to log event: {str(e)}") return False def submit_analysis_job( api_url: str, data: list, options: Optional[Dict[str, Any]] = None ) -> Optional[str]: """ Submit ABSA analysis job to async queue. Args: api_url: Backend API base URL data: Review data (list of dicts) options: Optional processing options Returns: Job ID if successful, None otherwise """ try: device_id = get_or_create_device_id() user_id = get_user_id() if options is None: options = {} # Add device_id to options for tracking options["device_id"] = device_id payload = { "data": data, "options": options, "user_id": user_id or "anonymous" } response = requests.post( f"{api_url}/submit-job", json=payload, timeout=10 ) if response.status_code == 429: # Rate limit hit st.error("⚠️ Rate limit exceeded. Please wait a minute before trying again.") return None response.raise_for_status() data = response.json() job_id = data.get("job_id") return job_id except requests.exceptions.HTTPError as e: if e.response.status_code == 429: error_data = e.response.json() st.error(f"⚠️ {error_data.get('detail', {}).get('message', 'Rate limit exceeded')}") else: st.error(f"Failed to submit job: {str(e)}") return None except Exception as e: st.error(f"Failed to submit job: {str(e)}") logger.error(f"Job submission error: {str(e)}") return None def get_job_status(api_url: str, job_id: str) -> Optional[Dict[str, Any]]: """ Get status of submitted job. Args: api_url: Backend API base URL job_id: Job identifier Returns: Job status dict or None """ try: response = requests.get( f"{api_url}/job-status/{job_id}", timeout=5 ) if response.status_code == 404: return None response.raise_for_status() return response.json() except Exception as e: logger.error(f"Failed to get job status: {str(e)}") return None def poll_job_until_complete( api_url: str, job_id: str, progress_callback=None, max_wait_seconds: int = 300 ) -> Optional[Dict[str, Any]]: """ Poll job status until complete. Args: api_url: Backend API base URL job_id: Job identifier progress_callback: Optional callback function for progress updates max_wait_seconds: Maximum time to wait Returns: Job result if completed, None otherwise """ import time start_time = time.time() while True: if time.time() - start_time > max_wait_seconds: if progress_callback: progress_callback("Timeout waiting for job completion") return None status_data = get_job_status(api_url, job_id) if not status_data: if progress_callback: progress_callback("Job not found") return None status = status_data.get("status") if progress_callback: progress_callback(f"Status: {status}") if status == "DONE": return status_data.get("result") elif status == "FAILED": if progress_callback: progress_callback("Job failed") return None # Wait before polling again time.sleep(2) def initialize_telemetry(api_url: str): """ Initialize telemetry on app load. Call this once at the start of your Streamlit app. Args: api_url: Backend API base URL """ # Log session metadata (gated by Redis) log_session_metadata(api_url) # Log dashboard view event log_event(api_url, "DASHBOARD_VIEW") # Example usage in Streamlit app: """ import streamlit as st from utils.frontend_helpers import initialize_telemetry, submit_analysis_job, poll_job_until_complete # At app startup api_url = "http://localhost:7860" initialize_telemetry(api_url) # When user uploads data if st.button("Analyze"): # Prepare data data = df.to_dict('records') # Submit job job_id = submit_analysis_job(api_url, data) if job_id: st.info(f"Job submitted: {job_id}") # Show progress progress_placeholder = st.empty() def update_progress(msg): progress_placeholder.write(msg) # Poll for result result = poll_job_until_complete(api_url, job_id, update_progress) if result: st.success("Analysis complete!") st.json(result) else: st.error("Analysis failed or timed out") """