Spaces:
Sleeping
Sleeping
File size: 14,155 Bytes
01d5a5d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 |
"""
Chat service for handling different types of chat interactions
"""
import logging
from typing import Optional, List, Dict, Any, Union, Iterator, Type
import uuid
from typing import Tuple
from datetime import datetime
from lpm_kernel.api.services.user_llm_config_service import UserLLMConfigService
from lpm_kernel.api.domains.kernel2.dto.chat_dto import ChatRequest
from lpm_kernel.api.services.local_llm_service import local_llm_service
from lpm_kernel.api.domains.kernel2.services.message_builder import MultiTurnMessageBuilder
from lpm_kernel.api.domains.kernel2.services.prompt_builder import (
SystemPromptStrategy,
BasePromptStrategy,
RoleBasedStrategy,
KnowledgeEnhancedStrategy,
)
logger = logging.getLogger(__name__)
class ChatService:
"""Chat service for handling different types of chat interactions"""
def __init__(self):
"""Initialize chat service"""
# Base strategy chain, must contain at least one base strategy
self.default_strategy_chain = [BasePromptStrategy, RoleBasedStrategy]
def _get_strategy_chain(
self,
request: ChatRequest,
strategy_chain: Optional[List[Type[SystemPromptStrategy]]] = None,
) -> List[Type[SystemPromptStrategy]]:
"""
Get the strategy chain to use for message building
Args:
request: Chat request containing message and other parameters
strategy_chain: Optional list of strategy classes to use
Returns:
List of strategy classes to use
Raises:
ValueError: If strategy_chain is empty or None and no default chain is available
"""
# If custom strategy chain is provided, validate and return
if strategy_chain is not None:
if not strategy_chain:
raise ValueError("Strategy chain cannot be empty")
if not any(issubclass(s, BasePromptStrategy) for s in strategy_chain):
raise ValueError("Strategy chain must contain at least one base strategy")
return strategy_chain
# Use default strategy chain
result_chain = self.default_strategy_chain.copy()
# Add knowledge enhancement strategy based on request parameters
if request.enable_l0_retrieval or request.enable_l1_retrieval:
result_chain.append(KnowledgeEnhancedStrategy)
return result_chain
def _build_messages(
self,
request: ChatRequest,
strategy_chain: Optional[List[Type[SystemPromptStrategy]]] = None,
) -> List[Dict[str, str]]:
"""
Build messages using the specified strategy chain
Args:
request: Chat request containing message and other parameters
strategy_chain: Optional list of strategy classes to use. If None, uses default chain
Returns:
List of message dictionaries
"""
# Get and validate strategy chain
final_strategy_chain = self._get_strategy_chain(request, strategy_chain)
# Build messages
message_builder = MultiTurnMessageBuilder(request, strategy_chain=final_strategy_chain)
messages = message_builder.build_messages()
# Log debug information
logger.info("Using strategy chain: %s", [s.__name__ for s in final_strategy_chain])
logger.info("Final messages for LLM:")
for msg in messages:
logger.info(f"Role: {msg['role']}, Content: {msg['content']}")
return messages
def _process_chat_response(self, chunk, full_response: Optional[Any], full_content: str) -> Tuple[Any, str, Optional[str]]:
"""
Process custom chat_response format data
Args:
chunk: Response data chunk
full_response: Current complete response object
full_content: Current accumulated content
Returns:
Tuple[Any, str, Optional[str]]: (Updated response object, Updated content, Finish reason)
"""
finish_reason = None
logger.info(f"Processing custom format response: {chunk}")
# Get content
content = ""
if isinstance(chunk, dict):
content = chunk.get("content", "")
is_done = chunk.get("done", False)
else:
content = chunk.content if hasattr(chunk, 'content') else ""
is_done = chunk.done if hasattr(chunk, 'done') else False
if content:
full_content += content
logger.info(f"Added content from custom format, current length: {len(full_content)}")
# Initialize response object (if needed)
if not full_response:
full_response = {
"id": str(uuid.uuid4()),
"object": "chat.completion.chunk",
"created": int(datetime.now().timestamp()),
"model": "models/lpm",
"system_fingerprint": None,
"choices": [
{
"index": 0,
"delta": {
"content": ""
},
"finish_reason": None
}
]
}
# Check if completed
if is_done:
finish_reason = 'stop'
logger.info("Got finish_reason from custom format: stop")
return full_response, full_content, finish_reason
def _process_openai_response(self, chunk, full_response: Optional[Any], full_content: str) -> Tuple[Any, str, Optional[str]]:
"""
Process OpenAI format response data
Args:
chunk: Response data chunk
full_response: Current complete response object
full_content: Current accumulated content
Returns:
Tuple[Any, str, Optional[str]]: (Updated response object, Updated content, Finish reason)
"""
finish_reason = None
if not hasattr(chunk, 'choices'):
logger.warning(f"Chunk has no choices attribute: {chunk}")
return full_response, full_content, finish_reason
choices = getattr(chunk, 'choices', None)
if not choices:
logger.warning("Chunk has empty choices")
return full_response, full_content, finish_reason
# Save basic information of the first response
if full_response is None:
full_response = {
"id": getattr(chunk, 'id', str(uuid.uuid4())),
"object": "chat.completion.chunk",
"created": int(datetime.now().timestamp()),
"model": "models/lpm",
"system_fingerprint": getattr(chunk, 'system_fingerprint', None),
"choices": [
{
"index": 0,
"delta": {
"content": ""
},
"finish_reason": None
}
]
}
# Collect content and finish reason
choice = choices[0]
if hasattr(choice, 'delta'):
delta = choice.delta
if hasattr(delta, 'content') and delta.content is not None:
full_content += delta.content
# logger.info(f"Added content from OpenAI format, current length: {len(full_content)}")
if choice.finish_reason:
finish_reason = choice.finish_reason
logger.info(f"Got finish_reason: {finish_reason}")
return full_response, full_content, finish_reason
def collect_stream_response(self, response_iterator: Iterator[Dict[str, Any]]):
"""
Collect streaming response into a complete response
Args:
response_iterator: Streaming response iterator
Returns:
Complete response dictionary
"""
logger.info("Starting to collect stream response")
full_response = None
full_content = ""
finish_reason = None
chunk_count = 0
try:
for chunk in response_iterator:
if chunk is None:
logger.warning("Received None chunk, skipping")
continue
chunk_count += 1
# logger.info(f"Processing chunk #{chunk_count}: {chunk}")
# Check if it's a custom format response
is_chat_response = (
(hasattr(chunk, 'type') and chunk.type == 'chat_response') or
(isinstance(chunk, dict) and chunk.get("type") == "chat_response")
)
if is_chat_response:
full_response, full_content, chunk_finish_reason = self._process_chat_response(
chunk, full_response, full_content
)
else:
full_response, full_content, chunk_finish_reason = self._process_openai_response(
chunk, full_response, full_content
)
if chunk_finish_reason:
finish_reason = chunk_finish_reason
# logger.info(f"Finished processing all chunks. Total chunks: {chunk_count}")
# logger.info(f"Final content length: {len(full_content)}")
# logger.info(f"Final finish_reason: {finish_reason}")
if not full_response:
logger.error("No valid response collected")
return None
if not full_content:
logger.error("No content collected")
return None
# Update response with complete content
full_response["choices"][0]["delta"]["content"] = full_content
if finish_reason:
full_response["choices"][0]["finish_reason"] = finish_reason
# logger.info(f"Final response object: {full_response}")
# logger.info(f"Final response content: {full_content}")
return full_response
except Exception as e:
logger.error(f"Error collecting stream response: {str(e)}", exc_info=True)
return None
def chat(
self,
request: ChatRequest,
strategy_chain: Optional[List[Type[SystemPromptStrategy]]] = None,
stream: bool = True,
json_response: bool = False,
client: Optional[Any] = None,
model_params: Optional[Dict[str, Any]] = None,
context: Optional[Any] = None,
) -> Union[Dict[str, Any], Iterator[Dict[str, Any]]]:
"""
Main chat method supporting both streaming and non-streaming responses
Args:
request: Chat request containing message and other parameters
strategy_chain: Optional list of strategy classes to use
stream: Whether to return a streaming response
json_response: Whether to request JSON formatted response from LLM
client: Optional OpenAI client to use. If None, uses local_llm_service.client
model_params: Optional model specific parameters to override defaults
context: Optional context to pass to strategies
Returns:
Either an iterator for streaming responses or a single response dictionary
"""
logger.info(f"Chat request: {request}")
# Build messages
message_builder = MultiTurnMessageBuilder(request, strategy_chain=strategy_chain)
messages = message_builder.build_messages(context)
# Log debug information
# logger.info("Using strategy chain: %s", [s.__name__ for s in strategy_chain] if strategy_chain else "default")
logger.info("Final messages for LLM:")
for msg in messages:
logger.info(f"Role: {msg['role']}, Content: {msg['content']}")
# Use provided client or default local_llm_service.client
current_client = client if client is not None else local_llm_service.client
self.user_llm_config_service = UserLLMConfigService()
self.user_llm_config = self.user_llm_config_service.get_available_llm()
# Prepare API call parameters
api_params = {
"messages": messages,
"temperature": request.temperature,
"response_format": {"type": "text"},
"seed": 42, # Optional: Fixed random seed to get consistent responses
"tools": None, # Optional: If function calling or similar features are needed
"tool_choice": None, # Optional: If function calling or similar features are needed
"max_tokens": request.max_tokens,
"stream": stream,
"model": request.model or "models/lpm",
"metadata": request.metadata
}
# Add JSON format requirement (if needed)
if json_response:
api_params["response_format"] = {"type": "json_object"}
# Update custom model parameters (if provided)
if model_params:
api_params.update(model_params)
logger.info(f"Current client base URL: {current_client.base_url}")
# logger.info(f"Using model parameters: {api_params}")
# Call LLM API
try:
response = current_client.chat.completions.create(**api_params)
if not stream:
logger.info(f"Response: {response.json() if hasattr(response, 'json') else response}")
return response
except Exception as e:
logger.error(f"Chat failed: {str(e)}", exc_info=True)
raise
# Global chat service instance
chat_service = ChatService()
|