File size: 1,960 Bytes
2299bb4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 | """
Time utilities - timestamp helpers.
"""
from datetime import datetime, timezone, timedelta
from typing import Optional
import time
def utc_now() -> datetime:
"""Get current UTC time as timezone-aware datetime."""
return datetime.now(timezone.utc)
def utc_timestamp() -> int:
"""Get current UTC timestamp in seconds."""
return int(time.time())
def format_date(dt: datetime, fmt: str = "%Y-%m-%d") -> str:
"""Format datetime to string."""
if dt is None:
return ""
return dt.strftime(fmt)
def format_datetime(dt: datetime) -> str:
"""Format datetime for display."""
if dt is None:
return ""
return dt.strftime("%Y-%m-%d %H:%M:%S")
def parse_date(date_str: str) -> Optional[datetime]:
"""Parse date string to datetime."""
try:
return datetime.fromisoformat(date_str.replace("Z", "+00:00"))
except (ValueError, AttributeError):
return None
def time_ago(dt: datetime) -> str:
"""Get human-readable time ago string."""
if dt is None:
return "unknown"
now = utc_now()
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
diff = now - dt
seconds = diff.total_seconds()
if seconds < 60:
return "just now"
elif seconds < 3600:
minutes = int(seconds / 60)
return f"{minutes}m ago"
elif seconds < 86400:
hours = int(seconds / 3600)
return f"{hours}h ago"
elif seconds < 604800:
days = int(seconds / 86400)
return f"{days}d ago"
else:
return format_date(dt)
def get_date_path(dt: Optional[datetime] = None) -> str:
"""Get date path for storage (YYYY/MM/DD)."""
if dt is None:
dt = utc_now()
return f"{dt.year}/{dt.month:02d}/{dt.day:02d}"
def get_hour_bucket(dt: Optional[datetime] = None) -> int:
"""Get hour bucket (0-23) for batching."""
if dt is None:
dt = utc_now()
return dt.hour |