|
|
|
|
|
""" |
|
|
Router Agent for GAIA Question Classification |
|
|
Analyzes questions and routes them to appropriate specialized agents |
|
|
""" |
|
|
|
|
|
import re |
|
|
import logging |
|
|
from typing import List, Dict, Any, Tuple |
|
|
from urllib.parse import urlparse |
|
|
|
|
|
from agents.state import GAIAAgentState, QuestionType, AgentRole, AgentResult |
|
|
from models.qwen_client import QwenClient, ModelTier |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class RouterAgent: |
|
|
""" |
|
|
Router agent that classifies GAIA questions and determines processing strategy |
|
|
""" |
|
|
|
|
|
def __init__(self, llm_client: QwenClient): |
|
|
self.llm_client = llm_client |
|
|
|
|
|
def process(self, state: GAIAAgentState) -> GAIAAgentState: |
|
|
""" |
|
|
Enhanced router processing with improved classification and planning |
|
|
""" |
|
|
logger.info("🧭 Router: Starting enhanced multi-phase analysis") |
|
|
state.add_processing_step("Router: Enhanced multi-phase question analysis") |
|
|
|
|
|
try: |
|
|
|
|
|
classification_result = self._classify_question_enhanced(state.question) |
|
|
|
|
|
state.question_type = classification_result['question_type'] |
|
|
state.routing_decision = classification_result['reasoning'] |
|
|
|
|
|
|
|
|
agents = self._select_agents_for_type(classification_result) |
|
|
state.selected_agents = agents |
|
|
|
|
|
|
|
|
state.router_analysis = { |
|
|
'classification': classification_result, |
|
|
'selected_agents': [a.value for a in agents], |
|
|
'confidence': classification_result['confidence'] |
|
|
} |
|
|
|
|
|
logger.info(f"✅ Enhanced routing: {classification_result['type']} -> {[a.value for a in agents]}") |
|
|
|
|
|
return state |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Enhanced router analysis failed: {str(e)}" |
|
|
logger.error(error_msg) |
|
|
state.add_error(error_msg) |
|
|
|
|
|
|
|
|
state.question_type = QuestionType.UNKNOWN |
|
|
state.selected_agents = [AgentRole.WEB_RESEARCHER, AgentRole.REASONING_AGENT, AgentRole.SYNTHESIZER] |
|
|
state.routing_decision = f"Enhanced routing failed, using fallback: {error_msg}" |
|
|
|
|
|
return state |
|
|
|
|
|
def route_question(self, state: GAIAAgentState) -> GAIAAgentState: |
|
|
""" |
|
|
Main routing function - analyzes question and determines processing strategy |
|
|
""" |
|
|
logger.info(f"🧭 Router: Analyzing question type and complexity") |
|
|
state.add_processing_step("Router: Analyzing question and selecting agents") |
|
|
|
|
|
try: |
|
|
|
|
|
question_types, primary_type = self._classify_question_types(state.question, state.file_name) |
|
|
state.question_types = question_types |
|
|
state.primary_question_type = primary_type |
|
|
|
|
|
|
|
|
llm_classification = self._get_llm_classification(state.question) |
|
|
|
|
|
|
|
|
final_types, final_primary = self._combine_classifications_legacy( |
|
|
question_types, primary_type, llm_classification |
|
|
) |
|
|
|
|
|
|
|
|
state.question_types = final_types |
|
|
state.primary_question_type = final_primary |
|
|
|
|
|
|
|
|
selected_agents = self._select_agents(final_types, final_primary, state.question) |
|
|
state.selected_agents = selected_agents |
|
|
|
|
|
logger.info(f"✅ Routing complete: {final_primary.value} -> {[a.value for a in selected_agents]}") |
|
|
state.add_processing_step(f"Router: Selected agents - {[a.value for a in selected_agents]}") |
|
|
|
|
|
return state |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Router failed: {str(e)}" |
|
|
logger.error(error_msg) |
|
|
state.add_error(error_msg) |
|
|
|
|
|
|
|
|
state.selected_agents = [AgentRole.WEB_RESEARCHER] |
|
|
state.primary_question_type = QuestionType.WEB_RESEARCH |
|
|
|
|
|
return state |
|
|
|
|
|
def _classify_question_types(self, question: str, file_name: str = None) -> Tuple[List[QuestionType], QuestionType]: |
|
|
""" |
|
|
Enhanced classification that can detect multiple question types |
|
|
Returns: (all_detected_types, primary_type) |
|
|
""" |
|
|
|
|
|
question_lower = question.lower() |
|
|
detected_types = [] |
|
|
|
|
|
|
|
|
if file_name: |
|
|
file_ext = file_name.lower().split('.')[-1] if '.' in file_name else "" |
|
|
|
|
|
if file_ext in ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'svg']: |
|
|
detected_types.append(QuestionType.FILE_PROCESSING) |
|
|
elif file_ext in ['mp3', 'wav', 'ogg', 'flac', 'm4a']: |
|
|
detected_types.append(QuestionType.FILE_PROCESSING) |
|
|
elif file_ext in ['xlsx', 'xls', 'csv']: |
|
|
detected_types.append(QuestionType.FILE_PROCESSING) |
|
|
elif file_ext in ['py', 'js', 'java', 'cpp', 'c']: |
|
|
detected_types.append(QuestionType.CODE_EXECUTION) |
|
|
else: |
|
|
detected_types.append(QuestionType.FILE_PROCESSING) |
|
|
|
|
|
|
|
|
url_patterns = { |
|
|
QuestionType.WIKIPEDIA: [ |
|
|
r'wikipedia\.org', r'featured article', r'promoted.*wikipedia', |
|
|
r'english wikipedia', r'wiki.*article' |
|
|
], |
|
|
QuestionType.YOUTUBE: [ |
|
|
r'youtube\.com', r'youtu\.be', r'watch\?v=', r'video.*youtube', |
|
|
r'https://www\.youtube\.com/watch' |
|
|
] |
|
|
} |
|
|
|
|
|
for question_type, patterns in url_patterns.items(): |
|
|
if any(re.search(pattern, question_lower) for pattern in patterns): |
|
|
detected_types.append(question_type) |
|
|
|
|
|
|
|
|
classification_patterns = { |
|
|
QuestionType.MATHEMATICAL: [ |
|
|
|
|
|
r'\bhow many\b', r'\bhow much\b', r'\bcount\b', r'\bnumber of\b', |
|
|
r'\btotal\b', r'\bsum\b', r'\baverage\b', r'\bmean\b', |
|
|
|
|
|
r'\bcalculate\b', r'\bcompute\b', r'\bsolve\b', |
|
|
|
|
|
r'\d+\s*[\+\-\*/]\s*\d+', r'\bsquare root\b', r'\bpercentage\b', |
|
|
|
|
|
r'\btable\b.*\bdefining\b', r'\bgiven.*table\b', r'\boperation table\b', |
|
|
|
|
|
r'\bequation\b', r'\bformula\b', r'\bratio\b', r'\bfactorial\b', |
|
|
|
|
|
r'\binterest\b', r'\bcompound\b', r'\bstatistics\b' |
|
|
], |
|
|
QuestionType.TEXT_MANIPULATION: [ |
|
|
|
|
|
r'\breverse\b', r'\bbackwards\b', r'\bencode\b', r'\bdecode\b', |
|
|
r'\btransform\b', r'\bconvert\b', r'\buppercase\b', r'\blowercase\b', |
|
|
r'\breplace\b', r'\bextract\b', r'\bopposite\b', |
|
|
|
|
|
r'[a-z]+\s+[a-z]+\s+[a-z]+.*\.', |
|
|
|
|
|
r'\.rewsna\b', r'\bword.*opposite\b' |
|
|
], |
|
|
QuestionType.CODE_EXECUTION: [ |
|
|
r'\bcode\b', r'\bprogram\b', r'\bscript\b', r'\bfunction\b', r'\balgorithm\b', |
|
|
r'\bexecute\b', r'\brun.*code\b', r'\bpython\b', r'\bjavascript\b', |
|
|
r'\battached.*code\b', r'\bfinal.*output\b', r'\bnumeric output\b' |
|
|
], |
|
|
QuestionType.REASONING: [ |
|
|
|
|
|
r'\bwhy\b', r'\bexplain\b', r'\banalyze\b', r'\breasoning\b', r'\blogic\b', |
|
|
r'\brelationship\b', r'\bcompare\b', r'\bcontrast\b', r'\bconclusion\b', |
|
|
|
|
|
r'\bexamine\b', r'\bidentify\b', r'\bdetermine\b', r'\bassess\b', |
|
|
r'\bevaluate\b', r'\binterpret\b' |
|
|
], |
|
|
QuestionType.WEB_RESEARCH: [ |
|
|
|
|
|
r'\bsearch\b', r'\bfind.*information\b', r'\bresearch\b', r'\blook up\b', |
|
|
r'\bwebsite\b', r'\bonline\b', r'\binternet\b', |
|
|
|
|
|
r'\bwho\s+(?:is|was|are|were|did|does)\b', |
|
|
r'\bwhat\s+(?:is|was|are|were)\b', r'\bwhen\s+(?:is|was|did|does)\b', |
|
|
r'\bwhere\s+(?:is|was|are|were)\b', |
|
|
|
|
|
r'\bauthor\b', r'\bpublished\b', r'\bhistory\b', r'\bhistorical\b', |
|
|
r'\bcentury\b', r'\byear\b', r'\bbiography\b', r'\bwinner\b', |
|
|
|
|
|
r'\bstudio albums\b', r'\brecipient\b', r'\bcompetition\b', r'\bspecimens\b' |
|
|
] |
|
|
} |
|
|
|
|
|
|
|
|
type_scores = {} |
|
|
for question_type, patterns in classification_patterns.items(): |
|
|
score = 0 |
|
|
for pattern in patterns: |
|
|
matches = re.findall(pattern, question_lower) |
|
|
score += len(matches) |
|
|
|
|
|
if question_type == QuestionType.MATHEMATICAL and pattern in [r'\bhow many\b', r'\bhow much\b']: |
|
|
score += 2 |
|
|
elif question_type == QuestionType.TEXT_MANIPULATION and any(special in pattern for special in ['opposite', 'reverse', 'backwards']): |
|
|
score += 1 |
|
|
if score > 0: |
|
|
type_scores[question_type] = score |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if re.search(r'\.rewsna\b|etirw\b|dnatsrednu\b', question_lower): |
|
|
type_scores[QuestionType.TEXT_MANIPULATION] = type_scores.get(QuestionType.TEXT_MANIPULATION, 0) + 3 |
|
|
|
|
|
|
|
|
if re.search(r'\bfinal.*output\b|\bnumeric.*output\b|\battached.*code\b', question_lower): |
|
|
type_scores[QuestionType.CODE_EXECUTION] = type_scores.get(QuestionType.CODE_EXECUTION, 0) + 4 |
|
|
|
|
|
|
|
|
if re.search(r'\b\d+.*\b(?:studio albums|between|and)\b.*\d+', question_lower): |
|
|
type_scores[QuestionType.MATHEMATICAL] = type_scores.get(QuestionType.MATHEMATICAL, 0) + 3 |
|
|
|
|
|
|
|
|
if re.search(r'\btable.*defining.*\*', question_lower) or '|*|' in question: |
|
|
type_scores[QuestionType.MATHEMATICAL] = type_scores.get(QuestionType.MATHEMATICAL, 0) + 4 |
|
|
|
|
|
|
|
|
if ('how many' in question_lower or 'how much' in question_lower) and \ |
|
|
any(term in question_lower for term in ['between', 'from', 'during', 'published', 'released']): |
|
|
type_scores[QuestionType.WEB_RESEARCH] = type_scores.get(QuestionType.WEB_RESEARCH, 0) + 3 |
|
|
type_scores[QuestionType.MATHEMATICAL] = type_scores.get(QuestionType.MATHEMATICAL, 0) + 3 |
|
|
|
|
|
|
|
|
if any(pattern in question_lower for pattern in ['who is', 'who was', 'who did', 'what is', 'when did', 'where', 'which']): |
|
|
type_scores[QuestionType.WEB_RESEARCH] = type_scores.get(QuestionType.WEB_RESEARCH, 0) + 2 |
|
|
|
|
|
|
|
|
if any(term in question_lower for term in ['image', 'picture', 'photo', 'file', 'attached', 'provided']): |
|
|
type_scores[QuestionType.FILE_PROCESSING] = type_scores.get(QuestionType.FILE_PROCESSING, 0) + 4 |
|
|
|
|
|
|
|
|
if any(term in question_lower for term in ['wikipedia', 'featured article', 'english wikipedia']): |
|
|
type_scores[QuestionType.WIKIPEDIA] = type_scores.get(QuestionType.WIKIPEDIA, 0) + 4 |
|
|
|
|
|
|
|
|
for qtype, score in type_scores.items(): |
|
|
if score > 0 and qtype not in detected_types: |
|
|
detected_types.append(qtype) |
|
|
|
|
|
|
|
|
if not detected_types: |
|
|
detected_types.append(QuestionType.WEB_RESEARCH) |
|
|
|
|
|
|
|
|
if type_scores: |
|
|
primary_type = max(type_scores.keys(), key=lambda t: type_scores[t]) |
|
|
else: |
|
|
primary_type = detected_types[0] if detected_types else QuestionType.WEB_RESEARCH |
|
|
|
|
|
return detected_types, primary_type |
|
|
|
|
|
def _assess_complexity(self, question: str) -> str: |
|
|
"""Assess question complexity with enhanced logic""" |
|
|
|
|
|
question_lower = question.lower() |
|
|
|
|
|
|
|
|
complex_indicators = [ |
|
|
'multi-step', 'multiple', 'several', 'complex', 'detailed', |
|
|
'analyze', 'explain why', 'reasoning', 'relationship', |
|
|
'compare and contrast', 'comprehensive', 'thorough', |
|
|
'between.*and', 'table.*defining', 'attached.*file' |
|
|
] |
|
|
|
|
|
|
|
|
simple_indicators = [ |
|
|
'what is', 'who is', 'when did', 'where is', 'yes or no', |
|
|
'true or false', 'simple', 'quick', 'name' |
|
|
] |
|
|
|
|
|
complex_score = sum(1 for indicator in complex_indicators if re.search(indicator, question_lower)) |
|
|
simple_score = sum(1 for indicator in simple_indicators if re.search(indicator, question_lower)) |
|
|
|
|
|
|
|
|
if len(question) > 200: |
|
|
complex_score += 1 |
|
|
if len(question.split()) > 30: |
|
|
complex_score += 1 |
|
|
if question.count('?') > 1: |
|
|
complex_score += 1 |
|
|
if '|' in question and '*' in question: |
|
|
complex_score += 2 |
|
|
if re.search(r'\d+.*between.*\d+', question_lower): |
|
|
complex_score += 1 |
|
|
|
|
|
|
|
|
if complex_score >= 3: |
|
|
return "complex" |
|
|
elif complex_score >= 1 and simple_score == 0: |
|
|
return "medium" |
|
|
elif simple_score >= 2 and complex_score == 0: |
|
|
return "simple" |
|
|
else: |
|
|
return "medium" |
|
|
|
|
|
def _select_agents_enhanced(self, question_types: List[QuestionType], primary_type: QuestionType, |
|
|
has_file: bool, complexity: str) -> List[AgentRole]: |
|
|
""" |
|
|
Enhanced agent selection that can choose multiple agents for complex workflows |
|
|
""" |
|
|
|
|
|
agents = [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
agent_priorities = { |
|
|
QuestionType.FILE_PROCESSING: [AgentRole.FILE_PROCESSOR], |
|
|
QuestionType.CODE_EXECUTION: [AgentRole.CODE_EXECUTOR], |
|
|
QuestionType.WIKIPEDIA: [AgentRole.WEB_RESEARCHER], |
|
|
QuestionType.YOUTUBE: [AgentRole.WEB_RESEARCHER], |
|
|
QuestionType.WEB_RESEARCH: [AgentRole.WEB_RESEARCHER], |
|
|
QuestionType.MATHEMATICAL: [AgentRole.REASONING_AGENT], |
|
|
QuestionType.TEXT_MANIPULATION: [AgentRole.REASONING_AGENT], |
|
|
QuestionType.REASONING: [AgentRole.REASONING_AGENT] |
|
|
} |
|
|
|
|
|
|
|
|
for qtype in question_types: |
|
|
if qtype in agent_priorities: |
|
|
for agent in agent_priorities[qtype]: |
|
|
if agent not in agents: |
|
|
agents.append(agent) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if primary_type == QuestionType.CODE_EXECUTION: |
|
|
|
|
|
ordered_agents = [] |
|
|
if AgentRole.CODE_EXECUTOR not in ordered_agents: |
|
|
ordered_agents.append(AgentRole.CODE_EXECUTOR) |
|
|
|
|
|
for agent in agents: |
|
|
if agent != AgentRole.CODE_EXECUTOR and agent not in ordered_agents: |
|
|
ordered_agents.append(agent) |
|
|
agents = ordered_agents |
|
|
|
|
|
|
|
|
elif (QuestionType.WEB_RESEARCH in question_types and QuestionType.MATHEMATICAL in question_types): |
|
|
|
|
|
ordered_agents = [] |
|
|
if AgentRole.WEB_RESEARCHER not in ordered_agents: |
|
|
ordered_agents.append(AgentRole.WEB_RESEARCHER) |
|
|
if AgentRole.REASONING_AGENT not in ordered_agents: |
|
|
ordered_agents.append(AgentRole.REASONING_AGENT) |
|
|
agents = ordered_agents |
|
|
|
|
|
|
|
|
elif has_file and len(question_types) > 1: |
|
|
|
|
|
ordered_agents = [] |
|
|
if AgentRole.FILE_PROCESSOR not in ordered_agents: |
|
|
ordered_agents.append(AgentRole.FILE_PROCESSOR) |
|
|
|
|
|
for agent in agents: |
|
|
if agent != AgentRole.FILE_PROCESSOR and agent not in ordered_agents: |
|
|
ordered_agents.append(agent) |
|
|
agents = ordered_agents |
|
|
|
|
|
|
|
|
if complexity == "complex" and AgentRole.REASONING_AGENT not in agents: |
|
|
agents.append(AgentRole.REASONING_AGENT) |
|
|
|
|
|
|
|
|
if primary_type == QuestionType.UNKNOWN or not agents: |
|
|
agents = [AgentRole.WEB_RESEARCHER, AgentRole.REASONING_AGENT] |
|
|
|
|
|
|
|
|
agents.append(AgentRole.SYNTHESIZER) |
|
|
|
|
|
|
|
|
seen = set() |
|
|
unique_agents = [] |
|
|
for agent in agents: |
|
|
if agent not in seen: |
|
|
seen.add(agent) |
|
|
unique_agents.append(agent) |
|
|
|
|
|
return unique_agents |
|
|
|
|
|
def _estimate_cost(self, complexity: str, agents: List[AgentRole]) -> float: |
|
|
"""Estimate processing cost based on complexity and agents""" |
|
|
|
|
|
base_costs = { |
|
|
"simple": 0.005, |
|
|
"medium": 0.015, |
|
|
"complex": 0.035 |
|
|
} |
|
|
|
|
|
base_cost = base_costs.get(complexity, 0.015) |
|
|
|
|
|
|
|
|
agent_cost = len(agents) * 0.008 |
|
|
|
|
|
return base_cost + agent_cost |
|
|
|
|
|
def _get_routing_reasoning(self, primary_type: QuestionType, complexity: str, |
|
|
agents: List[AgentRole], all_types: List[QuestionType]) -> str: |
|
|
"""Generate human-readable reasoning for routing decision""" |
|
|
|
|
|
reasons = [] |
|
|
|
|
|
|
|
|
type_descriptions = { |
|
|
QuestionType.WIKIPEDIA: "References Wikipedia content", |
|
|
QuestionType.YOUTUBE: "Involves YouTube video analysis", |
|
|
QuestionType.FILE_PROCESSING: "Requires file processing", |
|
|
QuestionType.MATHEMATICAL: "Involves mathematical computation/counting", |
|
|
QuestionType.CODE_EXECUTION: "Requires code execution", |
|
|
QuestionType.TEXT_MANIPULATION: "Involves text transformation/manipulation", |
|
|
QuestionType.REASONING: "Requires logical reasoning/analysis", |
|
|
QuestionType.WEB_RESEARCH: "Needs web research for factual information" |
|
|
} |
|
|
|
|
|
if primary_type in type_descriptions: |
|
|
reasons.append(type_descriptions[primary_type]) |
|
|
|
|
|
|
|
|
if len(all_types) > 1: |
|
|
other_types = [t for t in all_types if t != primary_type] |
|
|
reasons.append(f"Also involves: {', '.join([t.value for t in other_types])}") |
|
|
|
|
|
|
|
|
if complexity == "complex": |
|
|
reasons.append("Complex multi-step reasoning required") |
|
|
elif complexity == "simple": |
|
|
reasons.append("Straightforward question") |
|
|
|
|
|
|
|
|
agent_names = [agent.value.replace('_', ' ') for agent in agents] |
|
|
if len(agents) > 2: |
|
|
reasons.append(f"Multi-agent workflow: {' → '.join(agent_names)}") |
|
|
else: |
|
|
reasons.append(f"Single-agent workflow: {', '.join(agent_names)}") |
|
|
|
|
|
return "; ".join(reasons) |
|
|
|
|
|
def _llm_enhanced_routing(self, state: GAIAAgentState) -> GAIAAgentState: |
|
|
"""Use LLM for enhanced routing analysis of complex/unknown questions""" |
|
|
|
|
|
prompt = f""" |
|
|
Analyze this GAIA benchmark question and provide routing guidance: |
|
|
|
|
|
Question: {state.question} |
|
|
File attached: {state.file_name if state.file_name else "None"} |
|
|
Detected types: {state.routing_decision.get('all_types', [])} |
|
|
Primary classification: {state.question_type.value} |
|
|
Current complexity: {state.complexity_assessment} |
|
|
Selected agents: {[a.value for a in state.selected_agents]} |
|
|
|
|
|
Does this question need: |
|
|
1. Web research to find factual information? |
|
|
2. Mathematical calculation or counting? |
|
|
3. Text manipulation or decoding? |
|
|
4. File processing or analysis? |
|
|
5. Logical reasoning or analysis? |
|
|
|
|
|
Should the agent selection be adjusted? If so, provide specific recommendations. |
|
|
Keep response concise and focused on routing decisions. |
|
|
""" |
|
|
|
|
|
try: |
|
|
|
|
|
tier = ModelTier.MAIN |
|
|
result = self.llm_client.generate(prompt, tier=tier, max_tokens=300) |
|
|
|
|
|
if result.success: |
|
|
state.add_processing_step("Router: Enhanced with LLM analysis") |
|
|
state.routing_decision["llm_analysis"] = result.response |
|
|
logger.info("✅ LLM enhanced routing completed") |
|
|
else: |
|
|
state.add_error(f"LLM routing enhancement failed: {result.error}") |
|
|
|
|
|
except Exception as e: |
|
|
state.add_error(f"LLM routing error: {str(e)}") |
|
|
logger.error(f"LLM routing failed: {e}") |
|
|
|
|
|
return state |
|
|
|
|
|
def _get_llm_classification(self, question: str) -> Dict[str, Any]: |
|
|
"""Use 72B model for intelligent question classification""" |
|
|
|
|
|
classification_prompt = f""" |
|
|
Analyze this GAIA benchmark question and classify it for agent routing. |
|
|
|
|
|
Question: {question} |
|
|
|
|
|
Determine: |
|
|
1. Primary question type (mathematical, text_manipulation, web_research, file_processing, reasoning, factual_lookup) |
|
|
2. Required capabilities (research, calculation, file_analysis, text_processing, logical_reasoning) |
|
|
3. Complexity level (simple, moderate, complex) |
|
|
4. Expected answer type (number, text, yes_no, name, location, list) |
|
|
|
|
|
Provide your analysis in this format: |
|
|
PRIMARY_TYPE: [type] |
|
|
CAPABILITIES: [cap1, cap2, cap3] |
|
|
COMPLEXITY: [level] |
|
|
ANSWER_TYPE: [type] |
|
|
REASONING: [brief explanation] |
|
|
""" |
|
|
|
|
|
|
|
|
result = self.llm_client.generate( |
|
|
classification_prompt, |
|
|
tier=ModelTier.COMPLEX, |
|
|
max_tokens=200 |
|
|
) |
|
|
|
|
|
if result.success: |
|
|
return self._parse_llm_classification(result.response) |
|
|
else: |
|
|
logger.warning("LLM classification failed, using pattern-based only") |
|
|
return {"primary_type": "unknown", "capabilities": [], "complexity": "moderate"} |
|
|
|
|
|
def _parse_llm_classification(self, response: str) -> Dict[str, Any]: |
|
|
"""Parse LLM classification response""" |
|
|
|
|
|
parsed = { |
|
|
"primary_type": "unknown", |
|
|
"capabilities": [], |
|
|
"complexity": "moderate", |
|
|
"answer_type": "text", |
|
|
"reasoning": "" |
|
|
} |
|
|
|
|
|
lines = response.split('\n') |
|
|
for line in lines: |
|
|
line = line.strip() |
|
|
if line.startswith("PRIMARY_TYPE:"): |
|
|
parsed["primary_type"] = line.split(":", 1)[1].strip().lower() |
|
|
elif line.startswith("CAPABILITIES:"): |
|
|
caps_text = line.split(":", 1)[1].strip() |
|
|
parsed["capabilities"] = [c.strip().lower() for c in caps_text.split(",")] |
|
|
elif line.startswith("COMPLEXITY:"): |
|
|
parsed["complexity"] = line.split(":", 1)[1].strip().lower() |
|
|
elif line.startswith("ANSWER_TYPE:"): |
|
|
parsed["answer_type"] = line.split(":", 1)[1].strip().lower() |
|
|
elif line.startswith("REASONING:"): |
|
|
parsed["reasoning"] = line.split(":", 1)[1].strip() |
|
|
|
|
|
return parsed |
|
|
|
|
|
def _combine_classifications_legacy(self, pattern_types: List[QuestionType], pattern_primary: QuestionType, |
|
|
llm_classification: Dict[str, Any]) -> Tuple[List[QuestionType], QuestionType]: |
|
|
"""Combine pattern-based and LLM-based classifications""" |
|
|
|
|
|
|
|
|
llm_type_mapping = { |
|
|
"mathematical": QuestionType.MATHEMATICAL, |
|
|
"text_manipulation": QuestionType.TEXT_MANIPULATION, |
|
|
"web_research": QuestionType.WEB_RESEARCH, |
|
|
"file_processing": QuestionType.FILE_PROCESSING, |
|
|
"reasoning": QuestionType.REASONING, |
|
|
"factual_lookup": QuestionType.WEB_RESEARCH, |
|
|
"code_execution": QuestionType.CODE_EXECUTION |
|
|
} |
|
|
|
|
|
llm_primary = llm_type_mapping.get(llm_classification["primary_type"], QuestionType.WEB_RESEARCH) |
|
|
|
|
|
|
|
|
combined_types = list(pattern_types) |
|
|
if llm_primary not in combined_types: |
|
|
combined_types.insert(0, llm_primary) |
|
|
|
|
|
|
|
|
if llm_classification["complexity"] in ["complex", "moderate"] and llm_primary != QuestionType.WEB_RESEARCH: |
|
|
final_primary = llm_primary |
|
|
else: |
|
|
final_primary = pattern_primary |
|
|
|
|
|
logger.info(f"🤖 Combined classification: Pattern={pattern_primary.value}, LLM={llm_primary.value}, Final={final_primary.value}") |
|
|
|
|
|
return combined_types, final_primary |
|
|
|
|
|
def _select_agents_for_type(self, classification_result: Dict[str, Any]) -> List[AgentRole]: |
|
|
"""Select appropriate agents based on enhanced classification""" |
|
|
|
|
|
question_type = classification_result['type'] |
|
|
confidence = classification_result['confidence'] |
|
|
|
|
|
|
|
|
if question_type == 'mathematical': |
|
|
agents = [AgentRole.WEB_RESEARCHER, AgentRole.REASONING_AGENT] |
|
|
elif question_type == 'text_manipulation': |
|
|
agents = [AgentRole.REASONING_AGENT] |
|
|
elif question_type == 'file_processing': |
|
|
agents = [AgentRole.FILE_PROCESSOR, AgentRole.REASONING_AGENT] |
|
|
elif question_type == 'web_research': |
|
|
agents = [AgentRole.WEB_RESEARCHER] |
|
|
elif question_type == 'reasoning': |
|
|
agents = [AgentRole.REASONING_AGENT, AgentRole.WEB_RESEARCHER] |
|
|
elif question_type == 'factual_lookup': |
|
|
agents = [AgentRole.WEB_RESEARCHER] |
|
|
else: |
|
|
|
|
|
agents = [AgentRole.WEB_RESEARCHER, AgentRole.REASONING_AGENT] |
|
|
|
|
|
|
|
|
agents.append(AgentRole.SYNTHESIZER) |
|
|
|
|
|
|
|
|
if confidence < 0.6: |
|
|
if AgentRole.WEB_RESEARCHER not in agents: |
|
|
agents.insert(-1, AgentRole.WEB_RESEARCHER) |
|
|
|
|
|
return agents |
|
|
|
|
|
def _analyze_question_structure(self, question: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Phase 1: Analyze the structural components of the question |
|
|
""" |
|
|
structure = { |
|
|
'type': 'unknown', |
|
|
'complexity': 'medium', |
|
|
'components': [], |
|
|
'data_sources': [], |
|
|
'temporal_aspects': [], |
|
|
'quantitative_aspects': [] |
|
|
} |
|
|
|
|
|
question_lower = question.lower() |
|
|
|
|
|
|
|
|
if any(word in question_lower for word in ['how many', 'count', 'number of', 'quantity']): |
|
|
structure['type'] = 'quantitative' |
|
|
elif any(word in question_lower for word in ['who is', 'who was', 'who did', 'name of']): |
|
|
structure['type'] = 'identification' |
|
|
elif any(word in question_lower for word in ['where', 'location', 'place']): |
|
|
structure['type'] = 'location' |
|
|
elif any(word in question_lower for word in ['when', 'date', 'time', 'year']): |
|
|
structure['type'] = 'temporal' |
|
|
elif any(word in question_lower for word in ['what is', 'define', 'explain']): |
|
|
structure['type'] = 'definition' |
|
|
elif any(word in question_lower for word in ['calculate', 'compute', 'solve']): |
|
|
structure['type'] = 'mathematical' |
|
|
elif any(word in question_lower for word in ['compare', 'difference', 'versus']): |
|
|
structure['type'] = 'comparison' |
|
|
elif 'file' in question_lower or 'attached' in question_lower: |
|
|
structure['type'] = 'file_analysis' |
|
|
else: |
|
|
structure['type'] = 'complex_reasoning' |
|
|
|
|
|
|
|
|
if any(term in question_lower for term in ['wikipedia', 'article', 'page']): |
|
|
structure['data_sources'].append('wikipedia') |
|
|
if any(term in question_lower for term in ['video', 'youtube', 'watch']): |
|
|
structure['data_sources'].append('video') |
|
|
if any(term in question_lower for term in ['file', 'attached', 'document']): |
|
|
structure['data_sources'].append('file') |
|
|
if any(term in question_lower for term in ['recent', 'latest', 'current', '2024', '2025']): |
|
|
structure['data_sources'].append('web_search') |
|
|
|
|
|
|
|
|
import re |
|
|
years = re.findall(r'\b(?:19|20)\d{2}\b', question) |
|
|
dates = re.findall(r'\b(?:january|february|march|april|may|june|july|august|september|october|november|december)\s+\d{1,2},?\s+\d{4}\b', question_lower) |
|
|
structure['temporal_aspects'] = years + dates |
|
|
|
|
|
|
|
|
quantities = re.findall(r'\b\d+(?:\.\d+)?\b', question) |
|
|
structure['quantitative_aspects'] = quantities |
|
|
|
|
|
|
|
|
complexity_factors = [ |
|
|
len(question.split()) > 25, |
|
|
len(structure['data_sources']) > 1, |
|
|
len(structure['temporal_aspects']) > 1, |
|
|
'and' in question_lower and 'or' in question_lower, |
|
|
question.count('?') > 1, |
|
|
] |
|
|
|
|
|
if sum(complexity_factors) >= 3: |
|
|
structure['complexity'] = 'high' |
|
|
elif sum(complexity_factors) >= 1: |
|
|
structure['complexity'] = 'medium' |
|
|
else: |
|
|
structure['complexity'] = 'low' |
|
|
|
|
|
return structure |
|
|
|
|
|
def _analyze_information_needs(self, question: str, structural: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Phase 2: Analyze what specific information is needed to answer the question |
|
|
""" |
|
|
needs = { |
|
|
'primary_need': 'factual_lookup', |
|
|
'information_types': [], |
|
|
'precision_required': 'medium', |
|
|
'verification_needed': False, |
|
|
'synthesis_complexity': 'simple' |
|
|
} |
|
|
|
|
|
|
|
|
if structural['type'] == 'quantitative': |
|
|
needs['primary_need'] = 'numerical_data' |
|
|
needs['precision_required'] = 'high' |
|
|
elif structural['type'] == 'identification': |
|
|
needs['primary_need'] = 'entity_identification' |
|
|
elif structural['type'] == 'mathematical': |
|
|
needs['primary_need'] = 'computation' |
|
|
needs['precision_required'] = 'high' |
|
|
elif structural['type'] == 'file_analysis': |
|
|
needs['primary_need'] = 'file_processing' |
|
|
elif structural['type'] == 'comparison': |
|
|
needs['primary_need'] = 'comparative_analysis' |
|
|
needs['verification_needed'] = True |
|
|
else: |
|
|
needs['primary_need'] = 'factual_lookup' |
|
|
|
|
|
|
|
|
if 'wikipedia' in structural['data_sources']: |
|
|
needs['information_types'].append('encyclopedic') |
|
|
if 'video' in structural['data_sources']: |
|
|
needs['information_types'].append('multimedia_content') |
|
|
if 'web_search' in structural['data_sources']: |
|
|
needs['information_types'].append('current_information') |
|
|
if 'file' in structural['data_sources']: |
|
|
needs['information_types'].append('document_analysis') |
|
|
|
|
|
|
|
|
if structural['complexity'] == 'high' or len(needs['information_types']) > 2: |
|
|
needs['synthesis_complexity'] = 'complex' |
|
|
elif len(needs['information_types']) > 1: |
|
|
needs['synthesis_complexity'] = 'moderate' |
|
|
|
|
|
return needs |
|
|
|
|
|
def _plan_execution_strategy(self, question: str, structural: Dict[str, Any], requirements: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Phase 3: Plan the execution strategy based on analysis |
|
|
""" |
|
|
strategy = { |
|
|
'approach': 'sequential', |
|
|
'parallel_possible': False, |
|
|
'iterative_refinement': False, |
|
|
'fallback_needed': True, |
|
|
'verification_steps': [] |
|
|
} |
|
|
|
|
|
|
|
|
if requirements['primary_need'] == 'file_processing': |
|
|
strategy['approach'] = 'file_first' |
|
|
elif requirements['primary_need'] == 'computation': |
|
|
strategy['approach'] = 'reasoning_first' |
|
|
elif len(requirements['information_types']) > 2: |
|
|
strategy['approach'] = 'multi_source' |
|
|
strategy['parallel_possible'] = True |
|
|
elif 'current_information' in requirements['information_types']: |
|
|
strategy['approach'] = 'web_first' |
|
|
else: |
|
|
strategy['approach'] = 'knowledge_first' |
|
|
|
|
|
|
|
|
if (structural['complexity'] == 'high' or |
|
|
requirements['precision_required'] == 'high' or |
|
|
requirements['verification_needed']): |
|
|
strategy['iterative_refinement'] = True |
|
|
|
|
|
|
|
|
if requirements['verification_needed']: |
|
|
strategy['verification_steps'] = ['cross_reference', 'consistency_check'] |
|
|
if requirements['precision_required'] == 'high': |
|
|
strategy['verification_steps'].append('precision_validation') |
|
|
|
|
|
return strategy |
|
|
|
|
|
def _select_agent_sequence(self, strategy: Dict[str, Any], requirements: Dict[str, Any]) -> List[str]: |
|
|
""" |
|
|
Phase 4: Select the optimal sequence of agents based on strategy |
|
|
""" |
|
|
sequence = [] |
|
|
|
|
|
|
|
|
if strategy['approach'] == 'file_first': |
|
|
sequence = ['file_processor', 'reasoning_agent', 'synthesizer'] |
|
|
elif strategy['approach'] == 'reasoning_first': |
|
|
sequence = ['reasoning_agent', 'web_researcher', 'synthesizer'] |
|
|
elif strategy['approach'] == 'web_first': |
|
|
sequence = ['web_researcher', 'reasoning_agent', 'synthesizer'] |
|
|
elif strategy['approach'] == 'knowledge_first': |
|
|
sequence = ['web_researcher', 'reasoning_agent', 'synthesizer'] |
|
|
elif strategy['approach'] == 'multi_source': |
|
|
sequence = ['web_researcher', 'file_processor', 'reasoning_agent', 'synthesizer'] |
|
|
else: |
|
|
sequence = ['reasoning_agent', 'web_researcher', 'synthesizer'] |
|
|
|
|
|
|
|
|
if strategy['iterative_refinement']: |
|
|
|
|
|
if 'reasoning_agent' in sequence: |
|
|
sequence.remove('reasoning_agent') |
|
|
sequence.insert(-1, 'reasoning_agent') |
|
|
|
|
|
|
|
|
if 'synthesizer' in sequence: |
|
|
sequence.remove('synthesizer') |
|
|
sequence.append('synthesizer') |
|
|
|
|
|
return sequence |
|
|
|
|
|
def _classify_question_enhanced(self, question: str) -> Dict[str, Any]: |
|
|
"""Enhanced question classification using better pattern matching and LLM analysis""" |
|
|
|
|
|
question_lower = question.lower() |
|
|
|
|
|
|
|
|
pattern_classification = self._classify_by_enhanced_patterns(question_lower, question) |
|
|
|
|
|
|
|
|
llm_classification = self._classify_with_llm(question) |
|
|
|
|
|
|
|
|
final_classification = self._combine_classifications(pattern_classification, llm_classification, question) |
|
|
|
|
|
logger.info(f"🤖 Enhanced classification: Pattern={pattern_classification['type']}, LLM={llm_classification['type']}, Final={final_classification['type']}") |
|
|
|
|
|
return final_classification |
|
|
|
|
|
def _classify_by_enhanced_patterns(self, question_lower: str, original_question: str) -> Dict[str, Any]: |
|
|
"""Enhanced pattern-based classification with better accuracy""" |
|
|
|
|
|
|
|
|
mathematical_patterns = [ |
|
|
r'\bhow many\b', |
|
|
r'\bcount\b.*\b(of|the)\b', |
|
|
r'\bnumber of\b', |
|
|
r'\btotal\b.*\b(of|number)\b', |
|
|
r'\bcalculate\b', |
|
|
r'\bsum\b.*\bof\b', |
|
|
r'\bhow much\b', |
|
|
r'\bquantity\b' |
|
|
] |
|
|
|
|
|
if any(re.search(pattern, question_lower) for pattern in mathematical_patterns): |
|
|
|
|
|
temporal_indicators = ['between', 'from', 'during', 'in', r'\b(19|20)\d{2}\b'] |
|
|
has_temporal = any(re.search(indicator, question_lower) for indicator in temporal_indicators) |
|
|
|
|
|
return { |
|
|
'type': 'mathematical', |
|
|
'confidence': 0.9, |
|
|
'subtype': 'temporal_counting' if has_temporal else 'general_counting', |
|
|
'reasoning': 'Strong mathematical/counting indicators found' |
|
|
} |
|
|
|
|
|
|
|
|
text_manipulation_patterns = [ |
|
|
r'\bopposite\b', |
|
|
r'\breverse\b', |
|
|
r'\bbackwards\b', |
|
|
r'\bdecode\b', |
|
|
r'\btranslate\b', |
|
|
r'\bconvert\b', |
|
|
r'\.rewsna', |
|
|
r'\bcipher\b', |
|
|
r'\bencrypt\b' |
|
|
] |
|
|
|
|
|
if any(re.search(pattern, question_lower) for pattern in text_manipulation_patterns): |
|
|
return { |
|
|
'type': 'text_manipulation', |
|
|
'confidence': 0.85, |
|
|
'subtype': 'text_processing', |
|
|
'reasoning': 'Text manipulation patterns detected' |
|
|
} |
|
|
|
|
|
|
|
|
file_patterns = [ |
|
|
r'\battached\b.*\b(file|image|document|excel|csv|python|code)\b', |
|
|
r'\bfile\b.*\b(contains|attached|uploaded)\b', |
|
|
r'\b(image|photo|picture)\b.*\b(shows|contains|attached)\b', |
|
|
r'\bcode\b.*\b(attached|file|script)\b', |
|
|
r'\bspreadsheet\b', |
|
|
r'\b\.py\b|\b\.csv\b|\b\.xlsx\b|\b\.png\b|\b\.jpg\b' |
|
|
] |
|
|
|
|
|
if any(re.search(pattern, question_lower) for pattern in file_patterns): |
|
|
return { |
|
|
'type': 'file_processing', |
|
|
'confidence': 0.9, |
|
|
'subtype': 'file_analysis', |
|
|
'reasoning': 'File processing indicators found' |
|
|
} |
|
|
|
|
|
|
|
|
web_research_patterns = [ |
|
|
r'\bwikipedia\b.*\barticle\b', |
|
|
r'\bfeatured article\b', |
|
|
r'\bpromoted\b.*\b(in|during)\b.*\b(19|20)\d{2}\b', |
|
|
r'\bnominated\b.*\bby\b', |
|
|
r'\byoutube\b.*\bvideo\b', |
|
|
r'\bwatch\?v=\b', |
|
|
r'\bhttps?://\b', |
|
|
r'\bwebsite\b|\burl\b' |
|
|
] |
|
|
|
|
|
if any(re.search(pattern, question_lower) for pattern in web_research_patterns): |
|
|
return { |
|
|
'type': 'web_research', |
|
|
'confidence': 0.8, |
|
|
'subtype': 'specific_lookup', |
|
|
'reasoning': 'Web-specific content indicators found' |
|
|
} |
|
|
|
|
|
|
|
|
reasoning_patterns = [ |
|
|
r'\banalyze\b|\banalysis\b', |
|
|
r'\bcompare\b|\bcomparison\b', |
|
|
r'\bexplain\b|\bexplanation\b', |
|
|
r'\bwhy\b.*\b(is|are|was|were|do|does|did)\b', |
|
|
r'\bhow\b.*\b(does|do|did|can|could|would)\b', |
|
|
r'\bwhat.*difference\b', |
|
|
r'\bwhat.*relationship\b' |
|
|
] |
|
|
|
|
|
if any(re.search(pattern, question_lower) for pattern in reasoning_patterns): |
|
|
return { |
|
|
'type': 'reasoning', |
|
|
'confidence': 0.7, |
|
|
'subtype': 'analytical_reasoning', |
|
|
'reasoning': 'Reasoning/analysis patterns detected' |
|
|
} |
|
|
|
|
|
|
|
|
factual_patterns = [ |
|
|
r'\bwho\b.*\b(is|was|are|were)\b', |
|
|
r'\bwhat\b.*\b(is|was|are|were)\b', |
|
|
r'\bwhen\b.*\b(did|was|were|is|are)\b', |
|
|
r'\bwhere\b.*\b(is|was|are|were)\b', |
|
|
r'\bwhich\b.*\b(is|was|are|were)\b' |
|
|
] |
|
|
|
|
|
if any(re.search(pattern, question_lower) for pattern in factual_patterns): |
|
|
return { |
|
|
'type': 'factual_lookup', |
|
|
'confidence': 0.6, |
|
|
'subtype': 'general_factual', |
|
|
'reasoning': 'General factual question patterns' |
|
|
} |
|
|
|
|
|
|
|
|
return { |
|
|
'type': 'general', |
|
|
'confidence': 0.4, |
|
|
'subtype': 'unclassified', |
|
|
'reasoning': 'No specific patterns matched' |
|
|
} |
|
|
|
|
|
def _classify_with_llm(self, question: str) -> Dict[str, Any]: |
|
|
"""LLM-based classification for complex questions""" |
|
|
|
|
|
classification_prompt = f""" |
|
|
Analyze this question and classify it into one of these categories: |
|
|
|
|
|
Categories: |
|
|
- mathematical: Questions asking for counts, calculations, quantities |
|
|
- text_manipulation: Questions involving text reversal, encoding, word puzzles |
|
|
- file_processing: Questions about attached files, images, code, data |
|
|
- web_research: Questions requiring web search, Wikipedia lookup, current information |
|
|
- reasoning: Questions requiring analysis, comparison, logical deduction |
|
|
- factual_lookup: Simple fact-based questions about people, places, events |
|
|
|
|
|
Question: {question} |
|
|
|
|
|
Respond with just the category name and a brief reason (max 10 words). |
|
|
Format: category_name: reason |
|
|
|
|
|
Classification:""" |
|
|
|
|
|
try: |
|
|
llm_result = self.llm_client.generate( |
|
|
classification_prompt, |
|
|
tier=ModelTier.ROUTER, |
|
|
max_tokens=50 |
|
|
) |
|
|
|
|
|
if llm_result.success: |
|
|
response = llm_result.response.strip().lower() |
|
|
|
|
|
|
|
|
if ':' in response: |
|
|
category, reason = response.split(':', 1) |
|
|
category = category.strip() |
|
|
reason = reason.strip() |
|
|
else: |
|
|
category = response.split()[0] if response.split() else 'general' |
|
|
reason = 'llm classification' |
|
|
|
|
|
|
|
|
valid_categories = ['mathematical', 'text_manipulation', 'file_processing', 'web_research', 'reasoning', 'factual_lookup'] |
|
|
if category not in valid_categories: |
|
|
category = 'general' |
|
|
|
|
|
return { |
|
|
'type': category, |
|
|
'confidence': 0.7, |
|
|
'reasoning': f'LLM: {reason}' |
|
|
} |
|
|
else: |
|
|
return { |
|
|
'type': 'general', |
|
|
'confidence': 0.3, |
|
|
'reasoning': 'LLM classification failed' |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"LLM classification failed: {e}") |
|
|
return { |
|
|
'type': 'general', |
|
|
'confidence': 0.3, |
|
|
'reasoning': 'LLM classification error' |
|
|
} |
|
|
|
|
|
def _select_agents(self, question_types: List[QuestionType], primary_type: QuestionType, question: str) -> List[AgentRole]: |
|
|
"""Select agents based on combined classification""" |
|
|
|
|
|
agents = [] |
|
|
|
|
|
|
|
|
primary_agent_map = { |
|
|
QuestionType.MATHEMATICAL: AgentRole.REASONING_AGENT, |
|
|
QuestionType.TEXT_MANIPULATION: AgentRole.REASONING_AGENT, |
|
|
QuestionType.WEB_RESEARCH: AgentRole.WEB_RESEARCHER, |
|
|
QuestionType.FILE_PROCESSING: AgentRole.FILE_PROCESSOR, |
|
|
QuestionType.REASONING: AgentRole.REASONING_AGENT, |
|
|
QuestionType.CODE_EXECUTION: AgentRole.CODE_EXECUTOR |
|
|
} |
|
|
|
|
|
primary_agent = primary_agent_map.get(primary_type, AgentRole.WEB_RESEARCHER) |
|
|
if primary_agent not in agents: |
|
|
agents.append(primary_agent) |
|
|
|
|
|
|
|
|
for qtype in question_types: |
|
|
if qtype != primary_type: |
|
|
secondary_agent = primary_agent_map.get(qtype) |
|
|
if secondary_agent and secondary_agent not in agents: |
|
|
agents.append(secondary_agent) |
|
|
|
|
|
|
|
|
if AgentRole.SYNTHESIZER not in agents: |
|
|
agents.append(AgentRole.SYNTHESIZER) |
|
|
|
|
|
return agents |
|
|
|
|
|
def _combine_classifications(self, pattern_result: Dict[str, Any], llm_result: Dict[str, Any], question: str) -> Dict[str, Any]: |
|
|
"""Combine pattern and LLM classifications for final decision""" |
|
|
|
|
|
pattern_type = pattern_result['type'] |
|
|
pattern_confidence = pattern_result['confidence'] |
|
|
llm_type = llm_result['type'] |
|
|
llm_confidence = llm_result['confidence'] |
|
|
|
|
|
|
|
|
if pattern_confidence >= 0.8: |
|
|
final_type = pattern_type |
|
|
final_confidence = pattern_confidence |
|
|
reasoning = f"High confidence pattern match: {pattern_result['reasoning']}" |
|
|
|
|
|
|
|
|
elif pattern_type == llm_type: |
|
|
final_type = pattern_type |
|
|
final_confidence = min(0.95, (pattern_confidence + llm_confidence) / 2 + 0.1) |
|
|
reasoning = f"Pattern and LLM agree: {pattern_type}" |
|
|
|
|
|
|
|
|
elif pattern_confidence > llm_confidence: |
|
|
final_type = pattern_type |
|
|
final_confidence = pattern_confidence * 0.9 |
|
|
reasoning = f"Pattern-based: {pattern_result['reasoning']}" |
|
|
else: |
|
|
final_type = llm_type |
|
|
final_confidence = llm_confidence * 0.9 |
|
|
reasoning = f"LLM-based: {llm_result['reasoning']}" |
|
|
|
|
|
|
|
|
type_mapping = { |
|
|
'mathematical': QuestionType.MATHEMATICAL, |
|
|
'text_manipulation': QuestionType.TEXT_MANIPULATION, |
|
|
'file_processing': QuestionType.FILE_PROCESSING, |
|
|
'web_research': QuestionType.WEB_RESEARCH, |
|
|
'reasoning': QuestionType.REASONING, |
|
|
'factual_lookup': QuestionType.WEB_RESEARCH, |
|
|
'general': QuestionType.UNKNOWN |
|
|
} |
|
|
|
|
|
question_type = type_mapping.get(final_type, QuestionType.UNKNOWN) |
|
|
|
|
|
return { |
|
|
'type': final_type, |
|
|
'question_type': question_type, |
|
|
'confidence': final_confidence, |
|
|
'reasoning': reasoning, |
|
|
'pattern_result': pattern_result, |
|
|
'llm_result': llm_result |
|
|
} |