import streamlit as st import pandas as pd from datetime import datetime, date, timedelta from sqlalchemy import create_engine, Column, Integer, String, DateTime, ForeignKey, Boolean, Float, Text from sqlalchemy.orm import sessionmaker, relationship, declarative_base from sqlalchemy.exc import IntegrityError import hashlib import time import plotly.express as px import io import base64 # --- Configuration & Setup --- # Set Streamlit page configuration st.set_page_config( page_title="Production-Ready Project Manager (Complete)", page_icon="🛠️", layout="wide", initial_sidebar_state="expanded" ) # --- Database Core (SQLAlchemy ORM) --- # Using SQLite for the demo, easily scalable to PostgreSQL. DATABASE_URL = "sqlite:///project_manager_complete.db" # IMPORTANT for SQLite in Streamlit/Colab: check_same_thread must be False Engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False}) Base = declarative_base() # --- Database Models --- class User(Base): """Database model for application users and team members.""" __tablename__ = 'users' id = Column(Integer, primary_key=True) username = Column(String, unique=True, nullable=False) email = Column(String, unique=True, nullable=False) password_hash = Column(String, nullable=False) role = Column(String, default="Team Member") # Admin, Project Manager, Team Member, Viewer created_at = Column(DateTime, default=datetime.now) projects = relationship("Project", back_populates="manager") tasks_assigned = relationship("Task", back_populates="assigned_to_user") time_logs = relationship("TimeLog", back_populates="user") risks_owned = relationship("Risk", back_populates="owner") class Project(Base): """Database model for projects.""" __tablename__ = 'projects' id = Column(Integer, primary_key=True) name = Column(String, nullable=False) description = Column(String) category = Column(String, default="Software") status = Column(String, default="Planning") # Planning, In Progress, Completed, Archived priority = Column(String, default="Medium") # Low, Medium, High start_date = Column(DateTime, default=datetime.now) due_date = Column(DateTime) manager_id = Column(Integer, ForeignKey('users.id')) manager = relationship("User", back_populates="projects") tasks = relationship("Task", back_populates="project", cascade="all, delete-orphan") sprints = relationship("Sprint", back_populates="project", cascade="all, delete-orphan") risks = relationship("Risk", back_populates="project", cascade="all, delete-orphan") class Task(Base): """Database model for individual tasks.""" __tablename__ = 'tasks' id = Column(Integer, primary_key=True) title = Column(String, nullable=False) # FIX: Corrected the typo where 'Column' was reassigned as a variable description = Column(Text) status = Column(String, default="To Do") # To Do, In Progress, In Review, Done priority = Column(String, default="Medium") # Low, Medium, High, Urgent estimate_hours = Column(Float, default=0.0) dependency_task_id = Column(Integer, ForeignKey('tasks.id'), nullable=True) # For WBS/Dependencies sprint_id = Column(Integer, ForeignKey('sprints.id'), nullable=True) # For Sprint Planning project_id = Column(Integer, ForeignKey('projects.id')) assigned_to_id = Column(Integer, ForeignKey('users.id')) created_at = Column(DateTime, default=datetime.now) project = relationship("Project", back_populates="tasks") assigned_to_user = relationship("User", back_populates="tasks_assigned") time_logs = relationship("TimeLog", back_populates="task", cascade="all, delete-orphan") sprint = relationship("Sprint", back_populates="tasks") # Self-referencing relationship for dependencies depends_on = relationship("Task", remote_side=[id], backref="dependent_tasks") class TimeLog(Base): """Database model for recording time spent on tasks.""" __tablename__ = 'time_logs' id = Column(Integer, primary_key=True) hours = Column(Float, nullable=False) log_date = Column(DateTime, default=datetime.now) task_id = Column(Integer, ForeignKey('tasks.id')) user_id = Column(Integer, ForeignKey('users.id')) task = relationship("Task", back_populates="time_logs") user = relationship("User", back_populates="time_logs") class Risk(Base): """Database model for the project risk register.""" __tablename__ = 'risks' id = Column(Integer, primary_key=True) name = Column(String, nullable=False) description = Column(Text) probability = Column(String, default="Low") # Low, Medium, High impact = Column(String, default="Low") # Low, Medium, High mitigation_plan = Column(Text) status = Column(String, default="Open") # Open, Managed, Closed project_id = Column(Integer, ForeignKey('projects.id')) owner_id = Column(Integer, ForeignKey('users.id')) project = relationship("Project", back_populates="risks") owner = relationship("User", back_populates="risks_owned") class Sprint(Base): """Database model for sprints (iterations).""" __tablename__ = 'sprints' id = Column(Integer, primary_key=True) name = Column(String, nullable=False) start_date = Column(DateTime) end_date = Column(DateTime) project_id = Column(Integer, ForeignKey('projects.id')) status = Column(String, default="Planning") # Planning, Active, Completed project = relationship("Project", back_populates="sprints") tasks = relationship("Task", back_populates="sprint") # --- Database Manager & Seeding --- class DatabaseManager: """Manages database connection and CRUD operations.""" def __init__(self): self.Session = sessionmaker(bind=Engine) self.session = self.Session() self._ensure_db_schema() self._seed_initial_data() def _ensure_db_schema(self): """Ensures the database schema is created.""" Base.metadata.create_all(Engine) def _seed_initial_data(self): """Creates initial admin and demo users if they don't exist.""" def hash_password(password): return hashlib.sha256(password.encode()).hexdigest() if self.session.query(User).filter_by(username='admin').first() is None: admin = User(username='admin', email='admin@tool.com', password_hash=hash_password('adminpass'), role='Admin') manager = User(username='manager', email='manager@tool.com', password_hash=hash_password('managerpass'), role='Project Manager') member = User(username='member', email='member@tool.com', password_hash=hash_password('memberpass'), role='Team Member') self.session.add_all([admin, manager, member]) self.session.commit() st.toast("Initial users created: admin, manager, member", icon="🔑") if self.session.query(Project).count() == 0: admin = self.session.query(User).filter_by(username='admin').first() manager_user = self.session.query(User).filter_by(username='manager').first() member_user = self.session.query(User).filter_by(username='member').first() proj_core = Project(name="Tool Core Development", description="Develop the core features and database.", category="Software", status="In Progress", priority="High", due_date=datetime.now() + timedelta(days=90), manager_id=manager_user.id) proj_marketing = Project(name="Q4 Marketing Strategy", description="Plan and execute holiday marketing campaign.", category="Marketing", status="Planning", priority="Medium", due_date=datetime.now() + timedelta(days=45), manager_id=admin.id) self.session.add_all([proj_core, proj_marketing]) self.session.commit() # Create Sprint sprint_1 = Sprint(name="Sprint 1: Auth & DB", project_id=proj_core.id, start_date=datetime.now(), end_date=datetime.now() + timedelta(days=14), status="Active") self.session.add(sprint_1) self.session.commit() # Create Tasks task1 = Task(title="Implement Kanban View", status="In Progress", priority="Urgent", estimate_hours=8, project_id=proj_core.id, assigned_to_id=member_user.id, sprint_id=sprint_1.id) task2 = Task(title="Design Database Schema", status="Done", priority="High", estimate_hours=12, project_id=proj_core.id, assigned_to_id=admin.id) task3 = Task(title="Launch User Testing Phase 1", status="To Do", priority="High", estimate_hours=20, project_id=proj_marketing.id, assigned_to_id=manager_user.id) self.session.add_all([task1, task2, task3]) self.session.commit() # Set dependency: task1 depends on task2 task1.dependency_task_id = task2.id self.session.commit() # Create Risk risk1 = Risk(name="Database Migration Failure", description="Risk of data loss during production database switch.", probability="High", impact="High", mitigation_plan="Perform dry run migration and full backup.", project_id=proj_core.id, owner_id=admin.id) self.session.add(risk1) # Create Time Log log1 = TimeLog(hours=4.5, task_id=task2.id, user_id=admin.id) self.session.add(log1) self.session.commit() st.toast("Demo data (Projects, Sprints, Tasks, Risks, Time Logs) created.", icon="🚀") # --- Generic CRUD Methods --- def create(self, entity): try: self.session.add(entity) self.session.commit() return True except IntegrityError: self.session.rollback() return False except Exception as e: st.error(f"Database error: {e}") self.session.rollback() return False def read_all(self, model, project_id=None): if project_id and hasattr(model, 'project_id'): return self.session.query(model).filter(model.project_id == project_id).all() return self.session.query(model).all() def read_by_id(self, model, id): return self.session.query(model).get(id) def update(self, entity): try: self.session.merge(entity) self.session.commit() return True except Exception as e: st.error(f"Database error: {e}") self.session.rollback() return False def delete(self, entity): try: self.session.delete(entity) self.session.commit() return True except Exception as e: st.error(f"Database error: {e}") self.session.rollback() return False def get_users_for_assignment(self): users = self.session.query(User).all() return {user.username: user.id for user in users} def get_user_id_by_username(self, username): user = self.session.query(User).filter_by(username=username).first() return user.id if user else None def get_total_logged_hours(self, task_id): logs = self.session.query(TimeLog).filter(TimeLog.task_id == task_id).all() return sum(log.hours for log in logs) # --- Authentication Service --- class AuthService: """Handles user authentication and session state management.""" def __init__(self, db_manager): self.db = db_manager def hash_password(self, password): return hashlib.sha256(password.encode()).hexdigest() def authenticate(self, username, password): user = self.db.session.query(User).filter_by(username=username).first() if user and user.password_hash == self.hash_password(password): st.session_state.is_authenticated = True st.session_state.username = user.username st.session_state.user_role = user.role st.session_state.user_id = user.id st.session_state.current_page = "Dashboard" st.toast(f"Welcome back, {user.username} ({user.role})!", icon="👋") return True return False def check_role_access(self, required_roles): return st.session_state.get('user_role') in required_roles def logout(self): st.session_state.clear() st.session_state.is_authenticated = False st.session_state.current_page = "Login" st.experimental_rerun() # --- Streamlit UI Components & Utilities --- def init_session_state(): """Initialize necessary session state variables.""" if 'is_authenticated' not in st.session_state: st.session_state.is_authenticated = False st.session_state.current_page = "Login" st.session_state.username = None st.session_state.user_role = None st.session_state.user_id = None st.session_state.show_task_form = False # For Kanban task creation def get_db_and_auth(): """Utility to get initialized DB and Auth services.""" if 'db_manager' not in st.session_state: st.session_state.db_manager = DatabaseManager() if 'auth_service' not in st.session_state: st.session_state.auth_service = AuthService(st.session_state.db_manager) return st.session_state.db_manager, st.session_state.auth_service def draw_kpi_card(title, value, icon, color_code): """Draws a responsive, styled card for the dashboard.""" st.markdown(f"""
{icon} {title}
Assigned: {task.assigned_to_user.username}
Est: {task.estimate_hours:.1f}h | Logged: {logged_hours:.1f}h
{task.depends_on.title if task.depends_on else ''}
""" with task_container: st.markdown(card_content, unsafe_allow_html=True) if st.session_state.user_role in ['Admin', 'Project Manager', 'Team Member']: with st.form(f"update_task_{task.id}", clear_on_submit=False): col_up1, col_up2 = st.columns([2, 1]) with col_up1: new_status = st.selectbox("Status", KANBAN_STATUSES, index=KANBAN_STATUSES.index(task.status), label_visibility="collapsed", key=f"status_select_{task.id}") with col_up2: hours_to_log = st.number_input("Log (h)", min_value=0.0, max_value=24.0, value=0.0, step=0.5, key=f"log_input_{task.id}", label_visibility="collapsed") col_b1, col_b2 = st.columns(2) with col_b1: if st.form_submit_button("Move & Log", type="secondary", use_container_width=True): task.status = new_status if hours_to_log > 0: new_log = TimeLog(hours=hours_to_log, task_id=task.id, user_id=st.session_state.user_id) db_manager.create(new_log) if db_manager.update(task): st.toast(f"Task #{task.id} moved to {new_status} and {hours_to_log}h logged!", icon="👍") time.sleep(0.5); st.experimental_rerun() else: st.error("Failed to update task.") with col_b2: if st.form_submit_button("View Details"): st.info(f"Task Details for #{task.id}: {task.description or 'No detailed description.'}") st.info(f"Current Dependencies: {task.depends_on.title if task.depends_on else 'None'}") def sprint_page(db_manager): """Page for Sprint management and burndown chart analytics (mock).""" st.title("🏃 Sprint & Backlog Management") all_projects = db_manager.read_all(Project) project_options = {p.name: p.id for p in all_projects} selected_project_name = st.selectbox("Select Project for Sprint Planning", list(project_options.keys()) if project_options else ["No Projects Available"], key="sprint_project_select") if not selected_project_name or selected_project_name == "No Projects Available": return selected_project_id = project_options[selected_project_name] col_s1, col_s2 = st.columns([2, 1]) with col_s2: with st.expander("➕ Create New Sprint"): if st.session_state.user_role in ['Admin', 'Project Manager']: with st.form("new_sprint_form"): sprint_name = st.text_input("Sprint Name (e.g., Sprint 3)") sprint_start = st.date_input("Start Date", value=date.today()) sprint_end = st.date_input("End Date", value=date.today() + timedelta(days=14)) if st.form_submit_button("Create Sprint", type="primary"): new_sprint = Sprint(name=sprint_name, project_id=selected_project_id, start_date=datetime.combine(sprint_start, datetime.min.time()), end_date=datetime.combine(sprint_end, datetime.min.time())) if db_manager.create(new_sprint): st.success("Sprint created!"); time.sleep(0.5); st.experimental_rerun() else: st.error("Failed to create sprint.") else: st.warning("Only Project Managers can create sprints.") sprints = db_manager.read_all(Sprint, project_id=selected_project_id) with col_s1: st.subheader("Project Sprints") sprint_data = [] for s in sprints: tasks_count = len(s.tasks) completed_count = len([t for t in s.tasks if t.status == 'Done']) sprint_data.append({ 'ID': s.id, 'Name': s.name, 'Status': s.status, 'Tasks': f"{completed_count}/{tasks_count}", 'Duration': f"{s.start_date.strftime('%Y-%m-%d')} to {s.end_date.strftime('%Y-%m-%d')}" }) st.dataframe(pd.DataFrame(sprint_data), use_container_width=True, hide_index=True) st.markdown("---") st.subheader("Backlog & Task Assignment") # Tasks not assigned to a sprint are in the backlog (sprint_id is None) backlog_tasks = [t for t in db_manager.read_all(Task, project_id=selected_project_id) if not t.sprint] col_backlog, col_sprint_assign = st.columns(2) with col_backlog: st.markdown("#### Backlog Tasks") if backlog_tasks: backlog_df = pd.DataFrame([{'ID': t.id, 'Title': t.title, 'Est. Hrs': t.estimate_hours, 'Priority': t.priority} for t in backlog_tasks]) st.dataframe(backlog_df, use_container_width=True, hide_index=True) else: st.info("The backlog is empty! All tasks are assigned to sprints.") with col_sprint_assign: st.markdown("#### Assign Task to Sprint") if sprints and backlog_tasks: sprint_names = {s.id: s.name for s in sprints} backlog_titles = {t.id: t.title for t in backlog_tasks} with st.form("assign_task_form"): task_to_assign = st.selectbox("Select Task from Backlog", list(backlog_titles.keys()), format_func=lambda x: backlog_titles[x]) target_sprint = st.selectbox("Select Target Sprint", list(sprint_names.keys()), format_func=lambda x: sprint_names[x]) if st.form_submit_button("Move to Sprint"): task = db_manager.read_by_id(Task, task_to_assign) task.sprint_id = target_sprint if db_manager.update(task): st.success(f"Task '{task.title}' moved to sprint '{sprint_names[target_sprint]}'."); time.sleep(0.5); st.experimental_rerun() else: st.error("Failed to assign task.") elif sprints: st.info("No tasks in the backlog to assign.") else: st.warning("Create a sprint first to assign tasks.") def gantt_page(db_manager): """Page for Gantt Chart visualization (WBS mock included).""" st.title("📈 Timeline & Work Breakdown Structure (WBS)") all_projects = db_manager.read_all(Project) project_options = {p.name: p.id for p in all_projects} selected_project_name = st.selectbox("Select Project for Visualization", list(project_options.keys()) if project_options else ["No Projects Available"], key="gantt_project_select") if not selected_project_name or selected_project_name == "No Projects Available": return selected_project_id = project_options[selected_project_name] project_tasks = db_manager.read_all(Task, project_id=selected_project_id) # --- WBS/Dependencies Visualization Mock --- st.markdown("### Work Breakdown Structure (Dependency View)") if project_tasks: # Create a dictionary mapping task ID to task object tasks_map = {t.id: t for t in project_tasks} # Identify top-level tasks (tasks that no other task depends on, and have no dependency themselves) dependent_ids = {t.dependency_task_id for t in project_tasks if t.dependency_task_id} top_level_tasks = [t for t in project_tasks if t.id not in dependent_ids and not t.dependency_task_id] # Build dependency tree structure def render_wbs_node(task, level=0): prefix = "•" * (level + 1) status_emoji = "✅" if task.status == 'Done' else "🚧" st.markdown(f"#### {prefix} {status_emoji} **{task.title}** (Est: {task.estimate_hours}h, Prio: {task.priority})") # Find tasks that depend on the current task for dependent_task in [t for t in project_tasks if t.dependency_task_id == task.id]: render_wbs_node(dependent_task, level + 1) if top_level_tasks: st.info("Tasks are shown in order of completion (dependencies first).") for task in top_level_tasks: render_wbs_node(task) else: st.warning("No clear top-level tasks or dependencies found. All tasks might be independent.") else: st.info("No tasks available to generate WBS.") st.markdown("---") # --- Gantt Chart Visualization --- st.markdown("### Project Gantt Chart") fig = get_gantt_chart(project_tasks) if fig: st.plotly_chart(fig, use_container_width=True) else: st.info("No tasks available to render the Gantt chart.") def risk_register_page(db_manager): """Page for managing project risks.""" st.title("🚨 Risk Register") all_projects = db_manager.read_all(Project) project_options = {p.name: p.id for p in all_projects} users_map = db_manager.get_users_for_assignment() selected_project_name = st.selectbox("Select Project to Manage Risks", list(project_options.keys()) if project_options else ["No Projects Available"], key="risk_project_select") if not selected_project_name or selected_project_name == "No Projects Available": return selected_project_id = project_options[selected_project_name] if st.session_state.user_role not in ['Admin', 'Project Manager']: st.warning("You must be an Admin or Project Manager to manage risks.") tab1, tab2 = st.tabs(["View Risks", "Add New Risk"]) with tab1: risks = db_manager.read_all(Risk, project_id=selected_project_id) if risks: risk_data = [] for r in risks: risk_data.append({ 'ID': r.id, 'Name': r.name, 'Probability': r.probability, 'Impact': r.impact, 'Status': r.status, 'Owner': r.owner.username }) st.dataframe(pd.DataFrame(risk_data), use_container_width=True, hide_index=True) # Risk Mitigation Action st.markdown("---") st.subheader("Mitigation & Closure") risk_ids = [r.id for r in risks] selected_risk_id = st.selectbox("Select Risk ID to Manage", risk_ids) if selected_risk_id and st.session_state.user_role in ['Admin', 'Project Manager']: selected_risk = db_manager.read_by_id(Risk, selected_risk_id) with st.form(f"mitigation_form_{selected_risk_id}"): st.markdown(f"**Risk:** {selected_risk.name}") st.text_area("Mitigation Plan", value=selected_risk.mitigation_plan, key="mitigation_plan_edit") new_status = st.selectbox("Status", ['Open', 'Managed', 'Closed'], index=['Open', 'Managed', 'Closed'].index(selected_risk.status)) if st.form_submit_button("Update Risk"): selected_risk.mitigation_plan = st.session_state[f"mitigation_plan_edit"] selected_risk.status = new_status if db_manager.update(selected_risk): st.success("Risk updated successfully."); time.sleep(0.5); st.experimental_rerun() else: st.error("Failed to update risk.") else: st.info("No risks registered for this project yet.") with tab2: if st.session_state.user_role in ['Admin', 'Project Manager']: with st.form("add_risk_form"): risk_name = st.text_input("Risk Title (Required)") risk_desc = st.text_area("Detailed Description") col_r1, col_r2 = st.columns(2) with col_r1: risk_prob = st.selectbox("Probability", ['Low', 'Medium', 'High']) with col_r2: risk_impact = st.selectbox("Impact", ['Low', 'Medium', 'High']) risk_owner_name = st.selectbox("Risk Owner", list(users_map.keys())) if st.form_submit_button("Register Risk", type="primary"): if not risk_name: st.error("Risk Title is required.") else: new_risk = Risk(name=risk_name, description=risk_desc, probability=risk_prob, impact=risk_impact, project_id=selected_project_id, owner_id=users_map[risk_owner_name]) if db_manager.create(new_risk): st.success("Risk successfully registered."); time.sleep(0.5); st.experimental_rerun() else: st.error("Failed to register risk.") def reports_page(db_manager): """Page for generating and exporting data reports.""" st.title("📄 Reports and Analytics") tab1, tab2, tab3 = st.tabs(["Data Export", "Time Tracking Report", "Velocity Chart (Mock)"]) # --- Data Export Tab --- with tab1: st.subheader("Data Export (CSV & JSON)") def convert_df_to_csv(df): # Function to convert DataFrame to CSV for download return df.to_csv(index=False).encode('utf-8') models = {'Projects': Project, 'Tasks': Task, 'Users': User, 'Time Logs': TimeLog, 'Risks': Risk} selected_model = st.selectbox("Select Data Entity to Export", list(models.keys())) data = db_manager.read_all(models[selected_model]) if data: # Simple conversion to DataFrame, handling relationships for display df_export = pd.DataFrame([item.__dict__ for item in data]) df_export = df_export.drop(columns=['_sa_instance_state'], errors='ignore') st.dataframe(df_export, use_container_width=True) csv = convert_df_to_csv(df_export) st.download_button( label=f"Download {selected_model} as CSV", data=csv, file_name=f'{selected_model.lower().replace(" ", "_")}_report_{datetime.now().strftime("%Y%m%d")}.csv', mime='text/csv', type="primary" ) st.markdown(""" > **PDF/Excel Export Note:** For PDF/Excel exports (`ReportLab`/`XlsxWriter`), you would use the `df_export` object, > install the required libraries (e.g., `!pip install openpyxl`), and implement the conversion logic here. """) else: st.info("No data available for export.") # --- Time Tracking Report Tab --- with tab2: st.subheader("Team Time Tracking Summary") all_logs = db_manager.read_all(TimeLog) if all_logs: log_data = [] for log in all_logs: log_data.append({ 'User': log.user.username, 'Task ID': log.task.id, 'Task Title': log.task.title, 'Project': log.task.project.name, 'Hours Logged': log.hours, 'Date': log.log_date.strftime('%Y-%m-%d') }) df_logs = pd.DataFrame(log_data) st.dataframe(df_logs.sort_values(by='Hours Logged', ascending=False), use_container_width=True, hide_index=True) # Plot Weekly Logged Hours df_logs['Week'] = pd.to_datetime(df_logs['Date']).dt.to_period('W').astype(str) weekly_summary = df_logs.groupby(['User', 'Week'])['Hours Logged'].sum().reset_index() fig = px.bar(weekly_summary, x="Week", y="Hours Logged", color="User", title="Weekly Logged Hours by Team Member") st.plotly_chart(fig, use_container_width=True) else: st.info("No time has been logged yet.") # --- Velocity Chart Tab (Mock) --- with tab3: st.subheader("Sprint Velocity Chart") st.warning("This is a functional mock. For true Burndown/Velocity, complex calculation of Story Points/Estimates within a sprint is required.") all_sprints = db_manager.read_all(Sprint) if all_sprints: velocity_data = [] for s in all_sprints: if s.status == 'Completed': completed_tasks = [t for t in s.tasks if t.status == 'Done'] # Velocity is sum of estimates of completed tasks velocity = sum(t.estimate_hours for t in completed_tasks) velocity_data.append({'Sprint': s.name, 'Project': s.project.name, 'Velocity (Hours)': velocity}) if velocity_data: df_velocity = pd.DataFrame(velocity_data) fig_vel = px.bar(df_velocity, x="Sprint", y="Velocity (Hours)", color="Project", title="Sprint Velocity (Hours Completed)", labels={'Velocity (Hours)': 'Completed Hours'}) st.plotly_chart(fig_vel, use_container_width=True) else: st.info("No completed sprints with logged velocity data.") def administration_page(db_manager): """Page for system settings and role management.""" st.title("⚙️ Administration & Team Management") if not st.session_state.user_role == 'Admin': st.error("Access Denied. Only Administrators can view this page.") return tab1, tab2 = st.tabs(["Role Management", "System Settings (Mock)"]) with tab1: st.subheader("User Role and Team Management") users = db_manager.read_all(User) user_data = [{ 'ID': u.id, 'Username': u.username, 'Email': u.email, 'Role': u.role } for u in users] st.dataframe(pd.DataFrame(user_data), use_container_width=True, hide_index=True) st.markdown("---") st.subheader("Update User Role") user_map = {u.username: u.id for u in users} selected_username = st.selectbox("Select User", list(user_map.keys())) if selected_username: user_to_update = db_manager.read_by_id(User, user_map[selected_username]) with st.form("update_role_form"): new_role = st.selectbox("New Role", ['Admin', 'Project Manager', 'Team Member', 'Viewer'], index=['Admin', 'Project Manager', 'Team Member', 'Viewer'].index(user_to_update.role)) if st.form_submit_button("Update Role", type="primary"): user_to_update.role = new_role if db_manager.update(user_to_update): st.success(f"Role for {selected_username} updated to {new_role}.") time.sleep(0.5); st.experimental_rerun() else: st.error("Failed to update user role.") with tab2: st.subheader("System Settings and Audit Logs") st.info("This section is a placeholder for complex system configurations.") st.code(""" # Example system settings structure SYSTEM_SETTINGS = { 'DEFAULT_SPRINT_DAYS': 14, 'MAX_PROJECTS_PER_MANAGER': 10, 'AUDIT_LOGGING_ENABLED': True } """) # --- Main Application Logic --- def main_app(): """Main function to run the Streamlit app.""" init_session_state() db_manager, auth_service = get_db_and_auth() # --- Sidebar Navigation --- with st.sidebar: st.image("https://placehold.co/150x50/1e40af/ffffff?text=PM+TOOL", width=200) st.markdown("## Navigation") if st.session_state.is_authenticated: # Defined list of main pages main_pages = [ "Dashboard", "Project Management", "Kanban Board", "Sprint Management", "Gantt & WBS", "Risk Register", "Reports & Analytics" ] if st.session_state.user_role == 'Admin': main_pages.append("Administration") st.session_state.current_page = st.radio( "Go to:", options=main_pages, key="main_nav" ) st.markdown("---") st.write(f"Logged in as: **{st.session_state.username}** ({st.session_state.user_role})") if st.button("Logout", type="secondary", use_container_width=True): auth_service.logout() else: st.info("Please log in to access the tool.") # --- Content Routing --- if not st.session_state.is_authenticated: login_page(auth_service) elif st.session_state.current_page == "Dashboard": dashboard_page(db_manager) elif st.session_state.current_page == "Project Management": projects_page(db_manager) elif st.session_state.current_page == "Kanban Board": kanban_page(db_manager) elif st.session_state.current_page == "Sprint Management": sprint_page(db_manager) elif st.session_state.current_page == "Gantt & WBS": gantt_page(db_manager) elif st.session_state.current_page == "Risk Register": risk_register_page(db_manager) elif st.session_state.current_page == "Reports & Analytics": reports_page(db_manager) elif st.session_state.current_page == "Administration": administration_page(db_manager) # --- Footer --- st.markdown(""" """, unsafe_allow_html=True) if __name__ == '__main__': main_app()