import streamlit as st import pandas as pd import plotly.express as px import plotly.graph_objects as go import re from datetime import datetime import os import shutil from pathlib import Path import worklog_categorizer as wc import time import base64 from io import BytesIO # Set page configuration st.set_page_config(layout="wide", page_title="Non-Billable Time Analysis", page_icon="📊") # Define colors to match the React implementation COLORS = ['#0088FE', '#00C49F', '#FFBB28', '#FF8042', '#8884d8', '#82ca9d', '#ffc658', '#8dd1e1', '#a4de6c', '#d0ed57', '#bc80bd', '#ccebc5', '#ffed6f', '#bebada', '#fb8072', '#80b1d3', '#fdb462', '#b3de69'] # Initialize session state variables if they don't exist if 'initialized' not in st.session_state: st.session_state.initialized = True st.session_state.processed_data = None st.session_state.expanded_user = None st.session_state.sort_by = 'totalHours' st.session_state.sort_order = 'desc' st.session_state.selected_epics = [] st.session_state.active_tab = 'team_analysis' st.session_state.tech_user_filter = "" st.session_state.categorized_df = None st.session_state.show_csv_data = False st.session_state.needs_rerun = False st.session_state.has_generated_results = False def extract_month(date_range): """Extract month from date ranges""" if not isinstance(date_range, str): return None date_match = re.match(r'^(\d+)/(\w+)/(\d+) to', date_range) if date_match: return date_match.group(2) single_date_match = re.match(r'^(\d+)/(\w+)/(\d+) at', date_range) if single_date_match: return single_date_match.group(2) return None def get_row_level(row): """Parse level in the data hierarchy""" if pd.notna(row.get("Project Category")) and pd.isna(row.get("Project")): return "category" if pd.notna(row.get("Project")) and pd.isna(row.get("User")): return "project" if pd.notna(row.get("User")) and pd.isna(row.get("Epic")): return "user" if pd.notna(row.get("Epic")) and pd.isna(row.get("Issue")): return "epic" if pd.notna(row.get("Issue")) and pd.isna(row.get("Worklog")): return "issue" if pd.notna(row.get("Worklog")): return "worklog" return "unknown" def clear_session_and_cache(): """Reset the application by clearing cache and session state""" # Clear all cached data st.cache_data.clear() # Remove data files if they exist try: if os.path.exists("categorized_data.csv"): os.remove("categorized_data.csv") if os.path.exists("uploaded_data.csv"): os.remove("uploaded_data.csv") except Exception as e: st.error(f"Error removing files: {e}") # Reset all important session state variables st.session_state.processed_data = None st.session_state.expanded_user = None st.session_state.sort_by = 'totalHours' st.session_state.sort_order = 'desc' st.session_state.selected_epics = [] st.session_state.active_tab = 'team_analysis' st.session_state.tech_user_filter = "" st.session_state.categorized_df = None st.session_state.show_csv_data = False st.session_state.has_generated_results = False # Mark that we need to rerun after clearing st.session_state.needs_rerun = True def save_categorized_data(df, filename="categorized_data.csv"): """Save the categorized data to a CSV file""" try: df.to_csv(filename, index=False) return True except Exception as e: st.error(f"Error saving categorized data: {e}") return False @st.cache_data def process_data(raw_data, force_categorize=False, focus_users=None): """Process the data and return various aggregations (cached to prevent reprocessing)""" # Filter for non-billable data non_billable_data = raw_data[raw_data["Project Category"] == "Non-Billable"].copy() # Add level and month info non_billable_data["Level"] = non_billable_data.apply(get_row_level, axis=1) non_billable_data["Month"] = non_billable_data["Date"].apply(extract_month) # Check if we need to categorize data if "TechCategory" not in non_billable_data.columns or force_categorize: # Process tech categories for upskilling worklog entries with st.spinner("Categorizing upskilling worklog entries by technology..."): # If specific users are provided, prioritize their worklog categorization if focus_users and len(focus_users) > 0: st.info(f"Focusing on worklog categorization for {len(focus_users)} selected users") # First, process focus users focus_mask = non_billable_data["User"].isin(focus_users) focus_data = non_billable_data[focus_mask].copy() if not focus_data.empty: # Process worklogs for focus users first focus_data = wc.process_dataframe( focus_data, worklog_column="Worklog", issue_column="Issue", default_category="N/A", batch_size=10, pause_seconds=2, # Shorter pause for focus users show_progress=True ) # Update the categorized data for focus users non_billable_data.loc[focus_mask, "TechCategory"] = focus_data["TechCategory"] # Process all remaining data non_billable_data = wc.process_dataframe( non_billable_data, worklog_column="Worklog", issue_column="Issue", default_category="N/A", batch_size=10, pause_seconds=5, show_progress=True ) # Save the categorized data save_path = "categorized_data.csv" if save_categorized_data(non_billable_data, save_path): st.success(f"Saved categorized data to {save_path}") # Store the categorized DataFrame for download st.session_state.categorized_df = non_billable_data # Process derived data team_data = process_team_members(non_billable_data) epic_data = process_top_epics(non_billable_data) monthly_data = process_monthly_data(non_billable_data) tech_category_data = process_tech_categories(non_billable_data) epics = sorted(non_billable_data["Epic"].dropna().unique()) # Count upskilling entries upskilling_mask = non_billable_data["Issue"].apply(wc.is_upskilling_issue) upskilling_count = upskilling_mask.sum() # Get all unique users unique_users = sorted(non_billable_data["User"].dropna().unique()) return { 'non_billable_data': non_billable_data, 'team_data': team_data, 'epic_data': epic_data, 'monthly_data': monthly_data, 'unique_epics': epics, 'unique_users': unique_users, 'tech_category_data': tech_category_data, 'upskilling_count': upskilling_count } def process_tech_categories(data): """Process data to get tech category breakdown for upskilling entries""" # Filter for rows with tech categories tech_data = data[data["TechCategory"] != "N/A"].copy() if tech_data.empty: return { "overall": [], "by_user": {}, "by_month": [] } # Overall tech category breakdown overall = tech_data.groupby("TechCategory")["Logged"].sum().reset_index() overall = overall.sort_values("Logged", ascending=False) overall.columns = ["Category", "Hours"] # Tech category by user by_user = {} for user in tech_data["User"].dropna().unique(): user_data = tech_data[tech_data["User"] == user] user_categories = user_data.groupby("TechCategory")["Logged"].sum().reset_index() user_categories = user_categories.sort_values("Logged", ascending=False) user_categories.columns = ["Category", "Hours"] by_user[user] = user_categories.to_dict('records') # Tech category by month month_order = ['Nov', 'Dec', 'Jan', 'Feb', 'Mar'] by_month = [] for month in month_order: month_data = tech_data[tech_data["Month"] == month] if not month_data.empty: month_categories = month_data.groupby("TechCategory")["Logged"].sum().reset_index() month_categories = month_categories.sort_values("Logged", ascending=False) month_categories.columns = ["Category", "Hours"] by_month.append({ "Month": month, "Categories": month_categories.to_dict('records') }) return { "overall": overall.to_dict('records'), "by_user": by_user, "by_month": by_month } def process_team_members(data): """Process data to get team member breakdown""" # Get unique users unique_users = data[data["Level"] == "user"]["User"].dropna().unique() # Process data for each user team_data = [] month_order = ['Nov', 'Dec', 'Jan', 'Feb', 'Mar'] for user in unique_users: user_data = data[data["User"] == user] # Get user total hours user_total_row = user_data[user_data["Level"] == "user"] user_total = user_total_row["Logged"].iloc[0] if not user_total_row.empty else 0 # Skip users with zero hours if user_total == 0: continue # Get epic breakdown epic_breakdown = [] for _, row in user_data[user_data["Level"] == "epic"].iterrows(): epic_breakdown.append({ "Epic": row["Epic"] if pd.notna(row["Epic"]) else "No Epic", "Hours": row["Logged"] if pd.notna(row["Logged"]) else 0, "Project": row["Project"] if pd.notna(row["Project"]) else "No Project", "Month": row["Month"] }) # Get tech categories for this user (upskilling only) tech_categories = [] upskilling_rows = user_data[user_data["TechCategory"] != "N/A"] for _, row in upskilling_rows.iterrows(): tech_categories.append({ "TechCategory": row["TechCategory"], "Hours": row["Logged"] if pd.notna(row["Logged"]) else 0, "Issue": row["Issue"] if pd.notna(row["Issue"]) else "No Issue", "Worklog": row["Worklog"] if pd.notna(row["Worklog"]) else "No Worklog", "Month": row["Month"] }) # Get upskilling issues for this user upskilling_issues = [] for issue in upskilling_rows["Issue"].unique(): issue_data = upskilling_rows[upskilling_rows["Issue"] == issue] upskilling_issues.append({ "Issue": issue, "Hours": issue_data["Logged"].sum(), "TechCategories": [str(cat) for cat in issue_data["TechCategory"].unique().tolist()] }) # Get monthly breakdown monthly_breakdown = {} for month in month_order: month_data = [item for item in epic_breakdown if item["Month"] == month] total = sum(item["Hours"] for item in month_data) if total > 0: epic_hours = {} for item in month_data: epic = item["Epic"] hours = item["Hours"] epic_hours[epic] = epic_hours.get(epic, 0) + hours monthly_breakdown[month] = { "total": total, "epics": epic_hours } team_data.append({ "User": user, "TotalHours": user_total, "EpicBreakdown": epic_breakdown, "TechCategories": tech_categories, "UpskillIssues": upskilling_issues, "MonthlyData": monthly_breakdown }) # Sort by total hours team_data.sort(key=lambda x: x["TotalHours"], reverse=True) return team_data def process_top_epics(data): """Process epic data to get hours by epic""" # Filter epic rows epic_rows = data[data["Level"] == "epic"] # Group by epic and sum hours epic_hours = epic_rows.groupby( epic_rows["Epic"].fillna("No Epic") )["Logged"].sum().reset_index() # Rename columns epic_hours.columns = ["Epic", "Hours"] # Sort by hours epic_hours = epic_hours.sort_values("Hours", ascending=False) return epic_hours.to_dict('records') def process_monthly_data(data): """Process data to get monthly totals""" # Filter epic rows with month data monthly_rows = data[(data["Level"] == "epic") & (data["Month"].notna())] # Group by month and sum hours monthly_hours = monthly_rows.groupby("Month")["Logged"].sum().reset_index() # Rename columns monthly_hours.columns = ["Month", "Hours"] # Sort by custom month order month_order = ['Nov', 'Dec', 'Jan', 'Feb', 'Mar'] monthly_hours["MonthOrder"] = monthly_hours["Month"].apply(lambda x: month_order.index(x) if x in month_order else 999) monthly_hours = monthly_hours.sort_values("MonthOrder") monthly_hours = monthly_hours.drop("MonthOrder", axis=1) return monthly_hours.to_dict('records') def format_hours(hours): """Format hours for display""" if hours == 0: return "-" return f"{hours:.1f}" def get_filtered_data(team_data, search_term, selected_month, sort_by, sort_order, tech_category_filter=None): """Filter and sort team data based on current selections""" filtered_data = team_data.copy() # Apply search filter if search_term: filtered_data = [item for item in filtered_data if search_term.lower() in item["User"].lower()] # Apply tech category filter if tech_category_filter and tech_category_filter != "All": filtered_data = [ item for item in filtered_data if any(tc["TechCategory"] == tech_category_filter for tc in item["TechCategories"]) ] # Adjust total hours based on the tech category filter for item in filtered_data: tech_hours = sum(tc["Hours"] for tc in item["TechCategories"] if tc["TechCategory"] == tech_category_filter) item["FilteredTechHours"] = tech_hours # Apply month filter if selected_month != "All": filtered_data = [ item for item in filtered_data if item.get("MonthlyData", {}).get(selected_month) ] # Adjust hours for selected month for item in filtered_data: monthly_data = item["MonthlyData"][selected_month] item["TotalHours"] = monthly_data["total"] item["EpicBreakdown"] = [ epic for epic in item["EpicBreakdown"] if epic["Month"] == selected_month ] item["TechCategories"] = [ tc for tc in item["TechCategories"] if tc["Month"] == selected_month ] # Determine if we want ascending or descending reverse_sort = (sort_order == "desc") # Sort the data if sort_by == "name": filtered_data.sort(key=lambda x: x["User"], reverse=reverse_sort) elif sort_by == "totalHours": if tech_category_filter and tech_category_filter != "All": filtered_data.sort(key=lambda x: x.get("FilteredTechHours", 0), reverse=reverse_sort) else: filtered_data.sort(key=lambda x: x["TotalHours"], reverse=reverse_sort) else: # Sort by specific epic filtered_data.sort( key=lambda x: sum(e["Hours"] for e in x["EpicBreakdown"] if e["Epic"] == sort_by), reverse=reverse_sort ) return filtered_data def get_epic_totals(filtered_data, unique_epics): """Calculate total hours by epic for filtered data""" totals = {epic: 0 for epic in unique_epics} for user in filtered_data: for epic in user["EpicBreakdown"]: totals[epic["Epic"]] = totals.get(epic["Epic"], 0) + epic["Hours"] return totals def get_user_chart_data(user_data, selected_month): """Get chart data for a specific user""" # Combine hours by epic epic_totals = {} for epic in user_data["EpicBreakdown"]: if selected_month == "All" or epic["Month"] == selected_month: epic_name = epic["Epic"] epic_totals[epic_name] = epic_totals.get(epic_name, 0) + epic["Hours"] # Convert to array and sort return [ {"name": name, "value": value} for name, value in epic_totals.items() ] def get_user_tech_categories(user_data, selected_month, min_percentage=1.0): """Get tech category data for a specific user focusing on actual technology categories""" # Combine hours by tech category tech_totals = {} total_hours = 0 # First try to process TechCategories from the user data for tech in user_data["TechCategories"]: if selected_month == "All" or tech["Month"] == selected_month: category = tech["TechCategory"] # Skip 'nan' or empty categories if pd.isna(category) or category in ["nan", "null", "", None, "N/A"]: continue hours = tech["Hours"] tech_totals[category] = tech_totals.get(category, 0) + hours total_hours += hours # Filter categories below the minimum percentage threshold if total_hours > 0: tech_totals = { k: v for k, v in tech_totals.items() if (v / total_hours * 100) >= min_percentage } # Convert to array and sort by hours (value) return [ {"name": name, "value": value} for name, value in sorted(tech_totals.items(), key=lambda x: x[1], reverse=True) ] def get_user_monthly_data(user_data): """Get monthly data for a specific user""" month_order = ['Nov', 'Dec', 'Jan', 'Feb', 'Mar'] return [ { "month": month, "hours": user_data["MonthlyData"].get(month, {}).get("total", 0) } for month in month_order ] def get_user_tech_issues(user_data, tech_category, selected_month="All"): """Get issues associated with a specific tech category for a user""" issues = [] for tech in user_data["TechCategories"]: if tech["TechCategory"] == tech_category: if selected_month == "All" or tech["Month"] == selected_month: # Check if this issue is already in the list existing = next((i for i in issues if i["issue"] == tech["Issue"]), None) if existing: existing["hours"] += tech["Hours"] else: issues.append({ "issue": tech["Issue"], "worklog": tech["Worklog"], "hours": tech["Hours"], "month": tech["Month"] }) # Sort by hours return sorted(issues, key=lambda x: x["hours"], reverse=True) def display_categorized_data_view(df): """Display a view of the categorized data with filtering options""" st.header("View Categorized Data") if df is None: st.warning("No categorized data available. Please upload and process a CSV file first.") return # Add filters for the data view col1, col2, col3 = st.columns(3) with col1: # Filter by user users = sorted(df["User"].dropna().unique()) selected_user = st.selectbox("Filter by User:", ["All Users"] + list(users)) with col2: # Filter by upskilling issues only show_upskilling_only = st.checkbox("Show Upskilling Issues Only", value=True) with col3: # Filter by tech category tech_categories = sorted(df["TechCategory"].dropna().unique()) tech_categories = [cat for cat in tech_categories if cat != "N/A"] selected_tech = st.selectbox("Filter by Technology:", ["All Technologies"] + tech_categories) # Apply filters filtered_df = df.copy() if selected_user != "All Users": filtered_df = filtered_df[filtered_df["User"] == selected_user] if show_upskilling_only: filtered_df = filtered_df[filtered_df["Issue"].apply(wc.is_upskilling_issue)] if selected_tech != "All Technologies": filtered_df = filtered_df[filtered_df["TechCategory"] == selected_tech] # Only show worklog level rows filtered_df = filtered_df[filtered_df["Level"] == "worklog"] # Select relevant columns to display display_columns = ["User", "Issue", "Worklog", "TechCategory", "Logged", "Month"] # Display the filtered data if filtered_df.empty: st.info("No data matches the selected filters.") else: st.write(f"Showing {len(filtered_df)} records. Use the filters above to narrow down the results.") # Rename columns for display display_df = filtered_df[display_columns].copy() display_df.columns = ["User", "Issue", "Worklog", "Technology", "Hours", "Month"] # Sort by user and hours display_df = display_df.sort_values(["User", "Hours"], ascending=[True, False]) # Display as a table st.dataframe(display_df, use_container_width=True) # Add CSV download button st.download_button( label="Download Filtered CSV", data=filtered_df.to_csv(index=False).encode('utf-8'), file_name="filtered_upskilling_data.csv", mime="text/csv", ) # Download full categorized dataset st.download_button( label="Download Complete Categorized CSV", data=df.to_csv(index=False).encode('utf-8'), file_name="full_categorized_data.csv", mime="text/csv", ) def display_tech_category_analysis(team_data, tech_category_data, upskilling_count): """Display tech category analysis section for upskilling issues""" st.header("Upskilling Technology Analysis") # Info about upskilling issues st.info(f"Found {upskilling_count} upskilling-related entries in the data. Technology categories shown below represent only upskilling activities.") # User filter for upskilling tech analysis with session state tech_user_filter = st.text_input( "Filter by Team Member Name:", key="tech_user_filter_input" ) # Apply filter without reprocessing data filtered_team_data = team_data if tech_user_filter: filtered_team_data = [user for user in team_data if tech_user_filter.lower() in user["User"].lower()] if not filtered_team_data: st.warning(f"No team members found matching '{tech_user_filter}'") else: st.success(f"Showing data for {len(filtered_team_data)} team members matching '{tech_user_filter}'") # Filter to show only users with actual upskilling data upskilling_team_data = [ user for user in filtered_team_data if any(tech["TechCategory"] not in ["nan", "null", "", None, "N/A"] for tech in user["TechCategories"]) ] # If no tech categories found if not tech_category_data["overall"] or not upskilling_team_data: st.warning("No technology categories found in upskilling data. This could be because there are no upskilling worklog entries or the categorization process failed.") return # Add overall tech category chart st.subheader("Overall Technology Distribution in Upskilling") # Convert to DataFrame for Plotly overall_df = pd.DataFrame(tech_category_data["overall"]) # Filter out nan/null values from overall tech categories overall_df = overall_df[~overall_df["Category"].isin(["nan", "null", "", "N/A"])].copy() if not overall_df.empty: fig_tech = px.pie( overall_df, values="Hours", names="Category", color_discrete_sequence=COLORS, title="Hours by Technology Category in Upskilling Activities" ) fig_tech.update_traces( textposition='inside', textinfo='percent+label', hovertemplate='%{label}: %{value:.1f} hours (%{percent})' ) st.plotly_chart(fig_tech, use_container_width=True) # Show table of top categories st.subheader("Top Technology Categories in Upskilling") top_tech_df = overall_df.head(10).copy() top_tech_df["Hours"] = top_tech_df["Hours"].map(lambda x: f"{x:.1f}") st.dataframe(top_tech_df, use_container_width=True) # Tech category filters st.subheader("Team Member Analysis by Technology") col1, col2 = st.columns(2) with col1: # Get all unique tech categories (excluding nan/null) all_categories = [ item["Category"] for item in tech_category_data["overall"] if item["Category"] not in ["nan", "null", "", "N/A"] ] tech_filter_options = ["All"] + all_categories if "selected_tech" not in st.session_state: st.session_state.selected_tech = "All" selected_tech = st.selectbox( "Filter by Technology Category:", options=tech_filter_options, key="tech_category_selector" ) with col2: users_count = len(upskilling_team_data) # Use filtered upskilling users count default_value = min(5, max(1, users_count)) if users_count <= 1: st.write(f"Showing data for {users_count} user") min_users = users_count else: min_users = st.slider( "Minimum users to display:", min_value=1, max_value=max(2, users_count), value=default_value, key="min_users_slider" ) # Filter team data by tech category if selected_tech != "All": tech_filtered_data = [ user for user in upskilling_team_data if any(tc["TechCategory"] == selected_tech for tc in user["TechCategories"]) ] # Calculate hours for each user in this tech category for user in tech_filtered_data: user["TechHours"] = sum( tc["Hours"] for tc in user["TechCategories"] if tc["TechCategory"] == selected_tech ) # Sort by tech category hours tech_filtered_data.sort(key=lambda x: x.get("TechHours", 0), reverse=True) # Create bar chart of users by tech hours if tech_filtered_data: # Take top users by hours in this category top_users = tech_filtered_data[:min_users] user_tech_df = pd.DataFrame([ {"User": user["User"], "Hours": user["TechHours"]} for user in top_users ]) fig_users = px.bar( user_tech_df, x="User", y="Hours", title=f"Top Users for {selected_tech} in Upskilling", color_discrete_sequence=['#8884d8'] ) st.plotly_chart(fig_users, use_container_width=True) # Team member breakdown st.subheader(f"Team Members Upskilling in {selected_tech}") for i, user in enumerate(tech_filtered_data): with st.expander(f"{user['User']} - {format_hours(user['TechHours'])} hours"): # Get issues for this user in this tech category issues = get_user_tech_issues(user, selected_tech) if issues: st.write(f"### Upskilling Issues for {user['User']} in {selected_tech}") # Create an issues table issues_df = pd.DataFrame([ { "Issue": issue["issue"], "Worklog": issue["worklog"], "Hours": format_hours(issue["hours"]), "Month": issue["month"] } for issue in issues ]) st.dataframe(issues_df, use_container_width=True) else: st.write("No detailed issue information available.") else: st.info(f"No team members found upskilling in {selected_tech}.") else: # Show overall tech distribution by team member st.subheader("Technology Distribution by Team Member (Upskilling Only)") # Display upskilling users only if not upskilling_team_data: st.info("No team members found with upskilling entries.") return # Show top users based on selection display_users = upskilling_team_data[:min_users] # Create tabs for each team member tabs = st.tabs([user["User"] for user in display_users]) for i, tab in enumerate(tabs): user = display_users[i] with tab: # Get tech categories for this user user_tech = get_user_tech_categories(user, "All") if user_tech: # Convert to DataFrame for Plotly user_tech_df = pd.DataFrame(user_tech) fig_user_tech = px.pie( user_tech_df, values="value", names="name", color_discrete_sequence=COLORS, title=f"Upskilling Technology Distribution for {user['User']}" ) fig_user_tech.update_traces( textposition='inside', textinfo='percent+label', hovertemplate='%{label}: %{value:.1f} hours (%{percent})' ) st.plotly_chart(fig_user_tech, use_container_width=True) # Show breakdown of upskilling issues if user["UpskillIssues"]: st.subheader(f"Upskilling Issues for {user['User']}") # Filter out issues with only nan values valid_issues = [ issue for issue in user["UpskillIssues"] if any(tech not in ["nan", "null", "", None, "N/A"] for tech in issue["TechCategories"]) ] if valid_issues: issues_df = pd.DataFrame([ { "Issue": issue["Issue"], "Hours": format_hours(issue["Hours"]), "Technologies": ", ".join([ str(tech) for tech in issue["TechCategories"] if tech not in ["nan", "null", "", None, "N/A"] ]) } for issue in sorted(valid_issues, key=lambda x: x["Hours"], reverse=True) ]) st.dataframe(issues_df, use_container_width=True) else: st.info("No upskilling issues with valid technology categories found.") else: st.info(f"No upskilling technology categories found for {user['User']}.") def display_team_epic_analysis(team_data, epic_data, monthly_data, unique_epics): """Display team and epic analysis section""" # Create filters sidebar st.sidebar.title("Filters") # Team Member Filters st.sidebar.header("Team Member Filters") search_term = st.sidebar.text_input("Search by Name:", "") display_count_options = [10, 20, 50] if len(team_data) > 50: display_count_options.append(len(team_data)) display_count = st.sidebar.selectbox( "Team Members to Display:", options=display_count_options, format_func=lambda x: f"All ({len(team_data)})" if x == len(team_data) else str(x), index=1 # Default to 20 ) month_options = ["All"] + ['Nov', 'Dec', 'Jan', 'Feb', 'Mar'] selected_month = st.sidebar.selectbox( "Month:", options=month_options, index=0 # Default to "All" ) # Epic Filters st.sidebar.header("Epic Filters") epic_col1, epic_col2, epic_col3 = st.sidebar.columns(3) with epic_col1: if st.button("Select All"): st.session_state.selected_epics = [epic["Epic"] for epic in epic_data] with epic_col2: if st.button("Clear All"): st.session_state.selected_epics = [] with epic_col3: if st.button("Top 5 Epics"): st.session_state.selected_epics = [epic["Epic"] for epic in epic_data[:5]] # Epic selection st.sidebar.subheader("Select Epics") # Create a scrollable container for epics epic_container = st.sidebar.container() with epic_container: for i, epic in enumerate(epic_data): epic_name = epic["Epic"] epic_hours = epic["Hours"] epic_color = COLORS[i % len(COLORS)] # Use checkbox for each epic checked = st.checkbox( f"{epic_name} ({int(epic_hours)}h)", value=epic_name in st.session_state.selected_epics, key=f"epic_{i}" ) # Update selected epics based on checkbox state if checked and epic_name not in st.session_state.selected_epics: st.session_state.selected_epics.append(epic_name) elif not checked and epic_name in st.session_state.selected_epics: st.session_state.selected_epics.remove(epic_name) # Monthly Overview Chart st.header("Monthly Non-Billable Hours Overview") # Convert to DataFrame for Plotly monthly_df = pd.DataFrame(monthly_data) if not monthly_df.empty: fig_monthly = px.bar( monthly_df, x="Month", y="Hours", title="", labels={"Hours": "Non-Billable Hours", "Month": "Month"}, color_discrete_sequence=['#8884d8'] ) fig_monthly.update_layout( plot_bgcolor='white', margin=dict(l=20, r=30, t=10, b=20), ) st.plotly_chart(fig_monthly, use_container_width=True) # Team Members Table st.header("Team Member Breakdown") st.write("Click on a team member to see their detailed breakdown.") # Sorting controls sort_col1, sort_col2 = st.columns(2) with sort_col1: sort_options = ["totalHours", "name"] + st.session_state.selected_epics sort_labels = { "totalHours": "Total Hours", "name": "Name" } for epic in st.session_state.selected_epics: sort_labels[epic] = epic new_sort_by = st.selectbox( "Sort by:", options=sort_options, format_func=lambda x: sort_labels[x], index=sort_options.index(st.session_state.sort_by) ) if new_sort_by != st.session_state.sort_by: st.session_state.sort_by = new_sort_by with sort_col2: sort_order_options = ["desc", "asc"] sort_order_labels = {"desc": "Descending", "asc": "Ascending"} new_sort_order = st.selectbox( "Order:", options=sort_order_options, format_func=lambda x: sort_order_labels[x], index=sort_order_options.index(st.session_state.sort_order) ) if new_sort_order != st.session_state.sort_order: st.session_state.sort_order = new_sort_order # Get filtered and sorted data filtered_team_data = get_filtered_data( team_data, search_term, selected_month, st.session_state.sort_by, st.session_state.sort_order ) # Apply display count table_data = filtered_team_data[:display_count] # Calculate epic totals epic_totals = get_epic_totals(table_data, unique_epics) # Create the table if not table_data: st.warning("No data matches your filters. Try adjusting your search criteria.") else: # Create table header header_cols = ["Team Member", "Total Hours"] + st.session_state.selected_epics header_col_sizes = [3] + [2] * (len(header_cols) - 1) # Create a styled header row header_row = st.columns(header_col_sizes) with header_row[0]: sort_icon = "▼" if st.session_state.sort_by == "name" and st.session_state.sort_order == "desc" else "▲" if st.session_state.sort_by == "name" and st.session_state.sort_order == "asc" else "" st.markdown(f"**Team Member {sort_icon}**") with header_row[1]: sort_icon = "▼" if st.session_state.sort_by == "totalHours" and st.session_state.sort_order == "desc" else "▲" if st.session_state.sort_by == "totalHours" and st.session_state.sort_order == "asc" else "" st.markdown(f"**Total Hours {sort_icon}**") for i, epic in enumerate(st.session_state.selected_epics): with header_row[i+2]: sort_icon = "▼" if st.session_state.sort_by == epic and st.session_state.sort_order == "desc" else "▲" if st.session_state.sort_by == epic and st.session_state.sort_order == "asc" else "" st.markdown(f"**{epic} {sort_icon}**") # Display each team member as a row for user_idx, user in enumerate(table_data): # Create a container for each row with st.container(): # Use columns for table cells row_cols = st.columns(header_col_sizes) # User name cell - clickable to expand with row_cols[0]: is_expanded = st.session_state.expanded_user == user["User"] expand_icon = "🔽" if is_expanded else "🔼" if st.button(f"{user['User']} {expand_icon}", key=f"user_btn_{user_idx}"): if is_expanded: st.session_state.expanded_user = None else: st.session_state.expanded_user = user["User"] # Total hours cell with row_cols[1]: st.write(format_hours(user["TotalHours"])) # Epic hours cells for i, epic in enumerate(st.session_state.selected_epics): with row_cols[i+2]: epic_hours = sum(e["Hours"] for e in user["EpicBreakdown"] if e["Epic"] == epic) st.write(format_hours(epic_hours)) # Expanded user detail if st.session_state.expanded_user == user["User"]: with st.expander("", expanded=True): st.subheader(f"{user['User']} - Detailed Breakdown") # Create tabs for different views user_tab1, user_tab2 = st.tabs(["Epic Distribution", "Upskilling Technologies"]) with user_tab1: # Create two columns for charts chart_col1, chart_col2 = st.columns(2) # Epic Distribution Chart with chart_col1: st.markdown("#### Epic Distribution") # Get chart data user_chart_data = get_user_chart_data(user, selected_month) if user_chart_data: # Sort by value user_chart_data.sort(key=lambda x: x["value"], reverse=True) # Create DataFrame epic_df = pd.DataFrame(user_chart_data) fig_pie = px.pie( epic_df, values="value", names="name", color_discrete_sequence=COLORS, ) fig_pie.update_traces( textposition='inside', textinfo='percent+label', hovertemplate='%{label}: %{value:.1f} hours (%{percent})' ) fig_pie.update_layout( height=400, margin=dict(l=10, r=10, t=10, b=10) ) st.plotly_chart(fig_pie, use_container_width=True) else: st.info("No epic data available for the selected period.") # Monthly Distribution Chart with chart_col2: st.markdown("#### Monthly Distribution") # Get monthly data monthly_data = get_user_monthly_data(user) # Filter out zero hours monthly_data = [m for m in monthly_data if m["hours"] > 0] if monthly_data: # Create DataFrame monthly_df = pd.DataFrame(monthly_data) fig_bar = px.bar( monthly_df, x="month", y="hours", color_discrete_sequence=['#82ca9d'] ) fig_bar.update_layout( height=400, margin=dict(l=10, r=10, t=10, b=10), xaxis_title="Month", yaxis_title="Hours" ) st.plotly_chart(fig_bar, use_container_width=True) else: st.info("No monthly data available.") # Epic Details Table st.markdown("#### Epic Details") user_epic_data = get_user_chart_data(user, selected_month) if user_epic_data: # Create data for table epic_details = [] for item in user_epic_data: epic_name = item["name"] hours = item["value"] # Find project for this epic project = next((e["Project"] for e in user["EpicBreakdown"] if e["Epic"] == epic_name), "-") # Calculate percentage percent = (hours / user["TotalHours"] * 100) if user["TotalHours"] > 0 else 0 epic_details.append({ "Epic": epic_name, "Project": project, "Hours": hours, "% of Total": f"{percent:.1f}%" }) # Sort by hours descending epic_details.sort(key=lambda x: x["Hours"], reverse=True) # Create DataFrame epic_df = pd.DataFrame(epic_details) st.dataframe(epic_df, use_container_width=True) else: st.info("No epic details available for this user.") with user_tab2: # Technology Distribution st.markdown("#### Upskilling Technology Distribution") # Get tech categories for this user user_tech_data = get_user_tech_categories(user, selected_month) if user_tech_data: # Create DataFrame for chart tech_df = pd.DataFrame(user_tech_data) fig_tech = px.pie( tech_df, values="value", names="name", color_discrete_sequence=COLORS, ) fig_tech.update_traces( textposition='inside', textinfo='percent+label', hovertemplate='%{label}: %{value:.1f} hours (%{percent})' ) st.plotly_chart(fig_tech, use_container_width=True) # Upskilling issues if user["UpskillIssues"]: st.markdown("#### Upskilling Issues") # Filter out issues with only nan values valid_issues = [ issue for issue in user["UpskillIssues"] if any(tech not in ["nan", "null", "", None, "N/A"] for tech in issue["TechCategories"]) ] if valid_issues: issues_df = pd.DataFrame([ { "Issue": issue["Issue"], "Hours": format_hours(issue["Hours"]), "Technologies": ", ".join([ str(tech) for tech in issue["TechCategories"] if tech not in ["nan", "null", "", None, "N/A"] ]) } for issue in sorted(valid_issues, key=lambda x: x["Hours"], reverse=True) ]) st.dataframe(issues_df, use_container_width=True) else: st.info("No upskilling issues with valid technology categories found.") else: st.info("No upskilling technology data found for this user.") # Totals row st.markdown("---") total_row = st.columns(header_col_sizes) with total_row[0]: st.markdown("**Total**") with total_row[1]: total_hours = sum(user["TotalHours"] for user in table_data) st.markdown(f"**{format_hours(total_hours)}**") for i, epic in enumerate(st.session_state.selected_epics): with total_row[i+2]: st.markdown(f"**{format_hours(epic_totals.get(epic, 0))}**") # Epic Distribution Chart st.header("Epic Distribution") # Filter epic data to only selected epics selected_epic_data = [epic for epic in epic_data if epic["Epic"] in st.session_state.selected_epics] if selected_epic_data: epic_df = pd.DataFrame(selected_epic_data) fig_pie = px.pie( epic_df, values="Hours", names="Epic", color_discrete_sequence=COLORS, ) fig_pie.update_traces( textposition='inside', textinfo='percent+label', hovertemplate='%{label}: %{value:.1f} hours (%{percent})' ) fig_pie.update_layout( height=500, margin=dict(l=20, r=20, t=20, b=20) ) st.plotly_chart(fig_pie, use_container_width=True) else: st.info("Please select at least one epic to display the distribution chart.") def main(): st.title("Non-Billable Time Analysis Dashboard") # Clear cache button at the top clear_cache_col, title_col = st.columns([1, 5]) with clear_cache_col: if st.button("🗑️ Clear Cache", key="clear_cache_button"): clear_session_and_cache() st.success("Cache and files cleared successfully! Please reload the page.") st.stop() # Stop execution to force a reload # Check if we need to reload due to cache clearing if st.session_state.needs_rerun: st.session_state.needs_rerun = False st.rerun() # Load data if st.session_state.processed_data is None: # File uploader (only show when no data is processed) uploaded_file = st.file_uploader("Upload Project Time Logging CSV", type=["csv"]) if uploaded_file is not None: # New file uploaded - process from scratch try: raw_data = pd.read_csv(uploaded_file) # Save uploaded file for reference with open("uploaded_data.csv", "wb") as f: f.write(uploaded_file.getvalue()) # Extract unique users for selection unique_users = sorted(raw_data["User"].dropna().unique()) # Allow user to select focus users for tech categorization st.subheader("Select Users for Focus Tech Categorization") focus_users = st.multiselect( "Select users to prioritize for tech categorization (optional):", options=unique_users, default=[] ) # Process data with focus on specific users processed_data = process_data(raw_data, force_categorize=True, focus_users=focus_users) if processed_data is not None: st.session_state.processed_data = processed_data st.session_state.has_generated_results = True st.rerun() # Refresh to show results else: st.error("Error processing data") return except Exception as e: st.error(f"Error processing uploaded file: {e}") return else: st.info("👋 Welcome! Please upload a CSV file to begin analysis.") return else: # Get processed data from session state non_billable_data = st.session_state.processed_data['non_billable_data'] team_data = st.session_state.processed_data['team_data'] epic_data = st.session_state.processed_data['epic_data'] monthly_data = st.session_state.processed_data['monthly_data'] unique_epics = st.session_state.processed_data['unique_epics'] tech_category_data = st.session_state.processed_data['tech_category_data'] upskilling_count = st.session_state.processed_data['upskilling_count'] # Set default selected epics if none are selected if not st.session_state.selected_epics and epic_data: st.session_state.selected_epics = [epic["Epic"] for epic in epic_data[:5]] # Top 5 epics # Create tabs for different analysis views tab1, tab2, tab3 = st.tabs([ "📊 Team & Epic Analysis", "💻 Upskilling Technology Analysis", "📋 View & Download CSV Data" ]) with tab1: st.session_state.active_tab = 'team_analysis' display_team_epic_analysis(team_data, epic_data, monthly_data, unique_epics) with tab2: st.session_state.active_tab = 'tech_analysis' display_tech_category_analysis(team_data, tech_category_data, upskilling_count) with tab3: st.session_state.active_tab = 'csv_view' display_categorized_data_view(st.session_state.categorized_df) if __name__ == "__main__": main()