ido / services /analytics_service.py
Parthnuwal7
Adding backend to HF spaces
27d04ef
"""
Analytics Service
Computes insights from preprocessed session data
"""
from collections import Counter
from typing import Optional
import json
from pathlib import Path
def load_session_events(token: str) -> tuple[list[dict], dict]:
"""Load events and stats from a preprocessed session."""
storage_dir = Path(__file__).parent.parent / "storage"
session_path = storage_dir / f"preprocessed_{token}.json"
if not session_path.exists():
return [], {}
with open(session_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return data.get("events", []), data.get("stats", {})
def get_session_summary(token: str) -> dict:
"""
Get overall session summary.
Returns:
- total_events, total_watch, total_search, total_subscribe
- unique_channels
- date_range (first and last timestamp)
- language_breakdown
"""
events, stats = load_session_events(token)
if not events:
return {"error": "Session not found or empty"}
# Get unique channels from watch events
watch_events = [e for e in events if e.get("type") == "watch"]
channels = set(e.get("channel_clean") for e in watch_events if e.get("channel_clean"))
# Get date range
timestamps = [e.get("timestamp_utc") for e in events if e.get("timestamp_utc")]
timestamps.sort()
return {
"total_events": stats.get("total_events", len(events)),
"total_watch": stats.get("total_watch", len(watch_events)),
"total_search": stats.get("total_search", 0),
"total_subscribe": stats.get("total_subscribe", 0),
"unique_channels": len(channels),
"date_range": {
"first": timestamps[0] if timestamps else None,
"last": timestamps[-1] if timestamps else None
},
"language_breakdown": stats.get("language_breakdown", {})
}
def get_channel_analytics(token: str, top_n: int = 20, engagement_filter: str = "all") -> dict:
"""
Get channel analytics.
Args:
token: Session token
top_n: Number of top channels to return
engagement_filter: "all" | "watch" (active) | "view" (passive)
Returns:
- top_channels: List of (channel, count) tuples
- total_unique_channels
- channel_distribution: Percentage breakdown
"""
events, _ = load_session_events(token)
if not events:
return {"error": "Session not found or empty"}
# Get all watch-type events
all_watch_events = [e for e in events if e.get("type") == "watch"]
# Filter by engagement if specified
if engagement_filter == "watch":
# Watch = active engagement
filtered_events = [e for e in all_watch_events if e.get("engagement") == "active"]
elif engagement_filter == "view":
# View = passive engagement
filtered_events = [e for e in all_watch_events if e.get("engagement") == "passive"]
else:
# All watch events
filtered_events = all_watch_events
channel_counts = Counter(
e.get("channel_clean") for e in filtered_events
if e.get("channel_clean")
)
# Get original channel names for display
channel_display_names = {}
for e in all_watch_events:
clean = e.get("channel_clean")
original = e.get("channel")
if clean and original and clean not in channel_display_names:
channel_display_names[clean] = original
total_count = sum(channel_counts.values())
top_channels = channel_counts.most_common(top_n)
# Build result with display names and percentages
top_channels_result = []
for channel_clean, count in top_channels:
display_name = channel_display_names.get(channel_clean, channel_clean)
percentage = round((count / total_count) * 100, 2) if total_count > 0 else 0
top_channels_result.append({
"channel": display_name,
"channel_clean": channel_clean,
"count": count,
"percentage": percentage
})
# Build view count distribution histogram
# Buckets: 1, 2-5, 6-10, 11-20, 21-50, 51-100, 100+
view_buckets = {
"1": 0,
"2-5": 0,
"6-10": 0,
"11-20": 0,
"21-50": 0,
"51-100": 0,
"100+": 0
}
for count in channel_counts.values():
if count == 1:
view_buckets["1"] += 1
elif count <= 5:
view_buckets["2-5"] += 1
elif count <= 10:
view_buckets["6-10"] += 1
elif count <= 20:
view_buckets["11-20"] += 1
elif count <= 50:
view_buckets["21-50"] += 1
elif count <= 100:
view_buckets["51-100"] += 1
else:
view_buckets["100+"] += 1
# Convert to list format for frontend
view_distribution = [
{"bucket": bucket, "count": count}
for bucket, count in view_buckets.items()
]
# Also compute summary stats for both types
active_count = len([e for e in all_watch_events if e.get("engagement") == "active"])
passive_count = len([e for e in all_watch_events if e.get("engagement") == "passive"])
return {
"total_unique_channels": len(channel_counts),
"total_count": total_count,
"engagement_filter": engagement_filter,
"top_channels": top_channels_result,
"other_count": total_count - sum(c["count"] for c in top_channels_result),
"view_distribution": view_distribution,
"engagement_summary": {
"total_watch": active_count,
"total_view": passive_count,
"total_all": len(all_watch_events)
}
}
def get_watch_patterns(token: str) -> dict:
"""
Get watch time patterns.
Returns:
- hourly_distribution: Watches per hour (0-23)
- daily_distribution: Watches per day of week (0-6)
- peak_hour, peak_day
- weekly_peak_days: For each week, which day had most watches
- time_intervals: Grouped by time periods (morning, afternoon, etc.)
- circular_activity: Average watches per hour for radial chart
"""
events, _ = load_session_events(token)
if not events:
return {"error": "Session not found or empty"}
watch_events = [e for e in events if e.get("type") == "watch"]
# Count by hour
hourly = Counter(e.get("hour_local") for e in watch_events if e.get("hour_local") is not None)
# Count by day of week
daily = Counter(e.get("day_of_week") for e in watch_events if e.get("day_of_week") is not None)
# Build full distributions (fill missing with 0)
hourly_dist = [{"hour": h, "count": hourly.get(h, 0)} for h in range(24)]
daily_dist = [{"day": d, "count": daily.get(d, 0)} for d in range(7)]
# Find peaks
peak_hour = max(hourly_dist, key=lambda x: x["count"]) if hourly_dist else None
peak_day = max(daily_dist, key=lambda x: x["count"]) if daily_dist else None
day_names = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
# === Weekly Peak Day Analysis ===
# Group events by week (using ISO week number) and find peak day for each week
from datetime import datetime
weekly_data = {} # {(year, week): {day_of_week: count}}
for e in watch_events:
ts = e.get("timestamp_local")
dow = e.get("day_of_week")
if ts and dow is not None:
try:
dt = datetime.fromisoformat(ts.replace('Z', '+00:00'))
year, week, _ = dt.isocalendar()
key = (year, week)
if key not in weekly_data:
weekly_data[key] = Counter()
weekly_data[key][dow] += 1
except:
pass
# Find peak day for each week
weekly_peak_days = []
peak_day_counter = Counter() # Count how many weeks each day "wins"
for (year, week), day_counts in sorted(weekly_data.items()):
if day_counts:
peak_dow = max(day_counts.keys(), key=lambda d: day_counts[d])
peak_count = day_counts[peak_dow]
weekly_peak_days.append({
"year": year,
"week": week,
"peak_day": day_names[peak_dow],
"peak_day_num": peak_dow,
"count": peak_count
})
peak_day_counter[peak_dow] += 1
# Overall winner: which day wins the most weeks
overall_peak_day = None
overall_peak_wins = 0
if peak_day_counter:
winner_dow = max(peak_day_counter.keys(), key=lambda d: peak_day_counter[d])
overall_peak_day = day_names[winner_dow]
overall_peak_wins = peak_day_counter[winner_dow]
# === Time Intervals ===
# Group hours into intervals
intervals = {
"Night (12AM-6AM)": range(0, 6),
"Morning (6AM-12PM)": range(6, 12),
"Afternoon (12PM-6PM)": range(12, 18),
"Evening (6PM-12AM)": range(18, 24)
}
interval_counts = {}
for name, hour_range in intervals.items():
count = sum(hourly.get(h, 0) for h in hour_range)
interval_counts[name] = count
time_intervals = [
{"interval": name, "count": count, "hours": f"{list(hours)[0]}-{list(hours)[-1]+1}"}
for name, hours in intervals.items()
for count in [interval_counts[name]]
]
peak_interval = max(time_intervals, key=lambda x: x["count"]) if time_intervals else None
# === Circular Activity (Average per hour) ===
# Calculate average watches per hour across all days in the dataset
total_days = len(weekly_data) * 7 if weekly_data else 1 # Approximate
circular_activity = []
total_watches = sum(hourly.values())
for h in range(24):
count = hourly.get(h, 0)
# Percentage of total watches
percentage = round((count / total_watches) * 100, 2) if total_watches > 0 else 0
# Format hour label
if h == 0:
label = "12 AM"
elif h < 12:
label = f"{h} AM"
elif h == 12:
label = "12 PM"
else:
label = f"{h-12} PM"
circular_activity.append({
"hour": h,
"label": label,
"count": count,
"percentage": percentage
})
return {
"hourly_distribution": hourly_dist,
"daily_distribution": daily_dist,
"peak_hour": peak_hour["hour"] if peak_hour else None,
"peak_hour_count": peak_hour["count"] if peak_hour else 0,
"peak_day": day_names[peak_day["day"]] if peak_day and peak_day["day"] is not None else None,
"peak_day_count": peak_day["count"] if peak_day else 0,
# New fields
"weekly_peak_days": weekly_peak_days[-12:], # Last 12 weeks
"overall_peak_day": overall_peak_day,
"overall_peak_wins": overall_peak_wins,
"total_weeks": len(weekly_data),
"time_intervals": time_intervals,
"peak_interval": peak_interval["interval"] if peak_interval else None,
"circular_activity": circular_activity
}
def get_temporal_trends(token: str) -> dict:
"""
Analyze how watching patterns change month-to-month.
Returns:
- monthly_stats: Watch count, peak hour, peak day for each month
- peak_hour_trend: How peak hour shifts over months
- peak_day_trend: How peak day shifts over months
- activity_trend: Total watches per month
"""
events, _ = load_session_events(token)
if not events:
return {"error": "Session not found or empty"}
watch_events = [e for e in events if e.get("type") == "watch"]
if not watch_events:
return {
"monthly_stats": [],
"peak_hour_trend": [],
"peak_day_trend": [],
"activity_trend": [],
"summary": "No watch events found"
}
from collections import defaultdict
from datetime import datetime
# Group by month
monthly_data = defaultdict(lambda: {
"watches": 0,
"hourly": Counter(),
"daily": Counter()
})
day_names = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
for event in watch_events:
ts = event.get("timestamp_local") or event.get("timestamp_utc")
hour = event.get("hour_local")
dow = event.get("day_of_week")
if not ts:
continue
try:
# Parse year-month
if "T" in ts:
dt = datetime.fromisoformat(ts.replace('Z', '+00:00'))
else:
dt = datetime.strptime(ts[:10], "%Y-%m-%d")
month_key = f"{dt.year}-{dt.month:02d}"
monthly_data[month_key]["watches"] += 1
if hour is not None:
monthly_data[month_key]["hourly"][hour] += 1
if dow is not None:
monthly_data[month_key]["daily"][dow] += 1
except:
continue
# Build monthly stats
monthly_stats = []
peak_hour_trend = []
peak_day_trend = []
activity_trend = []
for month_key in sorted(monthly_data.keys()):
data = monthly_data[month_key]
# Find peak hour
peak_hour = None
peak_hour_count = 0
if data["hourly"]:
peak_hour = max(data["hourly"].keys(), key=lambda h: data["hourly"][h])
peak_hour_count = data["hourly"][peak_hour]
# Find peak day
peak_day = None
peak_day_name = None
peak_day_count = 0
if data["daily"]:
peak_day = max(data["daily"].keys(), key=lambda d: data["daily"][d])
peak_day_name = day_names[peak_day]
peak_day_count = data["daily"][peak_day]
# Format peak hour label
if peak_hour is not None:
if peak_hour == 0:
peak_hour_label = "12 AM"
elif peak_hour < 12:
peak_hour_label = f"{peak_hour} AM"
elif peak_hour == 12:
peak_hour_label = "12 PM"
else:
peak_hour_label = f"{peak_hour - 12} PM"
else:
peak_hour_label = "N/A"
# Month name
year, month = month_key.split("-")
month_names = ["", "Jan", "Feb", "Mar", "Apr", "May", "Jun",
"Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
month_label = f"{month_names[int(month)]} {year}"
monthly_stats.append({
"month": month_key,
"month_label": month_label,
"total_watches": data["watches"],
"peak_hour": peak_hour,
"peak_hour_label": peak_hour_label,
"peak_hour_count": peak_hour_count,
"peak_day": peak_day,
"peak_day_name": peak_day_name,
"peak_day_count": peak_day_count
})
# Trend data for charts
peak_hour_trend.append({
"month": month_key,
"month_label": month_label,
"peak_hour": peak_hour,
"peak_hour_label": peak_hour_label
})
peak_day_trend.append({
"month": month_key,
"month_label": month_label,
"peak_day": peak_day,
"peak_day_name": peak_day_name
})
activity_trend.append({
"month": month_key,
"month_label": month_label,
"watches": data["watches"]
})
# Detect significant shifts
shifts = []
for i in range(1, len(monthly_stats)):
prev = monthly_stats[i-1]
curr = monthly_stats[i]
# Hour shift
if prev["peak_hour"] is not None and curr["peak_hour"] is not None:
hour_diff = abs(curr["peak_hour"] - prev["peak_hour"])
if hour_diff >= 4: # Significant if 4+ hours shift
shifts.append({
"type": "peak_hour",
"from_month": prev["month_label"],
"to_month": curr["month_label"],
"from_value": prev["peak_hour_label"],
"to_value": curr["peak_hour_label"],
"description": f"Peak hour shifted from {prev['peak_hour_label']} to {curr['peak_hour_label']}"
})
# Day shift
if prev["peak_day"] is not None and curr["peak_day"] is not None:
if prev["peak_day"] != curr["peak_day"]:
# Check if weekday<->weekend shift
prev_weekend = prev["peak_day"] >= 5
curr_weekend = curr["peak_day"] >= 5
if prev_weekend != curr_weekend:
shifts.append({
"type": "peak_day",
"from_month": prev["month_label"],
"to_month": curr["month_label"],
"from_value": prev["peak_day_name"],
"to_value": curr["peak_day_name"],
"description": f"Shifted from {'weekend' if prev_weekend else 'weekday'} to {'weekend' if curr_weekend else 'weekday'}"
})
# Generate summary
if monthly_stats:
first = monthly_stats[0]
last = monthly_stats[-1]
summary = f"Tracked {len(monthly_stats)} months from {first['month_label']} to {last['month_label']}"
if shifts:
summary += f" | {len(shifts)} significant pattern shifts detected"
else:
summary = "No monthly data available"
return {
"monthly_stats": monthly_stats,
"peak_hour_trend": peak_hour_trend,
"peak_day_trend": peak_day_trend,
"activity_trend": activity_trend,
"pattern_shifts": shifts[:10], # Top 10 shifts
"total_months": len(monthly_stats),
"summary": summary
}
def get_search_analytics(token: str, top_n: int = 20) -> dict:
"""
Get search analytics.
Returns:
- total_searches
- top_search_terms
- language_breakdown for searches
"""
events, _ = load_session_events(token)
if not events:
return {"error": "Session not found or empty"}
search_events = [e for e in events if e.get("type") == "search"]
# Count search terms
search_counts = Counter(
e.get("text_clean") for e in search_events
if e.get("text_clean")
)
# Get raw terms for display
term_display = {}
for e in search_events:
clean = e.get("text_clean")
raw = e.get("text_raw")
if clean and raw and clean not in term_display:
term_display[clean] = raw
top_searches = []
for term_clean, count in search_counts.most_common(top_n):
display = term_display.get(term_clean, term_clean)
top_searches.append({
"term": display,
"term_clean": term_clean,
"count": count
})
# Language breakdown for searches
lang_counts = Counter(e.get("language_type") for e in search_events if e.get("language_type"))
return {
"total_searches": len(search_events),
"unique_searches": len(search_counts),
"top_searches": top_searches,
"language_breakdown": dict(lang_counts)
}
def get_subscription_overlap(token: str) -> dict:
"""
Analyze overlap between subscriptions and watch history.
Returns:
- total_subscriptions
- subscribed_and_watched: Channels you're subscribed to AND watched
- watched_not_subscribed: Channels you watch but aren't subscribed to
- subscribed_not_watched: Channels you're subscribed to but haven't watched
"""
events, _ = load_session_events(token)
if not events:
return {"error": "Session not found or empty"}
# Get subscribed channels (lowercase)
subscribe_events = [e for e in events if e.get("type") == "subscribe"]
subscribed = set(e.get("channel_clean") for e in subscribe_events if e.get("channel_clean"))
# Get watched channels (lowercase)
watch_events = [e for e in events if e.get("type") == "watch"]
watched = set(e.get("channel_clean") for e in watch_events if e.get("channel_clean"))
# Calculate overlaps
subscribed_and_watched = subscribed & watched
watched_not_subscribed = watched - subscribed
subscribed_not_watched = subscribed - watched
# Get display names
channel_display = {}
for e in events:
clean = e.get("channel_clean")
original = e.get("channel")
if clean and original:
channel_display[clean] = original
return {
"total_subscriptions": len(subscribed),
"total_watched_channels": len(watched),
"subscribed_and_watched": {
"count": len(subscribed_and_watched),
"percentage": round(len(subscribed_and_watched) / len(subscribed) * 100, 1) if subscribed else 0,
"channels": [channel_display.get(c, c) for c in list(subscribed_and_watched)[:20]]
},
"watched_not_subscribed": {
"count": len(watched_not_subscribed),
"channels": [channel_display.get(c, c) for c in list(watched_not_subscribed)[:20]]
},
"subscribed_not_watched": {
"count": len(subscribed_not_watched),
"channels": [channel_display.get(c, c) for c in list(subscribed_not_watched)[:20]]
}
}
def get_behavior_anomalies(token: str) -> dict:
"""
Detect deviations from normal watching patterns.
Identifies:
- Late night sessions (watching after midnight when normally don't)
- Binge periods (unusually high watch counts)
- Off-peak hour activity
- Weekly pattern changes
"""
from datetime import datetime
events, _ = load_session_events(token)
if not events:
return {"error": "Session not found or empty"}
watch_events = [e for e in events if e.get("type") == "watch"]
if not watch_events:
return {"anomalies": [], "late_night_sessions": [], "binge_days": []}
# === Calculate baseline patterns ===
# Average watches per hour across all data
hourly_counts = Counter(e.get("hour_local") for e in watch_events if e.get("hour_local") is not None)
total_watches = sum(hourly_counts.values())
# Define "late night" as 12AM-5AM (hours 0-4)
late_night_hours = {0, 1, 2, 3, 4}
late_night_baseline = sum(hourly_counts.get(h, 0) for h in late_night_hours)
late_night_percentage = (late_night_baseline / total_watches * 100) if total_watches > 0 else 0
# === Group by date ===
daily_data = {} # {date_str: {hour: count, total: count}}
for e in watch_events:
ts = e.get("timestamp_local")
hour = e.get("hour_local")
if ts and hour is not None:
try:
dt = datetime.fromisoformat(ts.replace('Z', '+00:00'))
date_str = dt.strftime("%Y-%m-%d")
if date_str not in daily_data:
daily_data[date_str] = {"hours": Counter(), "total": 0}
daily_data[date_str]["hours"][hour] += 1
daily_data[date_str]["total"] += 1
except:
pass
# Calculate average daily watches
if daily_data:
avg_daily_watches = sum(d["total"] for d in daily_data.values()) / len(daily_data)
std_dev = (sum((d["total"] - avg_daily_watches) ** 2 for d in daily_data.values()) / len(daily_data)) ** 0.5
else:
avg_daily_watches = 0
std_dev = 0
# === Detect Late Night Sessions ===
# Days where user watched significantly in late night hours (> usual)
late_night_sessions = []
for date_str, data in sorted(daily_data.items()):
late_night_count = sum(data["hours"].get(h, 0) for h in late_night_hours)
if late_night_count >= 3: # At least 3 videos in late night
late_night_sessions.append({
"date": date_str,
"late_night_count": late_night_count,
"total_count": data["total"],
"peak_hour": max(data["hours"].keys(), key=lambda h: data["hours"][h]) if data["hours"] else None
})
# === Detect Binge Days ===
# Days with watch count > mean + 2*std_dev
binge_threshold = avg_daily_watches + 2 * std_dev if std_dev > 0 else avg_daily_watches * 2
binge_days = []
for date_str, data in sorted(daily_data.items()):
if data["total"] > binge_threshold and data["total"] >= 10: # At least 10 videos
binge_days.append({
"date": date_str,
"count": data["total"],
"above_average_by": round(data["total"] - avg_daily_watches, 1),
"multiplier": round(data["total"] / avg_daily_watches, 2) if avg_daily_watches > 0 else 0
})
# === Detect Weekly Pattern Shifts ===
# Group by week and detect if hourly pattern changed significantly
weekly_patterns = {} # {(year, week): Counter of hours}
for e in watch_events:
ts = e.get("timestamp_local")
hour = e.get("hour_local")
if ts and hour is not None:
try:
dt = datetime.fromisoformat(ts.replace('Z', '+00:00'))
year, week, _ = dt.isocalendar()
key = (year, week)
if key not in weekly_patterns:
weekly_patterns[key] = Counter()
weekly_patterns[key][hour] += 1
except:
pass
# Find weeks with unusual late-night activity
unusual_weeks = []
for (year, week), hour_counts in sorted(weekly_patterns.items()):
week_total = sum(hour_counts.values())
week_late_night = sum(hour_counts.get(h, 0) for h in late_night_hours)
week_late_pct = (week_late_night / week_total * 100) if week_total > 0 else 0
# If this week's late night % is significantly higher than baseline
if week_late_pct > late_night_percentage * 1.5 and week_late_night >= 5:
unusual_weeks.append({
"year": year,
"week": week,
"late_night_count": week_late_night,
"late_night_percentage": round(week_late_pct, 1),
"baseline_percentage": round(late_night_percentage, 1),
"total_watches": week_total
})
# === Detect Streaks (Consecutive Days) ===
from datetime import timedelta
def find_streaks(date_list, max_gap=1):
"""Find streaks of dates with at most max_gap days between them."""
if not date_list:
return []
sorted_dates = sorted([datetime.strptime(d, "%Y-%m-%d") for d in date_list])
streaks = []
current_streak = [sorted_dates[0]]
for i in range(1, len(sorted_dates)):
gap = (sorted_dates[i] - sorted_dates[i-1]).days
if gap <= max_gap + 1: # Allow gap of max_gap days
current_streak.append(sorted_dates[i])
else:
if len(current_streak) >= 2:
streaks.append(current_streak)
current_streak = [sorted_dates[i]]
if len(current_streak) >= 2:
streaks.append(current_streak)
return streaks
# Find binge streaks (consecutive binge days, allowing 1 day gap)
binge_dates = [d["date"] for d in binge_days]
binge_streaks = find_streaks(binge_dates, max_gap=1)
binge_watching_periods = []
for streak in binge_streaks:
start = streak[0].strftime("%Y-%m-%d")
end = streak[-1].strftime("%Y-%m-%d")
days = (streak[-1] - streak[0]).days + 1
total_videos = sum(d["count"] for d in binge_days if d["date"] >= start and d["date"] <= end)
binge_watching_periods.append({
"start_date": start,
"end_date": end,
"duration_days": days,
"total_videos": total_videos,
"avg_per_day": round(total_videos / len(streak), 1)
})
# Find late night mood (consecutive late night sessions, allowing 1 day gap)
late_night_dates = [s["date"] for s in late_night_sessions]
late_night_streaks = find_streaks(late_night_dates, max_gap=1)
late_night_moods = []
for streak in late_night_streaks:
start = streak[0].strftime("%Y-%m-%d")
end = streak[-1].strftime("%Y-%m-%d")
days = (streak[-1] - streak[0]).days + 1
total_late = sum(s["late_night_count"] for s in late_night_sessions if s["date"] >= start and s["date"] <= end)
late_night_moods.append({
"start_date": start,
"end_date": end,
"duration_days": days,
"total_late_videos": total_late
})
# === Additional Pattern Analysis ===
# 1. Weekend Warrior Detection
weekend_watches = 0
weekday_watches = 0
for e in watch_events:
dow = e.get("day_of_week")
if dow is not None:
if dow >= 5: # Saturday=5, Sunday=6
weekend_watches += 1
else:
weekday_watches += 1
total_dow = weekend_watches + weekday_watches
weekend_pct = (weekend_watches / total_dow * 100) if total_dow > 0 else 0
# If weekend (2 days) has > 35% of watches, that's weekend warrior territory
is_weekend_warrior = weekend_pct > 35
# 2. Night Owl vs Morning Person
night_hours = {20, 21, 22, 23, 0, 1, 2, 3, 4} # 8PM - 5AM
morning_hours = {5, 6, 7, 8, 9, 10, 11} # 5AM - 12PM
night_count = sum(hourly_counts.get(h, 0) for h in night_hours)
morning_count = sum(hourly_counts.get(h, 0) for h in morning_hours)
if night_count > morning_count * 1.5:
chronotype = "Night Owl"
elif morning_count > night_count * 1.5:
chronotype = "Early Bird"
else:
chronotype = "Balanced"
# 3. Inactive Periods (gaps of 3+ days with no watching)
sorted_dates = sorted(daily_data.keys())
inactive_periods = []
for i in range(1, len(sorted_dates)):
prev = datetime.strptime(sorted_dates[i-1], "%Y-%m-%d")
curr = datetime.strptime(sorted_dates[i], "%Y-%m-%d")
gap = (curr - prev).days
if gap >= 4: # 3+ days gap
inactive_periods.append({
"start": sorted_dates[i-1],
"end": sorted_dates[i],
"gap_days": gap - 1
})
# === Build anomaly summary ===
anomalies = []
# Add binge watching periods
for period in sorted(binge_watching_periods, key=lambda x: x["duration_days"], reverse=True)[:3]:
anomalies.append({
"type": "binge_streak",
"date": f"{period['start_date']} to {period['end_date']}",
"description": f"Binge watching period: {period['duration_days']} days, {period['total_videos']} videos",
"severity": "high" if period["duration_days"] >= 5 else "medium"
})
# Add late night moods
for mood in sorted(late_night_moods, key=lambda x: x["duration_days"], reverse=True)[:3]:
anomalies.append({
"type": "late_night_mood",
"date": f"{mood['start_date']} to {mood['end_date']}",
"description": f"Late night mood: {mood['duration_days']} consecutive nights",
"severity": "high" if mood["duration_days"] >= 4 else "medium"
})
# Add top single-day anomalies
for session in sorted(late_night_sessions, key=lambda x: x["late_night_count"], reverse=True)[:3]:
anomalies.append({
"type": "late_night",
"date": session["date"],
"description": f"Watched {session['late_night_count']} videos after midnight",
"severity": "high" if session["late_night_count"] >= 10 else "medium"
})
for day in sorted(binge_days, key=lambda x: x["count"], reverse=True)[:3]:
anomalies.append({
"type": "binge",
"date": day["date"],
"description": f"Watched {day['count']} videos ({day['multiplier']}x above average)",
"severity": "high" if day["multiplier"] >= 3 else "medium"
})
return {
"baseline": {
"avg_daily_watches": round(avg_daily_watches, 1),
"std_dev": round(std_dev, 1),
"late_night_baseline_pct": round(late_night_percentage, 1),
"total_days": len(daily_data)
},
"anomalies": anomalies[:12], # Top 12 anomalies
"late_night_sessions": late_night_sessions[-20:],
"binge_days": binge_days[-20:],
"unusual_weeks": unusual_weeks[-10:],
# New streak data
"binge_watching_periods": binge_watching_periods,
"late_night_moods": late_night_moods,
# Behavior patterns
"patterns": {
"weekend_warrior": is_weekend_warrior,
"weekend_pct": round(weekend_pct, 1),
"chronotype": chronotype,
"night_watches": night_count,
"morning_watches": morning_count,
"inactive_periods": inactive_periods[-5:] # Last 5
}
}
def get_habit_formation(token: str, min_streak_days: int = 3) -> dict:
"""
Detect habit formation patterns.
Identifies:
- Channels watched daily for consecutive days
- Videos watched multiple times on different days
- Content patterns that indicate habitual watching
Args:
token: Session token
min_streak_days: Minimum consecutive days to count as a habit (default: 3)
Returns:
- channel_habits: Channels with daily watching streaks
- video_habits: Videos watched on multiple days
- content_habits: Topics/keywords watched daily
- habit_strength: Overall habit formation score
"""
events, _ = load_session_events(token)
if not events:
return {"error": "Session not found or empty"}
# Get watch events with dates
watch_events = [e for e in events if e.get("type") == "watch"]
if not watch_events:
return {
"channel_habits": [],
"video_habits": [],
"content_habits": [],
"habit_strength": 0,
"summary": "No watch events found"
}
# Group events by date and channel
from collections import defaultdict
from datetime import datetime, timedelta
channel_by_date = defaultdict(set) # date -> set of channels
date_by_channel = defaultdict(set) # channel -> set of dates
# Track videos by date (using text_clean as identifier since we don't have video_id)
date_by_video = defaultdict(set) # video title -> set of dates
video_info = {} # video title -> {channel, first_seen, total_watches}
# Also track micro_topics by date
topic_by_date = defaultdict(set) # date -> set of topics
date_by_topic = defaultdict(set) # topic -> set of dates
for event in watch_events:
ts = event.get("timestamp_local") or event.get("timestamp_utc")
if not ts:
continue
# Extract date
try:
date_str = ts.split("T")[0] if "T" in ts else ts[:10]
except:
continue
# Track channel
channel = event.get("channel_clean")
if channel:
channel_by_date[date_str].add(channel)
date_by_channel[channel].add(date_str)
# Track video (using text_clean as identifier)
video_title = event.get("text_clean") or event.get("text_raw")
if video_title and len(video_title) > 5: # Skip very short titles
date_by_video[video_title].add(date_str)
if video_title not in video_info:
video_info[video_title] = {
"channel": channel or "Unknown",
"first_seen": date_str,
"total_watches": 0
}
video_info[video_title]["total_watches"] += 1
# Track micro_topics if available
topics = event.get("micro_topics", [])
for topic in topics:
topic_by_date[date_str].add(topic)
date_by_topic[topic].add(date_str)
# Find consecutive day streaks for each channel
def find_daily_streaks(dates_set: set, min_days: int) -> list:
"""Find streaks of consecutive days."""
if len(dates_set) < min_days:
return []
sorted_dates = sorted(dates_set)
streaks = []
current_streak = [sorted_dates[0]]
for i in range(1, len(sorted_dates)):
try:
prev_date = datetime.strptime(sorted_dates[i-1], "%Y-%m-%d")
curr_date = datetime.strptime(sorted_dates[i], "%Y-%m-%d")
if (curr_date - prev_date).days == 1:
current_streak.append(sorted_dates[i])
else:
if len(current_streak) >= min_days:
streaks.append({
"start": current_streak[0],
"end": current_streak[-1],
"days": len(current_streak)
})
current_streak = [sorted_dates[i]]
except:
current_streak = [sorted_dates[i]]
# Don't forget the last streak
if len(current_streak) >= min_days:
streaks.append({
"start": current_streak[0],
"end": current_streak[-1],
"days": len(current_streak)
})
return streaks
# Find channel habits
channel_habits = []
for channel, dates in date_by_channel.items():
streaks = find_daily_streaks(dates, min_streak_days)
if streaks:
# Calculate total days in habits
total_habit_days = sum(s["days"] for s in streaks)
longest_streak = max(s["days"] for s in streaks)
channel_habits.append({
"channel": channel,
"total_days_watched": len(dates),
"habit_streaks": streaks,
"longest_streak": longest_streak,
"total_habit_days": total_habit_days,
"habit_score": min(100, total_habit_days * 10) # Score 0-100
})
# Sort by longest streak, then total habit days
channel_habits.sort(key=lambda x: (x["longest_streak"], x["total_habit_days"]), reverse=True)
# Find video habits (videos watched on multiple different days)
video_habits = []
for video_title, dates in date_by_video.items():
if len(dates) >= 2: # Watched on at least 2 different days
streaks = find_daily_streaks(dates, min_streak_days)
info = video_info.get(video_title, {})
video_habits.append({
"title": video_title[:80] + "..." if len(video_title) > 80 else video_title,
"channel": info.get("channel", "Unknown"),
"days_watched": len(dates),
"total_watches": info.get("total_watches", 0),
"first_seen": info.get("first_seen", ""),
"has_streak": len(streaks) > 0,
"longest_streak": max((s["days"] for s in streaks), default=0)
})
# Sort video habits by days watched, then total watches
video_habits.sort(key=lambda x: (x["days_watched"], x["total_watches"]), reverse=True)
# Find content/topic habits (only for topics with >= 5 total occurrences)
content_habits = []
for topic, dates in date_by_topic.items():
if len(dates) < 5: # Skip rare topics
continue
streaks = find_daily_streaks(dates, min_streak_days)
if streaks:
total_habit_days = sum(s["days"] for s in streaks)
longest_streak = max(s["days"] for s in streaks)
content_habits.append({
"topic": topic,
"total_days": len(dates),
"habit_streaks": streaks,
"longest_streak": longest_streak,
"total_habit_days": total_habit_days
})
# Sort content habits
content_habits.sort(key=lambda x: (x["longest_streak"], x["total_habit_days"]), reverse=True)
# Calculate overall habit strength
total_channels_with_habits = len(channel_habits)
max_channel_streak = max((h["longest_streak"] for h in channel_habits), default=0)
# Habit strength: 0-100 score
habit_strength = 0
if total_channels_with_habits > 0:
# Factors: number of habitual channels, longest streak, total habit days
habit_strength = min(100, (
total_channels_with_habits * 10 +
max_channel_streak * 5 +
sum(h["total_habit_days"] for h in channel_habits[:5]) # Top 5
))
# Generate habit summary
summary_parts = []
if channel_habits:
top_habit = channel_habits[0]
summary_parts.append(
f"Strongest habit: {top_habit['channel']} watched {top_habit['longest_streak']} days in a row"
)
if total_channels_with_habits > 1:
summary_parts.append(f"{total_channels_with_habits} channels with daily habits")
if video_habits:
summary_parts.append(f"{len(video_habits)} rewatched videos")
if content_habits:
summary_parts.append(f"{len(content_habits)} recurring topics")
return {
"channel_habits": channel_habits, # Return ALL channel habits
"video_habits": video_habits[:30], # Top 30 rewatched videos
"content_habits": content_habits[:30], # Top 30 topics
"habit_strength": habit_strength,
"total_channels_with_habits": total_channels_with_habits,
"total_videos_rewatched": len(video_habits),
"total_topics_with_habits": len(content_habits),
"max_streak_days": max_channel_streak,
"summary": " | ".join(summary_parts) if summary_parts else "No strong habits detected"
}
def get_time_spent(token: str, break_threshold_minutes: int = 60, last_video_minutes: int = 5) -> dict:
"""
Calculate approximate time spent on YouTube.
Uses session detection: groups continuous watching periods separated by
significant breaks (>break_threshold_minutes).
Args:
token: Session token
break_threshold_minutes: Gap that ends a session (default 60 min)
last_video_minutes: Estimate for last video duration (default 5 min)
Returns:
- total_minutes, total_hours
- average_daily_minutes
- sessions stats (count, average, longest)
"""
events, _ = load_session_events(token)
if not events:
return {"error": "Session not found or empty"}
from datetime import datetime, timedelta
from collections import defaultdict
# Get watch events with valid timestamps
watch_events = []
for e in events:
if e.get("type") == "watch":
ts = e.get("timestamp_local") or e.get("timestamp_utc")
if ts:
try:
if "T" in ts:
dt = datetime.fromisoformat(ts.replace('Z', '+00:00'))
else:
dt = datetime.strptime(ts[:19], "%Y-%m-%dT%H:%M:%S")
watch_events.append({"timestamp": dt, "event": e})
except:
pass
if not watch_events:
return {
"total_minutes": 0,
"total_hours": 0,
"average_daily_minutes": 0,
"total_days": 0,
"sessions": {
"total_count": 0,
"average_duration_minutes": 0,
"longest_session_minutes": 0
},
"summary": "No watch events with timestamps"
}
# Sort by timestamp
watch_events.sort(key=lambda x: x["timestamp"])
# Detect sessions
sessions = []
break_threshold = timedelta(minutes=break_threshold_minutes)
session_start = watch_events[0]["timestamp"]
session_end = watch_events[0]["timestamp"]
session_event_count = 1
for i in range(1, len(watch_events)):
current = watch_events[i]["timestamp"]
previous = watch_events[i-1]["timestamp"]
gap = current - previous
if gap > break_threshold:
# End current session
duration = (session_end - session_start).total_seconds() / 60.0
duration += last_video_minutes # Add estimate for last video
sessions.append({
"start": session_start,
"end": session_end,
"duration_minutes": duration,
"event_count": session_event_count
})
# Start new session
session_start = current
session_end = current
session_event_count = 1
else:
session_end = current
session_event_count += 1
# Don't forget the last session
duration = (session_end - session_start).total_seconds() / 60.0
duration += last_video_minutes
sessions.append({
"start": session_start,
"end": session_end,
"duration_minutes": duration,
"event_count": session_event_count
})
# Calculate totals
total_minutes = sum(s["duration_minutes"] for s in sessions)
total_hours = round(total_minutes / 60, 1)
# Get unique days
unique_days = set()
for we in watch_events:
unique_days.add(we["timestamp"].date())
total_days = len(unique_days)
average_daily = round(total_minutes / total_days, 1) if total_days > 0 else 0
# Session stats
session_durations = [s["duration_minutes"] for s in sessions]
avg_session = round(sum(session_durations) / len(session_durations), 1) if sessions else 0
longest_session = round(max(session_durations), 1) if sessions else 0
# Generate summary
if total_hours >= 24:
time_str = f"{int(total_hours // 24)} days {int(total_hours % 24)} hours"
else:
time_str = f"{total_hours} hours"
summary = f"Spent approximately {time_str} on YouTube across {total_days} days ({len(sessions)} sessions)"
return {
"total_minutes": round(total_minutes, 1),
"total_hours": total_hours,
"average_daily_minutes": average_daily,
"total_days": total_days,
"sessions": {
"total_count": len(sessions),
"average_duration_minutes": avg_session,
"longest_session_minutes": longest_session
},
"summary": summary
}
def get_channel_distribution(token: str) -> dict:
"""
Get channel distribution by view count bins.
Returns:
- bin_distribution: Channels grouped by view count [1, 2-5, 6-10, 11-20, 21-50, 51-100, 100+]
- temporal_by_bin: Monthly breakdown of videos watched per bin
"""
events, _ = load_session_events(token)
if not events:
return {"error": "Session not found or empty"}
from collections import defaultdict
from datetime import datetime
# Get watch events
watch_events = [e for e in events if e.get("type") == "watch"]
# Count views per channel
channel_counts = Counter(
e.get("channel_clean") for e in watch_events
if e.get("channel_clean")
)
# Define bins
bins = [
(1, 1, "1"),
(2, 5, "2-5"),
(6, 10, "6-10"),
(11, 20, "11-20"),
(21, 50, "21-50"),
(51, 100, "51-100"),
(101, float('inf'), "100+")
]
# Count channels per bin
bin_distribution = []
channel_bin_map = {} # Map channel -> bin label
for min_val, max_val, label in bins:
channels_in_bin = [
ch for ch, count in channel_counts.items()
if min_val <= count <= max_val
]
video_count = sum(channel_counts[ch] for ch in channels_in_bin)
bin_distribution.append({
"bin": label,
"channel_count": len(channels_in_bin),
"video_count": video_count
})
for ch in channels_in_bin:
channel_bin_map[ch] = label
# Temporal breakdown by bin and month
monthly_data = defaultdict(lambda: defaultdict(int)) # {month: {bin: count}}
for event in watch_events:
channel = event.get("channel_clean")
ts = event.get("timestamp_local") or event.get("timestamp_utc")
if not channel or not ts:
continue
bin_label = channel_bin_map.get(channel, "1") # Default to "1"
try:
if isinstance(ts, str):
dt = datetime.fromisoformat(ts.replace('Z', '+00:00'))
else:
continue
month_key = f"{dt.year}-{dt.month:02d}"
monthly_data[month_key][bin_label] += 1
except:
continue
# Convert to sorted list
temporal_by_bin = []
for month in sorted(monthly_data.keys()):
month_entry = {
"month": month,
"bins": {}
}
for min_val, max_val, label in bins:
month_entry["bins"][label] = monthly_data[month].get(label, 0)
temporal_by_bin.append(month_entry)
# Summary stats
total_channels = len(channel_counts)
total_videos = sum(channel_counts.values())
single_view_channels = sum(1 for count in channel_counts.values() if count == 1)
return {
"bin_distribution": bin_distribution,
"temporal_by_bin": temporal_by_bin,
"stats": {
"total_channels": total_channels,
"total_videos": total_videos,
"single_view_channels": single_view_channels,
"single_view_percentage": round(single_view_channels / total_channels * 100, 1) if total_channels > 0 else 0
}
}
def get_full_analytics(token: str) -> dict:
"""Get all analytics in one call."""
return {
"summary": get_session_summary(token),
"channels": get_channel_analytics(token),
"watch_patterns": get_watch_patterns(token),
"searches": get_search_analytics(token),
"subscription_overlap": get_subscription_overlap(token),
"behavior_anomalies": get_behavior_anomalies(token),
"habit_formation": get_habit_formation(token),
"temporal_trends": get_temporal_trends(token),
"time_spent": get_time_spent(token),
"channel_distribution": get_channel_distribution(token)
}