|
|
import asyncio |
|
|
from typing import Optional, List, Any |
|
|
from tenacity import ( |
|
|
retry, |
|
|
stop_after_attempt, |
|
|
wait_random_exponential, |
|
|
) |
|
|
from dashscope import Generation |
|
|
import dashscope |
|
|
|
|
|
from ..core.registry import register_model |
|
|
from .model_configs import AliyunLLMConfig |
|
|
from .base_model import BaseLLM |
|
|
from .model_utils import Cost, cost_manager |
|
|
from ..core.logging import logger |
|
|
import os |
|
|
|
|
|
@register_model(config_cls=AliyunLLMConfig, alias=["aliyun_llm"]) |
|
|
class AliyunLLM(BaseLLM): |
|
|
def init_model(self): |
|
|
""" |
|
|
Initialize the DashScope Generation client. |
|
|
""" |
|
|
config: AliyunLLMConfig = self.config |
|
|
if not config.aliyun_api_key: |
|
|
raise ValueError("Aliyun API key is required. You should set `aliyun_api_key` in AliyunLLMConfig") |
|
|
|
|
|
|
|
|
os.environ["DASHSCOPE_API_KEY"] = config.aliyun_api_key |
|
|
dashscope.api_key = config.aliyun_api_key |
|
|
|
|
|
|
|
|
self._client = Generation() |
|
|
self._default_ignore_fields = [ |
|
|
"llm_type", "output_response", "aliyun_api_key", "aliyun_access_key_id", |
|
|
"aliyun_access_key_secret", "model_name" |
|
|
] |
|
|
|
|
|
def formulate_messages(self, prompts: List[str], system_messages: Optional[List[str]] = None) -> List[List[dict]]: |
|
|
""" |
|
|
Format messages for the Aliyun model. |
|
|
|
|
|
Args: |
|
|
prompts (List[str]): List of user prompts. |
|
|
system_messages (Optional[List[str]]): Optional list of system messages. |
|
|
|
|
|
Returns: |
|
|
List[List[dict]]: Formatted messages for the model. |
|
|
""" |
|
|
if system_messages: |
|
|
assert len(prompts) == len(system_messages), f"the number of prompts ({len(prompts)}) is different from the number of system_messages ({len(system_messages)})" |
|
|
else: |
|
|
system_messages = [None] * len(prompts) |
|
|
|
|
|
messages_list = [] |
|
|
for prompt, system_message in zip(prompts, system_messages): |
|
|
messages = [] |
|
|
if system_message: |
|
|
messages.append({"role": "system", "content": system_message}) |
|
|
messages.append({"role": "user", "content": prompt}) |
|
|
messages_list.append(messages) |
|
|
return messages_list |
|
|
|
|
|
def update_completion_params(self, params1: dict, params2: dict) -> dict: |
|
|
""" |
|
|
Update completion parameters with new values. |
|
|
|
|
|
Args: |
|
|
params1 (dict): Base parameters. |
|
|
params2 (dict): New parameters to update with. |
|
|
|
|
|
Returns: |
|
|
dict: Updated parameters. |
|
|
""" |
|
|
config_params: list = self.config.get_config_params() |
|
|
for key, value in params2.items(): |
|
|
if key in self._default_ignore_fields: |
|
|
continue |
|
|
if key not in config_params: |
|
|
continue |
|
|
params1[key] = value |
|
|
return params1 |
|
|
|
|
|
def get_completion_params(self, **kwargs): |
|
|
""" |
|
|
Get completion parameters for the model. |
|
|
|
|
|
Returns: |
|
|
dict: Parameters for model completion. |
|
|
""" |
|
|
completion_params = self.config.get_set_params(ignore=self._default_ignore_fields) |
|
|
completion_params = self.update_completion_params(completion_params, kwargs) |
|
|
completion_params["model"] = self.config.model |
|
|
return completion_params |
|
|
|
|
|
def get_stream_output(self, response: Any, output_response: bool = True) -> str: |
|
|
""" |
|
|
Process streaming response from the model. |
|
|
|
|
|
Args: |
|
|
response: The streaming response from the model. |
|
|
output_response (bool): Whether to print the response. |
|
|
|
|
|
Returns: |
|
|
str: The complete response text. |
|
|
""" |
|
|
output = "" |
|
|
try: |
|
|
for chunk in response: |
|
|
if not hasattr(chunk, 'output') or chunk.output is None: |
|
|
error_msg = getattr(chunk, 'message', 'Invalid chunk format from model') |
|
|
raise ValueError(f"Model stream chunk error: {error_msg}") |
|
|
if hasattr(chunk.output, 'text'): |
|
|
content = chunk.output.text |
|
|
elif hasattr(chunk.output, 'choices') and chunk.output.choices: |
|
|
content = chunk.output.choices[0].message.content |
|
|
else: |
|
|
continue |
|
|
if content: |
|
|
if output_response: |
|
|
print(content, end="", flush=True) |
|
|
output += content |
|
|
except Exception as e: |
|
|
print(f"Error processing stream: {str(e)}") |
|
|
if not output: |
|
|
raise RuntimeError(f"Failed to process stream response: {str(e)}") |
|
|
if output_response: |
|
|
print("") |
|
|
return output |
|
|
|
|
|
async def get_stream_output_async(self, response: Any, output_response: bool = False) -> str: |
|
|
""" |
|
|
Process streaming response asynchronously. |
|
|
|
|
|
Args: |
|
|
response: The streaming response from the model. |
|
|
output_response (bool): Whether to print the response. |
|
|
|
|
|
Returns: |
|
|
str: The complete response text. |
|
|
""" |
|
|
output = "" |
|
|
try: |
|
|
async for chunk in response: |
|
|
if not hasattr(chunk, 'output') or chunk.output is None: |
|
|
error_msg = getattr(chunk, 'message', 'Invalid chunk format from model') |
|
|
raise ValueError(f"Model stream chunk error: {error_msg}") |
|
|
if hasattr(chunk.output, 'text'): |
|
|
content = chunk.output.text |
|
|
elif hasattr(chunk.output, 'choices') and chunk.output.choices: |
|
|
content = chunk.output.choices[0].message.content |
|
|
else: |
|
|
continue |
|
|
if content: |
|
|
if output_response: |
|
|
print(content, end="", flush=True) |
|
|
output += content |
|
|
except Exception as e: |
|
|
print(f"Error processing async stream: {str(e)}") |
|
|
if not output: |
|
|
raise RuntimeError(f"Failed to process async stream response: {str(e)}") |
|
|
if output_response: |
|
|
print("") |
|
|
return output |
|
|
|
|
|
def get_completion_output(self, response: Any, output_response: bool = True) -> str: |
|
|
""" |
|
|
Process non-streaming response from the model. |
|
|
|
|
|
Args: |
|
|
response: The response from the model. |
|
|
output_response (bool): Whether to print the response. |
|
|
|
|
|
Returns: |
|
|
str: The complete response text. |
|
|
""" |
|
|
try: |
|
|
if not hasattr(response, 'output') or response.output is None: |
|
|
error_msg = getattr(response, 'message', 'Invalid response format from model') |
|
|
raise ValueError(f"Model response error: {error_msg}") |
|
|
|
|
|
if hasattr(response.output, 'text'): |
|
|
output = response.output.text |
|
|
elif hasattr(response.output, 'choices') and response.output.choices: |
|
|
output = response.output.choices[0].message.content |
|
|
else: |
|
|
raise ValueError("Unexpected response format") |
|
|
|
|
|
if output_response: |
|
|
print(output) |
|
|
return output |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Error processing completion response: {str(e)}") |
|
|
|
|
|
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(5)) |
|
|
def single_generate(self, messages: List[dict], **kwargs) -> str: |
|
|
""" |
|
|
Generate a single response from the model. |
|
|
|
|
|
Args: |
|
|
messages (List[dict]): The conversation history. |
|
|
**kwargs: Additional parameters for generation. |
|
|
|
|
|
Returns: |
|
|
str: The generated response. |
|
|
""" |
|
|
stream = kwargs.get("stream", self.config.stream) |
|
|
output_response = kwargs.get("output_response", self.config.output_response) |
|
|
|
|
|
try: |
|
|
completion_params = self.get_completion_params(**kwargs) |
|
|
response = self._client.call(messages=messages, **completion_params) |
|
|
|
|
|
if response is None: |
|
|
raise RuntimeError("Received empty response from model") |
|
|
|
|
|
if stream: |
|
|
output = self.get_stream_output(response, output_response=output_response) |
|
|
cost = self._stream_cost(response) |
|
|
else: |
|
|
output = self.get_completion_output(response=response, output_response=output_response) |
|
|
cost = self._completion_cost(response) |
|
|
self._update_cost(cost=cost) |
|
|
return output |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Error during single_generate of AliyunLLM: {str(e)}") |
|
|
|
|
|
def batch_generate(self, batch_messages: List[List[dict]], **kwargs) -> List[str]: |
|
|
""" |
|
|
Generate responses for a batch of messages. |
|
|
|
|
|
Args: |
|
|
batch_messages (List[List[dict]]): List of conversation histories. |
|
|
**kwargs: Additional parameters for generation. |
|
|
|
|
|
Returns: |
|
|
List[str]: List of generated responses. |
|
|
""" |
|
|
if not isinstance(batch_messages, list) or not batch_messages: |
|
|
raise ValueError("batch_messages must be a non-empty list of message lists") |
|
|
return [self.single_generate(messages=one_messages, **kwargs) for one_messages in batch_messages] |
|
|
|
|
|
async def single_generate_async(self, messages: List[dict], **kwargs) -> str: |
|
|
""" |
|
|
Asynchronously generate a single response. |
|
|
|
|
|
Args: |
|
|
messages (List[dict]): The conversation history. |
|
|
**kwargs: Additional parameters for the generation. |
|
|
|
|
|
Returns: |
|
|
str: The generated response. |
|
|
""" |
|
|
stream = kwargs.get("stream", self.config.stream) |
|
|
output_response = kwargs.get("output_response", self.config.output_response) |
|
|
|
|
|
try: |
|
|
completion_params = self.get_completion_params(**kwargs) |
|
|
loop = asyncio.get_event_loop() |
|
|
response = await loop.run_in_executor( |
|
|
None, |
|
|
lambda: self._client.call(messages=messages, **completion_params) |
|
|
) |
|
|
|
|
|
if stream: |
|
|
output = await self.get_stream_output_async(response, output_response=output_response) |
|
|
cost = self._stream_cost(response) |
|
|
else: |
|
|
output = self.get_completion_output(response=response, output_response=output_response) |
|
|
cost = self._completion_cost(response) |
|
|
|
|
|
self._update_cost(cost=cost) |
|
|
return output |
|
|
|
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Error during single_generate_async of AliyunLLM: {str(e)}") |
|
|
|
|
|
def _completion_cost(self, response: Any) -> Cost: |
|
|
"""cost""" |
|
|
try: |
|
|
if not response: |
|
|
return Cost(input_tokens=0, output_tokens=0, input_cost=0.0, output_cost=0.0) |
|
|
|
|
|
|
|
|
input_tokens = 0 |
|
|
output_tokens = 0 |
|
|
|
|
|
if hasattr(response, 'usage'): |
|
|
usage = response.usage |
|
|
if hasattr(usage, 'input_tokens'): |
|
|
input_tokens = usage.input_tokens |
|
|
elif hasattr(usage, 'prompt_tokens'): |
|
|
input_tokens = usage.prompt_tokens |
|
|
|
|
|
if hasattr(usage, 'output_tokens'): |
|
|
output_tokens = usage.output_tokens |
|
|
elif hasattr(usage, 'completion_tokens'): |
|
|
output_tokens = usage.completion_tokens |
|
|
|
|
|
|
|
|
if input_tokens == 0 and output_tokens == 0 and hasattr(response, 'output'): |
|
|
if hasattr(response.output, 'text'): |
|
|
output_tokens = len(response.output.text.split()) * 1.3 |
|
|
elif hasattr(response.output, 'choices') and response.output.choices: |
|
|
output_tokens = len(response.output.choices[0].message.content.split()) * 1.3 |
|
|
|
|
|
total_cost = self._estimate_cost(input_tokens, output_tokens) |
|
|
return Cost( |
|
|
input_tokens=input_tokens, |
|
|
output_tokens=output_tokens, |
|
|
input_cost=total_cost * 0.4, |
|
|
output_cost=total_cost * 0.6 |
|
|
) |
|
|
except Exception as e: |
|
|
logger.warning(f"Error computing completion cost: {str(e)}") |
|
|
return Cost(input_tokens=0, output_tokens=0, input_cost=0.0, output_cost=0.0) |
|
|
|
|
|
def _stream_cost(self, response: Any) -> Cost: |
|
|
"""cost""" |
|
|
try: |
|
|
if not response: |
|
|
return Cost(input_tokens=0, output_tokens=0, input_cost=0.0, output_cost=0.0) |
|
|
|
|
|
|
|
|
input_tokens = 0 |
|
|
output_tokens = 0 |
|
|
|
|
|
if hasattr(response, 'usage'): |
|
|
usage = response.usage |
|
|
if hasattr(usage, 'input_tokens'): |
|
|
input_tokens = usage.input_tokens |
|
|
elif hasattr(usage, 'prompt_tokens'): |
|
|
input_tokens = usage.prompt_tokens |
|
|
|
|
|
if hasattr(usage, 'output_tokens'): |
|
|
output_tokens = usage.output_tokens |
|
|
elif hasattr(usage, 'completion_tokens'): |
|
|
output_tokens = usage.completion_tokens |
|
|
|
|
|
|
|
|
if input_tokens == 0 and output_tokens == 0 and hasattr(response, 'output'): |
|
|
if hasattr(response.output, 'text'): |
|
|
output_tokens = len(response.output.text.split()) * 1.3 |
|
|
elif hasattr(response.output, 'choices') and response.output.choices: |
|
|
output_tokens = len(response.output.choices[0].message.content.split()) * 1.3 |
|
|
|
|
|
total_cost = self._estimate_cost(input_tokens, output_tokens) |
|
|
return Cost( |
|
|
input_tokens=input_tokens, |
|
|
output_tokens=output_tokens, |
|
|
input_cost=total_cost * 0.4, |
|
|
output_cost=total_cost * 0.6 |
|
|
) |
|
|
except Exception as e: |
|
|
logger.warning(f"Error computing stream cost: {str(e)}") |
|
|
return Cost(input_tokens=0, output_tokens=0, input_cost=0.0, output_cost=0.0) |
|
|
|
|
|
def _estimate_cost(self, input_tokens: int, output_tokens: int) -> float: |
|
|
"""cost |
|
|
|
|
|
""" |
|
|
model = self.config.model.lower() |
|
|
if "turbo" in model: |
|
|
input_cost = (input_tokens / 1000) * 0.0005 |
|
|
output_cost = (output_tokens / 1000) * 0.001 |
|
|
elif "max" in model: |
|
|
input_cost = (input_tokens / 1000) * 0.002 |
|
|
output_cost = (output_tokens / 1000) * 0.004 |
|
|
else: |
|
|
input_cost = (input_tokens / 1000) * 0.001 |
|
|
output_cost = (output_tokens / 1000) * 0.002 |
|
|
|
|
|
return input_cost + output_cost |
|
|
|
|
|
def _update_cost(self, cost: Cost): |
|
|
""" |
|
|
Update the cost manager with the new cost. |
|
|
|
|
|
Args: |
|
|
cost (Cost): The cost to update. |
|
|
""" |
|
|
try: |
|
|
cost_manager.update_cost(cost=cost, model=self.config.model) |
|
|
except Exception as e: |
|
|
logger.warning(f"Error updating cost: {str(e)}") |
|
|
|
|
|
|