File size: 14,155 Bytes
01d5a5d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
"""
Chat service for handling different types of chat interactions
"""
import logging
from typing import Optional, List, Dict, Any, Union, Iterator, Type
import uuid
from typing import Tuple
from datetime import datetime

from lpm_kernel.api.services.user_llm_config_service import UserLLMConfigService
from lpm_kernel.api.domains.kernel2.dto.chat_dto import ChatRequest
from lpm_kernel.api.services.local_llm_service import local_llm_service
from lpm_kernel.api.domains.kernel2.services.message_builder import MultiTurnMessageBuilder
from lpm_kernel.api.domains.kernel2.services.prompt_builder import (
    SystemPromptStrategy,
    BasePromptStrategy,
    RoleBasedStrategy,
    KnowledgeEnhancedStrategy,
)

logger = logging.getLogger(__name__)


class ChatService:
    """Chat service for handling different types of chat interactions"""
    
    def __init__(self):
        """Initialize chat service"""
        # Base strategy chain, must contain at least one base strategy
        self.default_strategy_chain = [BasePromptStrategy, RoleBasedStrategy]
    
    def _get_strategy_chain(
        self,
        request: ChatRequest,
        strategy_chain: Optional[List[Type[SystemPromptStrategy]]] = None,
    ) -> List[Type[SystemPromptStrategy]]:
        """
        Get the strategy chain to use for message building
        
        Args:
            request: Chat request containing message and other parameters
            strategy_chain: Optional list of strategy classes to use
            
        Returns:
            List of strategy classes to use
            
        Raises:
            ValueError: If strategy_chain is empty or None and no default chain is available
        """
        # If custom strategy chain is provided, validate and return
        if strategy_chain is not None:
            if not strategy_chain:
                raise ValueError("Strategy chain cannot be empty")
            if not any(issubclass(s, BasePromptStrategy) for s in strategy_chain):
                raise ValueError("Strategy chain must contain at least one base strategy")
            return strategy_chain
            
        # Use default strategy chain
        result_chain = self.default_strategy_chain.copy()
        
        # Add knowledge enhancement strategy based on request parameters
        if request.enable_l0_retrieval or request.enable_l1_retrieval:
            result_chain.append(KnowledgeEnhancedStrategy)
            
        return result_chain
    
    def _build_messages(
        self,
        request: ChatRequest,
        strategy_chain: Optional[List[Type[SystemPromptStrategy]]] = None,
    ) -> List[Dict[str, str]]:
        """
        Build messages using the specified strategy chain
        
        Args:
            request: Chat request containing message and other parameters
            strategy_chain: Optional list of strategy classes to use. If None, uses default chain
            
        Returns:
            List of message dictionaries
        """
        # Get and validate strategy chain
        final_strategy_chain = self._get_strategy_chain(request, strategy_chain)
        
        # Build messages
        message_builder = MultiTurnMessageBuilder(request, strategy_chain=final_strategy_chain)
        messages = message_builder.build_messages()
        
        # Log debug information
        logger.info("Using strategy chain: %s", [s.__name__ for s in final_strategy_chain])
        logger.info("Final messages for LLM:")
        for msg in messages:
            logger.info(f"Role: {msg['role']}, Content: {msg['content']}")
            
        return messages
    
    def _process_chat_response(self, chunk, full_response: Optional[Any], full_content: str) -> Tuple[Any, str, Optional[str]]:
        """
        Process custom chat_response format data

        Args:
            chunk: Response data chunk
            full_response: Current complete response object
            full_content: Current accumulated content

        Returns:
            Tuple[Any, str, Optional[str]]: (Updated response object, Updated content, Finish reason)
        """
        finish_reason = None
        logger.info(f"Processing custom format response: {chunk}")
        
        # Get content
        content = ""
        if isinstance(chunk, dict):
            content = chunk.get("content", "")
            is_done = chunk.get("done", False)
        else:
            content = chunk.content if hasattr(chunk, 'content') else ""
            is_done = chunk.done if hasattr(chunk, 'done') else False
            
        if content:
            full_content += content
            logger.info(f"Added content from custom format, current length: {len(full_content)}")
            
        # Initialize response object (if needed)
        if not full_response:
            full_response = {
                "id": str(uuid.uuid4()),
                "object": "chat.completion.chunk",
                "created": int(datetime.now().timestamp()),
                "model": "models/lpm",
                "system_fingerprint": None,
                "choices": [
                    {
                        "index": 0,
                        "delta": {
                            "content": ""
                        },
                        "finish_reason": None
                    }
                ]
            }
            
        # Check if completed
        if is_done:
            finish_reason = 'stop'
            logger.info("Got finish_reason from custom format: stop")
            
        return full_response, full_content, finish_reason

    def _process_openai_response(self, chunk, full_response: Optional[Any], full_content: str) -> Tuple[Any, str, Optional[str]]:
        """
        Process OpenAI format response data

        Args:
            chunk: Response data chunk
            full_response: Current complete response object
            full_content: Current accumulated content

        Returns:
            Tuple[Any, str, Optional[str]]: (Updated response object, Updated content, Finish reason)
        """
        finish_reason = None
        
        if not hasattr(chunk, 'choices'):
            logger.warning(f"Chunk has no choices attribute: {chunk}")
            return full_response, full_content, finish_reason
            
        choices = getattr(chunk, 'choices', None)
        if not choices:
            logger.warning("Chunk has empty choices")
            return full_response, full_content, finish_reason
            
        # Save basic information of the first response
        if full_response is None:
            full_response = {
                "id": getattr(chunk, 'id', str(uuid.uuid4())),
                "object": "chat.completion.chunk",
                "created": int(datetime.now().timestamp()),
                "model": "models/lpm",
                "system_fingerprint": getattr(chunk, 'system_fingerprint', None),
                "choices": [
                    {
                        "index": 0,
                        "delta": {
                            "content": ""
                        },
                        "finish_reason": None
                    }
                ]
            }
        
        # Collect content and finish reason
        choice = choices[0]
        if hasattr(choice, 'delta'):
            delta = choice.delta
            if hasattr(delta, 'content') and delta.content is not None:
                full_content += delta.content
                # logger.info(f"Added content from OpenAI format, current length: {len(full_content)}")
            if choice.finish_reason:
                finish_reason = choice.finish_reason
                logger.info(f"Got finish_reason: {finish_reason}")
                
        return full_response, full_content, finish_reason

    def collect_stream_response(self, response_iterator: Iterator[Dict[str, Any]]):
        """
        Collect streaming response into a complete response

        Args:
            response_iterator: Streaming response iterator

        Returns:
            Complete response dictionary
        """
        logger.info("Starting to collect stream response")
        full_response = None
        full_content = ""
        finish_reason = None
        chunk_count = 0
        
        try:
            for chunk in response_iterator:
                if chunk is None:
                    logger.warning("Received None chunk, skipping")
                    continue
                    
                chunk_count += 1
                # logger.info(f"Processing chunk #{chunk_count}: {chunk}")
                
                # Check if it's a custom format response
                is_chat_response = (
                    (hasattr(chunk, 'type') and chunk.type == 'chat_response') or
                    (isinstance(chunk, dict) and chunk.get("type") == "chat_response")
                )
                
                if is_chat_response:
                    full_response, full_content, chunk_finish_reason = self._process_chat_response(
                        chunk, full_response, full_content
                    )
                else:
                    full_response, full_content, chunk_finish_reason = self._process_openai_response(
                        chunk, full_response, full_content
                    )
                    
                if chunk_finish_reason:
                    finish_reason = chunk_finish_reason
        
            # logger.info(f"Finished processing all chunks. Total chunks: {chunk_count}")
            # logger.info(f"Final content length: {len(full_content)}")
            # logger.info(f"Final finish_reason: {finish_reason}")
            
            if not full_response:
                logger.error("No valid response collected")
                return None
                
            if not full_content:
                logger.error("No content collected")
                return None
                
            # Update response with complete content
            full_response["choices"][0]["delta"]["content"] = full_content
            if finish_reason:
                full_response["choices"][0]["finish_reason"] = finish_reason
                
            # logger.info(f"Final response object: {full_response}")
            # logger.info(f"Final response content: {full_content}")
            return full_response
            
        except Exception as e:
            logger.error(f"Error collecting stream response: {str(e)}", exc_info=True)
            return None

    def chat(
            self,
            request: ChatRequest,
            strategy_chain: Optional[List[Type[SystemPromptStrategy]]] = None,
            stream: bool = True,
            json_response: bool = False,
            client: Optional[Any] = None,
            model_params: Optional[Dict[str, Any]] = None,
            context: Optional[Any] = None,
        ) -> Union[Dict[str, Any], Iterator[Dict[str, Any]]]:
        """
        Main chat method supporting both streaming and non-streaming responses
        
        Args:
            request: Chat request containing message and other parameters
            strategy_chain: Optional list of strategy classes to use
            stream: Whether to return a streaming response
            json_response: Whether to request JSON formatted response from LLM
            client: Optional OpenAI client to use. If None, uses local_llm_service.client
            model_params: Optional model specific parameters to override defaults
            context: Optional context to pass to strategies
            
        Returns:
            Either an iterator for streaming responses or a single response dictionary
        """
        logger.info(f"Chat request: {request}")
        # Build messages
        message_builder = MultiTurnMessageBuilder(request, strategy_chain=strategy_chain)
        messages = message_builder.build_messages(context)
        
        # Log debug information
        # logger.info("Using strategy chain: %s", [s.__name__ for s in strategy_chain] if strategy_chain else "default")
        logger.info("Final messages for LLM:")
        for msg in messages:
            logger.info(f"Role: {msg['role']}, Content: {msg['content']}")

        # Use provided client or default local_llm_service.client
        current_client = client if client is not None else local_llm_service.client
        
        self.user_llm_config_service = UserLLMConfigService()
        self.user_llm_config = self.user_llm_config_service.get_available_llm()
        
        # Prepare API call parameters
        api_params = {
            "messages": messages,
            "temperature": request.temperature,
            "response_format": {"type": "text"},
            "seed": 42,  # Optional: Fixed random seed to get consistent responses
            "tools": None,  # Optional: If function calling or similar features are needed
            "tool_choice": None,  # Optional: If function calling or similar features are needed
            "max_tokens": request.max_tokens,
            "stream": stream,
            "model": request.model or "models/lpm",
            "metadata": request.metadata
        }
        
        # Add JSON format requirement (if needed)
        if json_response:
            api_params["response_format"] = {"type": "json_object"}
            
        # Update custom model parameters (if provided)
        if model_params:
            api_params.update(model_params)

        logger.info(f"Current client base URL: {current_client.base_url}")
        # logger.info(f"Using model parameters: {api_params}")
        
        # Call LLM API
        try:
            response = current_client.chat.completions.create(**api_params)
            if not stream:
                logger.info(f"Response: {response.json() if hasattr(response, 'json') else response}")
            return response
            
        except Exception as e:
            logger.error(f"Chat failed: {str(e)}", exc_info=True)
            raise


# Global chat service instance
chat_service = ChatService()