""" Utility functions for the Pharmaceutical Analytics application. This module contains helper functions for environment setup, agent initialization, and workflow execution. """ import os import streamlit as st import traceback import logging import threading import time from datetime import datetime from config import DB_PATH # Get logger logger = logging.getLogger("pharma-analytics") def setup_environment(): """Setup the demo environment""" st.sidebar.markdown("### Setting up environment...") # Create directories os.makedirs("data", exist_ok=True) os.makedirs("agents", exist_ok=True) os.makedirs("workflows", exist_ok=True) os.makedirs("utils", exist_ok=True) os.makedirs("ui", exist_ok=True) # Check for database file if not os.path.exists(DB_PATH): if st.session_state.get("initialization_attempted", False): st.sidebar.warning("Previous initialization attempt failed. Check logs.") return False st.sidebar.info("Database not found. Generating synthetic data...") st.session_state.initialization_attempted = True try: # Import needed modules from data.schema import create_schema from data.seed_data import create_seed_data # Create schema first create_schema(DB_PATH) st.sidebar.info("✓ Database schema created") # Add seed data create_seed_data(DB_PATH) st.sidebar.info("✓ Seed data created") # Generate full synthetic dataset from data.data_generator import generate_all_data generate_all_data() st.sidebar.success("✓ Synthetic data generated successfully") return True except Exception as e: error_msg = f"Error setting up environment: {e}" stack_trace = traceback.format_exc() logger.error(f"{error_msg}\n{stack_trace}") st.sidebar.error(error_msg) return False else: st.sidebar.success("✓ Database found") return True @st.cache_resource def initialize_agents(): """Initialize agents and return status""" try: logger.info("Initializing agents...") # We'll only import these modules when needed from agents.planning_agent import PlanningAgent, AnalysisPlan from agents.data_agent import DataAgent, DataRequest, DataSource from agents.analytics_agent import AnalyticsAgent, AnalysisRequest, AnalysisResult from agents.qa_agent import QAAgent, ValidationRequest, ValidationResult from agents.insights_agent import InsightsAgent, InsightRequest, InsightCard from workflows.sales_analysis import SalesAnalysisWorkflow, WorkflowState logger.info("All agents imported successfully") return True, { "PlanningAgent": PlanningAgent, "DataAgent": DataAgent, "AnalyticsAgent": AnalyticsAgent, "QAAgent": QAAgent, "InsightsAgent": InsightsAgent, "SalesAnalysisWorkflow": SalesAnalysisWorkflow } except Exception as e: error_msg = f"Failed to initialize agents: {e}" stack_trace = traceback.format_exc() logger.error(f"{error_msg}\n{stack_trace}") return False, error_msg def run_workflow_thread(workflow, alert): """Run the workflow in a separate thread""" try: # Log start of workflow logger.info(f"Starting workflow for alert: {alert}") # Update status st.session_state.status_queue.put(("info", "Planning analysis approach...")) st.session_state.current_step = "planning" # Run the workflow with more detailed logging logger.info("Invoking workflow run...") result = workflow.run_workflow(alert) # Log completion logger.info("Workflow completed successfully") # Store the result st.session_state.workflow_state = result # Update status st.session_state.status_queue.put(("success", "Analysis complete!")) st.session_state.current_step = "complete" except Exception as e: # Capture the full stack trace stack_trace = traceback.format_exc() error_msg = f"Error: {str(e)}\n\nStack trace:\n{stack_trace}" # Log the error logger.error(error_msg) # Update status with more details st.session_state.status_queue.put(("error", f"Error: {str(e)}")) st.session_state.current_step = "error" # Store error details for debugging if "error_details" not in st.session_state: st.session_state.error_details = [] st.session_state.error_details.append(error_msg) def start_workflow(alert_text): """Start the workflow with the given alert text""" try: # Create and start the workflow logger.info(f"Creating workflow with alert: {alert_text}") SalesAnalysisWorkflow = st.session_state.agents["SalesAnalysisWorkflow"] workflow = SalesAnalysisWorkflow(db_path=DB_PATH) # Start the workflow in a separate thread thread = threading.Thread( target=run_workflow_thread, args=(workflow, alert_text), daemon=True ) thread.start() logger.info("Workflow thread started") # Store the thread and mark alert as submitted st.session_state.workflow_thread = thread st.session_state.alert_submitted = True # Display initial status st.info("Starting analysis workflow...") st.session_state.logs.append({ "timestamp": datetime.now().isoformat(), "type": "info", "message": "Starting analysis workflow..." }) # Force page refresh time.sleep(1) return True except Exception as e: stack_trace = traceback.format_exc() error_msg = f"Error starting workflow: {str(e)}\n\nStack trace:\n{stack_trace}" logger.error(error_msg) st.error(f"Error starting workflow: {str(e)}") st.session_state.error_details.append(error_msg) return False