import streamlit as st
import pandas as pd
from datetime import datetime, date
import os
import sys
import random
import json
import tempfile
# Disable Streamlit's telemetry and usage statistics
os.environ['STREAMLIT_ANALYTICS_ENABLED'] = 'false'
os.environ['STREAMLIT_METRICS_ENABLED'] = 'false'
os.environ['STREAMLIT_BROWSER_GATHER_USAGE_STATS'] = '0'
os.environ['STREAMLIT_SERVER_HEADLESS'] = 'true'
# Set page configuration with reduced margins and padding
st.set_page_config(
page_title="Task Board",
page_icon="📋",
layout="wide",
initial_sidebar_state="expanded"
)
# Apply custom CSS for a horizontal board layout with expandable tasks
st.markdown("""
""", unsafe_allow_html=True)
# Set up paths with more reliable locations
USER_HOME = os.path.expanduser("~")
TEMP_DIR = tempfile.gettempdir()
# Define multiple possible locations in order of preference
TASKS_FILE_LOCATIONS = [
os.path.join(os.getcwd(), "data", "tasks.json"), # Local data directory
os.path.join(os.getcwd(), "tasks.json"), # Current directory
os.path.join(USER_HOME, "streamlit_tasks.json"), # User's home directory
os.path.join(TEMP_DIR, "streamlit_tasks.json") # System temp directory
]
ASSIGNEE_FILE = "Assignee.txt" # Direct path to Assignee.txt
# Function to find a writable location
def get_writable_path(locations):
# First check if any existing location is writable
for loc in locations:
if os.path.exists(loc):
if os.access(os.path.dirname(loc), os.W_OK):
return loc
# Then try to find a writable directory for a new file
for loc in locations:
dir_path = os.path.dirname(loc)
try:
if not os.path.exists(dir_path):
os.makedirs(dir_path, exist_ok=True)
# Test file creation
test_file = os.path.join(dir_path, ".write_test")
with open(test_file, 'w') as f:
f.write("test")
os.remove(test_file)
return loc
except (PermissionError, OSError):
continue
# If all else fails, use in-memory storage
return None
# Get the best writable path
TASKS_FILE = get_writable_path(TASKS_FILE_LOCATIONS)
# Status color mapping
STATUS_COLORS = {
"To Do": "todo",
"In Progress": "in-progress",
"Done": "done",
"Backlog": "backlog"
}
def create_empty_df():
"""Create an empty DataFrame with the required columns"""
return pd.DataFrame(columns=[
'ID', 'Title', 'Description', 'Assignee', 'Status',
'Date Started', 'Date to Finish', 'Due Date', 'Documents' # Added Documents field
])
def load_tasks():
"""Load tasks from JSON file with better error handling"""
# First check if we have in-memory tasks from a failed save
if 'in_memory_tasks' in st.session_state:
try:
df = pd.DataFrame(st.session_state.in_memory_tasks)
# Convert date strings back to datetime objects
if not df.empty:
date_cols = ['Date Started', 'Date to Finish', 'Due Date']
for col in date_cols:
if col in df.columns:
df[col] = pd.to_datetime(df[col]).dt.date
if not df.empty:
return df
except Exception as e:
print(f"Error loading in-memory tasks: {str(e)}")
# Try loading from file
try:
# Try the selected file location
if TASKS_FILE and os.path.exists(TASKS_FILE):
with open(TASKS_FILE, 'r') as f:
tasks_data = json.load(f)
df = pd.DataFrame(tasks_data)
print(f"Loaded tasks from: {TASKS_FILE}")
else:
# If no tasks file exists, create empty dataframe
return create_empty_df()
# Process the dataframe
if not df.empty:
# Convert date strings back to datetime objects
date_cols = ['Date Started', 'Date to Finish', 'Due Date']
for col in date_cols:
if col in df.columns:
df[col] = pd.to_datetime(df[col]).dt.date
# Ensure ID column exists
if 'ID' not in df.columns:
df['ID'] = [f"#{i}" for i in range(1, len(df) + 1)]
return df
else:
return create_empty_df()
except Exception as e:
print(f"Error loading tasks from files: {str(e)}")
return create_empty_df()
def save_tasks():
"""Save tasks to JSON file with better error handling"""
try:
# Convert DataFrame to a list of dictionaries
tasks_dict = st.session_state.tasks.to_dict(orient='records')
# Convert date objects to strings for JSON serialization
for task in tasks_dict:
for key, value in task.items():
if isinstance(value, (datetime, date)):
task[key] = value.isoformat()
# Check if we have a writable file path
if TASKS_FILE:
try:
# Ensure the directory exists
os.makedirs(os.path.dirname(TASKS_FILE), exist_ok=True)
# Save to JSON file
with open(TASKS_FILE, 'w') as f:
json.dump(tasks_dict, f, indent=2)
print(f"Successfully saved tasks to: {TASKS_FILE}")
# Reset in-memory flag if we succeeded in saving to file
st.session_state.using_memory_storage = False
return True
except Exception as e:
print(f"Error saving to {TASKS_FILE}: {str(e)}")
# Fall back to in-memory storage
# Store in memory as fallback
st.session_state.in_memory_tasks = tasks_dict
st.session_state.using_memory_storage = True
if not TASKS_FILE:
st.warning("No writable location found. Tasks stored in memory instead.")
else:
st.warning(f"Could not save to file: {TASKS_FILE}. Tasks stored in memory instead.")
return False
except Exception as e:
print(f"Error saving tasks: {str(e)}")
# Store in memory as fallback
try:
st.session_state.in_memory_tasks = tasks_dict
except:
pass
st.session_state.using_memory_storage = True
st.warning(f"Error saving tasks: {str(e)}. Tasks stored in memory instead.")
return False
def load_assignees():
"""Load assignees from Assignee.txt with better error handling"""
# Check direct path first
if os.path.exists(ASSIGNEE_FILE):
return parse_assignee_file(ASSIGNEE_FILE)
# If not found, check other common locations
possible_locations = [
"./Assignee.txt",
"/app/Assignee.txt",
os.path.join(os.getcwd(), "Assignee.txt"),
"/data/Assignee.txt",
"../Assignee.txt",
os.path.join(USER_HOME, "Assignee.txt"),
os.path.join(TEMP_DIR, "Assignee.txt")
]
for location in possible_locations:
if os.path.exists(location):
print(f"Found assignee file at: {location}")
return parse_assignee_file(location)
# If file not found, create a default one in a writable location
try:
default_assignees = ["Team Member 1", "Team Member 2", "Team Member 3"]
default_location = os.path.join(USER_HOME, "Assignee.txt") if os.access(USER_HOME, os.W_OK) else os.path.join(TEMP_DIR, "Assignee.txt")
with open(default_location, "w") as f:
for assignee in default_assignees:
f.write(f"{assignee}\n")
print(f"Created default assignee file at: {default_location}")
return default_assignees
except Exception as e:
print(f"Error creating default assignee file: {str(e)}")
# Return default list if all else fails
return ["Team Member 1", "Team Member 2", "Team Member 3"]
def parse_assignee_file(file_path):
"""Parse the assignee file and extract names"""
try:
with open(file_path, "r") as f:
content = f.read()
# Debug: Print content to logs
print(f"Content of {file_path}:\n{content}")
# Parse lines and extract names
assignees = []
lines = [line.strip() for line in content.split('\n') if line.strip()]
for line in lines:
if '-' in line:
# Format: "Name - Role"
name_part = line.split('-')[0].strip()
if name_part:
assignees.append(name_part)
elif line:
# Just use the line as is if no dash
assignees.append(line)
# Log what we found
print(f"Found {len(assignees)} assignees: {assignees}")
if not assignees:
st.warning(f"No valid assignee names found in {file_path}. Using default assignees.")
return ["Team Member 1", "Team Member 2", "Team Member 3"]
return assignees
except Exception as e:
st.error(f"Error reading assignee file: {str(e)}")
print(f"Error reading {file_path}: {str(e)}")
return ["Team Member 1", "Team Member 2", "Team Member 3"]
# Generate a new task ID
def generate_task_id():
if len(st.session_state.tasks) == 0:
return "#1"
# Extract existing IDs and find the maximum
try:
existing_ids = [int(task_id.replace('#', '')) for task_id in st.session_state.tasks['ID'].tolist() if isinstance(task_id, str) and task_id.startswith('#')]
if existing_ids:
return f"#{max(existing_ids) + 1}"
else:
return "#1"
except:
# If any error occurs, generate a random ID
return f"#{random.randint(1, 100)}"
def update_task(task_idx, field, value):
"""Update a task field and save changes"""
st.session_state.tasks.at[task_idx, field] = value
save_tasks()
return True
def delete_task(task_id):
"""Delete a task by ID"""
task_idx = st.session_state.tasks[st.session_state.tasks['ID'] == task_id].index
if not task_idx.empty:
st.session_state.tasks = st.session_state.tasks.drop(task_idx).reset_index(drop=True)
save_tasks()
return True
return False
# Initialize session state ONLY ONCE
if 'tasks' not in st.session_state:
st.session_state.tasks = load_tasks()
if 'selected_task_id' not in st.session_state:
st.session_state.selected_task_id = None
if 'in_memory_tasks' not in st.session_state:
st.session_state.in_memory_tasks = []
# Add a flag to track if we're using in-memory storage
if 'using_memory_storage' not in st.session_state:
st.session_state.using_memory_storage = False
# Show storage location information
if TASKS_FILE:
print(f"Using task storage location: {TASKS_FILE}")
else:
print("No writable location found. Using in-memory storage only.")
st.session_state.using_memory_storage = True
# Load assignees
assignee_list = load_assignees()
# Application Header
st.markdown("""
""", unsafe_allow_html=True)
# Sidebar for new tasks
with st.sidebar:
st.markdown('', unsafe_allow_html=True)
title = st.text_input("Task Title", key="new_title")
description = st.text_area("Description", key="new_desc", height=100)
# More compact layout for form
col1, col2 = st.columns(2)
with col1:
assignee = st.selectbox("Assignee", assignee_list, key="new_assignee")
with col2:
status = st.selectbox("Status", ["Backlog", "To Do", "In Progress", "Done"], key="new_status")
# Added date fields in more compact layout
col3, col4 = st.columns(2)
with col3:
date_started = st.date_input("Date Started", value=datetime.now().date(), key="new_date_start")
with col4:
date_to_finish = st.date_input("Date to Finish", value=datetime.now().date(), key="new_date_finish")
due_date = st.date_input("Due Date", key="new_due_date")
# Add task button with try-catch
if st.button("Add Task", key="add_btn"):
if title: # Basic validation
try:
task_id = generate_task_id()
new_task = {
'ID': task_id,
'Title': title,
'Description': description,
'Assignee': assignee,
'Status': status,
'Date Started': date_started,
'Date to Finish': date_to_finish,
'Due Date': due_date
}
st.session_state.tasks = pd.concat([
st.session_state.tasks,
pd.DataFrame([new_task])
], ignore_index=True)
save_result = save_tasks()
# Show success message
st.success("Task added successfully!")
# Clear form after adding
st.rerun()
except Exception as e:
st.error(f"Error adding task: {str(e)}")
else:
st.warning("Please enter a task title")
# Sidebar additional options for task status updates
with st.sidebar:
st.markdown('', unsafe_allow_html=True)
# Only show if tasks exist
if not st.session_state.tasks.empty:
# Dropdown to select task
task_options = [f"{task['ID']} - {task['Title']}" for _, task in st.session_state.tasks.iterrows()]
selected_task = st.selectbox("Select Task", task_options, key="status_update_task")
# Extract task ID
selected_id = selected_task.split(" - ")[0] if selected_task else None
if selected_id:
# Find task index
task_idx = st.session_state.tasks[st.session_state.tasks['ID'] == selected_id].index
if not task_idx.empty:
task_idx = task_idx[0]
current_status = st.session_state.tasks.at[task_idx, 'Status']
# Status selection
new_status = st.selectbox("New Status",
["Backlog", "To Do", "In Progress", "Done"],
index=["Backlog", "To Do", "In Progress", "Done"].index(current_status)
if current_status in ["Backlog", "To Do", "In Progress", "Done"] else 0,
key="new_status_select")
if st.button("Update Status", key="update_status_btn"):
if new_status != current_status:
update_task(task_idx, 'Status', new_status)
st.success(f"Updated task {selected_id} status to {new_status}")
st.rerun()
# MAIN CONTENT AREA
# Get selected task ID from URL parameters
url_task_id = st.query_params.get('task_id', None)
# Update session state based on URL parameter
if url_task_id:
st.session_state.selected_task_id = url_task_id
# Use session state for selected task ID
selected_task_id = st.session_state.selected_task_id
# If a task is selected, show task details
if selected_task_id:
# Get task details
task_df = st.session_state.tasks[st.session_state.tasks['ID'] == selected_task_id]
if not task_df.empty:
# Back to board button
if st.button("← Back to Board", key="back_btn"):
st.session_state.selected_task_id = None
st.rerun()
# Get task data
task = task_df.iloc[0]
status_class = STATUS_COLORS.get(task['Status'], "backlog")
# Create the task details container
st.markdown('', unsafe_allow_html=True)
# Task header with title and status
st.markdown(f"""
""", unsafe_allow_html=True)
# Task metadata in grid layout
st.markdown('
', unsafe_allow_html=True)
# Task progress visualization
st.markdown(f"""
""", unsafe_allow_html=True)
# Task actions
st.markdown('
', unsafe_allow_html=True)
st.markdown('
', unsafe_allow_html=True)
st.markdown('
Edit Task
', unsafe_allow_html=True)
# Edit description
new_description = st.text_area("Edit Description",
value=task['Description'] if task['Description'] else '',
height=150,
key=f"edit_desc_{selected_task_id}")
# File upload section
uploaded_files = st.file_uploader("Upload Documents",
accept_multiple_files=True,
key=f"upload_{selected_task_id}")
if uploaded_files:
# Create a container to show upload status
upload_status = st.empty()
# Process each uploaded file
for uploaded_file in uploaded_files:
# Display upload progress
upload_status.info(f"Processing {uploaded_file.name}...")
try:
# Safely handle file upload
file_content = uploaded_file.getvalue() # Use getvalue() instead of read()
# Use TEMP_DIR for reliability
task_doc_dir = os.path.join(TEMP_DIR, f"task_docs_{selected_task_id.replace('#', '')}")
os.makedirs(task_doc_dir, exist_ok=True)
# Save file with unique name to avoid conflicts
safe_filename = uploaded_file.name.replace(" ", "_").replace("/", "_")
file_path = os.path.join(task_doc_dir, safe_filename)
# Write file in binary mode
with open(file_path, "wb") as f:
f.write(file_content)
# Update task with document reference
if 'Documents' not in task or not isinstance(task['Documents'], list):
task_df.at[task_df.index[0], 'Documents'] = [safe_filename]
else:
docs = task['Documents'] if isinstance(task['Documents'], list) else []
if safe_filename not in docs:
docs.append(safe_filename)
task_df.at[task_df.index[0], 'Documents'] = docs
# Save the updated task data
st.session_state.tasks.loc[task_df.index[0]] = task_df.iloc[0]
save_tasks()
upload_status.success(f"Uploaded {uploaded_file.name} successfully!")
except Exception as e:
# Provide a more user-friendly error message
upload_status.error(f"Error uploading {uploaded_file.name}. Please try a smaller file or a different filename.")
# Log the detailed error for debugging
print(f"Upload error details: {str(e)}")
# Clear status after all files processed
st.success("Upload processing complete")
# Display attached documents if any
if 'Documents' in task and isinstance(task['Documents'], list) and len(task['Documents']) > 0:
st.markdown('
', unsafe_allow_html=True)
st.markdown('
', unsafe_allow_html=True)
st.markdown('
Attached Documents
', unsafe_allow_html=True)
for doc in task['Documents']:
# Create the file path
doc_dir = os.path.join(os.path.dirname(TASKS_FILE), "task_documents")
task_doc_dir = os.path.join(doc_dir, f"task_{selected_task_id.replace('#', '')}")
file_path = os.path.join(task_doc_dir, doc)
# Display document with download option
if os.path.exists(file_path):
col1, col2 = st.columns([3, 1])
with col1:
st.markdown(f"📄 **{doc}**")
with col2:
with open(file_path, "rb") as file:
st.download_button(
label="Download",
data=file,
file_name=doc,
key=f"download_{doc}_{selected_task_id}"
)
st.markdown('', unsafe_allow_html=True)
st.markdown('
', unsafe_allow_html=True)
# Create columns for actions
col1, col2 = st.columns(2)
with col1:
# Status update form
new_status = st.selectbox(
"Update Status",
["Backlog", "To Do", "In Progress", "Done"],
index=["Backlog", "To Do", "In Progress", "Done"].index(task['Status']) if task['Status'] in ["Backlog", "To Do", "In Progress", "Done"] else 0
)
if st.button("Save Status Change", use_container_width=True):
if new_status != task['Status']:
task_idx = task_df.index[0]
update_task(task_idx, 'Status', new_status)
st.success(f"Updated task status to {new_status}")
st.rerun()
with col2:
if st.button("Delete Task", use_container_width=True):
if delete_task(selected_task_id):
st.success(f"Task {selected_task_id} deleted!")
# Return to main board
for key in list(st.query_params.keys()):
del st.query_params[key]
st.rerun()
else:
st.error(f"Failed to delete task {selected_task_id}")
st.markdown('
', unsafe_allow_html=True)
# Close the task details container
st.markdown('', unsafe_allow_html=True)
else:
st.error(f"Task {selected_task_id} not found")
# Return to main board
if st.button("Back to Board"):
for key in list(st.query_params.keys()):
del st.query_params[key]
st.rerun()
else:
# Main board view showing all tasks by status
if st.session_state.tasks.empty:
st.info("No tasks yet. Add your first task using the sidebar.")
else:
# Group tasks by status for organized display - reordering to match desired layout
statuses = ["To Do", "In Progress", "Done", "Backlog"]
# Create container with reduced height
st.markdown('', unsafe_allow_html=True)
# Create columns for each status
cols = st.columns(len(statuses))
# Display each status column
for i, status_name in enumerate(statuses):
with cols[i]:
st.markdown(f'
{status_name}
', unsafe_allow_html=True)
# Filter tasks by current status
status_tasks = st.session_state.tasks[st.session_state.tasks['Status'] == status_name]
if status_tasks.empty:
st.markdown("
No tasks
", unsafe_allow_html=True)
else:
# Start the flex container for task cards
st.markdown('
', unsafe_allow_html=True)
# Render each task as a card
for idx, task in status_tasks.iterrows():
status_class = STATUS_COLORS.get(task['Status'], "backlog")
task_id = task['ID'] if 'ID' in task and pd.notna(task['ID']) else f"#{idx+1}"
# Format dates for display - shorter format
date_started_str = task['Date Started'].strftime('%m/%d') if 'Date Started' in task and pd.notna(task['Date Started']) else "Not set"
date_to_finish_str = task['Date to Finish'].strftime('%m/%d') if 'Date to Finish' in task and pd.notna(task['Date to Finish']) else "Not set"
# With this direct Streamlit approach:
with st.container():
col1, col2 = st.columns([4, 1])
with col1:
st.markdown(f"""
{task['Title']}
{task['Assignee']}
Start: {date_started_str}
Finish: {date_to_finish_str}
""", unsafe_allow_html=True)
with col2:
if st.button("View", key=f"view_{task_id}"):
st.session_state.selected_task_id = task_id
st.rerun()
# Add JavaScript for better interactivity
st.markdown("""
""", unsafe_allow_html=True)