File size: 1,960 Bytes
2299bb4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
"""
Time utilities - timestamp helpers.
"""
from datetime import datetime, timezone, timedelta
from typing import Optional
import time


def utc_now() -> datetime:
    """Get current UTC time as timezone-aware datetime."""
    return datetime.now(timezone.utc)


def utc_timestamp() -> int:
    """Get current UTC timestamp in seconds."""
    return int(time.time())


def format_date(dt: datetime, fmt: str = "%Y-%m-%d") -> str:
    """Format datetime to string."""
    if dt is None:
        return ""
    return dt.strftime(fmt)


def format_datetime(dt: datetime) -> str:
    """Format datetime for display."""
    if dt is None:
        return ""
    return dt.strftime("%Y-%m-%d %H:%M:%S")


def parse_date(date_str: str) -> Optional[datetime]:
    """Parse date string to datetime."""
    try:
        return datetime.fromisoformat(date_str.replace("Z", "+00:00"))
    except (ValueError, AttributeError):
        return None


def time_ago(dt: datetime) -> str:
    """Get human-readable time ago string."""
    if dt is None:
        return "unknown"
    
    now = utc_now()
    if dt.tzinfo is None:
        dt = dt.replace(tzinfo=timezone.utc)
    
    diff = now - dt
    
    seconds = diff.total_seconds()
    if seconds < 60:
        return "just now"
    elif seconds < 3600:
        minutes = int(seconds / 60)
        return f"{minutes}m ago"
    elif seconds < 86400:
        hours = int(seconds / 3600)
        return f"{hours}h ago"
    elif seconds < 604800:
        days = int(seconds / 86400)
        return f"{days}d ago"
    else:
        return format_date(dt)


def get_date_path(dt: Optional[datetime] = None) -> str:
    """Get date path for storage (YYYY/MM/DD)."""
    if dt is None:
        dt = utc_now()
    return f"{dt.year}/{dt.month:02d}/{dt.day:02d}"


def get_hour_bucket(dt: Optional[datetime] = None) -> int:
    """Get hour bucket (0-23) for batching."""
    if dt is None:
        dt = utc_now()
    return dt.hour