iLOVE2D's picture
Upload 2846 files
5374a2d verified
import asyncio
from typing import Optional, List, Any
from tenacity import (
retry,
stop_after_attempt,
wait_random_exponential,
)
from dashscope import Generation # aliyun DashScope SDK
import dashscope
from ..core.registry import register_model
from .model_configs import AliyunLLMConfig
from .base_model import BaseLLM
from .model_utils import Cost, cost_manager
from ..core.logging import logger
import os
@register_model(config_cls=AliyunLLMConfig, alias=["aliyun_llm"])
class AliyunLLM(BaseLLM):
def init_model(self):
"""
Initialize the DashScope Generation client.
"""
config: AliyunLLMConfig = self.config
if not config.aliyun_api_key:
raise ValueError("Aliyun API key is required. You should set `aliyun_api_key` in AliyunLLMConfig")
# API key
os.environ["DASHSCOPE_API_KEY"] = config.aliyun_api_key
dashscope.api_key = config.aliyun_api_key
# model
self._client = Generation()
self._default_ignore_fields = [
"llm_type", "output_response", "aliyun_api_key", "aliyun_access_key_id",
"aliyun_access_key_secret", "model_name"
]
def formulate_messages(self, prompts: List[str], system_messages: Optional[List[str]] = None) -> List[List[dict]]:
"""
Format messages for the Aliyun model.
Args:
prompts (List[str]): List of user prompts.
system_messages (Optional[List[str]]): Optional list of system messages.
Returns:
List[List[dict]]: Formatted messages for the model.
"""
if system_messages:
assert len(prompts) == len(system_messages), f"the number of prompts ({len(prompts)}) is different from the number of system_messages ({len(system_messages)})"
else:
system_messages = [None] * len(prompts)
messages_list = []
for prompt, system_message in zip(prompts, system_messages):
messages = []
if system_message:
messages.append({"role": "system", "content": system_message})
messages.append({"role": "user", "content": prompt})
messages_list.append(messages)
return messages_list
def update_completion_params(self, params1: dict, params2: dict) -> dict:
"""
Update completion parameters with new values.
Args:
params1 (dict): Base parameters.
params2 (dict): New parameters to update with.
Returns:
dict: Updated parameters.
"""
config_params: list = self.config.get_config_params()
for key, value in params2.items():
if key in self._default_ignore_fields:
continue
if key not in config_params:
continue
params1[key] = value
return params1
def get_completion_params(self, **kwargs):
"""
Get completion parameters for the model.
Returns:
dict: Parameters for model completion.
"""
completion_params = self.config.get_set_params(ignore=self._default_ignore_fields)
completion_params = self.update_completion_params(completion_params, kwargs)
completion_params["model"] = self.config.model
return completion_params
def get_stream_output(self, response: Any, output_response: bool = True) -> str:
"""
Process streaming response from the model.
Args:
response: The streaming response from the model.
output_response (bool): Whether to print the response.
Returns:
str: The complete response text.
"""
output = ""
try:
for chunk in response:
if not hasattr(chunk, 'output') or chunk.output is None:
error_msg = getattr(chunk, 'message', 'Invalid chunk format from model')
raise ValueError(f"Model stream chunk error: {error_msg}")
if hasattr(chunk.output, 'text'):
content = chunk.output.text
elif hasattr(chunk.output, 'choices') and chunk.output.choices:
content = chunk.output.choices[0].message.content
else:
continue
if content:
if output_response:
print(content, end="", flush=True)
output += content
except Exception as e:
print(f"Error processing stream: {str(e)}")
if not output:
raise RuntimeError(f"Failed to process stream response: {str(e)}")
if output_response:
print("")
return output
async def get_stream_output_async(self, response: Any, output_response: bool = False) -> str:
"""
Process streaming response asynchronously.
Args:
response: The streaming response from the model.
output_response (bool): Whether to print the response.
Returns:
str: The complete response text.
"""
output = ""
try:
async for chunk in response:
if not hasattr(chunk, 'output') or chunk.output is None:
error_msg = getattr(chunk, 'message', 'Invalid chunk format from model')
raise ValueError(f"Model stream chunk error: {error_msg}")
if hasattr(chunk.output, 'text'):
content = chunk.output.text
elif hasattr(chunk.output, 'choices') and chunk.output.choices:
content = chunk.output.choices[0].message.content
else:
continue
if content:
if output_response:
print(content, end="", flush=True)
output += content
except Exception as e:
print(f"Error processing async stream: {str(e)}")
if not output:
raise RuntimeError(f"Failed to process async stream response: {str(e)}")
if output_response:
print("")
return output
def get_completion_output(self, response: Any, output_response: bool = True) -> str:
"""
Process non-streaming response from the model.
Args:
response: The response from the model.
output_response (bool): Whether to print the response.
Returns:
str: The complete response text.
"""
try:
if not hasattr(response, 'output') or response.output is None:
error_msg = getattr(response, 'message', 'Invalid response format from model')
raise ValueError(f"Model response error: {error_msg}")
if hasattr(response.output, 'text'):
output = response.output.text
elif hasattr(response.output, 'choices') and response.output.choices:
output = response.output.choices[0].message.content
else:
raise ValueError("Unexpected response format")
if output_response:
print(output)
return output
except Exception as e:
raise RuntimeError(f"Error processing completion response: {str(e)}")
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(5))
def single_generate(self, messages: List[dict], **kwargs) -> str:
"""
Generate a single response from the model.
Args:
messages (List[dict]): The conversation history.
**kwargs: Additional parameters for generation.
Returns:
str: The generated response.
"""
stream = kwargs.get("stream", self.config.stream)
output_response = kwargs.get("output_response", self.config.output_response)
try:
completion_params = self.get_completion_params(**kwargs)
response = self._client.call(messages=messages, **completion_params)
if response is None:
raise RuntimeError("Received empty response from model")
if stream:
output = self.get_stream_output(response, output_response=output_response)
cost = self._stream_cost(response)
else:
output = self.get_completion_output(response=response, output_response=output_response)
cost = self._completion_cost(response)
self._update_cost(cost=cost)
return output
except Exception as e:
raise RuntimeError(f"Error during single_generate of AliyunLLM: {str(e)}")
def batch_generate(self, batch_messages: List[List[dict]], **kwargs) -> List[str]:
"""
Generate responses for a batch of messages.
Args:
batch_messages (List[List[dict]]): List of conversation histories.
**kwargs: Additional parameters for generation.
Returns:
List[str]: List of generated responses.
"""
if not isinstance(batch_messages, list) or not batch_messages:
raise ValueError("batch_messages must be a non-empty list of message lists")
return [self.single_generate(messages=one_messages, **kwargs) for one_messages in batch_messages]
async def single_generate_async(self, messages: List[dict], **kwargs) -> str:
"""
Asynchronously generate a single response.
Args:
messages (List[dict]): The conversation history.
**kwargs: Additional parameters for the generation.
Returns:
str: The generated response.
"""
stream = kwargs.get("stream", self.config.stream)
output_response = kwargs.get("output_response", self.config.output_response)
try:
completion_params = self.get_completion_params(**kwargs)
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None,
lambda: self._client.call(messages=messages, **completion_params)
)
if stream:
output = await self.get_stream_output_async(response, output_response=output_response)
cost = self._stream_cost(response)
else:
output = self.get_completion_output(response=response, output_response=output_response)
cost = self._completion_cost(response)
self._update_cost(cost=cost)
return output
except Exception as e:
raise RuntimeError(f"Error during single_generate_async of AliyunLLM: {str(e)}")
def _completion_cost(self, response: Any) -> Cost:
"""cost"""
try:
if not response:
return Cost(input_tokens=0, output_tokens=0, input_cost=0.0, output_cost=0.0)
# tokens number
input_tokens = 0
output_tokens = 0
if hasattr(response, 'usage'):
usage = response.usage
if hasattr(usage, 'input_tokens'):
input_tokens = usage.input_tokens
elif hasattr(usage, 'prompt_tokens'):
input_tokens = usage.prompt_tokens
if hasattr(usage, 'output_tokens'):
output_tokens = usage.output_tokens
elif hasattr(usage, 'completion_tokens'):
output_tokens = usage.completion_tokens
#
if input_tokens == 0 and output_tokens == 0 and hasattr(response, 'output'):
if hasattr(response.output, 'text'):
output_tokens = len(response.output.text.split()) * 1.3
elif hasattr(response.output, 'choices') and response.output.choices:
output_tokens = len(response.output.choices[0].message.content.split()) * 1.3
total_cost = self._estimate_cost(input_tokens, output_tokens)
return Cost(
input_tokens=input_tokens,
output_tokens=output_tokens,
input_cost=total_cost * 0.4, #
output_cost=total_cost * 0.6 #
)
except Exception as e:
logger.warning(f"Error computing completion cost: {str(e)}")
return Cost(input_tokens=0, output_tokens=0, input_cost=0.0, output_cost=0.0)
def _stream_cost(self, response: Any) -> Cost:
"""cost"""
try:
if not response:
return Cost(input_tokens=0, output_tokens=0, input_cost=0.0, output_cost=0.0)
#
input_tokens = 0
output_tokens = 0
if hasattr(response, 'usage'):
usage = response.usage
if hasattr(usage, 'input_tokens'):
input_tokens = usage.input_tokens
elif hasattr(usage, 'prompt_tokens'):
input_tokens = usage.prompt_tokens
if hasattr(usage, 'output_tokens'):
output_tokens = usage.output_tokens
elif hasattr(usage, 'completion_tokens'):
output_tokens = usage.completion_tokens
#
if input_tokens == 0 and output_tokens == 0 and hasattr(response, 'output'):
if hasattr(response.output, 'text'):
output_tokens = len(response.output.text.split()) * 1.3 #
elif hasattr(response.output, 'choices') and response.output.choices:
output_tokens = len(response.output.choices[0].message.content.split()) * 1.3
total_cost = self._estimate_cost(input_tokens, output_tokens)
return Cost(
input_tokens=input_tokens,
output_tokens=output_tokens,
input_cost=total_cost * 0.4, #
output_cost=total_cost * 0.6 #
)
except Exception as e:
logger.warning(f"Error computing stream cost: {str(e)}")
return Cost(input_tokens=0, output_tokens=0, input_cost=0.0, output_cost=0.0)
def _estimate_cost(self, input_tokens: int, output_tokens: int) -> float:
"""cost
"""
model = self.config.model.lower()
if "turbo" in model:
input_cost = (input_tokens / 1000) * 0.0005
output_cost = (output_tokens / 1000) * 0.001
elif "max" in model:
input_cost = (input_tokens / 1000) * 0.002
output_cost = (output_tokens / 1000) * 0.004
else: # default
input_cost = (input_tokens / 1000) * 0.001
output_cost = (output_tokens / 1000) * 0.002
return input_cost + output_cost
def _update_cost(self, cost: Cost):
"""
Update the cost manager with the new cost.
Args:
cost (Cost): The cost to update.
"""
try:
cost_manager.update_cost(cost=cost, model=self.config.model)
except Exception as e:
logger.warning(f"Error updating cost: {str(e)}")