swiftops-backend / src /app /tasks /timesheet_tasks.py
kamau1's picture
complete services
456b2e2
"""
Timesheet Background Tasks
This module contains background jobs for timesheet management:
1. Daily timesheet generation with ticket metrics calculation
2. Automatic attendance tracking for active workers
Schedule:
- Daily at 11:59 PM (end of day) to calculate metrics for the day
Architecture:
- Generates timesheets for all active workers in all projects
- Calculates daily ticket metrics from ticket_assignments
- Stores metrics in timesheet for fast payroll generation
- Idempotent: safe to re-run, updates existing records
Usage:
# Run daily job for today
python -m app.tasks.timesheet_tasks
# Run for specific date
python -m app.tasks.timesheet_tasks --date 2025-01-15
# Dry run (no commit)
python -m app.tasks.timesheet_tasks --dry-run
# Run for specific project only
python -m app.tasks.timesheet_tasks --project-id <uuid>
"""
import logging
import argparse
from datetime import datetime, date, timedelta
from typing import Optional, Dict, Any, List
from uuid import UUID
from sqlalchemy import func, and_, or_
from sqlalchemy.orm import Session
from app.core.database import SessionLocal
from app.models.timesheet import Timesheet
from app.models.project_team import ProjectTeam
from app.models.ticket_assignment import TicketAssignment
from app.models.ticket import Ticket
from app.models.enums import TimesheetStatus, AssignmentAction
from app.models.user import User
from app.models.project import Project
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler(
f'logs/timesheet_job_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'
)
]
)
logger = logging.getLogger(__name__)
def calculate_daily_ticket_metrics(
db: Session,
user_id: UUID,
work_date: date,
project_id: Optional[UUID] = None
) -> Dict[str, int]:
"""
Calculate daily ticket metrics for a user
Counts:
- tickets_assigned: New assignments for the day (action='assigned')
- tickets_completed: Tickets completed on this day (ticket.completed_at on this date)
- tickets_rescheduled: Tickets rescheduled on this day
- tickets_cancelled: Tickets cancelled on this day
- tickets_rejected: Tickets rejected by user on this day (action='rejected')
Args:
db: Database session
user_id: User to calculate metrics for
work_date: Date to calculate metrics for
project_id: Optional project filter
Returns:
Dictionary with ticket metrics
"""
# Date range for the day (00:00:00 to 23:59:59)
start_of_day = datetime.combine(work_date, datetime.min.time())
end_of_day = datetime.combine(work_date, datetime.max.time())
# Count tickets assigned (action='assigned' on this date)
tickets_assigned_query = db.query(func.count(TicketAssignment.id)).filter(
TicketAssignment.user_id == user_id,
TicketAssignment.action == AssignmentAction.ASSIGNED,
TicketAssignment.assigned_at >= start_of_day,
TicketAssignment.assigned_at <= end_of_day
)
if project_id:
tickets_assigned_query = tickets_assigned_query.join(Ticket).filter(
Ticket.project_id == project_id
)
tickets_assigned = tickets_assigned_query.scalar() or 0
# Count tickets completed (ticket.completed_at on this date)
# Need to check ticket_assignments for this user first
tickets_completed_query = db.query(func.count(Ticket.id.distinct())).join(
TicketAssignment,
and_(
TicketAssignment.ticket_id == Ticket.id,
TicketAssignment.user_id == user_id
)
).filter(
Ticket.completed_at >= start_of_day,
Ticket.completed_at <= end_of_day
)
if project_id:
tickets_completed_query = tickets_completed_query.filter(Ticket.project_id == project_id)
tickets_completed = tickets_completed_query.scalar() or 0
# Count tickets rejected (action='rejected' on this date)
tickets_rejected_query = db.query(func.count(TicketAssignment.id)).filter(
TicketAssignment.user_id == user_id,
TicketAssignment.action == AssignmentAction.REJECTED,
TicketAssignment.responded_at >= start_of_day,
TicketAssignment.responded_at <= end_of_day
)
if project_id:
tickets_rejected_query = tickets_rejected_query.join(Ticket).filter(
Ticket.project_id == project_id
)
tickets_rejected = tickets_rejected_query.scalar() or 0
# TODO: Add tickets_rescheduled and tickets_cancelled when those features are implemented
# For now, set to 0
tickets_rescheduled = 0
tickets_cancelled = 0
return {
'tickets_assigned': tickets_assigned,
'tickets_completed': tickets_completed,
'tickets_rescheduled': tickets_rescheduled,
'tickets_cancelled': tickets_cancelled,
'tickets_rejected': tickets_rejected
}
def generate_or_update_timesheet(
db: Session,
user_id: UUID,
project_id: UUID,
work_date: date,
dry_run: bool = False
) -> Dict[str, Any]:
"""
Generate or update timesheet for a user on a specific date
- If timesheet exists: Update ticket metrics
- If timesheet doesn't exist: Create new timesheet with PRESENT status
Args:
db: Database session
user_id: User ID
project_id: Project ID
work_date: Date to generate timesheet for
dry_run: If True, don't commit changes
Returns:
Dictionary with result details
"""
try:
# Get user details for logging
user = db.query(User).filter(User.id == user_id).first()
if not user:
return {
'success': False,
'user_id': str(user_id),
'error': 'User not found'
}
# Calculate daily ticket metrics
metrics = calculate_daily_ticket_metrics(db, user_id, work_date, project_id)
# Check if timesheet already exists
existing_timesheet = db.query(Timesheet).filter(
Timesheet.user_id == user_id,
Timesheet.work_date == work_date
).first()
if existing_timesheet:
# Update existing timesheet with new metrics
existing_timesheet.tickets_assigned = metrics['tickets_assigned']
existing_timesheet.tickets_completed = metrics['tickets_completed']
existing_timesheet.tickets_rescheduled = metrics['tickets_rescheduled']
existing_timesheet.tickets_cancelled = metrics['tickets_cancelled']
existing_timesheet.tickets_rejected = metrics['tickets_rejected']
action = 'updated'
timesheet = existing_timesheet
else:
# Create new timesheet with PRESENT status
timesheet = Timesheet(
user_id=user_id,
project_id=project_id,
work_date=work_date,
status=TimesheetStatus.PRESENT,
tickets_assigned=metrics['tickets_assigned'],
tickets_completed=metrics['tickets_completed'],
tickets_rescheduled=metrics['tickets_rescheduled'],
tickets_cancelled=metrics['tickets_cancelled'],
tickets_rejected=metrics['tickets_rejected']
)
db.add(timesheet)
action = 'created'
if not dry_run:
db.commit()
db.refresh(timesheet)
logger.info(
f"{action.capitalize()} timesheet for {user.name} ({user_id}) on {work_date}: "
f"assigned={metrics['tickets_assigned']}, completed={metrics['tickets_completed']}, "
f"rejected={metrics['tickets_rejected']}"
)
return {
'success': True,
'action': action,
'user_id': str(user_id),
'user_name': user.name,
'work_date': str(work_date),
'metrics': metrics
}
except Exception as e:
logger.error(f"Error processing timesheet for user {user_id} on {work_date}: {str(e)}")
if not dry_run:
db.rollback()
return {
'success': False,
'user_id': str(user_id),
'work_date': str(work_date),
'error': str(e)
}
def generate_daily_timesheets_job(
target_date: Optional[date] = None,
project_id: Optional[UUID] = None,
dry_run: bool = False
) -> Dict[str, Any]:
"""
Generate or update timesheets for all active workers
This is the main background job that runs daily at 11:59 PM.
For each active worker in each active project:
1. Calculate daily ticket metrics
2. Create or update timesheet with metrics
Args:
target_date: Date to generate timesheets for (defaults to today)
project_id: Optional project filter (process only this project)
dry_run: If True, don't commit changes (for testing)
Returns:
Summary dictionary with counts and errors
"""
if target_date is None:
target_date = date.today()
logger.info(f"Starting daily timesheet generation for {target_date}")
if project_id:
logger.info(f"Filtering by project: {project_id}")
if dry_run:
logger.warning("DRY RUN MODE - No changes will be committed")
db = SessionLocal()
try:
# Get all active project team members
query = db.query(ProjectTeam).join(User).join(Project).filter(
User.is_active == True,
Project.is_closed == False,
ProjectTeam.removed_at.is_(None)
)
if project_id:
query = query.filter(ProjectTeam.project_id == project_id)
team_members = query.all()
logger.info(f"Found {len(team_members)} active team members to process")
results = {
'date': str(target_date),
'total_processed': 0,
'created': 0,
'updated': 0,
'errors': 0,
'details': []
}
for member in team_members:
result = generate_or_update_timesheet(
db=db,
user_id=member.user_id,
project_id=member.project_id,
work_date=target_date,
dry_run=dry_run
)
results['total_processed'] += 1
if result['success']:
if result['action'] == 'created':
results['created'] += 1
else:
results['updated'] += 1
else:
results['errors'] += 1
results['details'].append(result)
logger.info(
f"Daily timesheet generation completed: "
f"{results['created']} created, {results['updated']} updated, "
f"{results['errors']} errors"
)
return results
except Exception as e:
logger.error(f"Fatal error in daily timesheet generation: {str(e)}")
return {
'success': False,
'error': str(e)
}
finally:
db.close()
if __name__ == '__main__':
"""
CLI interface for running timesheet generation tasks
Examples:
# Generate for today
python -m app.tasks.timesheet_tasks
# Generate for specific date
python -m app.tasks.timesheet_tasks --date 2025-01-15
# Dry run (no commit)
python -m app.tasks.timesheet_tasks --dry-run
# Specific project only
python -m app.tasks.timesheet_tasks --project-id <uuid>
"""
parser = argparse.ArgumentParser(description='Generate daily timesheets with ticket metrics')
parser.add_argument(
'--date',
type=str,
help='Date to generate timesheets for (YYYY-MM-DD). Defaults to today.'
)
parser.add_argument(
'--project-id',
type=str,
help='Optional: Process only this project (UUID)'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Run without committing changes (for testing)'
)
args = parser.parse_args()
# Parse date
target_date = None
if args.date:
try:
target_date = datetime.strptime(args.date, '%Y-%m-%d').date()
except ValueError:
logger.error(f"Invalid date format: {args.date}. Use YYYY-MM-DD")
exit(1)
# Parse project ID
project_id = None
if args.project_id:
try:
project_id = UUID(args.project_id)
except ValueError:
logger.error(f"Invalid project ID format: {args.project_id}")
exit(1)
# Run the job
result = generate_daily_timesheets_job(
target_date=target_date,
project_id=project_id,
dry_run=args.dry_run
)
# Print summary
if result.get('success') is False:
logger.error(f"Job failed: {result.get('error')}")
exit(1)
else:
print("\n" + "="*60)
print("DAILY TIMESHEET GENERATION SUMMARY")
print("="*60)
print(f"Date: {result['date']}")
print(f"Total Processed: {result['total_processed']}")
print(f"Created: {result['created']}")
print(f"Updated: {result['updated']}")
print(f"Errors: {result['errors']}")
print("="*60 + "\n")
exit(0 if result['errors'] == 0 else 1)