""" Chat service for handling different types of chat interactions """ import logging from typing import Optional, List, Dict, Any, Union, Iterator, Type import uuid from typing import Tuple from datetime import datetime from lpm_kernel.api.services.user_llm_config_service import UserLLMConfigService from lpm_kernel.api.domains.kernel2.dto.chat_dto import ChatRequest from lpm_kernel.api.services.local_llm_service import local_llm_service from lpm_kernel.api.domains.kernel2.services.message_builder import MultiTurnMessageBuilder from lpm_kernel.api.domains.kernel2.services.prompt_builder import ( SystemPromptStrategy, BasePromptStrategy, RoleBasedStrategy, KnowledgeEnhancedStrategy, ) logger = logging.getLogger(__name__) class ChatService: """Chat service for handling different types of chat interactions""" def __init__(self): """Initialize chat service""" # Base strategy chain, must contain at least one base strategy self.default_strategy_chain = [BasePromptStrategy, RoleBasedStrategy] def _get_strategy_chain( self, request: ChatRequest, strategy_chain: Optional[List[Type[SystemPromptStrategy]]] = None, ) -> List[Type[SystemPromptStrategy]]: """ Get the strategy chain to use for message building Args: request: Chat request containing message and other parameters strategy_chain: Optional list of strategy classes to use Returns: List of strategy classes to use Raises: ValueError: If strategy_chain is empty or None and no default chain is available """ # If custom strategy chain is provided, validate and return if strategy_chain is not None: if not strategy_chain: raise ValueError("Strategy chain cannot be empty") if not any(issubclass(s, BasePromptStrategy) for s in strategy_chain): raise ValueError("Strategy chain must contain at least one base strategy") return strategy_chain # Use default strategy chain result_chain = self.default_strategy_chain.copy() # Add knowledge enhancement strategy based on request parameters if request.enable_l0_retrieval or request.enable_l1_retrieval: result_chain.append(KnowledgeEnhancedStrategy) return result_chain def _build_messages( self, request: ChatRequest, strategy_chain: Optional[List[Type[SystemPromptStrategy]]] = None, ) -> List[Dict[str, str]]: """ Build messages using the specified strategy chain Args: request: Chat request containing message and other parameters strategy_chain: Optional list of strategy classes to use. If None, uses default chain Returns: List of message dictionaries """ # Get and validate strategy chain final_strategy_chain = self._get_strategy_chain(request, strategy_chain) # Build messages message_builder = MultiTurnMessageBuilder(request, strategy_chain=final_strategy_chain) messages = message_builder.build_messages() # Log debug information logger.info("Using strategy chain: %s", [s.__name__ for s in final_strategy_chain]) logger.info("Final messages for LLM:") for msg in messages: logger.info(f"Role: {msg['role']}, Content: {msg['content']}") return messages def _process_chat_response(self, chunk, full_response: Optional[Any], full_content: str) -> Tuple[Any, str, Optional[str]]: """ Process custom chat_response format data Args: chunk: Response data chunk full_response: Current complete response object full_content: Current accumulated content Returns: Tuple[Any, str, Optional[str]]: (Updated response object, Updated content, Finish reason) """ finish_reason = None logger.info(f"Processing custom format response: {chunk}") # Get content content = "" if isinstance(chunk, dict): content = chunk.get("content", "") is_done = chunk.get("done", False) else: content = chunk.content if hasattr(chunk, 'content') else "" is_done = chunk.done if hasattr(chunk, 'done') else False if content: full_content += content logger.info(f"Added content from custom format, current length: {len(full_content)}") # Initialize response object (if needed) if not full_response: full_response = { "id": str(uuid.uuid4()), "object": "chat.completion.chunk", "created": int(datetime.now().timestamp()), "model": "models/lpm", "system_fingerprint": None, "choices": [ { "index": 0, "delta": { "content": "" }, "finish_reason": None } ] } # Check if completed if is_done: finish_reason = 'stop' logger.info("Got finish_reason from custom format: stop") return full_response, full_content, finish_reason def _process_openai_response(self, chunk, full_response: Optional[Any], full_content: str) -> Tuple[Any, str, Optional[str]]: """ Process OpenAI format response data Args: chunk: Response data chunk full_response: Current complete response object full_content: Current accumulated content Returns: Tuple[Any, str, Optional[str]]: (Updated response object, Updated content, Finish reason) """ finish_reason = None if not hasattr(chunk, 'choices'): logger.warning(f"Chunk has no choices attribute: {chunk}") return full_response, full_content, finish_reason choices = getattr(chunk, 'choices', None) if not choices: logger.warning("Chunk has empty choices") return full_response, full_content, finish_reason # Save basic information of the first response if full_response is None: full_response = { "id": getattr(chunk, 'id', str(uuid.uuid4())), "object": "chat.completion.chunk", "created": int(datetime.now().timestamp()), "model": "models/lpm", "system_fingerprint": getattr(chunk, 'system_fingerprint', None), "choices": [ { "index": 0, "delta": { "content": "" }, "finish_reason": None } ] } # Collect content and finish reason choice = choices[0] if hasattr(choice, 'delta'): delta = choice.delta if hasattr(delta, 'content') and delta.content is not None: full_content += delta.content # logger.info(f"Added content from OpenAI format, current length: {len(full_content)}") if choice.finish_reason: finish_reason = choice.finish_reason logger.info(f"Got finish_reason: {finish_reason}") return full_response, full_content, finish_reason def collect_stream_response(self, response_iterator: Iterator[Dict[str, Any]]): """ Collect streaming response into a complete response Args: response_iterator: Streaming response iterator Returns: Complete response dictionary """ logger.info("Starting to collect stream response") full_response = None full_content = "" finish_reason = None chunk_count = 0 try: for chunk in response_iterator: if chunk is None: logger.warning("Received None chunk, skipping") continue chunk_count += 1 # logger.info(f"Processing chunk #{chunk_count}: {chunk}") # Check if it's a custom format response is_chat_response = ( (hasattr(chunk, 'type') and chunk.type == 'chat_response') or (isinstance(chunk, dict) and chunk.get("type") == "chat_response") ) if is_chat_response: full_response, full_content, chunk_finish_reason = self._process_chat_response( chunk, full_response, full_content ) else: full_response, full_content, chunk_finish_reason = self._process_openai_response( chunk, full_response, full_content ) if chunk_finish_reason: finish_reason = chunk_finish_reason # logger.info(f"Finished processing all chunks. Total chunks: {chunk_count}") # logger.info(f"Final content length: {len(full_content)}") # logger.info(f"Final finish_reason: {finish_reason}") if not full_response: logger.error("No valid response collected") return None if not full_content: logger.error("No content collected") return None # Update response with complete content full_response["choices"][0]["delta"]["content"] = full_content if finish_reason: full_response["choices"][0]["finish_reason"] = finish_reason # logger.info(f"Final response object: {full_response}") # logger.info(f"Final response content: {full_content}") return full_response except Exception as e: logger.error(f"Error collecting stream response: {str(e)}", exc_info=True) return None def chat( self, request: ChatRequest, strategy_chain: Optional[List[Type[SystemPromptStrategy]]] = None, stream: bool = True, json_response: bool = False, client: Optional[Any] = None, model_params: Optional[Dict[str, Any]] = None, context: Optional[Any] = None, ) -> Union[Dict[str, Any], Iterator[Dict[str, Any]]]: """ Main chat method supporting both streaming and non-streaming responses Args: request: Chat request containing message and other parameters strategy_chain: Optional list of strategy classes to use stream: Whether to return a streaming response json_response: Whether to request JSON formatted response from LLM client: Optional OpenAI client to use. If None, uses local_llm_service.client model_params: Optional model specific parameters to override defaults context: Optional context to pass to strategies Returns: Either an iterator for streaming responses or a single response dictionary """ logger.info(f"Chat request: {request}") # Build messages message_builder = MultiTurnMessageBuilder(request, strategy_chain=strategy_chain) messages = message_builder.build_messages(context) # Log debug information # logger.info("Using strategy chain: %s", [s.__name__ for s in strategy_chain] if strategy_chain else "default") logger.info("Final messages for LLM:") for msg in messages: logger.info(f"Role: {msg['role']}, Content: {msg['content']}") # Use provided client or default local_llm_service.client current_client = client if client is not None else local_llm_service.client self.user_llm_config_service = UserLLMConfigService() self.user_llm_config = self.user_llm_config_service.get_available_llm() # Prepare API call parameters api_params = { "messages": messages, "temperature": request.temperature, "response_format": {"type": "text"}, "seed": 42, # Optional: Fixed random seed to get consistent responses "tools": None, # Optional: If function calling or similar features are needed "tool_choice": None, # Optional: If function calling or similar features are needed "max_tokens": request.max_tokens, "stream": stream, "model": request.model or "models/lpm", "metadata": request.metadata } # Add JSON format requirement (if needed) if json_response: api_params["response_format"] = {"type": "json_object"} # Update custom model parameters (if provided) if model_params: api_params.update(model_params) logger.info(f"Current client base URL: {current_client.base_url}") # logger.info(f"Using model parameters: {api_params}") # Call LLM API try: response = current_client.chat.completions.create(**api_params) if not stream: logger.info(f"Response: {response.json() if hasattr(response, 'json') else response}") return response except Exception as e: logger.error(f"Chat failed: {str(e)}", exc_info=True) raise # Global chat service instance chat_service = ChatService()