File size: 4,845 Bytes
a3bb5d4
 
 
 
86a82e8
 
a3bb5d4
86a82e8
 
 
 
 
 
a3bb5d4
 
 
 
 
 
86a82e8
a3bb5d4
86a82e8
a3bb5d4
 
 
 
 
 
 
86a82e8
a3bb5d4
 
86a82e8
a3bb5d4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
"""
Task statistics router
Provides endpoints for calculating and retrieving task statistics
"""
from fastapi import APIRouter, Depends, HTTPException, status
from sqlmodel import Session, select, func
from datetime import datetime, timedelta
from uuid import UUID

from ..database import get_session_dep
from ..models.task import Task
from ..models.user import User
from ..utils.deps import get_current_user

router = APIRouter(prefix="/api", tags=["stats"])


@router.get("/{user_id}/tasks/stats")
async def get_task_stats(
    user_id: UUID,
    current_user: User = Depends(get_current_user),
    session: Session = Depends(get_session_dep)
):
    """
    Get task statistics for a user.
    Returns: total tasks, completed tasks, pending tasks, completion rate, streak, etc.
    """
    # Verify user is requesting their own stats
    if current_user.id != user_id:
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Unauthorized")
    
    # Get all tasks for user
    tasks = session.exec(select(Task).where(Task.user_id == user_id)).all()
    
    total = len(tasks)
    completed = len([t for t in tasks if t.completed])
    pending = total - completed
    completion_rate = (completed / total * 100) if total > 0 else 0
    
    # Calculate streak (consecutive days with completed tasks)
    streak = calculate_streak(tasks)
    
    # Get achievements
    achievements = calculate_achievements(total, completed, streak)
    
    # Chart data for last 7 days
    chart_data = calculate_chart_data(tasks)
    
    return {
        "total": total,
        "completed": completed,
        "pending": pending,
        "completionRate": completion_rate,
        "streak": streak,
        "achievements": achievements,
        "chartData": chart_data
    }


def calculate_streak(tasks: list) -> int:
    """Calculate consecutive days with completed tasks"""
    if not tasks:
        return 0
    
    # Sort tasks by completion date (most recent first)
    completed_tasks = sorted(
        [t for t in tasks if t.completed and t.updated_at],
        key=lambda x: x.updated_at,
        reverse=True
    )
    
    if not completed_tasks:
        return 0
    
    streak = 0
    current_date = datetime.now().date()
    
    for task in completed_tasks:
        task_date = task.updated_at.date() if hasattr(task.updated_at, 'date') else task.updated_at
        
        # Check if task was completed today or yesterday from current streak
        if task_date == current_date or task_date == current_date - timedelta(days=streak):
            streak += 1
            current_date = task_date
        else:
            break
    
    return streak


def calculate_achievements(total: int, completed: int, streak: int) -> list:
    """Calculate unlocked achievements based on stats"""
    achievements = []
    
    # First task completed
    if completed >= 1:
        achievements.append({
            "id": "first_task",
            "name": "Getting Started",
            "description": "Completed your first task",
            "icon": "Star",
            "unlocked": True
        })
    
    # 5 tasks completed
    if completed >= 5:
        achievements.append({
            "id": "five_tasks",
            "name": "Task Master",
            "description": "Completed 5 tasks",
            "icon": "Trophy",
            "unlocked": True
        })
    
    # 10 tasks completed
    if completed >= 10:
        achievements.append({
            "id": "ten_tasks",
            "name": "Productivity Pro",
            "description": "Completed 10 tasks",
            "icon": "Zap",
            "unlocked": True
        })
    
    # 3 day streak
    if streak >= 3:
        achievements.append({
            "id": "three_day_streak",
            "name": "On Fire",
            "description": "3 day completion streak",
            "icon": "Flame",
            "unlocked": True
        })
    
    # 100% completion rate
    if total > 0 and (completed / total) == 1.0:
        achievements.append({
            "id": "perfect_score",
            "name": "Perfect Score",
            "description": "100% task completion rate",
            "icon": "Award",
            "unlocked": True
        })
    
    return achievements


def calculate_chart_data(tasks: list) -> list:
    """Calculate chart data for last 7 days"""
    chart_data = []
    today = datetime.now().date()
    
    for i in range(6, -1, -1):
        date = today - timedelta(days=i)
        count = len([
            t for t in tasks
            if t.completed
            and hasattr(t.updated_at, 'date')
            and t.updated_at.date() == date
        ])
        
        chart_data.append({
            "date": date.strftime("%a"),
            "count": count,
            "isToday": date == today
        })
    
    return chart_data