import streamlit as st import pandas as pd from datetime import datetime, date import os import sys import random import json import tempfile # Disable Streamlit's telemetry and usage statistics os.environ['STREAMLIT_ANALYTICS_ENABLED'] = 'false' os.environ['STREAMLIT_METRICS_ENABLED'] = 'false' os.environ['STREAMLIT_BROWSER_GATHER_USAGE_STATS'] = '0' os.environ['STREAMLIT_SERVER_HEADLESS'] = 'true' # Set page configuration with reduced margins and padding st.set_page_config( page_title="Task Board", page_icon="📋", layout="wide", initial_sidebar_state="expanded" ) # Apply custom CSS for a horizontal board layout with expandable tasks st.markdown(""" """, unsafe_allow_html=True) # Set up paths with more reliable locations USER_HOME = os.path.expanduser("~") TEMP_DIR = tempfile.gettempdir() # Define multiple possible locations in order of preference TASKS_FILE_LOCATIONS = [ os.path.join(os.getcwd(), "data", "tasks.json"), # Local data directory os.path.join(os.getcwd(), "tasks.json"), # Current directory os.path.join(USER_HOME, "streamlit_tasks.json"), # User's home directory os.path.join(TEMP_DIR, "streamlit_tasks.json") # System temp directory ] ASSIGNEE_FILE = "Assignee.txt" # Direct path to Assignee.txt # Function to find a writable location def get_writable_path(locations): # First check if any existing location is writable for loc in locations: if os.path.exists(loc): if os.access(os.path.dirname(loc), os.W_OK): return loc # Then try to find a writable directory for a new file for loc in locations: dir_path = os.path.dirname(loc) try: if not os.path.exists(dir_path): os.makedirs(dir_path, exist_ok=True) # Test file creation test_file = os.path.join(dir_path, ".write_test") with open(test_file, 'w') as f: f.write("test") os.remove(test_file) return loc except (PermissionError, OSError): continue # If all else fails, use in-memory storage return None # Get the best writable path TASKS_FILE = get_writable_path(TASKS_FILE_LOCATIONS) # Status color mapping STATUS_COLORS = { "To Do": "todo", "In Progress": "in-progress", "Done": "done", "Backlog": "backlog" } def create_empty_df(): """Create an empty DataFrame with the required columns""" return pd.DataFrame(columns=[ 'ID', 'Title', 'Description', 'Assignee', 'Status', 'Date Started', 'Date to Finish', 'Due Date', 'Documents' # Added Documents field ]) def load_tasks(): """Load tasks from JSON file with better error handling""" # First check if we have in-memory tasks from a failed save if 'in_memory_tasks' in st.session_state: try: df = pd.DataFrame(st.session_state.in_memory_tasks) # Convert date strings back to datetime objects if not df.empty: date_cols = ['Date Started', 'Date to Finish', 'Due Date'] for col in date_cols: if col in df.columns: df[col] = pd.to_datetime(df[col]).dt.date if not df.empty: return df except Exception as e: print(f"Error loading in-memory tasks: {str(e)}") # Try loading from file try: # Try the selected file location if TASKS_FILE and os.path.exists(TASKS_FILE): with open(TASKS_FILE, 'r') as f: tasks_data = json.load(f) df = pd.DataFrame(tasks_data) print(f"Loaded tasks from: {TASKS_FILE}") else: # If no tasks file exists, create empty dataframe return create_empty_df() # Process the dataframe if not df.empty: # Convert date strings back to datetime objects date_cols = ['Date Started', 'Date to Finish', 'Due Date'] for col in date_cols: if col in df.columns: df[col] = pd.to_datetime(df[col]).dt.date # Ensure ID column exists if 'ID' not in df.columns: df['ID'] = [f"#{i}" for i in range(1, len(df) + 1)] return df else: return create_empty_df() except Exception as e: print(f"Error loading tasks from files: {str(e)}") return create_empty_df() def save_tasks(): """Save tasks to JSON file with better error handling""" try: # Convert DataFrame to a list of dictionaries tasks_dict = st.session_state.tasks.to_dict(orient='records') # Convert date objects to strings for JSON serialization for task in tasks_dict: for key, value in task.items(): if isinstance(value, (datetime, date)): task[key] = value.isoformat() # Check if we have a writable file path if TASKS_FILE: try: # Ensure the directory exists os.makedirs(os.path.dirname(TASKS_FILE), exist_ok=True) # Save to JSON file with open(TASKS_FILE, 'w') as f: json.dump(tasks_dict, f, indent=2) print(f"Successfully saved tasks to: {TASKS_FILE}") # Reset in-memory flag if we succeeded in saving to file st.session_state.using_memory_storage = False return True except Exception as e: print(f"Error saving to {TASKS_FILE}: {str(e)}") # Fall back to in-memory storage # Store in memory as fallback st.session_state.in_memory_tasks = tasks_dict st.session_state.using_memory_storage = True if not TASKS_FILE: st.warning("No writable location found. Tasks stored in memory instead.") else: st.warning(f"Could not save to file: {TASKS_FILE}. Tasks stored in memory instead.") return False except Exception as e: print(f"Error saving tasks: {str(e)}") # Store in memory as fallback try: st.session_state.in_memory_tasks = tasks_dict except: pass st.session_state.using_memory_storage = True st.warning(f"Error saving tasks: {str(e)}. Tasks stored in memory instead.") return False def load_assignees(): """Load assignees from Assignee.txt with better error handling""" # Check direct path first if os.path.exists(ASSIGNEE_FILE): return parse_assignee_file(ASSIGNEE_FILE) # If not found, check other common locations possible_locations = [ "./Assignee.txt", "/app/Assignee.txt", os.path.join(os.getcwd(), "Assignee.txt"), "/data/Assignee.txt", "../Assignee.txt", os.path.join(USER_HOME, "Assignee.txt"), os.path.join(TEMP_DIR, "Assignee.txt") ] for location in possible_locations: if os.path.exists(location): print(f"Found assignee file at: {location}") return parse_assignee_file(location) # If file not found, create a default one in a writable location try: default_assignees = ["Team Member 1", "Team Member 2", "Team Member 3"] default_location = os.path.join(USER_HOME, "Assignee.txt") if os.access(USER_HOME, os.W_OK) else os.path.join(TEMP_DIR, "Assignee.txt") with open(default_location, "w") as f: for assignee in default_assignees: f.write(f"{assignee}\n") print(f"Created default assignee file at: {default_location}") return default_assignees except Exception as e: print(f"Error creating default assignee file: {str(e)}") # Return default list if all else fails return ["Team Member 1", "Team Member 2", "Team Member 3"] def parse_assignee_file(file_path): """Parse the assignee file and extract names""" try: with open(file_path, "r") as f: content = f.read() # Debug: Print content to logs print(f"Content of {file_path}:\n{content}") # Parse lines and extract names assignees = [] lines = [line.strip() for line in content.split('\n') if line.strip()] for line in lines: if '-' in line: # Format: "Name - Role" name_part = line.split('-')[0].strip() if name_part: assignees.append(name_part) elif line: # Just use the line as is if no dash assignees.append(line) # Log what we found print(f"Found {len(assignees)} assignees: {assignees}") if not assignees: st.warning(f"No valid assignee names found in {file_path}. Using default assignees.") return ["Team Member 1", "Team Member 2", "Team Member 3"] return assignees except Exception as e: st.error(f"Error reading assignee file: {str(e)}") print(f"Error reading {file_path}: {str(e)}") return ["Team Member 1", "Team Member 2", "Team Member 3"] # Generate a new task ID def generate_task_id(): if len(st.session_state.tasks) == 0: return "#1" # Extract existing IDs and find the maximum try: existing_ids = [int(task_id.replace('#', '')) for task_id in st.session_state.tasks['ID'].tolist() if isinstance(task_id, str) and task_id.startswith('#')] if existing_ids: return f"#{max(existing_ids) + 1}" else: return "#1" except: # If any error occurs, generate a random ID return f"#{random.randint(1, 100)}" def update_task(task_idx, field, value): """Update a task field and save changes""" st.session_state.tasks.at[task_idx, field] = value save_tasks() return True def delete_task(task_id): """Delete a task by ID""" task_idx = st.session_state.tasks[st.session_state.tasks['ID'] == task_id].index if not task_idx.empty: st.session_state.tasks = st.session_state.tasks.drop(task_idx).reset_index(drop=True) save_tasks() return True return False # Initialize session state ONLY ONCE if 'tasks' not in st.session_state: st.session_state.tasks = load_tasks() if 'selected_task_id' not in st.session_state: st.session_state.selected_task_id = None if 'in_memory_tasks' not in st.session_state: st.session_state.in_memory_tasks = [] # Add a flag to track if we're using in-memory storage if 'using_memory_storage' not in st.session_state: st.session_state.using_memory_storage = False # Show storage location information if TASKS_FILE: print(f"Using task storage location: {TASKS_FILE}") else: print("No writable location found. Using in-memory storage only.") st.session_state.using_memory_storage = True # Load assignees assignee_list = load_assignees() # Application Header st.markdown("""

📋 Task Board

""", unsafe_allow_html=True) # Sidebar for new tasks with st.sidebar: st.markdown('', unsafe_allow_html=True) title = st.text_input("Task Title", key="new_title") description = st.text_area("Description", key="new_desc", height=100) # More compact layout for form col1, col2 = st.columns(2) with col1: assignee = st.selectbox("Assignee", assignee_list, key="new_assignee") with col2: status = st.selectbox("Status", ["Backlog", "To Do", "In Progress", "Done"], key="new_status") # Added date fields in more compact layout col3, col4 = st.columns(2) with col3: date_started = st.date_input("Date Started", value=datetime.now().date(), key="new_date_start") with col4: date_to_finish = st.date_input("Date to Finish", value=datetime.now().date(), key="new_date_finish") due_date = st.date_input("Due Date", key="new_due_date") # Add task button with try-catch if st.button("Add Task", key="add_btn"): if title: # Basic validation try: task_id = generate_task_id() new_task = { 'ID': task_id, 'Title': title, 'Description': description, 'Assignee': assignee, 'Status': status, 'Date Started': date_started, 'Date to Finish': date_to_finish, 'Due Date': due_date } st.session_state.tasks = pd.concat([ st.session_state.tasks, pd.DataFrame([new_task]) ], ignore_index=True) save_result = save_tasks() # Show success message st.success("Task added successfully!") # Clear form after adding st.rerun() except Exception as e: st.error(f"Error adding task: {str(e)}") else: st.warning("Please enter a task title") # Sidebar additional options for task status updates with st.sidebar: st.markdown('', unsafe_allow_html=True) # Only show if tasks exist if not st.session_state.tasks.empty: # Dropdown to select task task_options = [f"{task['ID']} - {task['Title']}" for _, task in st.session_state.tasks.iterrows()] selected_task = st.selectbox("Select Task", task_options, key="status_update_task") # Extract task ID selected_id = selected_task.split(" - ")[0] if selected_task else None if selected_id: # Find task index task_idx = st.session_state.tasks[st.session_state.tasks['ID'] == selected_id].index if not task_idx.empty: task_idx = task_idx[0] current_status = st.session_state.tasks.at[task_idx, 'Status'] # Status selection new_status = st.selectbox("New Status", ["Backlog", "To Do", "In Progress", "Done"], index=["Backlog", "To Do", "In Progress", "Done"].index(current_status) if current_status in ["Backlog", "To Do", "In Progress", "Done"] else 0, key="new_status_select") if st.button("Update Status", key="update_status_btn"): if new_status != current_status: update_task(task_idx, 'Status', new_status) st.success(f"Updated task {selected_id} status to {new_status}") st.rerun() # MAIN CONTENT AREA # Get selected task ID from URL parameters url_task_id = st.query_params.get('task_id', None) # Update session state based on URL parameter if url_task_id: st.session_state.selected_task_id = url_task_id # Use session state for selected task ID selected_task_id = st.session_state.selected_task_id # If a task is selected, show task details if selected_task_id: # Get task details task_df = st.session_state.tasks[st.session_state.tasks['ID'] == selected_task_id] if not task_df.empty: # Back to board button if st.button("← Back to Board", key="back_btn"): st.session_state.selected_task_id = None st.rerun() # Get task data task = task_df.iloc[0] status_class = STATUS_COLORS.get(task['Status'], "backlog") # Create the task details container st.markdown('
', unsafe_allow_html=True) # Task header with title and status st.markdown(f"""

{task['Title']} {task['Status']}

""", unsafe_allow_html=True) # Task metadata in grid layout st.markdown('
', unsafe_allow_html=True) # Task ID st.markdown(f"""
Task ID
{selected_task_id}
""", unsafe_allow_html=True) # Assignee st.markdown(f"""
Assignee
{task['Assignee']}
""", unsafe_allow_html=True) # Format dates for display date_started_str = task['Date Started'].strftime('%m/%d/%Y') if 'Date Started' in task and pd.notna(task['Date Started']) else "Not set" date_to_finish_str = task['Date to Finish'].strftime('%m/%d/%Y') if 'Date to Finish' in task and pd.notna(task['Date to Finish']) else "Not set" due_date_str = task['Due Date'].strftime('%m/%d/%Y') if 'Due Date' in task and pd.notna(task['Due Date']) else "Not set" # Date Started st.markdown(f"""
Date Started
{date_started_str}
""", unsafe_allow_html=True) # Date to Finish st.markdown(f"""
Date to Finish
{date_to_finish_str}
""", unsafe_allow_html=True) # Due Date st.markdown(f"""
Due Date
{due_date_str}
""", unsafe_allow_html=True) st.markdown('
', unsafe_allow_html=True) # Task progress visualization st.markdown(f"""
""", unsafe_allow_html=True) # Task actions st.markdown('
', unsafe_allow_html=True) st.markdown('
', unsafe_allow_html=True) st.markdown('

Edit Task

', unsafe_allow_html=True) # Edit description new_description = st.text_area("Edit Description", value=task['Description'] if task['Description'] else '', height=150, key=f"edit_desc_{selected_task_id}") # File upload section uploaded_files = st.file_uploader("Upload Documents", accept_multiple_files=True, key=f"upload_{selected_task_id}") if uploaded_files: # Create a container to show upload status upload_status = st.empty() # Process each uploaded file for uploaded_file in uploaded_files: # Display upload progress upload_status.info(f"Processing {uploaded_file.name}...") try: # Safely handle file upload file_content = uploaded_file.getvalue() # Use getvalue() instead of read() # Use TEMP_DIR for reliability task_doc_dir = os.path.join(TEMP_DIR, f"task_docs_{selected_task_id.replace('#', '')}") os.makedirs(task_doc_dir, exist_ok=True) # Save file with unique name to avoid conflicts safe_filename = uploaded_file.name.replace(" ", "_").replace("/", "_") file_path = os.path.join(task_doc_dir, safe_filename) # Write file in binary mode with open(file_path, "wb") as f: f.write(file_content) # Update task with document reference if 'Documents' not in task or not isinstance(task['Documents'], list): task_df.at[task_df.index[0], 'Documents'] = [safe_filename] else: docs = task['Documents'] if isinstance(task['Documents'], list) else [] if safe_filename not in docs: docs.append(safe_filename) task_df.at[task_df.index[0], 'Documents'] = docs # Save the updated task data st.session_state.tasks.loc[task_df.index[0]] = task_df.iloc[0] save_tasks() upload_status.success(f"Uploaded {uploaded_file.name} successfully!") except Exception as e: # Provide a more user-friendly error message upload_status.error(f"Error uploading {uploaded_file.name}. Please try a smaller file or a different filename.") # Log the detailed error for debugging print(f"Upload error details: {str(e)}") # Clear status after all files processed st.success("Upload processing complete") # Display attached documents if any if 'Documents' in task and isinstance(task['Documents'], list) and len(task['Documents']) > 0: st.markdown('
', unsafe_allow_html=True) st.markdown('
', unsafe_allow_html=True) st.markdown('

Attached Documents

', unsafe_allow_html=True) for doc in task['Documents']: # Create the file path doc_dir = os.path.join(os.path.dirname(TASKS_FILE), "task_documents") task_doc_dir = os.path.join(doc_dir, f"task_{selected_task_id.replace('#', '')}") file_path = os.path.join(task_doc_dir, doc) # Display document with download option if os.path.exists(file_path): col1, col2 = st.columns([3, 1]) with col1: st.markdown(f"📄 **{doc}**") with col2: with open(file_path, "rb") as file: st.download_button( label="Download", data=file, file_name=doc, key=f"download_{doc}_{selected_task_id}" ) st.markdown('
', unsafe_allow_html=True) st.markdown('
', unsafe_allow_html=True) # Create columns for actions col1, col2 = st.columns(2) with col1: # Status update form new_status = st.selectbox( "Update Status", ["Backlog", "To Do", "In Progress", "Done"], index=["Backlog", "To Do", "In Progress", "Done"].index(task['Status']) if task['Status'] in ["Backlog", "To Do", "In Progress", "Done"] else 0 ) if st.button("Save Status Change", use_container_width=True): if new_status != task['Status']: task_idx = task_df.index[0] update_task(task_idx, 'Status', new_status) st.success(f"Updated task status to {new_status}") st.rerun() with col2: if st.button("Delete Task", use_container_width=True): if delete_task(selected_task_id): st.success(f"Task {selected_task_id} deleted!") # Return to main board for key in list(st.query_params.keys()): del st.query_params[key] st.rerun() else: st.error(f"Failed to delete task {selected_task_id}") st.markdown('
', unsafe_allow_html=True) # Close the task details container st.markdown('', unsafe_allow_html=True) else: st.error(f"Task {selected_task_id} not found") # Return to main board if st.button("Back to Board"): for key in list(st.query_params.keys()): del st.query_params[key] st.rerun() else: # Main board view showing all tasks by status if st.session_state.tasks.empty: st.info("No tasks yet. Add your first task using the sidebar.") else: # Group tasks by status for organized display - reordering to match desired layout statuses = ["To Do", "In Progress", "Done", "Backlog"] # Create container with reduced height st.markdown('
', unsafe_allow_html=True) # Create columns for each status cols = st.columns(len(statuses)) # Display each status column for i, status_name in enumerate(statuses): with cols[i]: st.markdown(f'

{status_name}

', unsafe_allow_html=True) # Filter tasks by current status status_tasks = st.session_state.tasks[st.session_state.tasks['Status'] == status_name] if status_tasks.empty: st.markdown("
No tasks
", unsafe_allow_html=True) else: # Start the flex container for task cards st.markdown('
', unsafe_allow_html=True) # Render each task as a card for idx, task in status_tasks.iterrows(): status_class = STATUS_COLORS.get(task['Status'], "backlog") task_id = task['ID'] if 'ID' in task and pd.notna(task['ID']) else f"#{idx+1}" # Format dates for display - shorter format date_started_str = task['Date Started'].strftime('%m/%d') if 'Date Started' in task and pd.notna(task['Date Started']) else "Not set" date_to_finish_str = task['Date to Finish'].strftime('%m/%d') if 'Date to Finish' in task and pd.notna(task['Date to Finish']) else "Not set" # With this direct Streamlit approach: with st.container(): col1, col2 = st.columns([4, 1]) with col1: st.markdown(f"""
{task_id}
{task['Title']}
{task['Assignee']}
Start: {date_started_str} Finish: {date_to_finish_str}
""", unsafe_allow_html=True) with col2: if st.button("View", key=f"view_{task_id}"): st.session_state.selected_task_id = task_id st.rerun() # Add JavaScript for better interactivity st.markdown(""" """, unsafe_allow_html=True)