tracker-test / tasks.py
umangchaudhry's picture
Upload 11 files
64b6fa7 verified
import streamlit as st
import json
import html
from datetime import datetime, date
from config_manager import ConfigManager
class TasksManager:
def __init__(self):
self.config_manager = ConfigManager()
self.vendors_cache = None
def get_vendor_name(self, vendor_id):
"""Get vendor name by vendor ID"""
if not self.vendors_cache or not vendor_id:
return None
for vendor in self.vendors_cache:
if vendor.get('id') == vendor_id:
return vendor.get('name')
return None
def get_vendor_contact_info(self, vendor_id):
"""Get vendor contact information by vendor ID"""
if not self.vendors_cache or not vendor_id:
return None
for vendor in self.vendors_cache:
if vendor.get('id') == vendor_id:
vendor_type = vendor.get('type', 'Vendor/Service')
if vendor_type == 'Vendor/Service':
# For vendors, return their contact information
return {
'contact_person': vendor.get('contact_person', ''),
'phone': vendor.get('phone', ''),
'email': vendor.get('email', ''),
'website': vendor.get('website', ''),
'address': vendor.get('address', '')
}
else:
# For items, return seller contact information
return {
'contact_person': '', # Items don't have contact person
'phone': vendor.get('seller_phone', ''),
'email': vendor.get('seller_email', ''),
'website': vendor.get('seller_website', ''),
'address': '' # Items don't have address
}
return None
def render(self, config):
st.markdown("## ✅ Task Management")
# Load tasks and vendors
tasks = self.config_manager.load_json_data('tasks.json')
self.vendors_cache = self.config_manager.load_json_data('vendors.json')
custom_settings = config.get('custom_settings', {})
custom_tags = custom_settings.get('custom_tags', [])
# Get event-based task groups
wedding_events = config.get('wedding_events', [])
event_names = [event['name'] for event in wedding_events] if wedding_events else []
# Add general planning categories
task_groups = event_names + ['General Planning', 'Vendor Management', 'Vendor & Item Management', 'Wedding Party', 'Guest Management', 'Timeline']
# Task creation section
with st.expander("➕ Add New Task", expanded=False):
self.render_task_form(task_groups, custom_tags)
# View toggle and filters
col1, col2, col3, col4, col5 = st.columns(5)
with col1:
view_mode = st.radio("View Mode", ["Detailed View", "Checklist View"], horizontal=True)
with col2:
filter_group = st.selectbox("Filter by Group", ["All"] + task_groups)
with col3:
filter_status = st.selectbox("Filter by Status", ["All", "Completed", "Incomplete"])
with col4:
# Get unique assignees from tasks (handle both single and multiple assignees)
assignees = set()
for task in tasks:
assigned_to = task.get('assigned_to', '')
if isinstance(assigned_to, str) and assigned_to.strip():
assignees.add(assigned_to.strip())
elif isinstance(assigned_to, list):
for assignee in assigned_to:
if assignee and assignee.strip():
assignees.add(assignee.strip())
assignee_list = sorted(list(assignees))
filter_assignees = st.multiselect("Filter by Assignees", assignee_list, help="Select one or more assignees to filter tasks")
with col5:
sort_by = st.selectbox("Sort by", ["Due Date", "Created Date", "Title", "Group"])
# Filter and sort tasks
filtered_tasks = self.filter_tasks(tasks, filter_group, filter_status, filter_assignees)
sorted_tasks = self.sort_tasks(filtered_tasks, sort_by)
# Display tasks based on view mode
if sorted_tasks:
st.markdown(f"### Tasks ({len(sorted_tasks)} total)")
if view_mode == "Checklist View":
self.render_checklist_view(sorted_tasks)
else:
# Group tasks by their group/category for detailed view
grouped_tasks = {}
for task in sorted_tasks:
group = task.get('group', 'Uncategorized')
if group not in grouped_tasks:
grouped_tasks[group] = []
grouped_tasks[group].append(task)
# Display tasks grouped by category
for group_name, tasks in grouped_tasks.items():
st.markdown(f"## {group_name} ({len(tasks)} tasks)")
for task in tasks:
self.render_task_card(task, task_groups, custom_tags)
else:
st.info("No tasks found. Create your first task above!")
def render_checklist_view(self, tasks):
"""Render tasks in a compact checklist format for easy reading"""
# Display all tasks in a single checklist without grouping
st.markdown("#### All Tasks")
# Create a container for the checklist
with st.container():
for task in tasks:
self.render_checklist_item(task)
def render_checklist_item(self, task):
"""Render a single task as a checklist item with interactive checkbox"""
task_id = task.get('id', '')
title = task.get('title', 'Untitled Task')
description = task.get('description', '')
due_date = task.get('due_date', '')
assigned_to = task.get('assigned_to', '')
tags = task.get('tags', [])
completed = task.get('completed', False)
vendor_id = task.get('vendor_id', '')
vendor_name = self.get_vendor_name(vendor_id) if vendor_id else None
# Handle both old single assignee and new multiple assignees format
if isinstance(assigned_to, str):
assigned_to_display = assigned_to if assigned_to else "Unassigned"
elif isinstance(assigned_to, list):
if assigned_to:
assigned_to_display = ", ".join(assigned_to)
else:
assigned_to_display = "Unassigned"
else:
assigned_to_display = "Unassigned"
# Create a compact checklist item
with st.container():
# Use a horizontal layout with better spacing
col1, col2, col3, col4 = st.columns([0.3, 3.2, 1, 1])
with col1:
# Interactive checkbox for completion status with label
new_completed = st.checkbox(
" ", # Single space as label to provide spacing
value=completed,
key=f"checklist_{task_id}",
help="Click to toggle completion status"
)
# If completion status changed, update the task
if new_completed != completed:
self.toggle_task_completion(task_id, new_completed)
with col2:
# Task title and description with proper spacing
if completed:
st.markdown(f"~~**{title}**~~")
else:
st.markdown(f"**{title}**")
if description:
st.caption(f"📝 {description}")
# Display tags if they exist
if tags and len(tags) > 0:
st.caption(f"🏷️ {', '.join(tags)}")
with col3:
# Due date only
if due_date:
st.caption(f"📅 {due_date}")
with col4:
# Assigned to and vendor
if assigned_to_display and assigned_to_display != "Unassigned":
st.caption(f"👤 {assigned_to_display}")
if vendor_name:
st.caption(f"🏢 {vendor_name}")
# Add a subtle separator
st.markdown("---")
def render_task_form(self, task_groups, custom_tags):
with st.form("task_form"):
col1, col2 = st.columns(2)
with col1:
title = st.text_input("Task Title *", placeholder="Enter task title")
# Show event-based groups first, then general categories
if task_groups:
group = st.selectbox("Event/Category", task_groups, help="Select the wedding event or general category this task relates to")
else:
group = st.selectbox("Category", ["General Planning"])
due_date = st.date_input("Due Date", value=None)
priority = st.selectbox("Priority", ["Low", "Medium", "High", "Urgent"])
with col2:
description = st.text_area("Description", placeholder="Enter task description")
# Assigned to field with wedding party and task assignees selection
wedding_party = self.config_manager.load_json_data('wedding_party.json')
wedding_party_names = [member.get('name', '') for member in wedding_party if member.get('name')]
# Get task assignees from config
config = self.config_manager.load_config()
custom_settings = config.get('custom_settings', {})
task_assignees = custom_settings.get('task_assignees', [])
# Create combined options for multiselect
all_assignee_options = []
if wedding_party_names:
all_assignee_options.extend([f"Wedding Party: {name}" for name in wedding_party_names])
if task_assignees:
all_assignee_options.extend([f"Task Assignee: {assignee}" for assignee in task_assignees])
# Multiple assignees selection
selected_assignees = st.multiselect("Assign to (select multiple people)", all_assignee_options, key="create_assignees")
# Custom assignee text input (for additional people not in the lists)
custom_assignee = st.text_input("Additional Custom Assignee", placeholder="Enter additional assignee name (optional)", key="create_custom_assignee")
# Combine selected assignees and custom assignee
assigned_to_list = []
for assignee in selected_assignees:
if assignee.startswith("Wedding Party: "):
assigned_to_list.append(assignee.replace("Wedding Party: ", ""))
elif assignee.startswith("Task Assignee: "):
assigned_to_list.append(assignee.replace("Task Assignee: ", ""))
if custom_assignee and custom_assignee.strip():
assigned_to_list.append(custom_assignee.strip())
# Store as list for multiple assignees
assigned_to = assigned_to_list
# Tags selection
selected_tags = st.multiselect("Tags", custom_tags, default=[])
submitted = st.form_submit_button("Create Task", type="primary")
if submitted:
if title:
new_task = {
'id': datetime.now().strftime("%Y%m%d_%H%M%S"),
'title': title,
'description': description,
'group': group,
'due_date': due_date.isoformat() if due_date else None,
'priority': priority,
'assigned_to': assigned_to,
'tags': selected_tags,
'completed': False,
'created_date': datetime.now().isoformat(),
'completed_date': None
}
# Load existing tasks and add new one
tasks = self.config_manager.load_json_data('tasks.json')
tasks.append(new_task)
if self.config_manager.save_json_data('tasks.json', tasks):
st.success("Task created successfully!")
st.rerun()
else:
st.error("Error saving task")
else:
st.error("Please enter a task title")
def filter_tasks(self, tasks, filter_group, filter_status, filter_assignees):
filtered = tasks.copy()
# Filter by group
if filter_group != "All":
filtered = [task for task in filtered if task.get('group') == filter_group]
# Filter by status
if filter_status == "Completed":
filtered = [task for task in filtered if task.get('completed', False)]
elif filter_status == "Incomplete":
filtered = [task for task in filtered if not task.get('completed', False)]
# Filter by assignees (handle both single and multiple assignees)
if filter_assignees: # If any assignees are selected
filtered_tasks = []
for task in filtered:
assigned_to = task.get('assigned_to', '')
task_assignees = []
# Extract assignees from task (handle both old single and new multiple assignees format)
if isinstance(assigned_to, str) and assigned_to.strip():
task_assignees = [assigned_to.strip()]
elif isinstance(assigned_to, list):
task_assignees = [assignee.strip() for assignee in assigned_to if assignee and assignee.strip()]
# Check if any of the task's assignees match any of the selected filter assignees
if any(assignee in filter_assignees for assignee in task_assignees):
filtered_tasks.append(task)
filtered = filtered_tasks
return filtered
def sort_tasks(self, tasks, sort_by):
if sort_by == "Due Date":
return sorted(tasks, key=lambda x: x.get('due_date') or '9999-12-31')
elif sort_by == "Created Date":
return sorted(tasks, key=lambda x: x.get('created_date', ''), reverse=True)
elif sort_by == "Title":
return sorted(tasks, key=lambda x: x.get('title', '').lower())
elif sort_by == "Group":
return sorted(tasks, key=lambda x: x.get('group', ''))
else:
return tasks
def render_task_card(self, task, task_groups, custom_tags):
task_id = task.get('id', '')
title = task.get('title', 'Untitled Task')
description = task.get('description', '')
group = task.get('group', 'Uncategorized')
due_date = task.get('due_date', '')
priority = task.get('priority', 'Medium')
assigned_to = task.get('assigned_to', '')
tags = task.get('tags', [])
completed = task.get('completed', False)
vendor_id = task.get('vendor_id', '')
vendor_name = self.get_vendor_name(vendor_id) if vendor_id else None
vendor_contact_info = self.get_vendor_contact_info(vendor_id) if vendor_id else None
# Handle both old single assignee and new multiple assignees format
if isinstance(assigned_to, str):
assigned_to_display = assigned_to if assigned_to else "Unassigned"
elif isinstance(assigned_to, list):
if assigned_to:
assigned_to_display = ", ".join(assigned_to)
else:
assigned_to_display = "Unassigned"
else:
assigned_to_display = "Unassigned"
# Create a container for the task card
with st.container():
# Task header with completion status and title - make this the most prominent
status_icon = "✅" if completed else "⏳"
st.markdown(f"### {status_icon} {title}")
# Task details in columns
col1, col2, col3 = st.columns(3)
with col1:
if due_date:
st.caption(f"📅 Due: {due_date}")
else:
st.caption("📅 No due date")
if vendor_name:
st.caption(f"🏢 Vendor: {vendor_name}")
with col2:
# Priority with color coding
if priority == "Urgent":
st.caption(f"🔴 Priority: {priority}")
elif priority == "High":
st.caption(f"🔴 Priority: {priority}")
elif priority == "Medium":
st.caption(f"🟡 Priority: {priority}")
else:
st.caption(f"🟢 Priority: {priority}")
if assigned_to_display and assigned_to_display != "Unassigned":
st.caption(f"👤 Assigned: {assigned_to_display}")
with col3:
st.caption(f"📁 Group: {group}")
if tags and len(tags) > 0:
st.caption(f"🏷️ Tags: {', '.join(tags)}")
# Description if available
if description:
st.caption(f"📝 {description}")
# Vendor contact information if available
if vendor_contact_info and vendor_name:
contact_info_items = []
# Determine if this is a vendor or item based on contact person
is_vendor = vendor_contact_info.get('contact_person', '') != ''
contact_type = "Vendor Contact" if is_vendor else "Seller Contact"
if is_vendor and vendor_contact_info.get('contact_person'):
contact_info_items.append(f"**Contact Person:** {vendor_contact_info['contact_person']}")
if vendor_contact_info.get('phone'):
contact_info_items.append(f"**Phone:** {vendor_contact_info['phone']}")
if vendor_contact_info.get('email'):
contact_info_items.append(f"**Email:** {vendor_contact_info['email']}")
if vendor_contact_info.get('website'):
contact_info_items.append(f"**Website:** [{vendor_contact_info['website']}]({vendor_contact_info['website']})")
if is_vendor and vendor_contact_info.get('address'):
contact_info_items.append(f"**Address:** {vendor_contact_info['address']}")
if contact_info_items:
st.markdown(f"**{contact_type} Information:**")
for info in contact_info_items:
st.markdown(f"<small>{info}</small>", unsafe_allow_html=True)
# Add some spacing
st.markdown("---")
# Action buttons below the task card
col1, col2, col3, col4 = st.columns([1, 1, 1, 1])
with col1:
if st.button("Edit", key=f"edit_{task_id}", help="Edit task", use_container_width=True):
st.session_state[f"editing_task_{task_id}"] = True
with col2:
if st.button("Duplicate", key=f"duplicate_{task_id}", help="Duplicate task", use_container_width=True):
self.duplicate_task(task_id)
with col3:
if completed:
if st.button("Undo", key=f"undo_{task_id}", help="Mark incomplete", use_container_width=True):
self.toggle_task_completion(task_id, False)
else:
if st.button("Complete", key=f"complete_{task_id}", help="Mark complete", use_container_width=True):
self.toggle_task_completion(task_id, True)
with col4:
if st.button("Delete", key=f"delete_{task_id}", help="Delete task", use_container_width=True):
self.delete_task(task_id)
# Show edit form if editing (outside columns to span full width)
if st.session_state.get(f"editing_task_{task_id}", False):
self.render_edit_task_form(task, task_groups, custom_tags)
def toggle_task_completion(self, task_id, completed):
tasks = self.config_manager.load_json_data('tasks.json')
for task in tasks:
if task.get('id') == task_id:
task['completed'] = completed
task['completed_date'] = datetime.now().isoformat() if completed else None
break
self.config_manager.save_json_data('tasks.json', tasks)
st.rerun()
def delete_task(self, task_id):
tasks = self.config_manager.load_json_data('tasks.json')
tasks = [task for task in tasks if task.get('id') != task_id]
self.config_manager.save_json_data('tasks.json', tasks)
st.rerun()
def duplicate_task(self, task_id):
tasks = self.config_manager.load_json_data('tasks.json')
# Find the task to duplicate
original_task = None
for task in tasks:
if task.get('id') == task_id:
original_task = task
break
if original_task:
# Create a duplicate with new ID and modified title
duplicated_task = original_task.copy()
duplicated_task['id'] = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
duplicated_task['title'] = f"{original_task.get('title', 'Untitled Task')}"
duplicated_task['completed'] = False
duplicated_task['completed_date'] = None
duplicated_task['created_date'] = datetime.now().isoformat()
# Add the duplicated task to the list
tasks.append(duplicated_task)
if self.config_manager.save_json_data('tasks.json', tasks):
st.success("Task duplicated successfully!")
st.rerun()
else:
st.error("Error saving duplicated task")
else:
st.error("Task not found")
def render_edit_task_form(self, task, task_groups, custom_tags):
task_id = task.get('id', '')
st.markdown("### Edit Task")
with st.form(f"edit_task_form_{task_id}"):
col1, col2 = st.columns(2)
with col1:
title = st.text_input("Task Title *", value=task.get('title', ''), key=f"edit_title_{task_id}")
# Show event-based groups first, then general categories
if task_groups:
current_group = task.get('group', '')
group_index = 0
if current_group in task_groups:
group_index = task_groups.index(current_group)
group = st.selectbox("Event/Category", task_groups, index=group_index, key=f"edit_group_{task_id}")
else:
group = st.selectbox("Category", ["General Planning"], key=f"edit_group_{task_id}")
due_date_str = task.get('due_date', '')
due_date = None
if due_date_str:
try:
due_date = datetime.fromisoformat(due_date_str).date()
except:
due_date = None
due_date = st.date_input("Due Date", value=due_date, key=f"edit_due_date_{task_id}")
priority_options = ["Low", "Medium", "High", "Urgent"]
current_priority = task.get('priority', 'Medium')
priority_index = priority_options.index(current_priority) if current_priority in priority_options else 1
priority = st.selectbox("Priority", priority_options, index=priority_index, key=f"edit_priority_{task_id}")
with col2:
description = st.text_area("Description", value=task.get('description', ''), key=f"edit_description_{task_id}")
# Assigned to field with wedding party and task assignees selection
wedding_party = self.config_manager.load_json_data('wedding_party.json')
wedding_party_names = [member.get('name', '') for member in wedding_party if member.get('name')]
# Get task assignees from config
config = self.config_manager.load_config()
custom_settings = config.get('custom_settings', {})
task_assignees = custom_settings.get('task_assignees', [])
# Get current assigned_to value (handle both old single assignee and new multiple assignees)
current_assigned_to = task.get('assigned_to', '')
# Handle backward compatibility - convert single assignee to list
if isinstance(current_assigned_to, str):
if current_assigned_to:
current_assignees = [current_assigned_to]
else:
current_assignees = []
elif isinstance(current_assigned_to, list):
current_assignees = current_assigned_to
else:
current_assignees = []
# Create combined options for multiselect
all_assignee_options = []
if wedding_party_names:
all_assignee_options.extend([f"Wedding Party: {name}" for name in wedding_party_names])
if task_assignees:
all_assignee_options.extend([f"Task Assignee: {assignee}" for assignee in task_assignees])
# Determine initial selected values
initial_selected = []
custom_assignees = []
for assignee in current_assignees:
if assignee in wedding_party_names:
initial_selected.append(f"Wedding Party: {assignee}")
elif assignee in task_assignees:
initial_selected.append(f"Task Assignee: {assignee}")
else:
custom_assignees.append(assignee)
# Multiple assignees selection
selected_assignees = st.multiselect("Assign to (select multiple people)", all_assignee_options, default=initial_selected, key=f"edit_assignees_{task_id}")
# Custom assignee text input (for additional people not in the lists)
custom_assignee_text = ", ".join(custom_assignees) if custom_assignees else ""
custom_assignee = st.text_input("Additional Custom Assignees", value=custom_assignee_text, placeholder="Enter additional assignee names (comma-separated)", key=f"edit_custom_assignee_{task_id}")
# Combine selected assignees and custom assignees
assigned_to_list = []
for assignee in selected_assignees:
if assignee.startswith("Wedding Party: "):
assigned_to_list.append(assignee.replace("Wedding Party: ", ""))
elif assignee.startswith("Task Assignee: "):
assigned_to_list.append(assignee.replace("Task Assignee: ", ""))
# Parse custom assignees (comma-separated)
if custom_assignee and custom_assignee.strip():
custom_list = [name.strip() for name in custom_assignee.split(',') if name.strip()]
assigned_to_list.extend(custom_list)
# Store as list for multiple assignees
assigned_to = assigned_to_list
# Tags selection
current_tags = task.get('tags', [])
# Filter current tags to only include those that exist in custom_tags
valid_current_tags = [tag for tag in current_tags if tag in custom_tags]
selected_tags = st.multiselect("Tags", custom_tags, default=valid_current_tags, key=f"edit_tags_{task_id}")
# Form buttons
col1, col2 = st.columns(2)
with col1:
save_clicked = st.form_submit_button("Save Changes", type="primary")
with col2:
cancel_clicked = st.form_submit_button("Cancel")
if save_clicked:
if title:
# Update the task
updated_task = {
'id': task_id,
'title': title,
'description': description,
'group': group,
'due_date': due_date.isoformat() if due_date else None,
'priority': priority,
'assigned_to': assigned_to,
'tags': selected_tags,
'completed': task.get('completed', False),
'created_date': task.get('created_date', datetime.now().isoformat()),
'completed_date': task.get('completed_date', None),
'vendor_id': task.get('vendor_id', '') # Preserve vendor_id if it exists
}
# Load existing tasks and update the specific one
tasks = self.config_manager.load_json_data('tasks.json')
for i, t in enumerate(tasks):
if t.get('id') == task_id:
tasks[i] = updated_task
break
if self.config_manager.save_json_data('tasks.json', tasks):
st.success("Task updated successfully!")
st.session_state[f"editing_task_{task_id}"] = False
st.rerun()
else:
st.error("Error saving task")
else:
st.error("Please enter a task title")
if cancel_clicked:
st.session_state[f"editing_task_{task_id}"] = False
st.rerun()