Pratik333's picture
Update app.py
20d6af3 verified
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import re
from datetime import datetime
import os
import shutil
from pathlib import Path
import worklog_categorizer as wc
import time
import base64
from io import BytesIO
# Set page configuration
st.set_page_config(layout="wide", page_title="Non-Billable Time Analysis", page_icon="πŸ“Š")
# Define colors to match the React implementation
COLORS = ['#0088FE', '#00C49F', '#FFBB28', '#FF8042', '#8884d8', '#82ca9d', '#ffc658',
'#8dd1e1', '#a4de6c', '#d0ed57', '#bc80bd', '#ccebc5', '#ffed6f', '#bebada',
'#fb8072', '#80b1d3', '#fdb462', '#b3de69']
# Initialize session state variables if they don't exist
if 'initialized' not in st.session_state:
st.session_state.initialized = True
st.session_state.processed_data = None
st.session_state.expanded_user = None
st.session_state.sort_by = 'totalHours'
st.session_state.sort_order = 'desc'
st.session_state.selected_epics = []
st.session_state.active_tab = 'team_analysis'
st.session_state.tech_user_filter = ""
st.session_state.categorized_df = None
st.session_state.show_csv_data = False
st.session_state.needs_rerun = False
st.session_state.has_generated_results = False
def extract_month(date_range):
"""Extract month from date ranges"""
if not isinstance(date_range, str):
return None
date_match = re.match(r'^(\d+)/(\w+)/(\d+) to', date_range)
if date_match:
return date_match.group(2)
single_date_match = re.match(r'^(\d+)/(\w+)/(\d+) at', date_range)
if single_date_match:
return single_date_match.group(2)
return None
def get_row_level(row):
"""Parse level in the data hierarchy"""
if pd.notna(row.get("Project Category")) and pd.isna(row.get("Project")):
return "category"
if pd.notna(row.get("Project")) and pd.isna(row.get("User")):
return "project"
if pd.notna(row.get("User")) and pd.isna(row.get("Epic")):
return "user"
if pd.notna(row.get("Epic")) and pd.isna(row.get("Issue")):
return "epic"
if pd.notna(row.get("Issue")) and pd.isna(row.get("Worklog")):
return "issue"
if pd.notna(row.get("Worklog")):
return "worklog"
return "unknown"
def clear_session_and_cache():
"""Reset the application by clearing cache and session state"""
# Clear all cached data
st.cache_data.clear()
# Remove data files if they exist
try:
if os.path.exists("categorized_data.csv"):
os.remove("categorized_data.csv")
if os.path.exists("uploaded_data.csv"):
os.remove("uploaded_data.csv")
except Exception as e:
st.error(f"Error removing files: {e}")
# Reset all important session state variables
st.session_state.processed_data = None
st.session_state.expanded_user = None
st.session_state.sort_by = 'totalHours'
st.session_state.sort_order = 'desc'
st.session_state.selected_epics = []
st.session_state.active_tab = 'team_analysis'
st.session_state.tech_user_filter = ""
st.session_state.categorized_df = None
st.session_state.show_csv_data = False
st.session_state.has_generated_results = False
# Mark that we need to rerun after clearing
st.session_state.needs_rerun = True
def save_categorized_data(df, filename="categorized_data.csv"):
"""Save the categorized data to a CSV file"""
try:
df.to_csv(filename, index=False)
return True
except Exception as e:
st.error(f"Error saving categorized data: {e}")
return False
@st.cache_data
def process_data(raw_data, force_categorize=False, focus_users=None):
"""Process the data and return various aggregations (cached to prevent reprocessing)"""
# Filter for non-billable data
non_billable_data = raw_data[raw_data["Project Category"] == "Non-Billable"].copy()
# Add level and month info
non_billable_data["Level"] = non_billable_data.apply(get_row_level, axis=1)
non_billable_data["Month"] = non_billable_data["Date"].apply(extract_month)
# Check if we need to categorize data
if "TechCategory" not in non_billable_data.columns or force_categorize:
# Process tech categories for upskilling worklog entries
with st.spinner("Categorizing upskilling worklog entries by technology..."):
# If specific users are provided, prioritize their worklog categorization
if focus_users and len(focus_users) > 0:
st.info(f"Focusing on worklog categorization for {len(focus_users)} selected users")
# First, process focus users
focus_mask = non_billable_data["User"].isin(focus_users)
focus_data = non_billable_data[focus_mask].copy()
if not focus_data.empty:
# Process worklogs for focus users first
focus_data = wc.process_dataframe(
focus_data,
worklog_column="Worklog",
issue_column="Issue",
default_category="N/A",
batch_size=10,
pause_seconds=2, # Shorter pause for focus users
show_progress=True
)
# Update the categorized data for focus users
non_billable_data.loc[focus_mask, "TechCategory"] = focus_data["TechCategory"]
# Process all remaining data
non_billable_data = wc.process_dataframe(
non_billable_data,
worklog_column="Worklog",
issue_column="Issue",
default_category="N/A",
batch_size=10,
pause_seconds=5,
show_progress=True
)
# Save the categorized data
save_path = "categorized_data.csv"
if save_categorized_data(non_billable_data, save_path):
st.success(f"Saved categorized data to {save_path}")
# Store the categorized DataFrame for download
st.session_state.categorized_df = non_billable_data
# Process derived data
team_data = process_team_members(non_billable_data)
epic_data = process_top_epics(non_billable_data)
monthly_data = process_monthly_data(non_billable_data)
tech_category_data = process_tech_categories(non_billable_data)
epics = sorted(non_billable_data["Epic"].dropna().unique())
# Count upskilling entries
upskilling_mask = non_billable_data["Issue"].apply(wc.is_upskilling_issue)
upskilling_count = upskilling_mask.sum()
# Get all unique users
unique_users = sorted(non_billable_data["User"].dropna().unique())
return {
'non_billable_data': non_billable_data,
'team_data': team_data,
'epic_data': epic_data,
'monthly_data': monthly_data,
'unique_epics': epics,
'unique_users': unique_users,
'tech_category_data': tech_category_data,
'upskilling_count': upskilling_count
}
def process_tech_categories(data):
"""Process data to get tech category breakdown for upskilling entries"""
# Filter for rows with tech categories
tech_data = data[data["TechCategory"] != "N/A"].copy()
if tech_data.empty:
return {
"overall": [],
"by_user": {},
"by_month": []
}
# Overall tech category breakdown
overall = tech_data.groupby("TechCategory")["Logged"].sum().reset_index()
overall = overall.sort_values("Logged", ascending=False)
overall.columns = ["Category", "Hours"]
# Tech category by user
by_user = {}
for user in tech_data["User"].dropna().unique():
user_data = tech_data[tech_data["User"] == user]
user_categories = user_data.groupby("TechCategory")["Logged"].sum().reset_index()
user_categories = user_categories.sort_values("Logged", ascending=False)
user_categories.columns = ["Category", "Hours"]
by_user[user] = user_categories.to_dict('records')
# Tech category by month
month_order = ['Nov', 'Dec', 'Jan', 'Feb', 'Mar']
by_month = []
for month in month_order:
month_data = tech_data[tech_data["Month"] == month]
if not month_data.empty:
month_categories = month_data.groupby("TechCategory")["Logged"].sum().reset_index()
month_categories = month_categories.sort_values("Logged", ascending=False)
month_categories.columns = ["Category", "Hours"]
by_month.append({
"Month": month,
"Categories": month_categories.to_dict('records')
})
return {
"overall": overall.to_dict('records'),
"by_user": by_user,
"by_month": by_month
}
def process_team_members(data):
"""Process data to get team member breakdown"""
# Get unique users
unique_users = data[data["Level"] == "user"]["User"].dropna().unique()
# Process data for each user
team_data = []
month_order = ['Nov', 'Dec', 'Jan', 'Feb', 'Mar']
for user in unique_users:
user_data = data[data["User"] == user]
# Get user total hours
user_total_row = user_data[user_data["Level"] == "user"]
user_total = user_total_row["Logged"].iloc[0] if not user_total_row.empty else 0
# Skip users with zero hours
if user_total == 0:
continue
# Get epic breakdown
epic_breakdown = []
for _, row in user_data[user_data["Level"] == "epic"].iterrows():
epic_breakdown.append({
"Epic": row["Epic"] if pd.notna(row["Epic"]) else "No Epic",
"Hours": row["Logged"] if pd.notna(row["Logged"]) else 0,
"Project": row["Project"] if pd.notna(row["Project"]) else "No Project",
"Month": row["Month"]
})
# Get tech categories for this user (upskilling only)
tech_categories = []
upskilling_rows = user_data[user_data["TechCategory"] != "N/A"]
for _, row in upskilling_rows.iterrows():
tech_categories.append({
"TechCategory": row["TechCategory"],
"Hours": row["Logged"] if pd.notna(row["Logged"]) else 0,
"Issue": row["Issue"] if pd.notna(row["Issue"]) else "No Issue",
"Worklog": row["Worklog"] if pd.notna(row["Worklog"]) else "No Worklog",
"Month": row["Month"]
})
# Get upskilling issues for this user
upskilling_issues = []
for issue in upskilling_rows["Issue"].unique():
issue_data = upskilling_rows[upskilling_rows["Issue"] == issue]
upskilling_issues.append({
"Issue": issue,
"Hours": issue_data["Logged"].sum(),
"TechCategories": [str(cat) for cat in issue_data["TechCategory"].unique().tolist()]
})
# Get monthly breakdown
monthly_breakdown = {}
for month in month_order:
month_data = [item for item in epic_breakdown if item["Month"] == month]
total = sum(item["Hours"] for item in month_data)
if total > 0:
epic_hours = {}
for item in month_data:
epic = item["Epic"]
hours = item["Hours"]
epic_hours[epic] = epic_hours.get(epic, 0) + hours
monthly_breakdown[month] = {
"total": total,
"epics": epic_hours
}
team_data.append({
"User": user,
"TotalHours": user_total,
"EpicBreakdown": epic_breakdown,
"TechCategories": tech_categories,
"UpskillIssues": upskilling_issues,
"MonthlyData": monthly_breakdown
})
# Sort by total hours
team_data.sort(key=lambda x: x["TotalHours"], reverse=True)
return team_data
def process_top_epics(data):
"""Process epic data to get hours by epic"""
# Filter epic rows
epic_rows = data[data["Level"] == "epic"]
# Group by epic and sum hours
epic_hours = epic_rows.groupby(
epic_rows["Epic"].fillna("No Epic")
)["Logged"].sum().reset_index()
# Rename columns
epic_hours.columns = ["Epic", "Hours"]
# Sort by hours
epic_hours = epic_hours.sort_values("Hours", ascending=False)
return epic_hours.to_dict('records')
def process_monthly_data(data):
"""Process data to get monthly totals"""
# Filter epic rows with month data
monthly_rows = data[(data["Level"] == "epic") & (data["Month"].notna())]
# Group by month and sum hours
monthly_hours = monthly_rows.groupby("Month")["Logged"].sum().reset_index()
# Rename columns
monthly_hours.columns = ["Month", "Hours"]
# Sort by custom month order
month_order = ['Nov', 'Dec', 'Jan', 'Feb', 'Mar']
monthly_hours["MonthOrder"] = monthly_hours["Month"].apply(lambda x: month_order.index(x) if x in month_order else 999)
monthly_hours = monthly_hours.sort_values("MonthOrder")
monthly_hours = monthly_hours.drop("MonthOrder", axis=1)
return monthly_hours.to_dict('records')
def format_hours(hours):
"""Format hours for display"""
if hours == 0:
return "-"
return f"{hours:.1f}"
def get_filtered_data(team_data, search_term, selected_month, sort_by, sort_order, tech_category_filter=None):
"""Filter and sort team data based on current selections"""
filtered_data = team_data.copy()
# Apply search filter
if search_term:
filtered_data = [item for item in filtered_data if search_term.lower() in item["User"].lower()]
# Apply tech category filter
if tech_category_filter and tech_category_filter != "All":
filtered_data = [
item for item in filtered_data
if any(tc["TechCategory"] == tech_category_filter for tc in item["TechCategories"])
]
# Adjust total hours based on the tech category filter
for item in filtered_data:
tech_hours = sum(tc["Hours"] for tc in item["TechCategories"] if tc["TechCategory"] == tech_category_filter)
item["FilteredTechHours"] = tech_hours
# Apply month filter
if selected_month != "All":
filtered_data = [
item for item in filtered_data
if item.get("MonthlyData", {}).get(selected_month)
]
# Adjust hours for selected month
for item in filtered_data:
monthly_data = item["MonthlyData"][selected_month]
item["TotalHours"] = monthly_data["total"]
item["EpicBreakdown"] = [
epic for epic in item["EpicBreakdown"]
if epic["Month"] == selected_month
]
item["TechCategories"] = [
tc for tc in item["TechCategories"]
if tc["Month"] == selected_month
]
# Determine if we want ascending or descending
reverse_sort = (sort_order == "desc")
# Sort the data
if sort_by == "name":
filtered_data.sort(key=lambda x: x["User"], reverse=reverse_sort)
elif sort_by == "totalHours":
if tech_category_filter and tech_category_filter != "All":
filtered_data.sort(key=lambda x: x.get("FilteredTechHours", 0), reverse=reverse_sort)
else:
filtered_data.sort(key=lambda x: x["TotalHours"], reverse=reverse_sort)
else:
# Sort by specific epic
filtered_data.sort(
key=lambda x: sum(e["Hours"] for e in x["EpicBreakdown"] if e["Epic"] == sort_by),
reverse=reverse_sort
)
return filtered_data
def get_epic_totals(filtered_data, unique_epics):
"""Calculate total hours by epic for filtered data"""
totals = {epic: 0 for epic in unique_epics}
for user in filtered_data:
for epic in user["EpicBreakdown"]:
totals[epic["Epic"]] = totals.get(epic["Epic"], 0) + epic["Hours"]
return totals
def get_user_chart_data(user_data, selected_month):
"""Get chart data for a specific user"""
# Combine hours by epic
epic_totals = {}
for epic in user_data["EpicBreakdown"]:
if selected_month == "All" or epic["Month"] == selected_month:
epic_name = epic["Epic"]
epic_totals[epic_name] = epic_totals.get(epic_name, 0) + epic["Hours"]
# Convert to array and sort
return [
{"name": name, "value": value}
for name, value in epic_totals.items()
]
def get_user_tech_categories(user_data, selected_month, min_percentage=1.0):
"""Get tech category data for a specific user focusing on actual technology categories"""
# Combine hours by tech category
tech_totals = {}
total_hours = 0
# First try to process TechCategories from the user data
for tech in user_data["TechCategories"]:
if selected_month == "All" or tech["Month"] == selected_month:
category = tech["TechCategory"]
# Skip 'nan' or empty categories
if pd.isna(category) or category in ["nan", "null", "", None, "N/A"]:
continue
hours = tech["Hours"]
tech_totals[category] = tech_totals.get(category, 0) + hours
total_hours += hours
# Filter categories below the minimum percentage threshold
if total_hours > 0:
tech_totals = {
k: v for k, v in tech_totals.items()
if (v / total_hours * 100) >= min_percentage
}
# Convert to array and sort by hours (value)
return [
{"name": name, "value": value}
for name, value in sorted(tech_totals.items(), key=lambda x: x[1], reverse=True)
]
def get_user_monthly_data(user_data):
"""Get monthly data for a specific user"""
month_order = ['Nov', 'Dec', 'Jan', 'Feb', 'Mar']
return [
{
"month": month,
"hours": user_data["MonthlyData"].get(month, {}).get("total", 0)
}
for month in month_order
]
def get_user_tech_issues(user_data, tech_category, selected_month="All"):
"""Get issues associated with a specific tech category for a user"""
issues = []
for tech in user_data["TechCategories"]:
if tech["TechCategory"] == tech_category:
if selected_month == "All" or tech["Month"] == selected_month:
# Check if this issue is already in the list
existing = next((i for i in issues if i["issue"] == tech["Issue"]), None)
if existing:
existing["hours"] += tech["Hours"]
else:
issues.append({
"issue": tech["Issue"],
"worklog": tech["Worklog"],
"hours": tech["Hours"],
"month": tech["Month"]
})
# Sort by hours
return sorted(issues, key=lambda x: x["hours"], reverse=True)
def display_categorized_data_view(df):
"""Display a view of the categorized data with filtering options"""
st.header("View Categorized Data")
if df is None:
st.warning("No categorized data available. Please upload and process a CSV file first.")
return
# Add filters for the data view
col1, col2, col3 = st.columns(3)
with col1:
# Filter by user
users = sorted(df["User"].dropna().unique())
selected_user = st.selectbox("Filter by User:", ["All Users"] + list(users))
with col2:
# Filter by upskilling issues only
show_upskilling_only = st.checkbox("Show Upskilling Issues Only", value=True)
with col3:
# Filter by tech category
tech_categories = sorted(df["TechCategory"].dropna().unique())
tech_categories = [cat for cat in tech_categories if cat != "N/A"]
selected_tech = st.selectbox("Filter by Technology:", ["All Technologies"] + tech_categories)
# Apply filters
filtered_df = df.copy()
if selected_user != "All Users":
filtered_df = filtered_df[filtered_df["User"] == selected_user]
if show_upskilling_only:
filtered_df = filtered_df[filtered_df["Issue"].apply(wc.is_upskilling_issue)]
if selected_tech != "All Technologies":
filtered_df = filtered_df[filtered_df["TechCategory"] == selected_tech]
# Only show worklog level rows
filtered_df = filtered_df[filtered_df["Level"] == "worklog"]
# Select relevant columns to display
display_columns = ["User", "Issue", "Worklog", "TechCategory", "Logged", "Month"]
# Display the filtered data
if filtered_df.empty:
st.info("No data matches the selected filters.")
else:
st.write(f"Showing {len(filtered_df)} records. Use the filters above to narrow down the results.")
# Rename columns for display
display_df = filtered_df[display_columns].copy()
display_df.columns = ["User", "Issue", "Worklog", "Technology", "Hours", "Month"]
# Sort by user and hours
display_df = display_df.sort_values(["User", "Hours"], ascending=[True, False])
# Display as a table
st.dataframe(display_df, use_container_width=True)
# Add CSV download button
st.download_button(
label="Download Filtered CSV",
data=filtered_df.to_csv(index=False).encode('utf-8'),
file_name="filtered_upskilling_data.csv",
mime="text/csv",
)
# Download full categorized dataset
st.download_button(
label="Download Complete Categorized CSV",
data=df.to_csv(index=False).encode('utf-8'),
file_name="full_categorized_data.csv",
mime="text/csv",
)
def display_tech_category_analysis(team_data, tech_category_data, upskilling_count):
"""Display tech category analysis section for upskilling issues"""
st.header("Upskilling Technology Analysis")
# Info about upskilling issues
st.info(f"Found {upskilling_count} upskilling-related entries in the data. Technology categories shown below represent only upskilling activities.")
# User filter for upskilling tech analysis with session state
tech_user_filter = st.text_input(
"Filter by Team Member Name:",
key="tech_user_filter_input"
)
# Apply filter without reprocessing data
filtered_team_data = team_data
if tech_user_filter:
filtered_team_data = [user for user in team_data if tech_user_filter.lower() in user["User"].lower()]
if not filtered_team_data:
st.warning(f"No team members found matching '{tech_user_filter}'")
else:
st.success(f"Showing data for {len(filtered_team_data)} team members matching '{tech_user_filter}'")
# Filter to show only users with actual upskilling data
upskilling_team_data = [
user for user in filtered_team_data
if any(tech["TechCategory"] not in ["nan", "null", "", None, "N/A"] for tech in user["TechCategories"])
]
# If no tech categories found
if not tech_category_data["overall"] or not upskilling_team_data:
st.warning("No technology categories found in upskilling data. This could be because there are no upskilling worklog entries or the categorization process failed.")
return
# Add overall tech category chart
st.subheader("Overall Technology Distribution in Upskilling")
# Convert to DataFrame for Plotly
overall_df = pd.DataFrame(tech_category_data["overall"])
# Filter out nan/null values from overall tech categories
overall_df = overall_df[~overall_df["Category"].isin(["nan", "null", "", "N/A"])].copy()
if not overall_df.empty:
fig_tech = px.pie(
overall_df,
values="Hours",
names="Category",
color_discrete_sequence=COLORS,
title="Hours by Technology Category in Upskilling Activities"
)
fig_tech.update_traces(
textposition='inside',
textinfo='percent+label',
hovertemplate='%{label}: %{value:.1f} hours (%{percent})'
)
st.plotly_chart(fig_tech, use_container_width=True)
# Show table of top categories
st.subheader("Top Technology Categories in Upskilling")
top_tech_df = overall_df.head(10).copy()
top_tech_df["Hours"] = top_tech_df["Hours"].map(lambda x: f"{x:.1f}")
st.dataframe(top_tech_df, use_container_width=True)
# Tech category filters
st.subheader("Team Member Analysis by Technology")
col1, col2 = st.columns(2)
with col1:
# Get all unique tech categories (excluding nan/null)
all_categories = [
item["Category"] for item in tech_category_data["overall"]
if item["Category"] not in ["nan", "null", "", "N/A"]
]
tech_filter_options = ["All"] + all_categories
if "selected_tech" not in st.session_state:
st.session_state.selected_tech = "All"
selected_tech = st.selectbox(
"Filter by Technology Category:",
options=tech_filter_options,
key="tech_category_selector"
)
with col2:
users_count = len(upskilling_team_data) # Use filtered upskilling users count
default_value = min(5, max(1, users_count))
if users_count <= 1:
st.write(f"Showing data for {users_count} user")
min_users = users_count
else:
min_users = st.slider(
"Minimum users to display:",
min_value=1,
max_value=max(2, users_count),
value=default_value,
key="min_users_slider"
)
# Filter team data by tech category
if selected_tech != "All":
tech_filtered_data = [
user for user in upskilling_team_data
if any(tc["TechCategory"] == selected_tech for tc in user["TechCategories"])
]
# Calculate hours for each user in this tech category
for user in tech_filtered_data:
user["TechHours"] = sum(
tc["Hours"] for tc in user["TechCategories"]
if tc["TechCategory"] == selected_tech
)
# Sort by tech category hours
tech_filtered_data.sort(key=lambda x: x.get("TechHours", 0), reverse=True)
# Create bar chart of users by tech hours
if tech_filtered_data:
# Take top users by hours in this category
top_users = tech_filtered_data[:min_users]
user_tech_df = pd.DataFrame([
{"User": user["User"], "Hours": user["TechHours"]}
for user in top_users
])
fig_users = px.bar(
user_tech_df,
x="User",
y="Hours",
title=f"Top Users for {selected_tech} in Upskilling",
color_discrete_sequence=['#8884d8']
)
st.plotly_chart(fig_users, use_container_width=True)
# Team member breakdown
st.subheader(f"Team Members Upskilling in {selected_tech}")
for i, user in enumerate(tech_filtered_data):
with st.expander(f"{user['User']} - {format_hours(user['TechHours'])} hours"):
# Get issues for this user in this tech category
issues = get_user_tech_issues(user, selected_tech)
if issues:
st.write(f"### Upskilling Issues for {user['User']} in {selected_tech}")
# Create an issues table
issues_df = pd.DataFrame([
{
"Issue": issue["issue"],
"Worklog": issue["worklog"],
"Hours": format_hours(issue["hours"]),
"Month": issue["month"]
}
for issue in issues
])
st.dataframe(issues_df, use_container_width=True)
else:
st.write("No detailed issue information available.")
else:
st.info(f"No team members found upskilling in {selected_tech}.")
else:
# Show overall tech distribution by team member
st.subheader("Technology Distribution by Team Member (Upskilling Only)")
# Display upskilling users only
if not upskilling_team_data:
st.info("No team members found with upskilling entries.")
return
# Show top users based on selection
display_users = upskilling_team_data[:min_users]
# Create tabs for each team member
tabs = st.tabs([user["User"] for user in display_users])
for i, tab in enumerate(tabs):
user = display_users[i]
with tab:
# Get tech categories for this user
user_tech = get_user_tech_categories(user, "All")
if user_tech:
# Convert to DataFrame for Plotly
user_tech_df = pd.DataFrame(user_tech)
fig_user_tech = px.pie(
user_tech_df,
values="value",
names="name",
color_discrete_sequence=COLORS,
title=f"Upskilling Technology Distribution for {user['User']}"
)
fig_user_tech.update_traces(
textposition='inside',
textinfo='percent+label',
hovertemplate='%{label}: %{value:.1f} hours (%{percent})'
)
st.plotly_chart(fig_user_tech, use_container_width=True)
# Show breakdown of upskilling issues
if user["UpskillIssues"]:
st.subheader(f"Upskilling Issues for {user['User']}")
# Filter out issues with only nan values
valid_issues = [
issue for issue in user["UpskillIssues"]
if any(tech not in ["nan", "null", "", None, "N/A"] for tech in issue["TechCategories"])
]
if valid_issues:
issues_df = pd.DataFrame([
{
"Issue": issue["Issue"],
"Hours": format_hours(issue["Hours"]),
"Technologies": ", ".join([
str(tech) for tech in issue["TechCategories"]
if tech not in ["nan", "null", "", None, "N/A"]
])
}
for issue in sorted(valid_issues, key=lambda x: x["Hours"], reverse=True)
])
st.dataframe(issues_df, use_container_width=True)
else:
st.info("No upskilling issues with valid technology categories found.")
else:
st.info(f"No upskilling technology categories found for {user['User']}.")
def display_team_epic_analysis(team_data, epic_data, monthly_data, unique_epics):
"""Display team and epic analysis section"""
# Create filters sidebar
st.sidebar.title("Filters")
# Team Member Filters
st.sidebar.header("Team Member Filters")
search_term = st.sidebar.text_input("Search by Name:", "")
display_count_options = [10, 20, 50]
if len(team_data) > 50:
display_count_options.append(len(team_data))
display_count = st.sidebar.selectbox(
"Team Members to Display:",
options=display_count_options,
format_func=lambda x: f"All ({len(team_data)})" if x == len(team_data) else str(x),
index=1 # Default to 20
)
month_options = ["All"] + ['Nov', 'Dec', 'Jan', 'Feb', 'Mar']
selected_month = st.sidebar.selectbox(
"Month:",
options=month_options,
index=0 # Default to "All"
)
# Epic Filters
st.sidebar.header("Epic Filters")
epic_col1, epic_col2, epic_col3 = st.sidebar.columns(3)
with epic_col1:
if st.button("Select All"):
st.session_state.selected_epics = [epic["Epic"] for epic in epic_data]
with epic_col2:
if st.button("Clear All"):
st.session_state.selected_epics = []
with epic_col3:
if st.button("Top 5 Epics"):
st.session_state.selected_epics = [epic["Epic"] for epic in epic_data[:5]]
# Epic selection
st.sidebar.subheader("Select Epics")
# Create a scrollable container for epics
epic_container = st.sidebar.container()
with epic_container:
for i, epic in enumerate(epic_data):
epic_name = epic["Epic"]
epic_hours = epic["Hours"]
epic_color = COLORS[i % len(COLORS)]
# Use checkbox for each epic
checked = st.checkbox(
f"{epic_name} ({int(epic_hours)}h)",
value=epic_name in st.session_state.selected_epics,
key=f"epic_{i}"
)
# Update selected epics based on checkbox state
if checked and epic_name not in st.session_state.selected_epics:
st.session_state.selected_epics.append(epic_name)
elif not checked and epic_name in st.session_state.selected_epics:
st.session_state.selected_epics.remove(epic_name)
# Monthly Overview Chart
st.header("Monthly Non-Billable Hours Overview")
# Convert to DataFrame for Plotly
monthly_df = pd.DataFrame(monthly_data)
if not monthly_df.empty:
fig_monthly = px.bar(
monthly_df,
x="Month",
y="Hours",
title="",
labels={"Hours": "Non-Billable Hours", "Month": "Month"},
color_discrete_sequence=['#8884d8']
)
fig_monthly.update_layout(
plot_bgcolor='white',
margin=dict(l=20, r=30, t=10, b=20),
)
st.plotly_chart(fig_monthly, use_container_width=True)
# Team Members Table
st.header("Team Member Breakdown")
st.write("Click on a team member to see their detailed breakdown.")
# Sorting controls
sort_col1, sort_col2 = st.columns(2)
with sort_col1:
sort_options = ["totalHours", "name"] + st.session_state.selected_epics
sort_labels = {
"totalHours": "Total Hours",
"name": "Name"
}
for epic in st.session_state.selected_epics:
sort_labels[epic] = epic
new_sort_by = st.selectbox(
"Sort by:",
options=sort_options,
format_func=lambda x: sort_labels[x],
index=sort_options.index(st.session_state.sort_by)
)
if new_sort_by != st.session_state.sort_by:
st.session_state.sort_by = new_sort_by
with sort_col2:
sort_order_options = ["desc", "asc"]
sort_order_labels = {"desc": "Descending", "asc": "Ascending"}
new_sort_order = st.selectbox(
"Order:",
options=sort_order_options,
format_func=lambda x: sort_order_labels[x],
index=sort_order_options.index(st.session_state.sort_order)
)
if new_sort_order != st.session_state.sort_order:
st.session_state.sort_order = new_sort_order
# Get filtered and sorted data
filtered_team_data = get_filtered_data(
team_data,
search_term,
selected_month,
st.session_state.sort_by,
st.session_state.sort_order
)
# Apply display count
table_data = filtered_team_data[:display_count]
# Calculate epic totals
epic_totals = get_epic_totals(table_data, unique_epics)
# Create the table
if not table_data:
st.warning("No data matches your filters. Try adjusting your search criteria.")
else:
# Create table header
header_cols = ["Team Member", "Total Hours"] + st.session_state.selected_epics
header_col_sizes = [3] + [2] * (len(header_cols) - 1)
# Create a styled header row
header_row = st.columns(header_col_sizes)
with header_row[0]:
sort_icon = "β–Ό" if st.session_state.sort_by == "name" and st.session_state.sort_order == "desc" else "β–²" if st.session_state.sort_by == "name" and st.session_state.sort_order == "asc" else ""
st.markdown(f"**Team Member {sort_icon}**")
with header_row[1]:
sort_icon = "β–Ό" if st.session_state.sort_by == "totalHours" and st.session_state.sort_order == "desc" else "β–²" if st.session_state.sort_by == "totalHours" and st.session_state.sort_order == "asc" else ""
st.markdown(f"**Total Hours {sort_icon}**")
for i, epic in enumerate(st.session_state.selected_epics):
with header_row[i+2]:
sort_icon = "β–Ό" if st.session_state.sort_by == epic and st.session_state.sort_order == "desc" else "β–²" if st.session_state.sort_by == epic and st.session_state.sort_order == "asc" else ""
st.markdown(f"**{epic} {sort_icon}**")
# Display each team member as a row
for user_idx, user in enumerate(table_data):
# Create a container for each row
with st.container():
# Use columns for table cells
row_cols = st.columns(header_col_sizes)
# User name cell - clickable to expand
with row_cols[0]:
is_expanded = st.session_state.expanded_user == user["User"]
expand_icon = "πŸ”½" if is_expanded else "πŸ”Ό"
if st.button(f"{user['User']} {expand_icon}", key=f"user_btn_{user_idx}"):
if is_expanded:
st.session_state.expanded_user = None
else:
st.session_state.expanded_user = user["User"]
# Total hours cell
with row_cols[1]:
st.write(format_hours(user["TotalHours"]))
# Epic hours cells
for i, epic in enumerate(st.session_state.selected_epics):
with row_cols[i+2]:
epic_hours = sum(e["Hours"] for e in user["EpicBreakdown"] if e["Epic"] == epic)
st.write(format_hours(epic_hours))
# Expanded user detail
if st.session_state.expanded_user == user["User"]:
with st.expander("", expanded=True):
st.subheader(f"{user['User']} - Detailed Breakdown")
# Create tabs for different views
user_tab1, user_tab2 = st.tabs(["Epic Distribution", "Upskilling Technologies"])
with user_tab1:
# Create two columns for charts
chart_col1, chart_col2 = st.columns(2)
# Epic Distribution Chart
with chart_col1:
st.markdown("#### Epic Distribution")
# Get chart data
user_chart_data = get_user_chart_data(user, selected_month)
if user_chart_data:
# Sort by value
user_chart_data.sort(key=lambda x: x["value"], reverse=True)
# Create DataFrame
epic_df = pd.DataFrame(user_chart_data)
fig_pie = px.pie(
epic_df,
values="value",
names="name",
color_discrete_sequence=COLORS,
)
fig_pie.update_traces(
textposition='inside',
textinfo='percent+label',
hovertemplate='%{label}: %{value:.1f} hours (%{percent})'
)
fig_pie.update_layout(
height=400,
margin=dict(l=10, r=10, t=10, b=10)
)
st.plotly_chart(fig_pie, use_container_width=True)
else:
st.info("No epic data available for the selected period.")
# Monthly Distribution Chart
with chart_col2:
st.markdown("#### Monthly Distribution")
# Get monthly data
monthly_data = get_user_monthly_data(user)
# Filter out zero hours
monthly_data = [m for m in monthly_data if m["hours"] > 0]
if monthly_data:
# Create DataFrame
monthly_df = pd.DataFrame(monthly_data)
fig_bar = px.bar(
monthly_df,
x="month",
y="hours",
color_discrete_sequence=['#82ca9d']
)
fig_bar.update_layout(
height=400,
margin=dict(l=10, r=10, t=10, b=10),
xaxis_title="Month",
yaxis_title="Hours"
)
st.plotly_chart(fig_bar, use_container_width=True)
else:
st.info("No monthly data available.")
# Epic Details Table
st.markdown("#### Epic Details")
user_epic_data = get_user_chart_data(user, selected_month)
if user_epic_data:
# Create data for table
epic_details = []
for item in user_epic_data:
epic_name = item["name"]
hours = item["value"]
# Find project for this epic
project = next((e["Project"] for e in user["EpicBreakdown"] if e["Epic"] == epic_name), "-")
# Calculate percentage
percent = (hours / user["TotalHours"] * 100) if user["TotalHours"] > 0 else 0
epic_details.append({
"Epic": epic_name,
"Project": project,
"Hours": hours,
"% of Total": f"{percent:.1f}%"
})
# Sort by hours descending
epic_details.sort(key=lambda x: x["Hours"], reverse=True)
# Create DataFrame
epic_df = pd.DataFrame(epic_details)
st.dataframe(epic_df, use_container_width=True)
else:
st.info("No epic details available for this user.")
with user_tab2:
# Technology Distribution
st.markdown("#### Upskilling Technology Distribution")
# Get tech categories for this user
user_tech_data = get_user_tech_categories(user, selected_month)
if user_tech_data:
# Create DataFrame for chart
tech_df = pd.DataFrame(user_tech_data)
fig_tech = px.pie(
tech_df,
values="value",
names="name",
color_discrete_sequence=COLORS,
)
fig_tech.update_traces(
textposition='inside',
textinfo='percent+label',
hovertemplate='%{label}: %{value:.1f} hours (%{percent})'
)
st.plotly_chart(fig_tech, use_container_width=True)
# Upskilling issues
if user["UpskillIssues"]:
st.markdown("#### Upskilling Issues")
# Filter out issues with only nan values
valid_issues = [
issue for issue in user["UpskillIssues"]
if any(tech not in ["nan", "null", "", None, "N/A"] for tech in issue["TechCategories"])
]
if valid_issues:
issues_df = pd.DataFrame([
{
"Issue": issue["Issue"],
"Hours": format_hours(issue["Hours"]),
"Technologies": ", ".join([
str(tech) for tech in issue["TechCategories"]
if tech not in ["nan", "null", "", None, "N/A"]
])
}
for issue in sorted(valid_issues, key=lambda x: x["Hours"], reverse=True)
])
st.dataframe(issues_df, use_container_width=True)
else:
st.info("No upskilling issues with valid technology categories found.")
else:
st.info("No upskilling technology data found for this user.")
# Totals row
st.markdown("---")
total_row = st.columns(header_col_sizes)
with total_row[0]:
st.markdown("**Total**")
with total_row[1]:
total_hours = sum(user["TotalHours"] for user in table_data)
st.markdown(f"**{format_hours(total_hours)}**")
for i, epic in enumerate(st.session_state.selected_epics):
with total_row[i+2]:
st.markdown(f"**{format_hours(epic_totals.get(epic, 0))}**")
# Epic Distribution Chart
st.header("Epic Distribution")
# Filter epic data to only selected epics
selected_epic_data = [epic for epic in epic_data if epic["Epic"] in st.session_state.selected_epics]
if selected_epic_data:
epic_df = pd.DataFrame(selected_epic_data)
fig_pie = px.pie(
epic_df,
values="Hours",
names="Epic",
color_discrete_sequence=COLORS,
)
fig_pie.update_traces(
textposition='inside',
textinfo='percent+label',
hovertemplate='%{label}: %{value:.1f} hours (%{percent})'
)
fig_pie.update_layout(
height=500,
margin=dict(l=20, r=20, t=20, b=20)
)
st.plotly_chart(fig_pie, use_container_width=True)
else:
st.info("Please select at least one epic to display the distribution chart.")
def main():
st.title("Non-Billable Time Analysis Dashboard")
# Clear cache button at the top
clear_cache_col, title_col = st.columns([1, 5])
with clear_cache_col:
if st.button("πŸ—‘οΈ Clear Cache", key="clear_cache_button"):
clear_session_and_cache()
st.success("Cache and files cleared successfully! Please reload the page.")
st.stop() # Stop execution to force a reload
# Check if we need to reload due to cache clearing
if st.session_state.needs_rerun:
st.session_state.needs_rerun = False
st.rerun()
# Load data
if st.session_state.processed_data is None:
# File uploader (only show when no data is processed)
uploaded_file = st.file_uploader("Upload Project Time Logging CSV", type=["csv"])
if uploaded_file is not None:
# New file uploaded - process from scratch
try:
raw_data = pd.read_csv(uploaded_file)
# Save uploaded file for reference
with open("uploaded_data.csv", "wb") as f:
f.write(uploaded_file.getvalue())
# Extract unique users for selection
unique_users = sorted(raw_data["User"].dropna().unique())
# Allow user to select focus users for tech categorization
st.subheader("Select Users for Focus Tech Categorization")
focus_users = st.multiselect(
"Select users to prioritize for tech categorization (optional):",
options=unique_users,
default=[]
)
# Process data with focus on specific users
processed_data = process_data(raw_data, force_categorize=True, focus_users=focus_users)
if processed_data is not None:
st.session_state.processed_data = processed_data
st.session_state.has_generated_results = True
st.rerun() # Refresh to show results
else:
st.error("Error processing data")
return
except Exception as e:
st.error(f"Error processing uploaded file: {e}")
return
else:
st.info("πŸ‘‹ Welcome! Please upload a CSV file to begin analysis.")
return
else:
# Get processed data from session state
non_billable_data = st.session_state.processed_data['non_billable_data']
team_data = st.session_state.processed_data['team_data']
epic_data = st.session_state.processed_data['epic_data']
monthly_data = st.session_state.processed_data['monthly_data']
unique_epics = st.session_state.processed_data['unique_epics']
tech_category_data = st.session_state.processed_data['tech_category_data']
upskilling_count = st.session_state.processed_data['upskilling_count']
# Set default selected epics if none are selected
if not st.session_state.selected_epics and epic_data:
st.session_state.selected_epics = [epic["Epic"] for epic in epic_data[:5]] # Top 5 epics
# Create tabs for different analysis views
tab1, tab2, tab3 = st.tabs([
"πŸ“Š Team & Epic Analysis",
"πŸ’» Upskilling Technology Analysis",
"πŸ“‹ View & Download CSV Data"
])
with tab1:
st.session_state.active_tab = 'team_analysis'
display_team_epic_analysis(team_data, epic_data, monthly_data, unique_epics)
with tab2:
st.session_state.active_tab = 'tech_analysis'
display_tech_category_analysis(team_data, tech_category_data, upskilling_count)
with tab3:
st.session_state.active_tab = 'csv_view'
display_categorized_data_view(st.session_state.categorized_df)
if __name__ == "__main__":
main()