Spaces:
Runtime error
Runtime error
| """ | |
| Utility functions for the Pharmaceutical Analytics application. | |
| This module contains helper functions for environment setup, agent initialization, | |
| and workflow execution. | |
| """ | |
| import os | |
| import streamlit as st | |
| import traceback | |
| import logging | |
| import threading | |
| import time | |
| from datetime import datetime | |
| from config import DB_PATH | |
| # Get logger | |
| logger = logging.getLogger("pharma-analytics") | |
| def setup_environment(): | |
| """Setup the demo environment""" | |
| st.sidebar.markdown("### Setting up environment...") | |
| # Create directories | |
| os.makedirs("data", exist_ok=True) | |
| os.makedirs("agents", exist_ok=True) | |
| os.makedirs("workflows", exist_ok=True) | |
| os.makedirs("utils", exist_ok=True) | |
| os.makedirs("ui", exist_ok=True) | |
| # Check for database file | |
| if not os.path.exists(DB_PATH): | |
| if st.session_state.get("initialization_attempted", False): | |
| st.sidebar.warning("Previous initialization attempt failed. Check logs.") | |
| return False | |
| st.sidebar.info("Database not found. Generating synthetic data...") | |
| st.session_state.initialization_attempted = True | |
| try: | |
| # Import needed modules | |
| from data.schema import create_schema | |
| from data.seed_data import create_seed_data | |
| # Create schema first | |
| create_schema(DB_PATH) | |
| st.sidebar.info("β Database schema created") | |
| # Add seed data | |
| create_seed_data(DB_PATH) | |
| st.sidebar.info("β Seed data created") | |
| # Generate full synthetic dataset | |
| from data.data_generator import generate_all_data | |
| generate_all_data() | |
| st.sidebar.success("β Synthetic data generated successfully") | |
| return True | |
| except Exception as e: | |
| error_msg = f"Error setting up environment: {e}" | |
| stack_trace = traceback.format_exc() | |
| logger.error(f"{error_msg}\n{stack_trace}") | |
| st.sidebar.error(error_msg) | |
| return False | |
| else: | |
| st.sidebar.success("β Database found") | |
| return True | |
| def initialize_agents(): | |
| """Initialize agents and return status""" | |
| try: | |
| logger.info("Initializing agents...") | |
| # We'll only import these modules when needed | |
| from agents.planning_agent import PlanningAgent, AnalysisPlan | |
| from agents.data_agent import DataAgent, DataRequest, DataSource | |
| from agents.analytics_agent import AnalyticsAgent, AnalysisRequest, AnalysisResult | |
| from agents.qa_agent import QAAgent, ValidationRequest, ValidationResult | |
| from agents.insights_agent import InsightsAgent, InsightRequest, InsightCard | |
| from workflows.sales_analysis import SalesAnalysisWorkflow, WorkflowState | |
| logger.info("All agents imported successfully") | |
| return True, { | |
| "PlanningAgent": PlanningAgent, | |
| "DataAgent": DataAgent, | |
| "AnalyticsAgent": AnalyticsAgent, | |
| "QAAgent": QAAgent, | |
| "InsightsAgent": InsightsAgent, | |
| "SalesAnalysisWorkflow": SalesAnalysisWorkflow | |
| } | |
| except Exception as e: | |
| error_msg = f"Failed to initialize agents: {e}" | |
| stack_trace = traceback.format_exc() | |
| logger.error(f"{error_msg}\n{stack_trace}") | |
| return False, error_msg | |
| def run_workflow_thread(workflow, alert): | |
| """Run the workflow in a separate thread""" | |
| try: | |
| # Log start of workflow | |
| logger.info(f"Starting workflow for alert: {alert}") | |
| # Update status | |
| st.session_state.status_queue.put(("info", "Planning analysis approach...")) | |
| st.session_state.current_step = "planning" | |
| # Run the workflow with more detailed logging | |
| logger.info("Invoking workflow run...") | |
| result = workflow.run_workflow(alert) | |
| # Log completion | |
| logger.info("Workflow completed successfully") | |
| # Store the result | |
| st.session_state.workflow_state = result | |
| # Update status | |
| st.session_state.status_queue.put(("success", "Analysis complete!")) | |
| st.session_state.current_step = "complete" | |
| except Exception as e: | |
| # Capture the full stack trace | |
| stack_trace = traceback.format_exc() | |
| error_msg = f"Error: {str(e)}\n\nStack trace:\n{stack_trace}" | |
| # Log the error | |
| logger.error(error_msg) | |
| # Update status with more details | |
| st.session_state.status_queue.put(("error", f"Error: {str(e)}")) | |
| st.session_state.current_step = "error" | |
| # Store error details for debugging | |
| if "error_details" not in st.session_state: | |
| st.session_state.error_details = [] | |
| st.session_state.error_details.append(error_msg) | |
| def start_workflow(alert_text): | |
| """Start the workflow with the given alert text""" | |
| try: | |
| # Create and start the workflow | |
| logger.info(f"Creating workflow with alert: {alert_text}") | |
| SalesAnalysisWorkflow = st.session_state.agents["SalesAnalysisWorkflow"] | |
| workflow = SalesAnalysisWorkflow(db_path=DB_PATH) | |
| # Start the workflow in a separate thread | |
| thread = threading.Thread( | |
| target=run_workflow_thread, | |
| args=(workflow, alert_text), | |
| daemon=True | |
| ) | |
| thread.start() | |
| logger.info("Workflow thread started") | |
| # Store the thread and mark alert as submitted | |
| st.session_state.workflow_thread = thread | |
| st.session_state.alert_submitted = True | |
| # Display initial status | |
| st.info("Starting analysis workflow...") | |
| st.session_state.logs.append({ | |
| "timestamp": datetime.now().isoformat(), | |
| "type": "info", | |
| "message": "Starting analysis workflow..." | |
| }) | |
| # Force page refresh | |
| time.sleep(1) | |
| return True | |
| except Exception as e: | |
| stack_trace = traceback.format_exc() | |
| error_msg = f"Error starting workflow: {str(e)}\n\nStack trace:\n{stack_trace}" | |
| logger.error(error_msg) | |
| st.error(f"Error starting workflow: {str(e)}") | |
| st.session_state.error_details.append(error_msg) | |
| return False |