VcRlAgent commited on
Commit
b3c30a9
·
1 Parent(s): bde3c17

Refactor for Headshort and Scene Generation using Instant-ID model hosted in Replicate

Browse files
__pycache__/rate_limiter.cpython-312.pyc ADDED
Binary file (6.8 kB). View file
 
rate_limiter.py CHANGED
@@ -0,0 +1,121 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ import os
3
+ from datetime import datetime, timezone, timedelta
4
+ from pathlib import Path
5
+ from typing import Tuple
6
+ import uuid
7
+ import hashlib
8
+
9
+
10
+ class RateLimiter:
11
+ def __init__(self, session_file: str, daily_limit: int, dev_limit: int):
12
+ self.session_file = Path(session_file)
13
+ self.daily_limit = daily_limit
14
+ self.dev_limit = dev_limit
15
+ self.is_dev_mode = os.getenv("DEV_MODE", "false").lower() == "true"
16
+
17
+ # Create session file if doesn't exist
18
+ if not self.session_file.exists():
19
+ self._save_data({})
20
+
21
+ def _load_data(self) -> dict:
22
+ """Load rate limit data from file"""
23
+ try:
24
+ with open(self.session_file, 'r') as f:
25
+ return json.load(f)
26
+ except (json.JSONDecodeError, FileNotFoundError):
27
+ return {}
28
+
29
+ def _save_data(self, data: dict):
30
+ """Save rate limit data to file"""
31
+ with open(self.session_file, 'w') as f:
32
+ json.dump(data, f, indent=2)
33
+
34
+ def _get_device_id(self, request: dict) -> str:
35
+ """Generate consistent device ID from request headers"""
36
+ # Use IP + User-Agent for fingerprinting
37
+ ip = request.get("client", {}).get("host", "unknown")
38
+ user_agent = request.get("headers", {}).get("user-agent", "unknown")
39
+
40
+ # Hash to create stable ID
41
+ fingerprint = f"{ip}:{user_agent}"
42
+ return hashlib.sha256(fingerprint.encode()).hexdigest()[:16]
43
+
44
+ def _get_next_reset(self) -> datetime:
45
+ """Get next midnight UTC"""
46
+ now = datetime.now(timezone.utc)
47
+ tomorrow = now + timedelta(days=1)
48
+ return tomorrow.replace(hour=0, minute=0, second=0, microsecond=0)
49
+
50
+ def _cleanup_expired(self, data: dict) -> dict:
51
+ """Remove expired entries"""
52
+ now = datetime.now(timezone.utc)
53
+ cleaned = {}
54
+
55
+ for device_id, info in data.items():
56
+ reset_time = datetime.fromisoformat(info["reset_time"])
57
+ if reset_time > now:
58
+ cleaned[device_id] = info
59
+
60
+ return cleaned
61
+
62
+ def check_limit(self, request: dict) -> Tuple[bool, int, datetime]:
63
+ """
64
+ Check if device has exceeded rate limit
65
+
66
+ Returns:
67
+ (allowed: bool, remaining: int, reset_time: datetime)
68
+ """
69
+ device_id = self._get_device_id(request)
70
+ data = self._load_data()
71
+ data = self._cleanup_expired(data)
72
+
73
+ limit = self.dev_limit if self.is_dev_mode else self.daily_limit
74
+ now = datetime.now(timezone.utc)
75
+
76
+ if device_id not in data:
77
+ # New device
78
+ reset_time = self._get_next_reset()
79
+ data[device_id] = {
80
+ "count": 0,
81
+ "reset_time": reset_time.isoformat()
82
+ }
83
+ self._save_data(data)
84
+
85
+ device_info = data[device_id]
86
+ reset_time = datetime.fromisoformat(device_info["reset_time"])
87
+
88
+ # Check if reset needed
89
+ if now >= reset_time:
90
+ device_info["count"] = 0
91
+ device_info["reset_time"] = self._get_next_reset().isoformat()
92
+ self._save_data(data)
93
+
94
+ current_count = device_info["count"]
95
+ remaining = max(0, limit - current_count)
96
+ allowed = current_count < limit
97
+
98
+ return allowed, remaining, reset_time
99
+
100
+ def increment(self, request: dict):
101
+ """Increment usage count for device"""
102
+ device_id = self._get_device_id(request)
103
+ data = self._load_data()
104
+
105
+ if device_id in data:
106
+ data[device_id]["count"] += 1
107
+ self._save_data(data)
108
+
109
+ def get_limit_message(self, remaining: int, reset_time: datetime) -> str:
110
+ """Generate user-friendly limit message"""
111
+ mode = "DEV" if self.is_dev_mode else "Standard"
112
+ limit = self.dev_limit if self.is_dev_mode else self.daily_limit
113
+
114
+ if remaining > 0:
115
+ return f"✅ {remaining}/{limit} generations remaining today ({mode} mode)"
116
+ else:
117
+ now = datetime.now(timezone.utc)
118
+ hours_left = int((reset_time - now).total_seconds() / 3600)
119
+ minutes_left = int(((reset_time - now).total_seconds() % 3600) / 60)
120
+
121
+ return f"❌ Daily limit reached ({limit}/{limit}). Resets in {hours_left}h {minutes_left}m (midnight UTC)"