Spaces:
Running
Running
File size: 5,195 Bytes
7d6be93 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 | from __future__ import annotations
from datetime import datetime, timedelta, timezone
from typing import Any
from firebase_admin import firestore
DEFAULT_CATEGORY = "Technology & AI"
def _as_datetime(value: Any) -> datetime | None:
if value is None:
return None
if isinstance(value, datetime):
if value.tzinfo is None:
return value.replace(tzinfo=timezone.utc)
return value.astimezone(timezone.utc)
if isinstance(value, str):
parsed = None
try:
parsed = datetime.fromisoformat(value.replace("Z", "+00:00"))
except ValueError:
return None
if parsed.tzinfo is None:
return parsed.replace(tzinfo=timezone.utc)
return parsed.astimezone(timezone.utc)
return None
def _as_int(value: Any, default: int = 0) -> int:
try:
return int(value or default)
except (TypeError, ValueError):
return default
def _categories_from_note(data: dict[str, Any]) -> list[str]:
raw = (
data.get("category")
or data.get("categories")
or data.get("video_categories")
or data.get("videoCategories")
)
if isinstance(raw, list):
categories = [str(item).strip() for item in raw if str(item).strip()]
elif isinstance(raw, str) and raw.strip():
categories = [raw.strip()]
else:
categories = []
categories = [
category for category in categories if category.lower() != "uncategorized"
]
return categories or [DEFAULT_CATEGORY]
def _key_points_count(data: dict[str, Any]) -> int:
raw = data.get("keyPoints") or data.get("key_points") or []
return len(raw) if isinstance(raw, list) else 0
def _note_duration_seconds(data: dict[str, Any]) -> int:
duration = _as_int(data.get("videoDuration", data.get("video_duration", 0)))
# Some legacy records store duration in minutes.
if duration == 0:
duration = _as_int(data.get("duration", data.get("durationSeconds", 0)))
return max(duration, 0)
def _note_created_at(data: dict[str, Any]) -> datetime | None:
return _as_datetime(data.get("createdAt") or data.get("created_at"))
def _notes_for_user(db: firestore.Client, user_id: str) -> list[Any]:
notes_by_id: dict[str, Any] = {}
for field in ("userId", "user_id"):
for note_doc in db.collection("notes").where(field, "==", user_id).stream():
notes_by_id[note_doc.id] = note_doc
return list(notes_by_id.values())
def build_user_analytics(db: firestore.Client, user_id: str) -> dict[str, Any]:
now = datetime.now(timezone.utc)
start_of_week = datetime(
now.year,
now.month,
now.day,
tzinfo=timezone.utc,
) - timedelta(days=now.weekday())
notes = _notes_for_user(db, user_id)
category_count: dict[str, int] = {}
favorite_notes_count = 0
total_key_points = 0
total_duration_seconds = 0
this_week_videos = 0
this_month_saved_hours = 0.0
note_dates: set[str] = set()
for note_doc in notes:
data = note_doc.to_dict() or {}
for category in _categories_from_note(data):
category_count[category] = category_count.get(category, 0) + 1
if bool(data.get("isFavorite", data.get("is_favorite", False))):
favorite_notes_count += 1
total_key_points += _key_points_count(data)
duration_seconds = _note_duration_seconds(data)
total_duration_seconds += duration_seconds
created_at = _note_created_at(data)
if created_at is None:
continue
note_dates.add(created_at.date().isoformat())
if created_at >= start_of_week:
this_week_videos += 1
if created_at.year == now.year and created_at.month == now.month:
this_month_saved_hours += duration_seconds / 3600.0
total_videos = len(notes)
total_minutes = total_duration_seconds // 60
favorite_category = (
max(category_count.items(), key=lambda item: item[1])[0]
if category_count
else "None"
)
current_streak = 0
check_date = now.date()
if check_date.isoformat() not in note_dates:
check_date -= timedelta(days=1)
while check_date.isoformat() in note_dates:
current_streak += 1
check_date -= timedelta(days=1)
return {
"userId": user_id,
"totalVideos": total_videos,
"notesCount": total_videos,
"totalMinutes": total_minutes,
"totalSavedHours": round(total_duration_seconds / 3600.0, 2),
"favoriteCategory": favorite_category,
"currentStreak": current_streak,
"thisWeekVideos": this_week_videos,
"thisMonthSavedHours": round(this_month_saved_hours, 2),
"categoryCount": category_count,
"favoriteNotesCount": favorite_notes_count,
"totalKeyPoints": total_key_points,
"lastUpdated": firestore.SERVER_TIMESTAMP,
}
def persist_user_analytics(db: firestore.Client, user_id: str) -> dict[str, Any]:
analytics = build_user_analytics(db, user_id)
db.collection("analytics").document(user_id).set(analytics, merge=True)
return analytics
|