import streamlit as st import pandas as pd from datetime import datetime, date, timedelta from sqlalchemy import create_engine, Column, Integer, String, DateTime, ForeignKey, Boolean, Float, Text from sqlalchemy.orm import sessionmaker, relationship, declarative_base from sqlalchemy.exc import IntegrityError import hashlib import time import plotly.express as px import io import base64 # --- Configuration & Setup --- # Set Streamlit page configuration st.set_page_config( page_title="Production-Ready Project Manager (Complete)", page_icon="🛠️", layout="wide", initial_sidebar_state="expanded" ) # --- Database Core (SQLAlchemy ORM) --- # Using SQLite for the demo, easily scalable to PostgreSQL. DATABASE_URL = "sqlite:///project_manager_complete.db" # IMPORTANT for SQLite in Streamlit/Colab: check_same_thread must be False Engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False}) Base = declarative_base() # --- Database Models --- class User(Base): """Database model for application users and team members.""" __tablename__ = 'users' id = Column(Integer, primary_key=True) username = Column(String, unique=True, nullable=False) email = Column(String, unique=True, nullable=False) password_hash = Column(String, nullable=False) role = Column(String, default="Team Member") # Admin, Project Manager, Team Member, Viewer created_at = Column(DateTime, default=datetime.now) projects = relationship("Project", back_populates="manager") tasks_assigned = relationship("Task", back_populates="assigned_to_user") time_logs = relationship("TimeLog", back_populates="user") risks_owned = relationship("Risk", back_populates="owner") class Project(Base): """Database model for projects.""" __tablename__ = 'projects' id = Column(Integer, primary_key=True) name = Column(String, nullable=False) description = Column(String) category = Column(String, default="Software") status = Column(String, default="Planning") # Planning, In Progress, Completed, Archived priority = Column(String, default="Medium") # Low, Medium, High start_date = Column(DateTime, default=datetime.now) due_date = Column(DateTime) manager_id = Column(Integer, ForeignKey('users.id')) manager = relationship("User", back_populates="projects") tasks = relationship("Task", back_populates="project", cascade="all, delete-orphan") sprints = relationship("Sprint", back_populates="project", cascade="all, delete-orphan") risks = relationship("Risk", back_populates="project", cascade="all, delete-orphan") class Task(Base): """Database model for individual tasks.""" __tablename__ = 'tasks' id = Column(Integer, primary_key=True) title = Column(String, nullable=False) # FIX: Corrected the typo where 'Column' was reassigned as a variable description = Column(Text) status = Column(String, default="To Do") # To Do, In Progress, In Review, Done priority = Column(String, default="Medium") # Low, Medium, High, Urgent estimate_hours = Column(Float, default=0.0) dependency_task_id = Column(Integer, ForeignKey('tasks.id'), nullable=True) # For WBS/Dependencies sprint_id = Column(Integer, ForeignKey('sprints.id'), nullable=True) # For Sprint Planning project_id = Column(Integer, ForeignKey('projects.id')) assigned_to_id = Column(Integer, ForeignKey('users.id')) created_at = Column(DateTime, default=datetime.now) project = relationship("Project", back_populates="tasks") assigned_to_user = relationship("User", back_populates="tasks_assigned") time_logs = relationship("TimeLog", back_populates="task", cascade="all, delete-orphan") sprint = relationship("Sprint", back_populates="tasks") # Self-referencing relationship for dependencies depends_on = relationship("Task", remote_side=[id], backref="dependent_tasks") class TimeLog(Base): """Database model for recording time spent on tasks.""" __tablename__ = 'time_logs' id = Column(Integer, primary_key=True) hours = Column(Float, nullable=False) log_date = Column(DateTime, default=datetime.now) task_id = Column(Integer, ForeignKey('tasks.id')) user_id = Column(Integer, ForeignKey('users.id')) task = relationship("Task", back_populates="time_logs") user = relationship("User", back_populates="time_logs") class Risk(Base): """Database model for the project risk register.""" __tablename__ = 'risks' id = Column(Integer, primary_key=True) name = Column(String, nullable=False) description = Column(Text) probability = Column(String, default="Low") # Low, Medium, High impact = Column(String, default="Low") # Low, Medium, High mitigation_plan = Column(Text) status = Column(String, default="Open") # Open, Managed, Closed project_id = Column(Integer, ForeignKey('projects.id')) owner_id = Column(Integer, ForeignKey('users.id')) project = relationship("Project", back_populates="risks") owner = relationship("User", back_populates="risks_owned") class Sprint(Base): """Database model for sprints (iterations).""" __tablename__ = 'sprints' id = Column(Integer, primary_key=True) name = Column(String, nullable=False) start_date = Column(DateTime) end_date = Column(DateTime) project_id = Column(Integer, ForeignKey('projects.id')) status = Column(String, default="Planning") # Planning, Active, Completed project = relationship("Project", back_populates="sprints") tasks = relationship("Task", back_populates="sprint") # --- Database Manager & Seeding --- class DatabaseManager: """Manages database connection and CRUD operations.""" def __init__(self): self.Session = sessionmaker(bind=Engine) self.session = self.Session() self._ensure_db_schema() self._seed_initial_data() def _ensure_db_schema(self): """Ensures the database schema is created.""" Base.metadata.create_all(Engine) def _seed_initial_data(self): """Creates initial admin and demo users if they don't exist.""" def hash_password(password): return hashlib.sha256(password.encode()).hexdigest() if self.session.query(User).filter_by(username='admin').first() is None: admin = User(username='admin', email='admin@tool.com', password_hash=hash_password('adminpass'), role='Admin') manager = User(username='manager', email='manager@tool.com', password_hash=hash_password('managerpass'), role='Project Manager') member = User(username='member', email='member@tool.com', password_hash=hash_password('memberpass'), role='Team Member') self.session.add_all([admin, manager, member]) self.session.commit() st.toast("Initial users created: admin, manager, member", icon="🔑") if self.session.query(Project).count() == 0: admin = self.session.query(User).filter_by(username='admin').first() manager_user = self.session.query(User).filter_by(username='manager').first() member_user = self.session.query(User).filter_by(username='member').first() proj_core = Project(name="Tool Core Development", description="Develop the core features and database.", category="Software", status="In Progress", priority="High", due_date=datetime.now() + timedelta(days=90), manager_id=manager_user.id) proj_marketing = Project(name="Q4 Marketing Strategy", description="Plan and execute holiday marketing campaign.", category="Marketing", status="Planning", priority="Medium", due_date=datetime.now() + timedelta(days=45), manager_id=admin.id) self.session.add_all([proj_core, proj_marketing]) self.session.commit() # Create Sprint sprint_1 = Sprint(name="Sprint 1: Auth & DB", project_id=proj_core.id, start_date=datetime.now(), end_date=datetime.now() + timedelta(days=14), status="Active") self.session.add(sprint_1) self.session.commit() # Create Tasks task1 = Task(title="Implement Kanban View", status="In Progress", priority="Urgent", estimate_hours=8, project_id=proj_core.id, assigned_to_id=member_user.id, sprint_id=sprint_1.id) task2 = Task(title="Design Database Schema", status="Done", priority="High", estimate_hours=12, project_id=proj_core.id, assigned_to_id=admin.id) task3 = Task(title="Launch User Testing Phase 1", status="To Do", priority="High", estimate_hours=20, project_id=proj_marketing.id, assigned_to_id=manager_user.id) self.session.add_all([task1, task2, task3]) self.session.commit() # Set dependency: task1 depends on task2 task1.dependency_task_id = task2.id self.session.commit() # Create Risk risk1 = Risk(name="Database Migration Failure", description="Risk of data loss during production database switch.", probability="High", impact="High", mitigation_plan="Perform dry run migration and full backup.", project_id=proj_core.id, owner_id=admin.id) self.session.add(risk1) # Create Time Log log1 = TimeLog(hours=4.5, task_id=task2.id, user_id=admin.id) self.session.add(log1) self.session.commit() st.toast("Demo data (Projects, Sprints, Tasks, Risks, Time Logs) created.", icon="🚀") # --- Generic CRUD Methods --- def create(self, entity): try: self.session.add(entity) self.session.commit() return True except IntegrityError: self.session.rollback() return False except Exception as e: st.error(f"Database error: {e}") self.session.rollback() return False def read_all(self, model, project_id=None): if project_id and hasattr(model, 'project_id'): return self.session.query(model).filter(model.project_id == project_id).all() return self.session.query(model).all() def read_by_id(self, model, id): return self.session.query(model).get(id) def update(self, entity): try: self.session.merge(entity) self.session.commit() return True except Exception as e: st.error(f"Database error: {e}") self.session.rollback() return False def delete(self, entity): try: self.session.delete(entity) self.session.commit() return True except Exception as e: st.error(f"Database error: {e}") self.session.rollback() return False def get_users_for_assignment(self): users = self.session.query(User).all() return {user.username: user.id for user in users} def get_user_id_by_username(self, username): user = self.session.query(User).filter_by(username=username).first() return user.id if user else None def get_total_logged_hours(self, task_id): logs = self.session.query(TimeLog).filter(TimeLog.task_id == task_id).all() return sum(log.hours for log in logs) # --- Authentication Service --- class AuthService: """Handles user authentication and session state management.""" def __init__(self, db_manager): self.db = db_manager def hash_password(self, password): return hashlib.sha256(password.encode()).hexdigest() def authenticate(self, username, password): user = self.db.session.query(User).filter_by(username=username).first() if user and user.password_hash == self.hash_password(password): st.session_state.is_authenticated = True st.session_state.username = user.username st.session_state.user_role = user.role st.session_state.user_id = user.id st.session_state.current_page = "Dashboard" st.toast(f"Welcome back, {user.username} ({user.role})!", icon="👋") return True return False def check_role_access(self, required_roles): return st.session_state.get('user_role') in required_roles def logout(self): st.session_state.clear() st.session_state.is_authenticated = False st.session_state.current_page = "Login" st.experimental_rerun() # --- Streamlit UI Components & Utilities --- def init_session_state(): """Initialize necessary session state variables.""" if 'is_authenticated' not in st.session_state: st.session_state.is_authenticated = False st.session_state.current_page = "Login" st.session_state.username = None st.session_state.user_role = None st.session_state.user_id = None st.session_state.show_task_form = False # For Kanban task creation def get_db_and_auth(): """Utility to get initialized DB and Auth services.""" if 'db_manager' not in st.session_state: st.session_state.db_manager = DatabaseManager() if 'auth_service' not in st.session_state: st.session_state.auth_service = AuthService(st.session_state.db_manager) return st.session_state.db_manager, st.session_state.auth_service def draw_kpi_card(title, value, icon, color_code): """Draws a responsive, styled card for the dashboard.""" st.markdown(f"""

{icon} {title}

{value}

""", unsafe_allow_html=True) # --- Visualizations --- def get_gantt_chart(tasks): """Generates a Plotly Gantt chart from task data.""" if not tasks: return None df_data = [] for task in tasks: logged_hours = st.session_state.db_manager.get_total_logged_hours(task.id) # For simplicity, calculate end date based on logged time and a generic assumption # In a real app, this would use due_date and complex scheduling logic. start = task.created_at.date() end = start + timedelta(hours=task.estimate_hours) if task.estimate_hours > 0 else start + timedelta(days=1) df_data.append(dict( Task=task.title, Start=start, Finish=end.date(), Resource=task.assigned_to_user.username, Status=task.status, Priority=task.priority, Duration=task.estimate_hours, Logged=logged_hours )) df = pd.DataFrame(df_data) # Map status to color status_colors = { "To Do": "#ef4444", "In Progress": "#f97316", "In Review": "#6366f1", "Done": "#10b981" } fig = px.timeline( df, x_start="Start", x_end="Finish", y="Task", color="Status", color_discrete_map=status_colors, custom_data=['Resource', 'Priority', 'Duration', 'Logged'] ) fig.update_layout( title='Project Task Timeline (Gantt Chart)', xaxis_title="Timeline", yaxis_title="", hoverlabel=dict(bgcolor="white", font_size=12) ) fig.update_traces( hovertemplate="Task: %{y}
" + "Assigned: %{customdata[0]}
" + "Start: %{x}
" + "End: %{xend}
" + "Priority: %{customdata[1]}
" + "Estimated: %{customdata[2]} hrs
" + "Logged: %{customdata[3]} hrs" ) return fig # --- Page Functions --- def login_page(auth_service): """The login and registration page.""" st.title("🔑 Project Management Tool Access") col1, col2 = st.columns([1, 1], gap="large") with col1: st.subheader("Existing User Login") with st.form("login_form"): username = st.text_input("Username", key="login_user") password = st.text_input("Password", type="password", key="login_pass") submitted = st.form_submit_button("Login", type="primary") if submitted: if auth_service.authenticate(username, password): st.experimental_rerun() else: st.error("Invalid username or password.") with col2: st.subheader("New User Registration") st.info("Registration is open for Project Manager or Team Member roles.") with st.form("register_form"): new_username = st.text_input("Choose Username", key="reg_user") new_email = st.text_input("Email", key="reg_email") new_password = st.text_input("Set Password", type="password", key="reg_pass") confirm_password = st.text_input("Confirm Password", type="password", key="reg_confirm_pass") role_options = ["Team Member", "Project Manager"] new_role = st.selectbox("Your Role", role_options, index=0, key="reg_role") register_submitted = st.form_submit_button("Register") if register_submitted: if not all([new_username, new_email, new_password, confirm_password]): st.error("All fields are required.") elif new_password != confirm_password: st.error("Passwords do not match.") else: db_manager = st.session_state.db_manager hashed_pass = auth_service.hash_password(new_password) new_user = User( username=new_username, email=new_email, password_hash=hashed_pass, role=new_role ) if db_manager.create(new_user): st.success("Registration successful! Please log in.") else: st.error("Username or Email already exists.") def dashboard_page(db_manager): """The main user dashboard/landing page with KPIs and charts.""" st.title("📊 Personal & Portfolio Dashboard") st.markdown(f"### Welcome, {st.session_state.username} ({st.session_state.user_role})!") all_projects = db_manager.read_all(Project) all_tasks = db_manager.read_all(Task) # Filter tasks assigned to the current user user_tasks = [t for t in all_tasks if t.assigned_to_id == st.session_state.user_id] # Calculate key metrics total_projects = len(all_projects) projects_active = len([p for p in all_projects if p.status == 'In Progress']) my_tasks_open = len([t for t in user_tasks if t.status != 'Done']) my_tasks_done = len([t for t in user_tasks if t.status == 'Done']) total_hours_logged = sum(log.hours for log in db_manager.read_all(TimeLog) if log.user_id == st.session_state.user_id) # KPI Cards col1, col2, col3, col4, col5 = st.columns(5) with col1: draw_kpi_card("Total Projects", total_projects, "📦", '#3b82f6') with col2: draw_kpi_card("Active Projects", projects_active, "📈", '#10b981') with col3: draw_kpi_card("My Open Tasks", my_tasks_open, "🔥", '#f97316') with col4: draw_kpi_card("My Done Tasks", my_tasks_done, "✅", '#6366f1') with col5: draw_kpi_card("Hours Logged", f"{total_hours_logged:.1f}h", "⏱️", '#d946ef') st.markdown("---") col_chart, col_summary = st.columns([2, 1]) with col_chart: st.subheader("Team Task Status Breakdown") if all_tasks: task_status_df = pd.DataFrame([{'Status': t.status} for t in all_tasks]) status_counts = task_status_df['Status'].value_counts().reset_index() status_counts.columns = ['Status', 'Count'] fig = px.bar(status_counts, x='Status', y='Count', color='Status', color_discrete_map={"To Do": "#ef4444", "In Progress": "#f97316", "In Review": "#6366f1", "Done": "#10b981"}, title="Overall Task Status Distribution") st.plotly_chart(fig, use_container_width=True) else: st.info("No tasks to display.") with col_summary: st.subheader("Critical Project Info") # Display risks risks = db_manager.read_all(Risk) st.metric(label="Total Open Risks", value=len([r for r in risks if r.status == 'Open'])) # Display high priority tasks high_tasks = [t for t in all_tasks if t.priority in ['High', 'Urgent'] and t.status != 'Done'] st.metric(label="Urgent/High Priority Tasks", value=len(high_tasks)) st.markdown("**Urgent Tasks**") for t in high_tasks[:5]: st.markdown(f"- **{t.title}** ({t.project.name})") def projects_page(db_manager): """Page for Project CRUD and sub-feature navigation.""" st.title("📦 Project Management (CRUD)") if st.session_state.user_role not in ['Admin', 'Project Manager']: st.warning("You do not have permission to manage projects. Viewing mode only.") tab1, tab2 = st.tabs(["View/Edit Projects", "Create New Project"]) with tab1: projects = db_manager.read_all(Project) project_data = [] for p in projects: manager_username = p.manager.username if p.manager else "N/A" total_tasks = len(p.tasks) completed_tasks = len([t for t in p.tasks if t.status == 'Done']) project_data.append({ 'ID': p.id, 'Name': p.name, 'Manager': manager_username, 'Status': p.status, 'Priority': p.priority, 'Tasks': f"{completed_tasks}/{total_tasks}", 'Due Date': p.due_date.strftime('%Y-%m-%d') if p.due_date else 'N/A' }) df = pd.DataFrame(project_data) st.dataframe(df, use_container_width=True, hide_index=True) st.markdown("---") st.subheader("Edit/Delete Project") project_ids = [p.id for p in projects] if project_ids: selected_id = st.selectbox("Select Project ID to Edit/Delete", project_ids, key="proj_select_edit") selected_project = db_manager.read_by_id(Project, selected_id) if selected_project: with st.expander(f"Edit Project: {selected_project.name}", expanded=False): if st.session_state.user_role in ['Admin', 'Project Manager']: users_map = db_manager.get_users_for_assignment() current_manager_name = db_manager.read_by_id(User, selected_project.manager_id).username with st.form(f"edit_project_form_{selected_id}"): new_name = st.text_input("Project Name", value=selected_project.name) new_desc = st.text_area("Description", value=selected_project.description) new_status = st.selectbox("Status", ['Planning', 'In Progress', 'Completed', 'Archived'], index=['Planning', 'In Progress', 'Completed', 'Archived'].index(selected_project.status)) new_priority = st.selectbox("Priority", ['Low', 'Medium', 'High'], index=['Low', 'Medium', 'High'].index(selected_project.priority)) new_manager_name = st.selectbox("Project Manager", list(users_map.keys()), index=list(users_map.keys()).index(current_manager_name)) col_e1, col_e2 = st.columns(2) with col_e1: update_submitted = st.form_submit_button("Update Project", type="primary") with col_e2: delete_submitted = st.form_submit_button("Delete Project", type="danger") if update_submitted: selected_project.name, selected_project.description, selected_project.status, selected_project.priority, selected_project.manager_id = new_name, new_desc, new_status, new_priority, users_map[new_manager_name] if db_manager.update(selected_project): st.success(f"Project '{selected_project.name}' updated successfully!") time.sleep(1); st.experimental_rerun() else: st.error("Failed to update project.") if delete_submitted: if db_manager.delete(selected_project): st.success(f"Project '{selected_project.name}' and all associated data deleted.") time.sleep(1); st.experimental_rerun() else: st.error("Failed to delete project.") else: st.warning("Insufficient permissions to edit or delete projects.") else: st.info("No projects created yet.") with tab2: if st.session_state.user_role in ['Admin', 'Project Manager']: users_map = db_manager.get_users_for_assignment() with st.form("new_project_form"): project_name = st.text_input("Project Name (Required)") project_desc = st.text_area("Description") col_c1, col_c2, col_c3 = st.columns(3) with col_c1: project_manager_name = st.selectbox("Project Manager", list(users_map.keys())) with col_c2: project_priority = st.selectbox("Priority", ['Low', 'Medium', 'High']) with col_c3: project_due_date = st.date_input("Due Date", min_value=date.today()) create_submitted = st.form_submit_button("Create Project", type="primary") if create_submitted: if not project_name: st.error("Project Name is required.") else: new_project = Project(name=project_name, description=project_desc, priority=project_priority, due_date=datetime.combine(project_due_date, datetime.min.time()), manager_id=users_map[project_manager_name]) if db_manager.create(new_project): st.success(f"Project '{project_name}' created successfully!"); time.sleep(1); st.experimental_rerun() else: st.error("Failed to create project.") else: st.warning("You must be an Admin or Project Manager to create new projects.") def kanban_page(db_manager): """Page for Task Management, Kanban View, and Time Logging.""" st.title("📋 Kanban Board & Tasks") all_projects = db_manager.read_all(Project) project_options = {p.name: p.id for p in all_projects} col_sel, col_add = st.columns([3, 1]) with col_sel: selected_project_name = st.selectbox("Select Project for Kanban View", list(project_options.keys()) if project_options else ["No Projects Available"], key="kanban_project_select") if not selected_project_name or selected_project_name == "No Projects Available": st.info("Please create a project first on the 'Project Management' page.") return selected_project_id = project_options[selected_project_name] tasks = db_manager.read_all(Task, project_id=selected_project_id) sprint_options = db_manager.read_all(Sprint, project_id=selected_project_id) sprint_map = {s.name: s.id for s in sprint_options} # --- Task Creation Sidebar/Expander --- with col_add: if st.session_state.user_role in ['Admin', 'Project Manager']: if st.button("➕ Add New Task", use_container_width=True, type="primary"): st.session_state.show_task_form = True if st.session_state.get('show_task_form', False): with st.sidebar: st.subheader(f"Add Task to {selected_project_name}") users_map = db_manager.get_users_for_assignment() with st.form("new_task_form"): task_title = st.text_input("Task Title (Required)") task_desc = st.text_area("Description") col_i1, col_i2 = st.columns(2) with col_i1: task_assigned_name = st.selectbox("Assign To", list(users_map.keys())) with col_i2: task_priority = st.selectbox("Priority", ['Low', 'Medium', 'High', 'Urgent']) col_i3, col_i4 = st.columns(2) with col_i3: task_estimate = st.number_input("Estimate (Hours)", min_value=0.0, value=4.0, step=0.5) with col_i4: sprint_name = st.selectbox("Assign to Sprint (Optional)", ["Backlog"] + list(sprint_map.keys())) # Dependencies Mock dependency_tasks = ["None"] + [f"Task #{t.id}: {t.title}" for t in tasks if t.status != 'Done'] dependency_selection = st.selectbox("Depends on (Task Dependency)", dependency_tasks) col_b1, col_b2 = st.columns(2) with col_b1: create_task_submitted = st.form_submit_button("Create Task", type="primary") with col_b2: if st.form_submit_button("Cancel"): st.session_state.show_task_form = False; st.experimental_rerun() if create_task_submitted: if not task_title: st.error("Task Title is required.") else: dependency_id = int(dependency_selection.split(':')[0].replace('Task #', '')) if dependency_selection != "None" else None sprint_id = sprint_map.get(sprint_name) new_task = Task(title=task_title, description=task_desc, priority=task_priority, estimate_hours=task_estimate, project_id=selected_project_id, assigned_to_id=users_map[task_assigned_name], dependency_task_id=dependency_id, sprint_id=sprint_id) if db_manager.create(new_task): st.success(f"Task '{task_title}' created."); st.session_state.show_task_form = False; time.sleep(1); st.experimental_rerun() else: st.error("Failed to create task.") # --- Kanban Board Rendering --- KANBAN_STATUSES = ["To Do", "In Progress", "In Review", "Done"] tasks_by_status = {status: [] for status in KANBAN_STATUSES} for task in tasks: if task.status in KANBAN_STATUSES: tasks_by_status[task.status].append(task) cols = st.columns(len(KANBAN_STATUSES)) st.markdown(""" """, unsafe_allow_html=True) for i, status in enumerate(KANBAN_STATUSES): with cols[i]: header_class = f"status-{status.lower().replace(' ', '-')}" st.markdown(f'
{status} ({len(tasks_by_status[status])})
', unsafe_allow_html=True) task_container = st.container(height=600, border=False) for task in tasks_by_status[status]: logged_hours = db_manager.get_total_logged_hours(task.id) progress = logged_hours / task.estimate_hours if task.estimate_hours > 0 else 0 # --- Card Content --- card_content = f"""
📌 Task #{task.id} | Sprint: {task.sprint.name if task.sprint else 'Backlog'}
{task.title}

Assigned: {task.assigned_to_user.username}

Est: {task.estimate_hours:.1f}h | Logged: {logged_hours:.1f}h

{task.depends_on.title if task.depends_on else ''}

""" with task_container: st.markdown(card_content, unsafe_allow_html=True) if st.session_state.user_role in ['Admin', 'Project Manager', 'Team Member']: with st.form(f"update_task_{task.id}", clear_on_submit=False): col_up1, col_up2 = st.columns([2, 1]) with col_up1: new_status = st.selectbox("Status", KANBAN_STATUSES, index=KANBAN_STATUSES.index(task.status), label_visibility="collapsed", key=f"status_select_{task.id}") with col_up2: hours_to_log = st.number_input("Log (h)", min_value=0.0, max_value=24.0, value=0.0, step=0.5, key=f"log_input_{task.id}", label_visibility="collapsed") col_b1, col_b2 = st.columns(2) with col_b1: if st.form_submit_button("Move & Log", type="secondary", use_container_width=True): task.status = new_status if hours_to_log > 0: new_log = TimeLog(hours=hours_to_log, task_id=task.id, user_id=st.session_state.user_id) db_manager.create(new_log) if db_manager.update(task): st.toast(f"Task #{task.id} moved to {new_status} and {hours_to_log}h logged!", icon="👍") time.sleep(0.5); st.experimental_rerun() else: st.error("Failed to update task.") with col_b2: if st.form_submit_button("View Details"): st.info(f"Task Details for #{task.id}: {task.description or 'No detailed description.'}") st.info(f"Current Dependencies: {task.depends_on.title if task.depends_on else 'None'}") def sprint_page(db_manager): """Page for Sprint management and burndown chart analytics (mock).""" st.title("🏃 Sprint & Backlog Management") all_projects = db_manager.read_all(Project) project_options = {p.name: p.id for p in all_projects} selected_project_name = st.selectbox("Select Project for Sprint Planning", list(project_options.keys()) if project_options else ["No Projects Available"], key="sprint_project_select") if not selected_project_name or selected_project_name == "No Projects Available": return selected_project_id = project_options[selected_project_name] col_s1, col_s2 = st.columns([2, 1]) with col_s2: with st.expander("➕ Create New Sprint"): if st.session_state.user_role in ['Admin', 'Project Manager']: with st.form("new_sprint_form"): sprint_name = st.text_input("Sprint Name (e.g., Sprint 3)") sprint_start = st.date_input("Start Date", value=date.today()) sprint_end = st.date_input("End Date", value=date.today() + timedelta(days=14)) if st.form_submit_button("Create Sprint", type="primary"): new_sprint = Sprint(name=sprint_name, project_id=selected_project_id, start_date=datetime.combine(sprint_start, datetime.min.time()), end_date=datetime.combine(sprint_end, datetime.min.time())) if db_manager.create(new_sprint): st.success("Sprint created!"); time.sleep(0.5); st.experimental_rerun() else: st.error("Failed to create sprint.") else: st.warning("Only Project Managers can create sprints.") sprints = db_manager.read_all(Sprint, project_id=selected_project_id) with col_s1: st.subheader("Project Sprints") sprint_data = [] for s in sprints: tasks_count = len(s.tasks) completed_count = len([t for t in s.tasks if t.status == 'Done']) sprint_data.append({ 'ID': s.id, 'Name': s.name, 'Status': s.status, 'Tasks': f"{completed_count}/{tasks_count}", 'Duration': f"{s.start_date.strftime('%Y-%m-%d')} to {s.end_date.strftime('%Y-%m-%d')}" }) st.dataframe(pd.DataFrame(sprint_data), use_container_width=True, hide_index=True) st.markdown("---") st.subheader("Backlog & Task Assignment") # Tasks not assigned to a sprint are in the backlog (sprint_id is None) backlog_tasks = [t for t in db_manager.read_all(Task, project_id=selected_project_id) if not t.sprint] col_backlog, col_sprint_assign = st.columns(2) with col_backlog: st.markdown("#### Backlog Tasks") if backlog_tasks: backlog_df = pd.DataFrame([{'ID': t.id, 'Title': t.title, 'Est. Hrs': t.estimate_hours, 'Priority': t.priority} for t in backlog_tasks]) st.dataframe(backlog_df, use_container_width=True, hide_index=True) else: st.info("The backlog is empty! All tasks are assigned to sprints.") with col_sprint_assign: st.markdown("#### Assign Task to Sprint") if sprints and backlog_tasks: sprint_names = {s.id: s.name for s in sprints} backlog_titles = {t.id: t.title for t in backlog_tasks} with st.form("assign_task_form"): task_to_assign = st.selectbox("Select Task from Backlog", list(backlog_titles.keys()), format_func=lambda x: backlog_titles[x]) target_sprint = st.selectbox("Select Target Sprint", list(sprint_names.keys()), format_func=lambda x: sprint_names[x]) if st.form_submit_button("Move to Sprint"): task = db_manager.read_by_id(Task, task_to_assign) task.sprint_id = target_sprint if db_manager.update(task): st.success(f"Task '{task.title}' moved to sprint '{sprint_names[target_sprint]}'."); time.sleep(0.5); st.experimental_rerun() else: st.error("Failed to assign task.") elif sprints: st.info("No tasks in the backlog to assign.") else: st.warning("Create a sprint first to assign tasks.") def gantt_page(db_manager): """Page for Gantt Chart visualization (WBS mock included).""" st.title("📈 Timeline & Work Breakdown Structure (WBS)") all_projects = db_manager.read_all(Project) project_options = {p.name: p.id for p in all_projects} selected_project_name = st.selectbox("Select Project for Visualization", list(project_options.keys()) if project_options else ["No Projects Available"], key="gantt_project_select") if not selected_project_name or selected_project_name == "No Projects Available": return selected_project_id = project_options[selected_project_name] project_tasks = db_manager.read_all(Task, project_id=selected_project_id) # --- WBS/Dependencies Visualization Mock --- st.markdown("### Work Breakdown Structure (Dependency View)") if project_tasks: # Create a dictionary mapping task ID to task object tasks_map = {t.id: t for t in project_tasks} # Identify top-level tasks (tasks that no other task depends on, and have no dependency themselves) dependent_ids = {t.dependency_task_id for t in project_tasks if t.dependency_task_id} top_level_tasks = [t for t in project_tasks if t.id not in dependent_ids and not t.dependency_task_id] # Build dependency tree structure def render_wbs_node(task, level=0): prefix = "•" * (level + 1) status_emoji = "✅" if task.status == 'Done' else "🚧" st.markdown(f"#### {prefix} {status_emoji} **{task.title}** (Est: {task.estimate_hours}h, Prio: {task.priority})") # Find tasks that depend on the current task for dependent_task in [t for t in project_tasks if t.dependency_task_id == task.id]: render_wbs_node(dependent_task, level + 1) if top_level_tasks: st.info("Tasks are shown in order of completion (dependencies first).") for task in top_level_tasks: render_wbs_node(task) else: st.warning("No clear top-level tasks or dependencies found. All tasks might be independent.") else: st.info("No tasks available to generate WBS.") st.markdown("---") # --- Gantt Chart Visualization --- st.markdown("### Project Gantt Chart") fig = get_gantt_chart(project_tasks) if fig: st.plotly_chart(fig, use_container_width=True) else: st.info("No tasks available to render the Gantt chart.") def risk_register_page(db_manager): """Page for managing project risks.""" st.title("🚨 Risk Register") all_projects = db_manager.read_all(Project) project_options = {p.name: p.id for p in all_projects} users_map = db_manager.get_users_for_assignment() selected_project_name = st.selectbox("Select Project to Manage Risks", list(project_options.keys()) if project_options else ["No Projects Available"], key="risk_project_select") if not selected_project_name or selected_project_name == "No Projects Available": return selected_project_id = project_options[selected_project_name] if st.session_state.user_role not in ['Admin', 'Project Manager']: st.warning("You must be an Admin or Project Manager to manage risks.") tab1, tab2 = st.tabs(["View Risks", "Add New Risk"]) with tab1: risks = db_manager.read_all(Risk, project_id=selected_project_id) if risks: risk_data = [] for r in risks: risk_data.append({ 'ID': r.id, 'Name': r.name, 'Probability': r.probability, 'Impact': r.impact, 'Status': r.status, 'Owner': r.owner.username }) st.dataframe(pd.DataFrame(risk_data), use_container_width=True, hide_index=True) # Risk Mitigation Action st.markdown("---") st.subheader("Mitigation & Closure") risk_ids = [r.id for r in risks] selected_risk_id = st.selectbox("Select Risk ID to Manage", risk_ids) if selected_risk_id and st.session_state.user_role in ['Admin', 'Project Manager']: selected_risk = db_manager.read_by_id(Risk, selected_risk_id) with st.form(f"mitigation_form_{selected_risk_id}"): st.markdown(f"**Risk:** {selected_risk.name}") st.text_area("Mitigation Plan", value=selected_risk.mitigation_plan, key="mitigation_plan_edit") new_status = st.selectbox("Status", ['Open', 'Managed', 'Closed'], index=['Open', 'Managed', 'Closed'].index(selected_risk.status)) if st.form_submit_button("Update Risk"): selected_risk.mitigation_plan = st.session_state[f"mitigation_plan_edit"] selected_risk.status = new_status if db_manager.update(selected_risk): st.success("Risk updated successfully."); time.sleep(0.5); st.experimental_rerun() else: st.error("Failed to update risk.") else: st.info("No risks registered for this project yet.") with tab2: if st.session_state.user_role in ['Admin', 'Project Manager']: with st.form("add_risk_form"): risk_name = st.text_input("Risk Title (Required)") risk_desc = st.text_area("Detailed Description") col_r1, col_r2 = st.columns(2) with col_r1: risk_prob = st.selectbox("Probability", ['Low', 'Medium', 'High']) with col_r2: risk_impact = st.selectbox("Impact", ['Low', 'Medium', 'High']) risk_owner_name = st.selectbox("Risk Owner", list(users_map.keys())) if st.form_submit_button("Register Risk", type="primary"): if not risk_name: st.error("Risk Title is required.") else: new_risk = Risk(name=risk_name, description=risk_desc, probability=risk_prob, impact=risk_impact, project_id=selected_project_id, owner_id=users_map[risk_owner_name]) if db_manager.create(new_risk): st.success("Risk successfully registered."); time.sleep(0.5); st.experimental_rerun() else: st.error("Failed to register risk.") def reports_page(db_manager): """Page for generating and exporting data reports.""" st.title("📄 Reports and Analytics") tab1, tab2, tab3 = st.tabs(["Data Export", "Time Tracking Report", "Velocity Chart (Mock)"]) # --- Data Export Tab --- with tab1: st.subheader("Data Export (CSV & JSON)") def convert_df_to_csv(df): # Function to convert DataFrame to CSV for download return df.to_csv(index=False).encode('utf-8') models = {'Projects': Project, 'Tasks': Task, 'Users': User, 'Time Logs': TimeLog, 'Risks': Risk} selected_model = st.selectbox("Select Data Entity to Export", list(models.keys())) data = db_manager.read_all(models[selected_model]) if data: # Simple conversion to DataFrame, handling relationships for display df_export = pd.DataFrame([item.__dict__ for item in data]) df_export = df_export.drop(columns=['_sa_instance_state'], errors='ignore') st.dataframe(df_export, use_container_width=True) csv = convert_df_to_csv(df_export) st.download_button( label=f"Download {selected_model} as CSV", data=csv, file_name=f'{selected_model.lower().replace(" ", "_")}_report_{datetime.now().strftime("%Y%m%d")}.csv', mime='text/csv', type="primary" ) st.markdown(""" > **PDF/Excel Export Note:** For PDF/Excel exports (`ReportLab`/`XlsxWriter`), you would use the `df_export` object, > install the required libraries (e.g., `!pip install openpyxl`), and implement the conversion logic here. """) else: st.info("No data available for export.") # --- Time Tracking Report Tab --- with tab2: st.subheader("Team Time Tracking Summary") all_logs = db_manager.read_all(TimeLog) if all_logs: log_data = [] for log in all_logs: log_data.append({ 'User': log.user.username, 'Task ID': log.task.id, 'Task Title': log.task.title, 'Project': log.task.project.name, 'Hours Logged': log.hours, 'Date': log.log_date.strftime('%Y-%m-%d') }) df_logs = pd.DataFrame(log_data) st.dataframe(df_logs.sort_values(by='Hours Logged', ascending=False), use_container_width=True, hide_index=True) # Plot Weekly Logged Hours df_logs['Week'] = pd.to_datetime(df_logs['Date']).dt.to_period('W').astype(str) weekly_summary = df_logs.groupby(['User', 'Week'])['Hours Logged'].sum().reset_index() fig = px.bar(weekly_summary, x="Week", y="Hours Logged", color="User", title="Weekly Logged Hours by Team Member") st.plotly_chart(fig, use_container_width=True) else: st.info("No time has been logged yet.") # --- Velocity Chart Tab (Mock) --- with tab3: st.subheader("Sprint Velocity Chart") st.warning("This is a functional mock. For true Burndown/Velocity, complex calculation of Story Points/Estimates within a sprint is required.") all_sprints = db_manager.read_all(Sprint) if all_sprints: velocity_data = [] for s in all_sprints: if s.status == 'Completed': completed_tasks = [t for t in s.tasks if t.status == 'Done'] # Velocity is sum of estimates of completed tasks velocity = sum(t.estimate_hours for t in completed_tasks) velocity_data.append({'Sprint': s.name, 'Project': s.project.name, 'Velocity (Hours)': velocity}) if velocity_data: df_velocity = pd.DataFrame(velocity_data) fig_vel = px.bar(df_velocity, x="Sprint", y="Velocity (Hours)", color="Project", title="Sprint Velocity (Hours Completed)", labels={'Velocity (Hours)': 'Completed Hours'}) st.plotly_chart(fig_vel, use_container_width=True) else: st.info("No completed sprints with logged velocity data.") def administration_page(db_manager): """Page for system settings and role management.""" st.title("⚙️ Administration & Team Management") if not st.session_state.user_role == 'Admin': st.error("Access Denied. Only Administrators can view this page.") return tab1, tab2 = st.tabs(["Role Management", "System Settings (Mock)"]) with tab1: st.subheader("User Role and Team Management") users = db_manager.read_all(User) user_data = [{ 'ID': u.id, 'Username': u.username, 'Email': u.email, 'Role': u.role } for u in users] st.dataframe(pd.DataFrame(user_data), use_container_width=True, hide_index=True) st.markdown("---") st.subheader("Update User Role") user_map = {u.username: u.id for u in users} selected_username = st.selectbox("Select User", list(user_map.keys())) if selected_username: user_to_update = db_manager.read_by_id(User, user_map[selected_username]) with st.form("update_role_form"): new_role = st.selectbox("New Role", ['Admin', 'Project Manager', 'Team Member', 'Viewer'], index=['Admin', 'Project Manager', 'Team Member', 'Viewer'].index(user_to_update.role)) if st.form_submit_button("Update Role", type="primary"): user_to_update.role = new_role if db_manager.update(user_to_update): st.success(f"Role for {selected_username} updated to {new_role}.") time.sleep(0.5); st.experimental_rerun() else: st.error("Failed to update user role.") with tab2: st.subheader("System Settings and Audit Logs") st.info("This section is a placeholder for complex system configurations.") st.code(""" # Example system settings structure SYSTEM_SETTINGS = { 'DEFAULT_SPRINT_DAYS': 14, 'MAX_PROJECTS_PER_MANAGER': 10, 'AUDIT_LOGGING_ENABLED': True } """) # --- Main Application Logic --- def main_app(): """Main function to run the Streamlit app.""" init_session_state() db_manager, auth_service = get_db_and_auth() # --- Sidebar Navigation --- with st.sidebar: st.image("https://placehold.co/150x50/1e40af/ffffff?text=PM+TOOL", width=200) st.markdown("## Navigation") if st.session_state.is_authenticated: # Defined list of main pages main_pages = [ "Dashboard", "Project Management", "Kanban Board", "Sprint Management", "Gantt & WBS", "Risk Register", "Reports & Analytics" ] if st.session_state.user_role == 'Admin': main_pages.append("Administration") st.session_state.current_page = st.radio( "Go to:", options=main_pages, key="main_nav" ) st.markdown("---") st.write(f"Logged in as: **{st.session_state.username}** ({st.session_state.user_role})") if st.button("Logout", type="secondary", use_container_width=True): auth_service.logout() else: st.info("Please log in to access the tool.") # --- Content Routing --- if not st.session_state.is_authenticated: login_page(auth_service) elif st.session_state.current_page == "Dashboard": dashboard_page(db_manager) elif st.session_state.current_page == "Project Management": projects_page(db_manager) elif st.session_state.current_page == "Kanban Board": kanban_page(db_manager) elif st.session_state.current_page == "Sprint Management": sprint_page(db_manager) elif st.session_state.current_page == "Gantt & WBS": gantt_page(db_manager) elif st.session_state.current_page == "Risk Register": risk_register_page(db_manager) elif st.session_state.current_page == "Reports & Analytics": reports_page(db_manager) elif st.session_state.current_page == "Administration": administration_page(db_manager) # --- Footer --- st.markdown(""" """, unsafe_allow_html=True) if __name__ == '__main__': main_app()