|
|
import time |
|
|
from abc import ABC, abstractmethod |
|
|
from collections import deque |
|
|
from typing import Any |
|
|
|
|
|
SECONDS_IN_MINUTE = 60 |
|
|
|
|
|
class BaseAgent(ABC): |
|
|
@abstractmethod |
|
|
def run(self, question: str, file_name: str = "", file_content: str = "") -> Any: |
|
|
"""Override this method in your agent implementation""" |
|
|
print(f"Agent received question (first 50 chars): {question[:50]}...") |
|
|
return "BaseAgent default answer" |
|
|
|
|
|
class InputTokenRateLimiter: |
|
|
"""Ensures we don’t exceed tokens per minute limits for LLMs""" |
|
|
_instance = None |
|
|
|
|
|
def __new__(cls): |
|
|
if cls._instance is None: |
|
|
cls._instance = super().__new__(cls) |
|
|
return cls._instance |
|
|
|
|
|
def __init__(self, max_tpm=50000): |
|
|
self.max_tpm = max_tpm |
|
|
if not hasattr(self, "_initialized"): |
|
|
self.token_window = deque() |
|
|
self._initialized = True |
|
|
|
|
|
def _update_queue(self, time_now): |
|
|
while self.token_window and time_now - self.token_window[0][0] > SECONDS_IN_MINUTE: |
|
|
self.token_window.popleft() |
|
|
|
|
|
def tokens_used_last_minute(self): |
|
|
now = time.time() |
|
|
self._update_queue(now) |
|
|
return sum(tokens for _, tokens in self.token_window) |
|
|
|
|
|
def maybe_wait(self, tokens_expected_to_use): |
|
|
ctr = 0 |
|
|
while self.tokens_used_last_minute() + tokens_expected_to_use > self.max_tpm: |
|
|
if ctr % 10 == 0: |
|
|
print("Sleeping 0.5s to respect token limits...") |
|
|
time.sleep(0.5) |
|
|
self._update_queue(time.time()) |
|
|
ctr += 1 |
|
|
|
|
|
def add_tokens(self, tokens): |
|
|
now = time.time() |
|
|
self.token_window.append((now, tokens)) |
|
|
self._update_queue(now) |
|
|
|