Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| import re | |
| from datetime import datetime | |
| import os | |
| import shutil | |
| from pathlib import Path | |
| import worklog_categorizer as wc | |
| import time | |
| import base64 | |
| from io import BytesIO | |
| # Set page configuration | |
| st.set_page_config(layout="wide", page_title="Non-Billable Time Analysis", page_icon="π") | |
| # Define colors to match the React implementation | |
| COLORS = ['#0088FE', '#00C49F', '#FFBB28', '#FF8042', '#8884d8', '#82ca9d', '#ffc658', | |
| '#8dd1e1', '#a4de6c', '#d0ed57', '#bc80bd', '#ccebc5', '#ffed6f', '#bebada', | |
| '#fb8072', '#80b1d3', '#fdb462', '#b3de69'] | |
| # Initialize session state variables if they don't exist | |
| if 'initialized' not in st.session_state: | |
| st.session_state.initialized = True | |
| st.session_state.processed_data = None | |
| st.session_state.expanded_user = None | |
| st.session_state.sort_by = 'totalHours' | |
| st.session_state.sort_order = 'desc' | |
| st.session_state.selected_epics = [] | |
| st.session_state.active_tab = 'team_analysis' | |
| st.session_state.tech_user_filter = "" | |
| st.session_state.categorized_df = None | |
| st.session_state.show_csv_data = False | |
| st.session_state.needs_rerun = False | |
| st.session_state.has_generated_results = False | |
| def extract_month(date_range): | |
| """Extract month from date ranges""" | |
| if not isinstance(date_range, str): | |
| return None | |
| date_match = re.match(r'^(\d+)/(\w+)/(\d+) to', date_range) | |
| if date_match: | |
| return date_match.group(2) | |
| single_date_match = re.match(r'^(\d+)/(\w+)/(\d+) at', date_range) | |
| if single_date_match: | |
| return single_date_match.group(2) | |
| return None | |
| def get_row_level(row): | |
| """Parse level in the data hierarchy""" | |
| if pd.notna(row.get("Project Category")) and pd.isna(row.get("Project")): | |
| return "category" | |
| if pd.notna(row.get("Project")) and pd.isna(row.get("User")): | |
| return "project" | |
| if pd.notna(row.get("User")) and pd.isna(row.get("Epic")): | |
| return "user" | |
| if pd.notna(row.get("Epic")) and pd.isna(row.get("Issue")): | |
| return "epic" | |
| if pd.notna(row.get("Issue")) and pd.isna(row.get("Worklog")): | |
| return "issue" | |
| if pd.notna(row.get("Worklog")): | |
| return "worklog" | |
| return "unknown" | |
| def clear_session_and_cache(): | |
| """Reset the application by clearing cache and session state""" | |
| # Clear all cached data | |
| st.cache_data.clear() | |
| # Remove data files if they exist | |
| try: | |
| if os.path.exists("categorized_data.csv"): | |
| os.remove("categorized_data.csv") | |
| if os.path.exists("uploaded_data.csv"): | |
| os.remove("uploaded_data.csv") | |
| except Exception as e: | |
| st.error(f"Error removing files: {e}") | |
| # Reset all important session state variables | |
| st.session_state.processed_data = None | |
| st.session_state.expanded_user = None | |
| st.session_state.sort_by = 'totalHours' | |
| st.session_state.sort_order = 'desc' | |
| st.session_state.selected_epics = [] | |
| st.session_state.active_tab = 'team_analysis' | |
| st.session_state.tech_user_filter = "" | |
| st.session_state.categorized_df = None | |
| st.session_state.show_csv_data = False | |
| st.session_state.has_generated_results = False | |
| # Mark that we need to rerun after clearing | |
| st.session_state.needs_rerun = True | |
| def save_categorized_data(df, filename="categorized_data.csv"): | |
| """Save the categorized data to a CSV file""" | |
| try: | |
| df.to_csv(filename, index=False) | |
| return True | |
| except Exception as e: | |
| st.error(f"Error saving categorized data: {e}") | |
| return False | |
| def process_data(raw_data, force_categorize=False, focus_users=None): | |
| """Process the data and return various aggregations (cached to prevent reprocessing)""" | |
| # Filter for non-billable data | |
| non_billable_data = raw_data[raw_data["Project Category"] == "Non-Billable"].copy() | |
| # Add level and month info | |
| non_billable_data["Level"] = non_billable_data.apply(get_row_level, axis=1) | |
| non_billable_data["Month"] = non_billable_data["Date"].apply(extract_month) | |
| # Check if we need to categorize data | |
| if "TechCategory" not in non_billable_data.columns or force_categorize: | |
| # Process tech categories for upskilling worklog entries | |
| with st.spinner("Categorizing upskilling worklog entries by technology..."): | |
| # If specific users are provided, prioritize their worklog categorization | |
| if focus_users and len(focus_users) > 0: | |
| st.info(f"Focusing on worklog categorization for {len(focus_users)} selected users") | |
| # First, process focus users | |
| focus_mask = non_billable_data["User"].isin(focus_users) | |
| focus_data = non_billable_data[focus_mask].copy() | |
| if not focus_data.empty: | |
| # Process worklogs for focus users first | |
| focus_data = wc.process_dataframe( | |
| focus_data, | |
| worklog_column="Worklog", | |
| issue_column="Issue", | |
| default_category="N/A", | |
| batch_size=10, | |
| pause_seconds=2, # Shorter pause for focus users | |
| show_progress=True | |
| ) | |
| # Update the categorized data for focus users | |
| non_billable_data.loc[focus_mask, "TechCategory"] = focus_data["TechCategory"] | |
| # Process all remaining data | |
| non_billable_data = wc.process_dataframe( | |
| non_billable_data, | |
| worklog_column="Worklog", | |
| issue_column="Issue", | |
| default_category="N/A", | |
| batch_size=10, | |
| pause_seconds=5, | |
| show_progress=True | |
| ) | |
| # Save the categorized data | |
| save_path = "categorized_data.csv" | |
| if save_categorized_data(non_billable_data, save_path): | |
| st.success(f"Saved categorized data to {save_path}") | |
| # Store the categorized DataFrame for download | |
| st.session_state.categorized_df = non_billable_data | |
| # Process derived data | |
| team_data = process_team_members(non_billable_data) | |
| epic_data = process_top_epics(non_billable_data) | |
| monthly_data = process_monthly_data(non_billable_data) | |
| tech_category_data = process_tech_categories(non_billable_data) | |
| epics = sorted(non_billable_data["Epic"].dropna().unique()) | |
| # Count upskilling entries | |
| upskilling_mask = non_billable_data["Issue"].apply(wc.is_upskilling_issue) | |
| upskilling_count = upskilling_mask.sum() | |
| # Get all unique users | |
| unique_users = sorted(non_billable_data["User"].dropna().unique()) | |
| return { | |
| 'non_billable_data': non_billable_data, | |
| 'team_data': team_data, | |
| 'epic_data': epic_data, | |
| 'monthly_data': monthly_data, | |
| 'unique_epics': epics, | |
| 'unique_users': unique_users, | |
| 'tech_category_data': tech_category_data, | |
| 'upskilling_count': upskilling_count | |
| } | |
| def process_tech_categories(data): | |
| """Process data to get tech category breakdown for upskilling entries""" | |
| # Filter for rows with tech categories | |
| tech_data = data[data["TechCategory"] != "N/A"].copy() | |
| if tech_data.empty: | |
| return { | |
| "overall": [], | |
| "by_user": {}, | |
| "by_month": [] | |
| } | |
| # Overall tech category breakdown | |
| overall = tech_data.groupby("TechCategory")["Logged"].sum().reset_index() | |
| overall = overall.sort_values("Logged", ascending=False) | |
| overall.columns = ["Category", "Hours"] | |
| # Tech category by user | |
| by_user = {} | |
| for user in tech_data["User"].dropna().unique(): | |
| user_data = tech_data[tech_data["User"] == user] | |
| user_categories = user_data.groupby("TechCategory")["Logged"].sum().reset_index() | |
| user_categories = user_categories.sort_values("Logged", ascending=False) | |
| user_categories.columns = ["Category", "Hours"] | |
| by_user[user] = user_categories.to_dict('records') | |
| # Tech category by month | |
| month_order = ['Nov', 'Dec', 'Jan', 'Feb', 'Mar'] | |
| by_month = [] | |
| for month in month_order: | |
| month_data = tech_data[tech_data["Month"] == month] | |
| if not month_data.empty: | |
| month_categories = month_data.groupby("TechCategory")["Logged"].sum().reset_index() | |
| month_categories = month_categories.sort_values("Logged", ascending=False) | |
| month_categories.columns = ["Category", "Hours"] | |
| by_month.append({ | |
| "Month": month, | |
| "Categories": month_categories.to_dict('records') | |
| }) | |
| return { | |
| "overall": overall.to_dict('records'), | |
| "by_user": by_user, | |
| "by_month": by_month | |
| } | |
| def process_team_members(data): | |
| """Process data to get team member breakdown""" | |
| # Get unique users | |
| unique_users = data[data["Level"] == "user"]["User"].dropna().unique() | |
| # Process data for each user | |
| team_data = [] | |
| month_order = ['Nov', 'Dec', 'Jan', 'Feb', 'Mar'] | |
| for user in unique_users: | |
| user_data = data[data["User"] == user] | |
| # Get user total hours | |
| user_total_row = user_data[user_data["Level"] == "user"] | |
| user_total = user_total_row["Logged"].iloc[0] if not user_total_row.empty else 0 | |
| # Skip users with zero hours | |
| if user_total == 0: | |
| continue | |
| # Get epic breakdown | |
| epic_breakdown = [] | |
| for _, row in user_data[user_data["Level"] == "epic"].iterrows(): | |
| epic_breakdown.append({ | |
| "Epic": row["Epic"] if pd.notna(row["Epic"]) else "No Epic", | |
| "Hours": row["Logged"] if pd.notna(row["Logged"]) else 0, | |
| "Project": row["Project"] if pd.notna(row["Project"]) else "No Project", | |
| "Month": row["Month"] | |
| }) | |
| # Get tech categories for this user (upskilling only) | |
| tech_categories = [] | |
| upskilling_rows = user_data[user_data["TechCategory"] != "N/A"] | |
| for _, row in upskilling_rows.iterrows(): | |
| tech_categories.append({ | |
| "TechCategory": row["TechCategory"], | |
| "Hours": row["Logged"] if pd.notna(row["Logged"]) else 0, | |
| "Issue": row["Issue"] if pd.notna(row["Issue"]) else "No Issue", | |
| "Worklog": row["Worklog"] if pd.notna(row["Worklog"]) else "No Worklog", | |
| "Month": row["Month"] | |
| }) | |
| # Get upskilling issues for this user | |
| upskilling_issues = [] | |
| for issue in upskilling_rows["Issue"].unique(): | |
| issue_data = upskilling_rows[upskilling_rows["Issue"] == issue] | |
| upskilling_issues.append({ | |
| "Issue": issue, | |
| "Hours": issue_data["Logged"].sum(), | |
| "TechCategories": [str(cat) for cat in issue_data["TechCategory"].unique().tolist()] | |
| }) | |
| # Get monthly breakdown | |
| monthly_breakdown = {} | |
| for month in month_order: | |
| month_data = [item for item in epic_breakdown if item["Month"] == month] | |
| total = sum(item["Hours"] for item in month_data) | |
| if total > 0: | |
| epic_hours = {} | |
| for item in month_data: | |
| epic = item["Epic"] | |
| hours = item["Hours"] | |
| epic_hours[epic] = epic_hours.get(epic, 0) + hours | |
| monthly_breakdown[month] = { | |
| "total": total, | |
| "epics": epic_hours | |
| } | |
| team_data.append({ | |
| "User": user, | |
| "TotalHours": user_total, | |
| "EpicBreakdown": epic_breakdown, | |
| "TechCategories": tech_categories, | |
| "UpskillIssues": upskilling_issues, | |
| "MonthlyData": monthly_breakdown | |
| }) | |
| # Sort by total hours | |
| team_data.sort(key=lambda x: x["TotalHours"], reverse=True) | |
| return team_data | |
| def process_top_epics(data): | |
| """Process epic data to get hours by epic""" | |
| # Filter epic rows | |
| epic_rows = data[data["Level"] == "epic"] | |
| # Group by epic and sum hours | |
| epic_hours = epic_rows.groupby( | |
| epic_rows["Epic"].fillna("No Epic") | |
| )["Logged"].sum().reset_index() | |
| # Rename columns | |
| epic_hours.columns = ["Epic", "Hours"] | |
| # Sort by hours | |
| epic_hours = epic_hours.sort_values("Hours", ascending=False) | |
| return epic_hours.to_dict('records') | |
| def process_monthly_data(data): | |
| """Process data to get monthly totals""" | |
| # Filter epic rows with month data | |
| monthly_rows = data[(data["Level"] == "epic") & (data["Month"].notna())] | |
| # Group by month and sum hours | |
| monthly_hours = monthly_rows.groupby("Month")["Logged"].sum().reset_index() | |
| # Rename columns | |
| monthly_hours.columns = ["Month", "Hours"] | |
| # Sort by custom month order | |
| month_order = ['Nov', 'Dec', 'Jan', 'Feb', 'Mar'] | |
| monthly_hours["MonthOrder"] = monthly_hours["Month"].apply(lambda x: month_order.index(x) if x in month_order else 999) | |
| monthly_hours = monthly_hours.sort_values("MonthOrder") | |
| monthly_hours = monthly_hours.drop("MonthOrder", axis=1) | |
| return monthly_hours.to_dict('records') | |
| def format_hours(hours): | |
| """Format hours for display""" | |
| if hours == 0: | |
| return "-" | |
| return f"{hours:.1f}" | |
| def get_filtered_data(team_data, search_term, selected_month, sort_by, sort_order, tech_category_filter=None): | |
| """Filter and sort team data based on current selections""" | |
| filtered_data = team_data.copy() | |
| # Apply search filter | |
| if search_term: | |
| filtered_data = [item for item in filtered_data if search_term.lower() in item["User"].lower()] | |
| # Apply tech category filter | |
| if tech_category_filter and tech_category_filter != "All": | |
| filtered_data = [ | |
| item for item in filtered_data | |
| if any(tc["TechCategory"] == tech_category_filter for tc in item["TechCategories"]) | |
| ] | |
| # Adjust total hours based on the tech category filter | |
| for item in filtered_data: | |
| tech_hours = sum(tc["Hours"] for tc in item["TechCategories"] if tc["TechCategory"] == tech_category_filter) | |
| item["FilteredTechHours"] = tech_hours | |
| # Apply month filter | |
| if selected_month != "All": | |
| filtered_data = [ | |
| item for item in filtered_data | |
| if item.get("MonthlyData", {}).get(selected_month) | |
| ] | |
| # Adjust hours for selected month | |
| for item in filtered_data: | |
| monthly_data = item["MonthlyData"][selected_month] | |
| item["TotalHours"] = monthly_data["total"] | |
| item["EpicBreakdown"] = [ | |
| epic for epic in item["EpicBreakdown"] | |
| if epic["Month"] == selected_month | |
| ] | |
| item["TechCategories"] = [ | |
| tc for tc in item["TechCategories"] | |
| if tc["Month"] == selected_month | |
| ] | |
| # Determine if we want ascending or descending | |
| reverse_sort = (sort_order == "desc") | |
| # Sort the data | |
| if sort_by == "name": | |
| filtered_data.sort(key=lambda x: x["User"], reverse=reverse_sort) | |
| elif sort_by == "totalHours": | |
| if tech_category_filter and tech_category_filter != "All": | |
| filtered_data.sort(key=lambda x: x.get("FilteredTechHours", 0), reverse=reverse_sort) | |
| else: | |
| filtered_data.sort(key=lambda x: x["TotalHours"], reverse=reverse_sort) | |
| else: | |
| # Sort by specific epic | |
| filtered_data.sort( | |
| key=lambda x: sum(e["Hours"] for e in x["EpicBreakdown"] if e["Epic"] == sort_by), | |
| reverse=reverse_sort | |
| ) | |
| return filtered_data | |
| def get_epic_totals(filtered_data, unique_epics): | |
| """Calculate total hours by epic for filtered data""" | |
| totals = {epic: 0 for epic in unique_epics} | |
| for user in filtered_data: | |
| for epic in user["EpicBreakdown"]: | |
| totals[epic["Epic"]] = totals.get(epic["Epic"], 0) + epic["Hours"] | |
| return totals | |
| def get_user_chart_data(user_data, selected_month): | |
| """Get chart data for a specific user""" | |
| # Combine hours by epic | |
| epic_totals = {} | |
| for epic in user_data["EpicBreakdown"]: | |
| if selected_month == "All" or epic["Month"] == selected_month: | |
| epic_name = epic["Epic"] | |
| epic_totals[epic_name] = epic_totals.get(epic_name, 0) + epic["Hours"] | |
| # Convert to array and sort | |
| return [ | |
| {"name": name, "value": value} | |
| for name, value in epic_totals.items() | |
| ] | |
| def get_user_tech_categories(user_data, selected_month, min_percentage=1.0): | |
| """Get tech category data for a specific user focusing on actual technology categories""" | |
| # Combine hours by tech category | |
| tech_totals = {} | |
| total_hours = 0 | |
| # First try to process TechCategories from the user data | |
| for tech in user_data["TechCategories"]: | |
| if selected_month == "All" or tech["Month"] == selected_month: | |
| category = tech["TechCategory"] | |
| # Skip 'nan' or empty categories | |
| if pd.isna(category) or category in ["nan", "null", "", None, "N/A"]: | |
| continue | |
| hours = tech["Hours"] | |
| tech_totals[category] = tech_totals.get(category, 0) + hours | |
| total_hours += hours | |
| # Filter categories below the minimum percentage threshold | |
| if total_hours > 0: | |
| tech_totals = { | |
| k: v for k, v in tech_totals.items() | |
| if (v / total_hours * 100) >= min_percentage | |
| } | |
| # Convert to array and sort by hours (value) | |
| return [ | |
| {"name": name, "value": value} | |
| for name, value in sorted(tech_totals.items(), key=lambda x: x[1], reverse=True) | |
| ] | |
| def get_user_monthly_data(user_data): | |
| """Get monthly data for a specific user""" | |
| month_order = ['Nov', 'Dec', 'Jan', 'Feb', 'Mar'] | |
| return [ | |
| { | |
| "month": month, | |
| "hours": user_data["MonthlyData"].get(month, {}).get("total", 0) | |
| } | |
| for month in month_order | |
| ] | |
| def get_user_tech_issues(user_data, tech_category, selected_month="All"): | |
| """Get issues associated with a specific tech category for a user""" | |
| issues = [] | |
| for tech in user_data["TechCategories"]: | |
| if tech["TechCategory"] == tech_category: | |
| if selected_month == "All" or tech["Month"] == selected_month: | |
| # Check if this issue is already in the list | |
| existing = next((i for i in issues if i["issue"] == tech["Issue"]), None) | |
| if existing: | |
| existing["hours"] += tech["Hours"] | |
| else: | |
| issues.append({ | |
| "issue": tech["Issue"], | |
| "worklog": tech["Worklog"], | |
| "hours": tech["Hours"], | |
| "month": tech["Month"] | |
| }) | |
| # Sort by hours | |
| return sorted(issues, key=lambda x: x["hours"], reverse=True) | |
| def display_categorized_data_view(df): | |
| """Display a view of the categorized data with filtering options""" | |
| st.header("View Categorized Data") | |
| if df is None: | |
| st.warning("No categorized data available. Please upload and process a CSV file first.") | |
| return | |
| # Add filters for the data view | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| # Filter by user | |
| users = sorted(df["User"].dropna().unique()) | |
| selected_user = st.selectbox("Filter by User:", ["All Users"] + list(users)) | |
| with col2: | |
| # Filter by upskilling issues only | |
| show_upskilling_only = st.checkbox("Show Upskilling Issues Only", value=True) | |
| with col3: | |
| # Filter by tech category | |
| tech_categories = sorted(df["TechCategory"].dropna().unique()) | |
| tech_categories = [cat for cat in tech_categories if cat != "N/A"] | |
| selected_tech = st.selectbox("Filter by Technology:", ["All Technologies"] + tech_categories) | |
| # Apply filters | |
| filtered_df = df.copy() | |
| if selected_user != "All Users": | |
| filtered_df = filtered_df[filtered_df["User"] == selected_user] | |
| if show_upskilling_only: | |
| filtered_df = filtered_df[filtered_df["Issue"].apply(wc.is_upskilling_issue)] | |
| if selected_tech != "All Technologies": | |
| filtered_df = filtered_df[filtered_df["TechCategory"] == selected_tech] | |
| # Only show worklog level rows | |
| filtered_df = filtered_df[filtered_df["Level"] == "worklog"] | |
| # Select relevant columns to display | |
| display_columns = ["User", "Issue", "Worklog", "TechCategory", "Logged", "Month"] | |
| # Display the filtered data | |
| if filtered_df.empty: | |
| st.info("No data matches the selected filters.") | |
| else: | |
| st.write(f"Showing {len(filtered_df)} records. Use the filters above to narrow down the results.") | |
| # Rename columns for display | |
| display_df = filtered_df[display_columns].copy() | |
| display_df.columns = ["User", "Issue", "Worklog", "Technology", "Hours", "Month"] | |
| # Sort by user and hours | |
| display_df = display_df.sort_values(["User", "Hours"], ascending=[True, False]) | |
| # Display as a table | |
| st.dataframe(display_df, use_container_width=True) | |
| # Add CSV download button | |
| st.download_button( | |
| label="Download Filtered CSV", | |
| data=filtered_df.to_csv(index=False).encode('utf-8'), | |
| file_name="filtered_upskilling_data.csv", | |
| mime="text/csv", | |
| ) | |
| # Download full categorized dataset | |
| st.download_button( | |
| label="Download Complete Categorized CSV", | |
| data=df.to_csv(index=False).encode('utf-8'), | |
| file_name="full_categorized_data.csv", | |
| mime="text/csv", | |
| ) | |
| def display_tech_category_analysis(team_data, tech_category_data, upskilling_count): | |
| """Display tech category analysis section for upskilling issues""" | |
| st.header("Upskilling Technology Analysis") | |
| # Info about upskilling issues | |
| st.info(f"Found {upskilling_count} upskilling-related entries in the data. Technology categories shown below represent only upskilling activities.") | |
| # User filter for upskilling tech analysis with session state | |
| tech_user_filter = st.text_input( | |
| "Filter by Team Member Name:", | |
| key="tech_user_filter_input" | |
| ) | |
| # Apply filter without reprocessing data | |
| filtered_team_data = team_data | |
| if tech_user_filter: | |
| filtered_team_data = [user for user in team_data if tech_user_filter.lower() in user["User"].lower()] | |
| if not filtered_team_data: | |
| st.warning(f"No team members found matching '{tech_user_filter}'") | |
| else: | |
| st.success(f"Showing data for {len(filtered_team_data)} team members matching '{tech_user_filter}'") | |
| # Filter to show only users with actual upskilling data | |
| upskilling_team_data = [ | |
| user for user in filtered_team_data | |
| if any(tech["TechCategory"] not in ["nan", "null", "", None, "N/A"] for tech in user["TechCategories"]) | |
| ] | |
| # If no tech categories found | |
| if not tech_category_data["overall"] or not upskilling_team_data: | |
| st.warning("No technology categories found in upskilling data. This could be because there are no upskilling worklog entries or the categorization process failed.") | |
| return | |
| # Add overall tech category chart | |
| st.subheader("Overall Technology Distribution in Upskilling") | |
| # Convert to DataFrame for Plotly | |
| overall_df = pd.DataFrame(tech_category_data["overall"]) | |
| # Filter out nan/null values from overall tech categories | |
| overall_df = overall_df[~overall_df["Category"].isin(["nan", "null", "", "N/A"])].copy() | |
| if not overall_df.empty: | |
| fig_tech = px.pie( | |
| overall_df, | |
| values="Hours", | |
| names="Category", | |
| color_discrete_sequence=COLORS, | |
| title="Hours by Technology Category in Upskilling Activities" | |
| ) | |
| fig_tech.update_traces( | |
| textposition='inside', | |
| textinfo='percent+label', | |
| hovertemplate='%{label}: %{value:.1f} hours (%{percent})' | |
| ) | |
| st.plotly_chart(fig_tech, use_container_width=True) | |
| # Show table of top categories | |
| st.subheader("Top Technology Categories in Upskilling") | |
| top_tech_df = overall_df.head(10).copy() | |
| top_tech_df["Hours"] = top_tech_df["Hours"].map(lambda x: f"{x:.1f}") | |
| st.dataframe(top_tech_df, use_container_width=True) | |
| # Tech category filters | |
| st.subheader("Team Member Analysis by Technology") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| # Get all unique tech categories (excluding nan/null) | |
| all_categories = [ | |
| item["Category"] for item in tech_category_data["overall"] | |
| if item["Category"] not in ["nan", "null", "", "N/A"] | |
| ] | |
| tech_filter_options = ["All"] + all_categories | |
| if "selected_tech" not in st.session_state: | |
| st.session_state.selected_tech = "All" | |
| selected_tech = st.selectbox( | |
| "Filter by Technology Category:", | |
| options=tech_filter_options, | |
| key="tech_category_selector" | |
| ) | |
| with col2: | |
| users_count = len(upskilling_team_data) # Use filtered upskilling users count | |
| default_value = min(5, max(1, users_count)) | |
| if users_count <= 1: | |
| st.write(f"Showing data for {users_count} user") | |
| min_users = users_count | |
| else: | |
| min_users = st.slider( | |
| "Minimum users to display:", | |
| min_value=1, | |
| max_value=max(2, users_count), | |
| value=default_value, | |
| key="min_users_slider" | |
| ) | |
| # Filter team data by tech category | |
| if selected_tech != "All": | |
| tech_filtered_data = [ | |
| user for user in upskilling_team_data | |
| if any(tc["TechCategory"] == selected_tech for tc in user["TechCategories"]) | |
| ] | |
| # Calculate hours for each user in this tech category | |
| for user in tech_filtered_data: | |
| user["TechHours"] = sum( | |
| tc["Hours"] for tc in user["TechCategories"] | |
| if tc["TechCategory"] == selected_tech | |
| ) | |
| # Sort by tech category hours | |
| tech_filtered_data.sort(key=lambda x: x.get("TechHours", 0), reverse=True) | |
| # Create bar chart of users by tech hours | |
| if tech_filtered_data: | |
| # Take top users by hours in this category | |
| top_users = tech_filtered_data[:min_users] | |
| user_tech_df = pd.DataFrame([ | |
| {"User": user["User"], "Hours": user["TechHours"]} | |
| for user in top_users | |
| ]) | |
| fig_users = px.bar( | |
| user_tech_df, | |
| x="User", | |
| y="Hours", | |
| title=f"Top Users for {selected_tech} in Upskilling", | |
| color_discrete_sequence=['#8884d8'] | |
| ) | |
| st.plotly_chart(fig_users, use_container_width=True) | |
| # Team member breakdown | |
| st.subheader(f"Team Members Upskilling in {selected_tech}") | |
| for i, user in enumerate(tech_filtered_data): | |
| with st.expander(f"{user['User']} - {format_hours(user['TechHours'])} hours"): | |
| # Get issues for this user in this tech category | |
| issues = get_user_tech_issues(user, selected_tech) | |
| if issues: | |
| st.write(f"### Upskilling Issues for {user['User']} in {selected_tech}") | |
| # Create an issues table | |
| issues_df = pd.DataFrame([ | |
| { | |
| "Issue": issue["issue"], | |
| "Worklog": issue["worklog"], | |
| "Hours": format_hours(issue["hours"]), | |
| "Month": issue["month"] | |
| } | |
| for issue in issues | |
| ]) | |
| st.dataframe(issues_df, use_container_width=True) | |
| else: | |
| st.write("No detailed issue information available.") | |
| else: | |
| st.info(f"No team members found upskilling in {selected_tech}.") | |
| else: | |
| # Show overall tech distribution by team member | |
| st.subheader("Technology Distribution by Team Member (Upskilling Only)") | |
| # Display upskilling users only | |
| if not upskilling_team_data: | |
| st.info("No team members found with upskilling entries.") | |
| return | |
| # Show top users based on selection | |
| display_users = upskilling_team_data[:min_users] | |
| # Create tabs for each team member | |
| tabs = st.tabs([user["User"] for user in display_users]) | |
| for i, tab in enumerate(tabs): | |
| user = display_users[i] | |
| with tab: | |
| # Get tech categories for this user | |
| user_tech = get_user_tech_categories(user, "All") | |
| if user_tech: | |
| # Convert to DataFrame for Plotly | |
| user_tech_df = pd.DataFrame(user_tech) | |
| fig_user_tech = px.pie( | |
| user_tech_df, | |
| values="value", | |
| names="name", | |
| color_discrete_sequence=COLORS, | |
| title=f"Upskilling Technology Distribution for {user['User']}" | |
| ) | |
| fig_user_tech.update_traces( | |
| textposition='inside', | |
| textinfo='percent+label', | |
| hovertemplate='%{label}: %{value:.1f} hours (%{percent})' | |
| ) | |
| st.plotly_chart(fig_user_tech, use_container_width=True) | |
| # Show breakdown of upskilling issues | |
| if user["UpskillIssues"]: | |
| st.subheader(f"Upskilling Issues for {user['User']}") | |
| # Filter out issues with only nan values | |
| valid_issues = [ | |
| issue for issue in user["UpskillIssues"] | |
| if any(tech not in ["nan", "null", "", None, "N/A"] for tech in issue["TechCategories"]) | |
| ] | |
| if valid_issues: | |
| issues_df = pd.DataFrame([ | |
| { | |
| "Issue": issue["Issue"], | |
| "Hours": format_hours(issue["Hours"]), | |
| "Technologies": ", ".join([ | |
| str(tech) for tech in issue["TechCategories"] | |
| if tech not in ["nan", "null", "", None, "N/A"] | |
| ]) | |
| } | |
| for issue in sorted(valid_issues, key=lambda x: x["Hours"], reverse=True) | |
| ]) | |
| st.dataframe(issues_df, use_container_width=True) | |
| else: | |
| st.info("No upskilling issues with valid technology categories found.") | |
| else: | |
| st.info(f"No upskilling technology categories found for {user['User']}.") | |
| def display_team_epic_analysis(team_data, epic_data, monthly_data, unique_epics): | |
| """Display team and epic analysis section""" | |
| # Create filters sidebar | |
| st.sidebar.title("Filters") | |
| # Team Member Filters | |
| st.sidebar.header("Team Member Filters") | |
| search_term = st.sidebar.text_input("Search by Name:", "") | |
| display_count_options = [10, 20, 50] | |
| if len(team_data) > 50: | |
| display_count_options.append(len(team_data)) | |
| display_count = st.sidebar.selectbox( | |
| "Team Members to Display:", | |
| options=display_count_options, | |
| format_func=lambda x: f"All ({len(team_data)})" if x == len(team_data) else str(x), | |
| index=1 # Default to 20 | |
| ) | |
| month_options = ["All"] + ['Nov', 'Dec', 'Jan', 'Feb', 'Mar'] | |
| selected_month = st.sidebar.selectbox( | |
| "Month:", | |
| options=month_options, | |
| index=0 # Default to "All" | |
| ) | |
| # Epic Filters | |
| st.sidebar.header("Epic Filters") | |
| epic_col1, epic_col2, epic_col3 = st.sidebar.columns(3) | |
| with epic_col1: | |
| if st.button("Select All"): | |
| st.session_state.selected_epics = [epic["Epic"] for epic in epic_data] | |
| with epic_col2: | |
| if st.button("Clear All"): | |
| st.session_state.selected_epics = [] | |
| with epic_col3: | |
| if st.button("Top 5 Epics"): | |
| st.session_state.selected_epics = [epic["Epic"] for epic in epic_data[:5]] | |
| # Epic selection | |
| st.sidebar.subheader("Select Epics") | |
| # Create a scrollable container for epics | |
| epic_container = st.sidebar.container() | |
| with epic_container: | |
| for i, epic in enumerate(epic_data): | |
| epic_name = epic["Epic"] | |
| epic_hours = epic["Hours"] | |
| epic_color = COLORS[i % len(COLORS)] | |
| # Use checkbox for each epic | |
| checked = st.checkbox( | |
| f"{epic_name} ({int(epic_hours)}h)", | |
| value=epic_name in st.session_state.selected_epics, | |
| key=f"epic_{i}" | |
| ) | |
| # Update selected epics based on checkbox state | |
| if checked and epic_name not in st.session_state.selected_epics: | |
| st.session_state.selected_epics.append(epic_name) | |
| elif not checked and epic_name in st.session_state.selected_epics: | |
| st.session_state.selected_epics.remove(epic_name) | |
| # Monthly Overview Chart | |
| st.header("Monthly Non-Billable Hours Overview") | |
| # Convert to DataFrame for Plotly | |
| monthly_df = pd.DataFrame(monthly_data) | |
| if not monthly_df.empty: | |
| fig_monthly = px.bar( | |
| monthly_df, | |
| x="Month", | |
| y="Hours", | |
| title="", | |
| labels={"Hours": "Non-Billable Hours", "Month": "Month"}, | |
| color_discrete_sequence=['#8884d8'] | |
| ) | |
| fig_monthly.update_layout( | |
| plot_bgcolor='white', | |
| margin=dict(l=20, r=30, t=10, b=20), | |
| ) | |
| st.plotly_chart(fig_monthly, use_container_width=True) | |
| # Team Members Table | |
| st.header("Team Member Breakdown") | |
| st.write("Click on a team member to see their detailed breakdown.") | |
| # Sorting controls | |
| sort_col1, sort_col2 = st.columns(2) | |
| with sort_col1: | |
| sort_options = ["totalHours", "name"] + st.session_state.selected_epics | |
| sort_labels = { | |
| "totalHours": "Total Hours", | |
| "name": "Name" | |
| } | |
| for epic in st.session_state.selected_epics: | |
| sort_labels[epic] = epic | |
| new_sort_by = st.selectbox( | |
| "Sort by:", | |
| options=sort_options, | |
| format_func=lambda x: sort_labels[x], | |
| index=sort_options.index(st.session_state.sort_by) | |
| ) | |
| if new_sort_by != st.session_state.sort_by: | |
| st.session_state.sort_by = new_sort_by | |
| with sort_col2: | |
| sort_order_options = ["desc", "asc"] | |
| sort_order_labels = {"desc": "Descending", "asc": "Ascending"} | |
| new_sort_order = st.selectbox( | |
| "Order:", | |
| options=sort_order_options, | |
| format_func=lambda x: sort_order_labels[x], | |
| index=sort_order_options.index(st.session_state.sort_order) | |
| ) | |
| if new_sort_order != st.session_state.sort_order: | |
| st.session_state.sort_order = new_sort_order | |
| # Get filtered and sorted data | |
| filtered_team_data = get_filtered_data( | |
| team_data, | |
| search_term, | |
| selected_month, | |
| st.session_state.sort_by, | |
| st.session_state.sort_order | |
| ) | |
| # Apply display count | |
| table_data = filtered_team_data[:display_count] | |
| # Calculate epic totals | |
| epic_totals = get_epic_totals(table_data, unique_epics) | |
| # Create the table | |
| if not table_data: | |
| st.warning("No data matches your filters. Try adjusting your search criteria.") | |
| else: | |
| # Create table header | |
| header_cols = ["Team Member", "Total Hours"] + st.session_state.selected_epics | |
| header_col_sizes = [3] + [2] * (len(header_cols) - 1) | |
| # Create a styled header row | |
| header_row = st.columns(header_col_sizes) | |
| with header_row[0]: | |
| sort_icon = "βΌ" if st.session_state.sort_by == "name" and st.session_state.sort_order == "desc" else "β²" if st.session_state.sort_by == "name" and st.session_state.sort_order == "asc" else "" | |
| st.markdown(f"**Team Member {sort_icon}**") | |
| with header_row[1]: | |
| sort_icon = "βΌ" if st.session_state.sort_by == "totalHours" and st.session_state.sort_order == "desc" else "β²" if st.session_state.sort_by == "totalHours" and st.session_state.sort_order == "asc" else "" | |
| st.markdown(f"**Total Hours {sort_icon}**") | |
| for i, epic in enumerate(st.session_state.selected_epics): | |
| with header_row[i+2]: | |
| sort_icon = "βΌ" if st.session_state.sort_by == epic and st.session_state.sort_order == "desc" else "β²" if st.session_state.sort_by == epic and st.session_state.sort_order == "asc" else "" | |
| st.markdown(f"**{epic} {sort_icon}**") | |
| # Display each team member as a row | |
| for user_idx, user in enumerate(table_data): | |
| # Create a container for each row | |
| with st.container(): | |
| # Use columns for table cells | |
| row_cols = st.columns(header_col_sizes) | |
| # User name cell - clickable to expand | |
| with row_cols[0]: | |
| is_expanded = st.session_state.expanded_user == user["User"] | |
| expand_icon = "π½" if is_expanded else "πΌ" | |
| if st.button(f"{user['User']} {expand_icon}", key=f"user_btn_{user_idx}"): | |
| if is_expanded: | |
| st.session_state.expanded_user = None | |
| else: | |
| st.session_state.expanded_user = user["User"] | |
| # Total hours cell | |
| with row_cols[1]: | |
| st.write(format_hours(user["TotalHours"])) | |
| # Epic hours cells | |
| for i, epic in enumerate(st.session_state.selected_epics): | |
| with row_cols[i+2]: | |
| epic_hours = sum(e["Hours"] for e in user["EpicBreakdown"] if e["Epic"] == epic) | |
| st.write(format_hours(epic_hours)) | |
| # Expanded user detail | |
| if st.session_state.expanded_user == user["User"]: | |
| with st.expander("", expanded=True): | |
| st.subheader(f"{user['User']} - Detailed Breakdown") | |
| # Create tabs for different views | |
| user_tab1, user_tab2 = st.tabs(["Epic Distribution", "Upskilling Technologies"]) | |
| with user_tab1: | |
| # Create two columns for charts | |
| chart_col1, chart_col2 = st.columns(2) | |
| # Epic Distribution Chart | |
| with chart_col1: | |
| st.markdown("#### Epic Distribution") | |
| # Get chart data | |
| user_chart_data = get_user_chart_data(user, selected_month) | |
| if user_chart_data: | |
| # Sort by value | |
| user_chart_data.sort(key=lambda x: x["value"], reverse=True) | |
| # Create DataFrame | |
| epic_df = pd.DataFrame(user_chart_data) | |
| fig_pie = px.pie( | |
| epic_df, | |
| values="value", | |
| names="name", | |
| color_discrete_sequence=COLORS, | |
| ) | |
| fig_pie.update_traces( | |
| textposition='inside', | |
| textinfo='percent+label', | |
| hovertemplate='%{label}: %{value:.1f} hours (%{percent})' | |
| ) | |
| fig_pie.update_layout( | |
| height=400, | |
| margin=dict(l=10, r=10, t=10, b=10) | |
| ) | |
| st.plotly_chart(fig_pie, use_container_width=True) | |
| else: | |
| st.info("No epic data available for the selected period.") | |
| # Monthly Distribution Chart | |
| with chart_col2: | |
| st.markdown("#### Monthly Distribution") | |
| # Get monthly data | |
| monthly_data = get_user_monthly_data(user) | |
| # Filter out zero hours | |
| monthly_data = [m for m in monthly_data if m["hours"] > 0] | |
| if monthly_data: | |
| # Create DataFrame | |
| monthly_df = pd.DataFrame(monthly_data) | |
| fig_bar = px.bar( | |
| monthly_df, | |
| x="month", | |
| y="hours", | |
| color_discrete_sequence=['#82ca9d'] | |
| ) | |
| fig_bar.update_layout( | |
| height=400, | |
| margin=dict(l=10, r=10, t=10, b=10), | |
| xaxis_title="Month", | |
| yaxis_title="Hours" | |
| ) | |
| st.plotly_chart(fig_bar, use_container_width=True) | |
| else: | |
| st.info("No monthly data available.") | |
| # Epic Details Table | |
| st.markdown("#### Epic Details") | |
| user_epic_data = get_user_chart_data(user, selected_month) | |
| if user_epic_data: | |
| # Create data for table | |
| epic_details = [] | |
| for item in user_epic_data: | |
| epic_name = item["name"] | |
| hours = item["value"] | |
| # Find project for this epic | |
| project = next((e["Project"] for e in user["EpicBreakdown"] if e["Epic"] == epic_name), "-") | |
| # Calculate percentage | |
| percent = (hours / user["TotalHours"] * 100) if user["TotalHours"] > 0 else 0 | |
| epic_details.append({ | |
| "Epic": epic_name, | |
| "Project": project, | |
| "Hours": hours, | |
| "% of Total": f"{percent:.1f}%" | |
| }) | |
| # Sort by hours descending | |
| epic_details.sort(key=lambda x: x["Hours"], reverse=True) | |
| # Create DataFrame | |
| epic_df = pd.DataFrame(epic_details) | |
| st.dataframe(epic_df, use_container_width=True) | |
| else: | |
| st.info("No epic details available for this user.") | |
| with user_tab2: | |
| # Technology Distribution | |
| st.markdown("#### Upskilling Technology Distribution") | |
| # Get tech categories for this user | |
| user_tech_data = get_user_tech_categories(user, selected_month) | |
| if user_tech_data: | |
| # Create DataFrame for chart | |
| tech_df = pd.DataFrame(user_tech_data) | |
| fig_tech = px.pie( | |
| tech_df, | |
| values="value", | |
| names="name", | |
| color_discrete_sequence=COLORS, | |
| ) | |
| fig_tech.update_traces( | |
| textposition='inside', | |
| textinfo='percent+label', | |
| hovertemplate='%{label}: %{value:.1f} hours (%{percent})' | |
| ) | |
| st.plotly_chart(fig_tech, use_container_width=True) | |
| # Upskilling issues | |
| if user["UpskillIssues"]: | |
| st.markdown("#### Upskilling Issues") | |
| # Filter out issues with only nan values | |
| valid_issues = [ | |
| issue for issue in user["UpskillIssues"] | |
| if any(tech not in ["nan", "null", "", None, "N/A"] for tech in issue["TechCategories"]) | |
| ] | |
| if valid_issues: | |
| issues_df = pd.DataFrame([ | |
| { | |
| "Issue": issue["Issue"], | |
| "Hours": format_hours(issue["Hours"]), | |
| "Technologies": ", ".join([ | |
| str(tech) for tech in issue["TechCategories"] | |
| if tech not in ["nan", "null", "", None, "N/A"] | |
| ]) | |
| } | |
| for issue in sorted(valid_issues, key=lambda x: x["Hours"], reverse=True) | |
| ]) | |
| st.dataframe(issues_df, use_container_width=True) | |
| else: | |
| st.info("No upskilling issues with valid technology categories found.") | |
| else: | |
| st.info("No upskilling technology data found for this user.") | |
| # Totals row | |
| st.markdown("---") | |
| total_row = st.columns(header_col_sizes) | |
| with total_row[0]: | |
| st.markdown("**Total**") | |
| with total_row[1]: | |
| total_hours = sum(user["TotalHours"] for user in table_data) | |
| st.markdown(f"**{format_hours(total_hours)}**") | |
| for i, epic in enumerate(st.session_state.selected_epics): | |
| with total_row[i+2]: | |
| st.markdown(f"**{format_hours(epic_totals.get(epic, 0))}**") | |
| # Epic Distribution Chart | |
| st.header("Epic Distribution") | |
| # Filter epic data to only selected epics | |
| selected_epic_data = [epic for epic in epic_data if epic["Epic"] in st.session_state.selected_epics] | |
| if selected_epic_data: | |
| epic_df = pd.DataFrame(selected_epic_data) | |
| fig_pie = px.pie( | |
| epic_df, | |
| values="Hours", | |
| names="Epic", | |
| color_discrete_sequence=COLORS, | |
| ) | |
| fig_pie.update_traces( | |
| textposition='inside', | |
| textinfo='percent+label', | |
| hovertemplate='%{label}: %{value:.1f} hours (%{percent})' | |
| ) | |
| fig_pie.update_layout( | |
| height=500, | |
| margin=dict(l=20, r=20, t=20, b=20) | |
| ) | |
| st.plotly_chart(fig_pie, use_container_width=True) | |
| else: | |
| st.info("Please select at least one epic to display the distribution chart.") | |
| def main(): | |
| st.title("Non-Billable Time Analysis Dashboard") | |
| # Clear cache button at the top | |
| clear_cache_col, title_col = st.columns([1, 5]) | |
| with clear_cache_col: | |
| if st.button("ποΈ Clear Cache", key="clear_cache_button"): | |
| clear_session_and_cache() | |
| st.success("Cache and files cleared successfully! Please reload the page.") | |
| st.stop() # Stop execution to force a reload | |
| # Check if we need to reload due to cache clearing | |
| if st.session_state.needs_rerun: | |
| st.session_state.needs_rerun = False | |
| st.rerun() | |
| # Load data | |
| if st.session_state.processed_data is None: | |
| # File uploader (only show when no data is processed) | |
| uploaded_file = st.file_uploader("Upload Project Time Logging CSV", type=["csv"]) | |
| if uploaded_file is not None: | |
| # New file uploaded - process from scratch | |
| try: | |
| raw_data = pd.read_csv(uploaded_file) | |
| # Save uploaded file for reference | |
| with open("uploaded_data.csv", "wb") as f: | |
| f.write(uploaded_file.getvalue()) | |
| # Extract unique users for selection | |
| unique_users = sorted(raw_data["User"].dropna().unique()) | |
| # Allow user to select focus users for tech categorization | |
| st.subheader("Select Users for Focus Tech Categorization") | |
| focus_users = st.multiselect( | |
| "Select users to prioritize for tech categorization (optional):", | |
| options=unique_users, | |
| default=[] | |
| ) | |
| # Process data with focus on specific users | |
| processed_data = process_data(raw_data, force_categorize=True, focus_users=focus_users) | |
| if processed_data is not None: | |
| st.session_state.processed_data = processed_data | |
| st.session_state.has_generated_results = True | |
| st.rerun() # Refresh to show results | |
| else: | |
| st.error("Error processing data") | |
| return | |
| except Exception as e: | |
| st.error(f"Error processing uploaded file: {e}") | |
| return | |
| else: | |
| st.info("π Welcome! Please upload a CSV file to begin analysis.") | |
| return | |
| else: | |
| # Get processed data from session state | |
| non_billable_data = st.session_state.processed_data['non_billable_data'] | |
| team_data = st.session_state.processed_data['team_data'] | |
| epic_data = st.session_state.processed_data['epic_data'] | |
| monthly_data = st.session_state.processed_data['monthly_data'] | |
| unique_epics = st.session_state.processed_data['unique_epics'] | |
| tech_category_data = st.session_state.processed_data['tech_category_data'] | |
| upskilling_count = st.session_state.processed_data['upskilling_count'] | |
| # Set default selected epics if none are selected | |
| if not st.session_state.selected_epics and epic_data: | |
| st.session_state.selected_epics = [epic["Epic"] for epic in epic_data[:5]] # Top 5 epics | |
| # Create tabs for different analysis views | |
| tab1, tab2, tab3 = st.tabs([ | |
| "π Team & Epic Analysis", | |
| "π» Upskilling Technology Analysis", | |
| "π View & Download CSV Data" | |
| ]) | |
| with tab1: | |
| st.session_state.active_tab = 'team_analysis' | |
| display_team_epic_analysis(team_data, epic_data, monthly_data, unique_epics) | |
| with tab2: | |
| st.session_state.active_tab = 'tech_analysis' | |
| display_tech_category_analysis(team_data, tech_category_data, upskilling_count) | |
| with tab3: | |
| st.session_state.active_tab = 'csv_view' | |
| display_categorized_data_view(st.session_state.categorized_df) | |
| if __name__ == "__main__": | |
| main() | |