Spaces:
Sleeping
Sleeping
| from typing import List, Dict, Any, Optional | |
| import json | |
| import re | |
| from src.domain.models.conversation import ConversationContext, ConversationTurn, QueryClassification | |
| from src.application.services.conversation_manager import ConversationManager | |
| from src.infrastructure.providers.llm_provider import LLMClient | |
| from config import settings | |
| from config.conversation_config import conversation_config | |
| class HistoryQueryHandler: | |
| """Handles queries that can be answered from conversation history.""" | |
| def __init__(self, conversation_manager: ConversationManager): | |
| self.conversation_manager = conversation_manager | |
| self.llm_client = LLMClient() | |
| async def handle_history_query(self, user_message: str, session_id: str, | |
| classification: QueryClassification) -> str: | |
| """Handle a query that references conversation history.""" | |
| conversation = self.conversation_manager.get_or_create_conversation(session_id) | |
| # Find relevant historical context | |
| relevant_context = await self._find_relevant_context(user_message, conversation, classification) | |
| if not relevant_context: | |
| return await self._handle_no_context(user_message) | |
| # Generate response using LLM with historical context | |
| return await self._generate_history_response(user_message, relevant_context) | |
| async def _find_relevant_context(self, user_message: str, conversation: ConversationContext, | |
| classification: QueryClassification) -> Optional[Dict[str, Any]]: | |
| """Find relevant context from conversation history.""" | |
| context = { | |
| "relevant_turns": [], | |
| "entity_contexts": {}, | |
| "recent_summary": "", | |
| "confidence": classification.confidence | |
| } | |
| # 1. Search for relevant turns based on query content | |
| max_turns = conversation_config.history_handler.max_turns_in_prompt | |
| relevant_turns = self.conversation_manager.search_conversation_history( | |
| conversation.session_id, user_message, limit=max_turns | |
| ) | |
| context["relevant_turns"] = relevant_turns | |
| # 2. Get context for referenced entities | |
| for entity_ref in classification.referenced_entities: | |
| entity_context = self._find_entity_context(entity_ref, conversation) | |
| if entity_context: | |
| context["entity_contexts"][entity_ref] = entity_context | |
| # 3. Handle specific reference patterns | |
| context.update(await self._handle_specific_references(user_message, conversation)) | |
| # 4. Get recent conversation summary | |
| max_summary_turns = conversation_config.memory.max_context_summary_turns | |
| context["recent_summary"] = conversation.get_context_summary(max_turns=max_summary_turns) | |
| return context if (relevant_turns or context["entity_contexts"] or context.get("specific_data")) else None | |
| def _find_entity_context(self, entity_ref: str, conversation: ConversationContext) -> Optional[Dict[str, Any]]: | |
| """Find context for a specific entity reference.""" | |
| # Try to find entity by different patterns | |
| entity_patterns = [ | |
| ("user_handle", r'\b(user|member|handle)\b'), | |
| ("challenge_id", r'\b(challenge|contest)\b'), | |
| ("skill_name", r'\b(skill|technology)\b'), | |
| ] | |
| for entity_type, pattern in entity_patterns: | |
| if re.search(pattern, entity_ref, re.IGNORECASE): | |
| entity = conversation.find_entity(entity_type) | |
| if entity: | |
| context_str = self.conversation_manager.get_context_for_entity( | |
| conversation.session_id, entity_type, entity.name | |
| ) | |
| return { | |
| "entity": entity, | |
| "context": context_str, | |
| "type": entity_type | |
| } | |
| return None | |
| async def _handle_specific_references(self, user_message: str, conversation: ConversationContext) -> Dict[str, Any]: | |
| """Handle specific reference patterns in the user message.""" | |
| result = {} | |
| message_lower = user_message.lower() | |
| # "Last/Previous" references | |
| if re.search(r'\b(last|previous|recent)\b', message_lower): | |
| max_recent = conversation_config.memory.max_recent_turns_for_context | |
| recent_turns = conversation.get_recent_turns(max_recent) | |
| tool_turns = [turn for turn in recent_turns if turn.tool_used] | |
| if tool_turns: | |
| last_tool_turn = tool_turns[-1] | |
| result["specific_data"] = { | |
| "type": "last_result", | |
| "turn": last_tool_turn, | |
| "description": f"Last {last_tool_turn.tool_used} result" | |
| } | |
| # "That/It" references | |
| elif re.search(r'\b(that|it|this)\b', message_lower): | |
| # Use smaller window for direct references | |
| recent_turns = conversation.get_recent_turns(2) | |
| if recent_turns: | |
| last_turn = recent_turns[-1] | |
| result["specific_data"] = { | |
| "type": "reference", | |
| "turn": last_turn, | |
| "description": "Referenced item from recent conversation" | |
| } | |
| # Specific count/number questions | |
| elif re.search(r'\b(how many|count|total|number)\b', message_lower): | |
| # Look for turns with list results | |
| for turn in reversed(conversation.turns): | |
| if turn.tool_used and "query" in turn.tool_used: | |
| # Try to extract count information | |
| count_info = self._extract_count_from_turn(turn) | |
| if count_info: | |
| result["specific_data"] = { | |
| "type": "count", | |
| "turn": turn, | |
| "count_info": count_info, | |
| "description": f"Count information from {turn.tool_used}" | |
| } | |
| break | |
| return result | |
| def _extract_count_from_turn(self, turn: ConversationTurn) -> Optional[Dict[str, Any]]: | |
| """Extract count information from a conversation turn.""" | |
| if not turn.full_response: | |
| return None | |
| try: | |
| # Try to parse as JSON to get list length | |
| if turn.full_response.startswith('[') or turn.full_response.startswith('{'): | |
| data = json.loads(turn.full_response) | |
| if isinstance(data, list): | |
| return {"count": len(data), "items": "results"} | |
| elif isinstance(data, dict) and "result" in data: | |
| result = data["result"] | |
| if isinstance(result, list): | |
| return {"count": len(result), "items": "results"} | |
| except json.JSONDecodeError: | |
| pass | |
| # Try to extract count from response summary | |
| count_match = re.search(r'(\d+)\s*(challenges?|members?|skills?|results?)', | |
| turn.response_summary, re.IGNORECASE) | |
| if count_match: | |
| return { | |
| "count": int(count_match.group(1)), | |
| "items": count_match.group(2) | |
| } | |
| return None | |
| async def _generate_history_response(self, user_message: str, context: Dict[str, Any]) -> str: | |
| """Generate a response using LLM with historical context.""" | |
| if not settings.HF_TOKEN: | |
| return self._generate_simple_history_response(context) | |
| # Prepare context for LLM | |
| context_prompt = self._build_context_prompt(user_message, context) | |
| try: | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": "You are a helpful Topcoder assistant. Answer the user's question using only the provided conversation history and context. Be conversational and helpful." | |
| }, | |
| { | |
| "role": "user", | |
| "content": context_prompt | |
| } | |
| ] | |
| response = await self.llm_client.chat(messages) | |
| return response | |
| except Exception as e: | |
| print(f"LLM history response failed: {e}") | |
| return self._generate_simple_history_response(context) | |
| def _build_context_prompt(self, user_message: str, context: Dict[str, Any]) -> str: | |
| """Build a prompt for the LLM with historical context.""" | |
| prompt_parts = [ | |
| f"USER'S QUESTION: {user_message}", | |
| "", | |
| "CONVERSATION HISTORY:" | |
| ] | |
| # Add relevant turns | |
| if context["relevant_turns"]: | |
| for i, turn in enumerate(context["relevant_turns"], 1): | |
| prompt_parts.append(f"Turn {i}:") | |
| prompt_parts.append(f" User asked: {turn.user_message}") | |
| if turn.tool_used: | |
| prompt_parts.append(f" Tool used: {turn.tool_used}") | |
| if turn.tool_params: | |
| prompt_parts.append(f" Parameters: {json.dumps(turn.tool_params)}") | |
| prompt_parts.append(f" Result: {turn.response_summary}") | |
| else: | |
| max_chars = conversation_config.history_handler.max_response_chars_for_display | |
| prompt_parts.append(f" Response: {turn.full_response[:max_chars]}") | |
| prompt_parts.append("") | |
| # Add entity contexts | |
| if context["entity_contexts"]: | |
| prompt_parts.append("ENTITY CONTEXT:") | |
| for entity_ref, entity_context in context["entity_contexts"].items(): | |
| prompt_parts.append(f" {entity_ref}: {entity_context['context']}") | |
| prompt_parts.append("") | |
| # Add specific data | |
| if context.get("specific_data"): | |
| specific = context["specific_data"] | |
| prompt_parts.append("SPECIFIC REFERENCE:") | |
| prompt_parts.append(f" Type: {specific['type']}") | |
| prompt_parts.append(f" Description: {specific['description']}") | |
| if specific["type"] == "count" and "count_info" in specific: | |
| count_info = specific["count_info"] | |
| prompt_parts.append(f" Count: {count_info['count']} {count_info['items']}") | |
| elif "turn" in specific: | |
| turn = specific["turn"] | |
| prompt_parts.append(f" From: {turn.user_message}") | |
| prompt_parts.append(f" Result: {turn.response_summary}") | |
| prompt_parts.append("") | |
| prompt_parts.extend([ | |
| "INSTRUCTIONS:", | |
| "- Answer the user's question using only the information above", | |
| "- Be conversational and helpful", | |
| "- If the question can't be fully answered from the history, say so", | |
| "- Reference specific results when appropriate", | |
| "- If asking about counts or numbers, provide the specific count if available" | |
| ]) | |
| return "\n".join(prompt_parts) | |
| def _generate_simple_history_response(self, context: Dict[str, Any]) -> str: | |
| """Generate a simple response without LLM when no token available.""" | |
| if context.get("specific_data"): | |
| specific = context["specific_data"] | |
| if specific["type"] == "count" and "count_info" in specific: | |
| count_info = specific["count_info"] | |
| return f"Based on our previous search, there were {count_info['count']} {count_info['items']} found." | |
| elif specific["type"] == "last_result": | |
| turn = specific["turn"] | |
| return f"In our last {turn.tool_used} search: {turn.response_summary}" | |
| elif specific["type"] == "reference": | |
| turn = specific["turn"] | |
| if turn.tool_used: | |
| return f"From the previous {turn.tool_used} result: {turn.response_summary}" | |
| else: | |
| return turn.full_response[:300] + "..." if len(turn.full_response) > 300 else turn.full_response | |
| elif context["relevant_turns"]: | |
| last_relevant = context["relevant_turns"][-1] | |
| if last_relevant.tool_used: | |
| return f"From our previous {last_relevant.tool_used} search: {last_relevant.response_summary}" | |
| else: | |
| max_chars = conversation_config.history_handler.max_response_chars_for_display | |
| return f"As discussed earlier: {last_relevant.full_response[:max_chars]}..." | |
| return conversation_config.history_handler.no_context_message | |
| async def _handle_no_context(self, user_message: str) -> str: | |
| """Handle cases where no relevant context is found.""" | |
| # Check if this might be a misclassified query | |
| if any(word in user_message.lower() for word in ['new', 'different', 'other', 'more']): | |
| return conversation_config.history_handler.misclassified_query_message | |
| return conversation_config.history_handler.no_context_message |