Spaces:
Running
Running
Pulastya B
commited on
Commit
Β·
c206ce6
1
Parent(s):
74cd041
Fix TPM rate limiting: Add token budget management for Groq 12K/min limit
Browse files- Track actual tokens used per API call (response.usage.total_tokens)
- Reset token counter every 60 seconds
- Proactive pacing at 70% threshold (8.4K tokens) to prevent 429 errors
- Detailed logging: token usage per call and per minute budget status
- Remove duplicate pruning code, keep aggressive 4-exchange limit
- Emergency pruning at 8K tokens for extra safety
This resolves 429 TPM errors that occurred after just 2 requests despite all previous optimizations (dynamic prompts, result compression, conversation pruning).
- src/orchestrator.py +57 -6
src/orchestrator.py
CHANGED
|
@@ -244,13 +244,24 @@ class DataScienceCopilot:
|
|
| 244 |
self.tools_registry = TOOLS
|
| 245 |
self.tool_functions = self._build_tool_functions_map()
|
| 246 |
|
| 247 |
-
# Token tracking
|
| 248 |
self.total_tokens_used = 0
|
|
|
|
|
|
|
| 249 |
self.api_calls_made = 0
|
| 250 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 251 |
# Rate limiting for Gemini (10 RPM free tier)
|
| 252 |
self.last_api_call_time = 0
|
| 253 |
-
self.min_api_call_interval = 1.0 if self.provider == "gemini" else 0 # 1s = safe for Gemini API
|
| 254 |
|
| 255 |
# Ensure output directories exist
|
| 256 |
Path("./outputs").mkdir(exist_ok=True)
|
|
@@ -1575,13 +1586,46 @@ You are a DOER. Complete workflows based on user intent."""
|
|
| 1575 |
iteration += 1
|
| 1576 |
|
| 1577 |
try:
|
| 1578 |
-
#
|
|
|
|
|
|
|
| 1579 |
if len(messages) > 10:
|
| 1580 |
-
# Keep: system prompt, user
|
| 1581 |
messages = [messages[0], messages[1]] + messages[-8:]
|
| 1582 |
-
print(f"
|
| 1583 |
|
| 1584 |
-
#
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1585 |
if self.min_api_call_interval > 0:
|
| 1586 |
time_since_last_call = time.time() - self.last_api_call_time
|
| 1587 |
if time_since_last_call < self.min_api_call_interval:
|
|
@@ -1604,6 +1648,13 @@ You are a DOER. Complete workflows based on user intent."""
|
|
| 1604 |
|
| 1605 |
self.api_calls_made += 1
|
| 1606 |
self.last_api_call_time = time.time()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1607 |
response_message = response.choices[0].message
|
| 1608 |
tool_calls = response_message.tool_calls
|
| 1609 |
final_content = response_message.content
|
|
|
|
| 244 |
self.tools_registry = TOOLS
|
| 245 |
self.tool_functions = self._build_tool_functions_map()
|
| 246 |
|
| 247 |
+
# Token tracking and rate limiting
|
| 248 |
self.total_tokens_used = 0
|
| 249 |
+
self.tokens_this_minute = 0
|
| 250 |
+
self.minute_start_time = time.time()
|
| 251 |
self.api_calls_made = 0
|
| 252 |
|
| 253 |
+
# Provider-specific limits
|
| 254 |
+
if self.provider == "groq":
|
| 255 |
+
self.tpm_limit = 12000 # Tokens per minute
|
| 256 |
+
self.rpm_limit = 30 # Requests per minute
|
| 257 |
+
self.min_api_call_interval = 0.5 # Wait between calls
|
| 258 |
+
elif self.provider == "gemini":
|
| 259 |
+
self.tpm_limit = 32000 # More generous
|
| 260 |
+
self.rpm_limit = 15
|
| 261 |
+
self.min_api_call_interval = 1.0 # Gemini free tier: safer spacing
|
| 262 |
+
|
| 263 |
# Rate limiting for Gemini (10 RPM free tier)
|
| 264 |
self.last_api_call_time = 0
|
|
|
|
| 265 |
|
| 266 |
# Ensure output directories exist
|
| 267 |
Path("./outputs").mkdir(exist_ok=True)
|
|
|
|
| 1586 |
iteration += 1
|
| 1587 |
|
| 1588 |
try:
|
| 1589 |
+
# π AGGRESSIVE CONVERSATION PRUNING (LangChain pattern)
|
| 1590 |
+
# Keep only: system + user + last 4 exchanges (8 messages)
|
| 1591 |
+
# This prevents token bloat while maintaining context
|
| 1592 |
if len(messages) > 10:
|
| 1593 |
+
# Keep: system prompt [0], user query [1], last 4 exchanges
|
| 1594 |
messages = [messages[0], messages[1]] + messages[-8:]
|
| 1595 |
+
print(f"βοΈ Pruned conversation (keeping last 4 exchanges, ~4K tokens saved)")
|
| 1596 |
|
| 1597 |
+
# π Token estimation and warning
|
| 1598 |
+
estimated_tokens = sum(len(str(m.get('content', ''))) // 4 for m in messages)
|
| 1599 |
+
if estimated_tokens > 8000:
|
| 1600 |
+
# Emergency pruning - keep only last 2 exchanges
|
| 1601 |
+
messages = [messages[0], messages[1]] + messages[-4:]
|
| 1602 |
+
print(f"β οΈ Emergency pruning (conversation > 8K tokens)")
|
| 1603 |
+
|
| 1604 |
+
# π° Token budget management (Groq TPM limit)
|
| 1605 |
+
if self.provider == "groq":
|
| 1606 |
+
# Reset minute counter if needed
|
| 1607 |
+
elapsed = time.time() - self.minute_start_time
|
| 1608 |
+
if elapsed > 60:
|
| 1609 |
+
print(f"π Token budget reset (was {self.tokens_this_minute}/{self.tpm_limit})")
|
| 1610 |
+
self.tokens_this_minute = 0
|
| 1611 |
+
self.minute_start_time = time.time()
|
| 1612 |
+
|
| 1613 |
+
# Check if we're close to TPM limit (use 70% threshold to be safe)
|
| 1614 |
+
if self.tokens_this_minute + estimated_tokens > self.tpm_limit * 0.7:
|
| 1615 |
+
wait_time = 60 - elapsed
|
| 1616 |
+
if wait_time > 0:
|
| 1617 |
+
print(f"βΈοΈ Token budget: {self.tokens_this_minute}/{self.tpm_limit} used ({(self.tokens_this_minute/self.tpm_limit)*100:.0f}%)")
|
| 1618 |
+
print(f" Next request would use ~{estimated_tokens} tokens β exceeds safe limit")
|
| 1619 |
+
print(f" Waiting {wait_time:.0f}s for budget reset...")
|
| 1620 |
+
time.sleep(wait_time)
|
| 1621 |
+
self.tokens_this_minute = 0
|
| 1622 |
+
self.minute_start_time = time.time()
|
| 1623 |
+
print(f"β
Token budget reset complete")
|
| 1624 |
+
else:
|
| 1625 |
+
print(f"π° Token budget: {self.tokens_this_minute}/{self.tpm_limit} ({(self.tokens_this_minute/self.tpm_limit)*100:.0f}%)")
|
| 1626 |
+
|
| 1627 |
+
|
| 1628 |
+
# Rate limiting - wait if needed
|
| 1629 |
if self.min_api_call_interval > 0:
|
| 1630 |
time_since_last_call = time.time() - self.last_api_call_time
|
| 1631 |
if time_since_last_call < self.min_api_call_interval:
|
|
|
|
| 1648 |
|
| 1649 |
self.api_calls_made += 1
|
| 1650 |
self.last_api_call_time = time.time()
|
| 1651 |
+
|
| 1652 |
+
# Track tokens used (for TPM budget management)
|
| 1653 |
+
if hasattr(response, 'usage') and response.usage:
|
| 1654 |
+
tokens_used = response.usage.total_tokens
|
| 1655 |
+
self.tokens_this_minute += tokens_used
|
| 1656 |
+
print(f"π Tokens: {tokens_used} this call | {self.tokens_this_minute}/{self.tpm_limit} this minute")
|
| 1657 |
+
|
| 1658 |
response_message = response.choices[0].message
|
| 1659 |
tool_calls = response_message.tool_calls
|
| 1660 |
final_content = response_message.content
|