sales_analytics / utils.py
cryogenic22's picture
Create utils.py
70a7cb7 verified
"""
Utility functions for the Pharmaceutical Analytics application.
This module contains helper functions for environment setup, agent initialization,
and workflow execution.
"""
import os
import streamlit as st
import traceback
import logging
import threading
import time
from datetime import datetime
from config import DB_PATH
# Get logger
logger = logging.getLogger("pharma-analytics")
def setup_environment():
"""Setup the demo environment"""
st.sidebar.markdown("### Setting up environment...")
# Create directories
os.makedirs("data", exist_ok=True)
os.makedirs("agents", exist_ok=True)
os.makedirs("workflows", exist_ok=True)
os.makedirs("utils", exist_ok=True)
os.makedirs("ui", exist_ok=True)
# Check for database file
if not os.path.exists(DB_PATH):
if st.session_state.get("initialization_attempted", False):
st.sidebar.warning("Previous initialization attempt failed. Check logs.")
return False
st.sidebar.info("Database not found. Generating synthetic data...")
st.session_state.initialization_attempted = True
try:
# Import needed modules
from data.schema import create_schema
from data.seed_data import create_seed_data
# Create schema first
create_schema(DB_PATH)
st.sidebar.info("βœ“ Database schema created")
# Add seed data
create_seed_data(DB_PATH)
st.sidebar.info("βœ“ Seed data created")
# Generate full synthetic dataset
from data.data_generator import generate_all_data
generate_all_data()
st.sidebar.success("βœ“ Synthetic data generated successfully")
return True
except Exception as e:
error_msg = f"Error setting up environment: {e}"
stack_trace = traceback.format_exc()
logger.error(f"{error_msg}\n{stack_trace}")
st.sidebar.error(error_msg)
return False
else:
st.sidebar.success("βœ“ Database found")
return True
@st.cache_resource
def initialize_agents():
"""Initialize agents and return status"""
try:
logger.info("Initializing agents...")
# We'll only import these modules when needed
from agents.planning_agent import PlanningAgent, AnalysisPlan
from agents.data_agent import DataAgent, DataRequest, DataSource
from agents.analytics_agent import AnalyticsAgent, AnalysisRequest, AnalysisResult
from agents.qa_agent import QAAgent, ValidationRequest, ValidationResult
from agents.insights_agent import InsightsAgent, InsightRequest, InsightCard
from workflows.sales_analysis import SalesAnalysisWorkflow, WorkflowState
logger.info("All agents imported successfully")
return True, {
"PlanningAgent": PlanningAgent,
"DataAgent": DataAgent,
"AnalyticsAgent": AnalyticsAgent,
"QAAgent": QAAgent,
"InsightsAgent": InsightsAgent,
"SalesAnalysisWorkflow": SalesAnalysisWorkflow
}
except Exception as e:
error_msg = f"Failed to initialize agents: {e}"
stack_trace = traceback.format_exc()
logger.error(f"{error_msg}\n{stack_trace}")
return False, error_msg
def run_workflow_thread(workflow, alert):
"""Run the workflow in a separate thread"""
try:
# Log start of workflow
logger.info(f"Starting workflow for alert: {alert}")
# Update status
st.session_state.status_queue.put(("info", "Planning analysis approach..."))
st.session_state.current_step = "planning"
# Run the workflow with more detailed logging
logger.info("Invoking workflow run...")
result = workflow.run_workflow(alert)
# Log completion
logger.info("Workflow completed successfully")
# Store the result
st.session_state.workflow_state = result
# Update status
st.session_state.status_queue.put(("success", "Analysis complete!"))
st.session_state.current_step = "complete"
except Exception as e:
# Capture the full stack trace
stack_trace = traceback.format_exc()
error_msg = f"Error: {str(e)}\n\nStack trace:\n{stack_trace}"
# Log the error
logger.error(error_msg)
# Update status with more details
st.session_state.status_queue.put(("error", f"Error: {str(e)}"))
st.session_state.current_step = "error"
# Store error details for debugging
if "error_details" not in st.session_state:
st.session_state.error_details = []
st.session_state.error_details.append(error_msg)
def start_workflow(alert_text):
"""Start the workflow with the given alert text"""
try:
# Create and start the workflow
logger.info(f"Creating workflow with alert: {alert_text}")
SalesAnalysisWorkflow = st.session_state.agents["SalesAnalysisWorkflow"]
workflow = SalesAnalysisWorkflow(db_path=DB_PATH)
# Start the workflow in a separate thread
thread = threading.Thread(
target=run_workflow_thread,
args=(workflow, alert_text),
daemon=True
)
thread.start()
logger.info("Workflow thread started")
# Store the thread and mark alert as submitted
st.session_state.workflow_thread = thread
st.session_state.alert_submitted = True
# Display initial status
st.info("Starting analysis workflow...")
st.session_state.logs.append({
"timestamp": datetime.now().isoformat(),
"type": "info",
"message": "Starting analysis workflow..."
})
# Force page refresh
time.sleep(1)
return True
except Exception as e:
stack_trace = traceback.format_exc()
error_msg = f"Error starting workflow: {str(e)}\n\nStack trace:\n{stack_trace}"
logger.error(error_msg)
st.error(f"Error starting workflow: {str(e)}")
st.session_state.error_details.append(error_msg)
return False