Spaces:
Running
Running
Ali Hashhash
feat: implement analytics service and API endpoints for user dashboard statistics
7d6be93 | from __future__ import annotations | |
| from datetime import datetime, timedelta, timezone | |
| from typing import Any | |
| from firebase_admin import firestore | |
| DEFAULT_CATEGORY = "Technology & AI" | |
| def _as_datetime(value: Any) -> datetime | None: | |
| if value is None: | |
| return None | |
| if isinstance(value, datetime): | |
| if value.tzinfo is None: | |
| return value.replace(tzinfo=timezone.utc) | |
| return value.astimezone(timezone.utc) | |
| if isinstance(value, str): | |
| parsed = None | |
| try: | |
| parsed = datetime.fromisoformat(value.replace("Z", "+00:00")) | |
| except ValueError: | |
| return None | |
| if parsed.tzinfo is None: | |
| return parsed.replace(tzinfo=timezone.utc) | |
| return parsed.astimezone(timezone.utc) | |
| return None | |
| def _as_int(value: Any, default: int = 0) -> int: | |
| try: | |
| return int(value or default) | |
| except (TypeError, ValueError): | |
| return default | |
| def _categories_from_note(data: dict[str, Any]) -> list[str]: | |
| raw = ( | |
| data.get("category") | |
| or data.get("categories") | |
| or data.get("video_categories") | |
| or data.get("videoCategories") | |
| ) | |
| if isinstance(raw, list): | |
| categories = [str(item).strip() for item in raw if str(item).strip()] | |
| elif isinstance(raw, str) and raw.strip(): | |
| categories = [raw.strip()] | |
| else: | |
| categories = [] | |
| categories = [ | |
| category for category in categories if category.lower() != "uncategorized" | |
| ] | |
| return categories or [DEFAULT_CATEGORY] | |
| def _key_points_count(data: dict[str, Any]) -> int: | |
| raw = data.get("keyPoints") or data.get("key_points") or [] | |
| return len(raw) if isinstance(raw, list) else 0 | |
| def _note_duration_seconds(data: dict[str, Any]) -> int: | |
| duration = _as_int(data.get("videoDuration", data.get("video_duration", 0))) | |
| # Some legacy records store duration in minutes. | |
| if duration == 0: | |
| duration = _as_int(data.get("duration", data.get("durationSeconds", 0))) | |
| return max(duration, 0) | |
| def _note_created_at(data: dict[str, Any]) -> datetime | None: | |
| return _as_datetime(data.get("createdAt") or data.get("created_at")) | |
| def _notes_for_user(db: firestore.Client, user_id: str) -> list[Any]: | |
| notes_by_id: dict[str, Any] = {} | |
| for field in ("userId", "user_id"): | |
| for note_doc in db.collection("notes").where(field, "==", user_id).stream(): | |
| notes_by_id[note_doc.id] = note_doc | |
| return list(notes_by_id.values()) | |
| def build_user_analytics(db: firestore.Client, user_id: str) -> dict[str, Any]: | |
| now = datetime.now(timezone.utc) | |
| start_of_week = datetime( | |
| now.year, | |
| now.month, | |
| now.day, | |
| tzinfo=timezone.utc, | |
| ) - timedelta(days=now.weekday()) | |
| notes = _notes_for_user(db, user_id) | |
| category_count: dict[str, int] = {} | |
| favorite_notes_count = 0 | |
| total_key_points = 0 | |
| total_duration_seconds = 0 | |
| this_week_videos = 0 | |
| this_month_saved_hours = 0.0 | |
| note_dates: set[str] = set() | |
| for note_doc in notes: | |
| data = note_doc.to_dict() or {} | |
| for category in _categories_from_note(data): | |
| category_count[category] = category_count.get(category, 0) + 1 | |
| if bool(data.get("isFavorite", data.get("is_favorite", False))): | |
| favorite_notes_count += 1 | |
| total_key_points += _key_points_count(data) | |
| duration_seconds = _note_duration_seconds(data) | |
| total_duration_seconds += duration_seconds | |
| created_at = _note_created_at(data) | |
| if created_at is None: | |
| continue | |
| note_dates.add(created_at.date().isoformat()) | |
| if created_at >= start_of_week: | |
| this_week_videos += 1 | |
| if created_at.year == now.year and created_at.month == now.month: | |
| this_month_saved_hours += duration_seconds / 3600.0 | |
| total_videos = len(notes) | |
| total_minutes = total_duration_seconds // 60 | |
| favorite_category = ( | |
| max(category_count.items(), key=lambda item: item[1])[0] | |
| if category_count | |
| else "None" | |
| ) | |
| current_streak = 0 | |
| check_date = now.date() | |
| if check_date.isoformat() not in note_dates: | |
| check_date -= timedelta(days=1) | |
| while check_date.isoformat() in note_dates: | |
| current_streak += 1 | |
| check_date -= timedelta(days=1) | |
| return { | |
| "userId": user_id, | |
| "totalVideos": total_videos, | |
| "notesCount": total_videos, | |
| "totalMinutes": total_minutes, | |
| "totalSavedHours": round(total_duration_seconds / 3600.0, 2), | |
| "favoriteCategory": favorite_category, | |
| "currentStreak": current_streak, | |
| "thisWeekVideos": this_week_videos, | |
| "thisMonthSavedHours": round(this_month_saved_hours, 2), | |
| "categoryCount": category_count, | |
| "favoriteNotesCount": favorite_notes_count, | |
| "totalKeyPoints": total_key_points, | |
| "lastUpdated": firestore.SERVER_TIMESTAMP, | |
| } | |
| def persist_user_analytics(db: firestore.Client, user_id: str) -> dict[str, Any]: | |
| analytics = build_user_analytics(db, user_id) | |
| db.collection("analytics").document(user_id).set(analytics, merge=True) | |
| return analytics | |