|
|
from tenacity import ( |
|
|
retry, |
|
|
stop_after_attempt, |
|
|
wait_random_exponential, |
|
|
) |
|
|
from typing import List |
|
|
|
|
|
from .openai_model import OpenAILLM |
|
|
from .model_configs import SiliconFlowConfig |
|
|
from ..core.registry import register_model |
|
|
from openai import OpenAI, Stream |
|
|
|
|
|
from .model_utils import Cost, cost_manager |
|
|
from openai.types.chat import ChatCompletion |
|
|
from .siliconflow_model_cost import model_cost |
|
|
|
|
|
@register_model(config_cls=SiliconFlowConfig, alias=["siliconflow"]) |
|
|
class SiliconFlowLLM(OpenAILLM): |
|
|
|
|
|
def init_model(self): |
|
|
config: SiliconFlowConfig = self.config |
|
|
self._client = self._init_client(config) |
|
|
self._default_ignore_fields = ["llm_type", "siliconflow_key", "output_response"] |
|
|
|
|
|
def _init_client(self, config: SiliconFlowConfig): |
|
|
client = OpenAI(api_key=config.siliconflow_key, base_url="https://api.siliconflow.cn/v1") |
|
|
return client |
|
|
|
|
|
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(5)) |
|
|
def single_generate(self, messages: List[dict], **kwargs) -> str: |
|
|
|
|
|
stream = kwargs["stream"] if "stream" in kwargs else self.config.stream |
|
|
output_response = kwargs["output_response"] if "output_response" in kwargs else self.config.output_response |
|
|
|
|
|
try: |
|
|
completion_params = self.get_completion_params(**kwargs) |
|
|
response = self._client.chat.completions.create( |
|
|
messages=messages, |
|
|
**completion_params |
|
|
) |
|
|
if stream: |
|
|
output = self.get_stream_output(response, output_response=output_response) |
|
|
cost = self._completion_cost(self.response) |
|
|
else: |
|
|
output: str = response.choices[0].message.content |
|
|
cost = self._completion_cost(response) |
|
|
if output_response: |
|
|
print(output) |
|
|
self._update_cost(cost=cost) |
|
|
except Exception as e: |
|
|
if "account balance is insufficient" in str(e): |
|
|
print("Warning: Account balance insufficient. Please recharge your account.") |
|
|
return "" |
|
|
raise RuntimeError(f"Error during single_generate of OpenAILLM: {str(e)}") |
|
|
|
|
|
return output |
|
|
|
|
|
|
|
|
def _completion_cost(self, response: ChatCompletion) -> Cost: |
|
|
input_tokens = response.usage.prompt_tokens |
|
|
output_tokens = response.usage.completion_tokens |
|
|
return self._compute_cost(input_tokens=input_tokens, output_tokens=output_tokens) |
|
|
|
|
|
|
|
|
def _compute_cost(self, input_tokens: int, output_tokens: int) -> Cost: |
|
|
model: str = self.config.model |
|
|
|
|
|
if model not in model_cost: |
|
|
return Cost(input_tokens=input_tokens, output_tokens=output_tokens, input_cost=0.0, output_cost=0.0) |
|
|
|
|
|
if "token_cost" in model_cost[model]: |
|
|
|
|
|
input_cost = input_tokens * model_cost[model]["token_cost"] / 1e6 |
|
|
output_cost = output_tokens * model_cost[model]["token_cost"] / 1e6 |
|
|
else: |
|
|
|
|
|
input_cost = input_tokens * model_cost[model]["input_token_cost"] / 1e6 |
|
|
output_cost = output_tokens * model_cost[model]["output_token_cost"] / 1e6 |
|
|
|
|
|
return Cost(input_tokens=input_tokens, output_tokens=output_tokens, input_cost=input_cost, output_cost=output_cost) |
|
|
|
|
|
|
|
|
def get_cost(self) -> dict: |
|
|
cost_info = {} |
|
|
try: |
|
|
tokens = self.response.usage |
|
|
if tokens.prompt_tokens == -1: |
|
|
cost_info["note"] = "Token counts not available in stream mode" |
|
|
cost_info["prompt_tokens"] = 0 |
|
|
cost_info["completion_tokens"] = 0 |
|
|
cost_info["total_tokens"] = 0 |
|
|
else: |
|
|
cost_info["prompt_tokens"] = tokens.prompt_tokens |
|
|
cost_info["completion_tokens"] = tokens.completion_tokens |
|
|
cost_info["total_tokens"] = tokens.total_tokens |
|
|
except Exception as e: |
|
|
print(f"Error during get_cost of SiliconFlow: {str(e)}") |
|
|
cost_info["error"] = str(e) |
|
|
|
|
|
return cost_info |
|
|
|
|
|
def get_stream_output(self, response: Stream, output_response: bool=True) -> str: |
|
|
output = "" |
|
|
last_chunk = None |
|
|
for chunk in response: |
|
|
content = chunk.choices[0].delta.content |
|
|
if content: |
|
|
if output_response: |
|
|
print(content, end="", flush=True) |
|
|
output += content |
|
|
last_chunk = chunk |
|
|
|
|
|
if output_response: |
|
|
print("") |
|
|
|
|
|
|
|
|
if hasattr(last_chunk, 'usage'): |
|
|
self.response = last_chunk |
|
|
else: |
|
|
|
|
|
self.response = type('StreamResponse', (), { |
|
|
'usage': type('StreamUsage', (), { |
|
|
'prompt_tokens': -1, |
|
|
'completion_tokens': -1, |
|
|
'total_tokens': -1 |
|
|
}) |
|
|
}) |
|
|
|
|
|
return output |
|
|
|
|
|
def _update_cost(self, cost: Cost): |
|
|
cost_manager.update_cost(cost=cost, model=self.config.model) |