| | |
| | import uuid |
| | import logging |
| | import time |
| | import asyncio |
| | from datetime import datetime |
| | from typing import List, Dict, Optional |
| | from concurrent.futures import ThreadPoolExecutor |
| | import sys |
| | import os |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | |
| | current_dir = os.path.dirname(os.path.abspath(__file__)) |
| | parent_dir = os.path.dirname(current_dir) |
| | sys.path.insert(0, parent_dir) |
| | sys.path.insert(0, current_dir) |
| |
|
| | try: |
| | from safety_threshold_matrix import should_trigger_user_choice |
| | from safety_user_choice import create_safety_choice_prompt, process_safety_choice |
| | from safety_choice_orchestrator import SafetyChoiceOrchestrator |
| | SAFETY_CHOICE_AVAILABLE = True |
| | logger.info("Safety choice modules loaded successfully") |
| | except ImportError as e: |
| | logger.warning(f"Safety choice modules not available: {e}") |
| | SAFETY_CHOICE_AVAILABLE = False |
| |
|
| | class MVPOrchestrator: |
| | def __init__(self, llm_router, context_manager, agents): |
| | self.llm_router = llm_router |
| | self.context_manager = context_manager |
| | self.agents = agents |
| | self.execution_trace = [] |
| | |
| | self._topic_cache = {} |
| | self._topic_cache_max_size = 100 |
| | |
| | |
| | self.safety_thresholds = { |
| | "toxicity_or_harmful_language": 0.3, |
| | "potential_biases_or_stereotypes": 0.05, |
| | "privacy_or_security_concerns": 0.2, |
| | "controversial_or_sensitive_topics": 0.3 |
| | } |
| | self.max_revision_attempts = 2 |
| | self.revision_timeout = 30 |
| | |
| | |
| | self.awaiting_safety_response = {} |
| | self._pending_choices = {} |
| | |
| | |
| | self._current_user_id = {} |
| | |
| | |
| | self._context_cache = {} |
| | |
| | |
| | self.recent_queries = [] |
| | self.max_recent_queries = 50 |
| | |
| | |
| | self.agent_call_count = 0 |
| | self.agent_call_history = [] |
| | self.max_agent_history = 50 |
| | self.response_metrics_history = [] |
| | self.metrics_history_max_size = 100 |
| | |
| | |
| | self.context_classifier = None |
| | self._classifier_initialized = False |
| | |
| | logger.info("MVPOrchestrator initialized with safety revision thresholds") |
| | |
| | def set_user_id(self, session_id: str, user_id: str): |
| | """Set user_id with loop prevention""" |
| | |
| | old_user_id = self._current_user_id.get(session_id) |
| | |
| | if old_user_id != user_id: |
| | self._current_user_id[session_id] = user_id |
| | logger.info(f"Set user_id={user_id} for session {session_id} (was: {old_user_id})") |
| | |
| | |
| | cache_key = f"context_{session_id}" |
| | if cache_key in self._context_cache: |
| | del self._context_cache[cache_key] |
| | logger.info(f"Cleared context cache for session {session_id} due to user change") |
| | else: |
| | self._current_user_id[session_id] = user_id |
| | |
| | def _get_user_id_for_session(self, session_id: str) -> str: |
| | """Get user_id without triggering context loops""" |
| | |
| | if hasattr(self, '_current_user_id') and session_id in self._current_user_id: |
| | return self._current_user_id[session_id] |
| | |
| | |
| | return "Test_Any" |
| | |
| | async def _get_or_create_context(self, session_id: str, user_input: str, user_id: str) -> dict: |
| | """Get context with loop prevention and caching""" |
| | |
| | cache_key = f"context_{session_id}" |
| | current_time = time.time() |
| | |
| | if hasattr(self, '_context_cache'): |
| | cached = self._context_cache.get(cache_key) |
| | if cached and (current_time - cached['timestamp']) < 5: |
| | logger.info(f"Using cached context for session {session_id}") |
| | return cached['context'] |
| | |
| | |
| | context = await self.context_manager.manage_context(session_id, user_input, user_id=user_id) |
| | |
| | |
| | if not hasattr(self, '_context_cache'): |
| | self._context_cache = {} |
| | |
| | self._context_cache[cache_key] = { |
| | 'context': context, |
| | 'timestamp': current_time |
| | } |
| | |
| | |
| | if len(self._context_cache) > 100: |
| | |
| | sorted_items = sorted(self._context_cache.items(), key=lambda x: x[1]['timestamp']) |
| | self._context_cache = dict(sorted_items[-50:]) |
| | |
| | return context |
| | |
| | async def process_request(self, session_id: str, user_input: str) -> dict: |
| | """ |
| | Main orchestration flow with loop prevention |
| | """ |
| | logger.info(f"Processing request for session {session_id}") |
| | logger.info(f"User input: {user_input[:100]}") |
| | |
| | |
| | user_input_upper = user_input.strip().upper() |
| | is_binary_response = user_input_upper in ['YES', 'NO', 'APPLY', 'KEEP', 'Y', 'N'] |
| | |
| | |
| | if is_binary_response and self.awaiting_safety_response.get(session_id, False): |
| | logger.info(f"Binary safety response detected ({user_input_upper}) - bypassing recursive safety check") |
| | |
| | |
| | self.awaiting_safety_response[session_id] = False |
| | |
| | |
| | if hasattr(self, '_pending_choices'): |
| | self._pending_choices.pop(session_id, None) |
| | |
| | |
| | return { |
| | 'is_safety_response': True, |
| | 'response': user_input_upper, |
| | 'requires_user_choice': False, |
| | 'skip_safety_check': True, |
| | 'final_response': f"Choice '{user_input_upper}' has been applied.", |
| | 'bypass_reason': 'binary_safety_response' |
| | } |
| | |
| | |
| | self.execution_trace = [] |
| | start_time = time.time() |
| | |
| | |
| | reasoning_chain = { |
| | "chain_of_thought": {}, |
| | "alternative_paths": [], |
| | "uncertainty_areas": [], |
| | "evidence_sources": [], |
| | "confidence_calibration": {} |
| | } |
| | |
| | try: |
| | |
| | |
| | similar_response = self.check_query_similarity(user_input, threshold=0.95) |
| | if similar_response: |
| | logger.info(f"Similar/duplicate query detected, using cached response") |
| | |
| | metrics_start = time.time() |
| | self.track_response_metrics(metrics_start, similar_response) |
| | return similar_response |
| | |
| | |
| | interaction_id = self._generate_interaction_id(session_id) |
| | logger.info(f"Generated interaction ID: {interaction_id}") |
| | |
| | |
| | logger.info("Step 2: Managing context with loop prevention...") |
| | |
| | |
| | user_id = self._get_user_id_for_session(session_id) |
| | |
| | |
| | base_context = await self._get_or_create_context(session_id, user_input, user_id) |
| | |
| | |
| | context_mode = 'fresh' |
| | try: |
| | if hasattr(self.context_manager, 'get_context_mode'): |
| | context_mode = self.context_manager.get_context_mode(session_id) |
| | except Exception as e: |
| | logger.warning(f"Error getting context mode: {e}, using default 'fresh'") |
| | |
| | |
| | relevance_classification = None |
| | if context_mode == 'relevant': |
| | try: |
| | logger.info("Relevant context mode: Classifying and summarizing relevant sessions...") |
| | |
| | |
| | if not self._classifier_initialized: |
| | try: |
| | from src.context_relevance_classifier import ContextRelevanceClassifier |
| | self.context_classifier = ContextRelevanceClassifier(self.llm_router) |
| | self._classifier_initialized = True |
| | logger.info("Context relevance classifier initialized") |
| | except ImportError as e: |
| | logger.warning(f"Context relevance classifier not available: {e}") |
| | self._classifier_initialized = True |
| | |
| | |
| | if self.context_classifier: |
| | all_session_contexts = [] |
| | try: |
| | if hasattr(self.context_manager, 'get_all_user_sessions'): |
| | all_session_contexts = await self.context_manager.get_all_user_sessions(user_id) |
| | else: |
| | |
| | all_session_contexts = await self._get_all_user_sessions(user_id) |
| | except Exception as e: |
| | logger.error(f"Error fetching user sessions: {e}", exc_info=True) |
| | all_session_contexts = [] |
| | |
| | if all_session_contexts: |
| | |
| | relevance_classification = await self.context_classifier.classify_and_summarize_relevant_contexts( |
| | current_input=user_input, |
| | session_contexts=all_session_contexts, |
| | user_id=user_id |
| | ) |
| | |
| | logger.info( |
| | f"Relevance classification complete: " |
| | f"{len(relevance_classification.get('relevant_summaries', []))} sessions summarized, " |
| | f"topic: '{relevance_classification.get('topic', 'unknown')}', " |
| | f"time: {relevance_classification.get('processing_time', 0):.2f}s" |
| | ) |
| | else: |
| | logger.info("No session contexts available for relevance classification") |
| | else: |
| | logger.debug("Context classifier not available, skipping relevance classification") |
| | |
| | except Exception as e: |
| | logger.error(f"Error in relevance classification: {e}", exc_info=True) |
| | |
| | relevance_classification = None |
| | |
| | |
| | try: |
| | context = self.context_manager._optimize_context( |
| | base_context, |
| | relevance_classification=relevance_classification |
| | ) |
| | except Exception as e: |
| | logger.error(f"Error optimizing context: {e}", exc_info=True) |
| | |
| | context = base_context |
| | |
| | interaction_contexts_count = len(context.get('interaction_contexts', [])) |
| | logger.info(f"Context retrieved: {interaction_contexts_count} interaction contexts, mode: {context_mode}") |
| | |
| | |
| | user_context = context.get('user_context', '') |
| | has_user_context = bool(user_context) |
| | |
| | |
| | main_topic = await self._extract_main_topic(user_input, context) |
| | topic_continuity = await self._analyze_topic_continuity(context, user_input) |
| | query_keywords = await self._extract_keywords(user_input) |
| | |
| | reasoning_chain["chain_of_thought"]["step_1"] = { |
| | "hypothesis": f"User is asking about: '{main_topic}'", |
| | "evidence": [ |
| | f"Previous interaction contexts: {interaction_contexts_count}", |
| | f"User context available: {has_user_context}", |
| | f"Session duration: {self._calculate_session_duration(context)}", |
| | f"Topic continuity: {topic_continuity}", |
| | f"Query keywords: {query_keywords}" |
| | ], |
| | "confidence": 0.85, |
| | "reasoning": f"Context analysis shows user is focused on {main_topic} with {interaction_contexts_count} previous interaction contexts and {'existing' if has_user_context else 'new'} user context" |
| | } |
| | |
| | |
| | |
| | use_parallel = getattr(self, '_parallel_processing_enabled', True) |
| | |
| | if use_parallel: |
| | logger.info("Step 3: Processing intent, skills, and safety in parallel...") |
| | parallel_results = await self.process_request_parallel(session_id, user_input, context) |
| | intent_result = parallel_results.get('intent', {}) |
| | skills_result = parallel_results.get('skills', {}) |
| | |
| | else: |
| | |
| | logger.info("Step 3: Recognizing intent...") |
| | self.execution_trace.append({ |
| | "step": "intent_recognition", |
| | "agent": "intent_recognition", |
| | "status": "executing" |
| | }) |
| | intent_result = await self.agents['intent_recognition'].execute( |
| | user_input=user_input, |
| | context=context |
| | ) |
| | self.execution_trace[-1].update({ |
| | "status": "completed", |
| | "result": {"primary_intent": intent_result.get('primary_intent', 'unknown')} |
| | }) |
| | logger.info(f"Intent detected: {intent_result.get('primary_intent', 'unknown')}") |
| | |
| | |
| | logger.info("Step 3.5: Identifying relevant skills...") |
| | self.execution_trace.append({ |
| | "step": "skills_identification", |
| | "agent": "skills_identification", |
| | "status": "executing" |
| | }) |
| | skills_result = await self.agents['skills_identification'].execute( |
| | user_input=user_input, |
| | context=context |
| | ) |
| | self.execution_trace[-1].update({ |
| | "status": "completed", |
| | "result": {"skills_count": len(skills_result.get('identified_skills', []))} |
| | }) |
| | logger.info(f"Skills identified: {len(skills_result.get('identified_skills', []))} skills") |
| | |
| | |
| | reasoning_chain["chain_of_thought"]["step_2_5"] = { |
| | "hypothesis": f"User input relates to {len(skills_result.get('identified_skills', []))} expert skills", |
| | "evidence": [ |
| | f"Market analysis: {skills_result.get('market_analysis', {}).get('overall_analysis', 'N/A')}", |
| | f"Skill classification: {skills_result.get('skill_classification', {}).get('classification_reasoning', 'N/A')}", |
| | f"High-probability skills: {[s.get('skill', '') for s in skills_result.get('identified_skills', [])[:3]]}", |
| | f"Confidence score: {skills_result.get('confidence_score', 0.5)}" |
| | ], |
| | "confidence": skills_result.get('confidence_score', 0.5), |
| | "reasoning": f"Skills identification completed for topic '{main_topic}' with {len(skills_result.get('identified_skills', []))} relevant skills" |
| | } |
| | |
| | |
| | reasoning_chain["chain_of_thought"]["step_2"] = { |
| | "hypothesis": f"User intent is '{intent_result.get('primary_intent', 'unknown')}' for topic '{main_topic}'", |
| | "evidence": [ |
| | f"Pattern analysis: {self._extract_pattern_evidence(user_input)}", |
| | f"Confidence scores: {intent_result.get('confidence_scores', {})}", |
| | f"Secondary intents: {intent_result.get('secondary_intents', [])}", |
| | f"Query complexity: {self._assess_query_complexity(user_input)}" |
| | ], |
| | "confidence": intent_result.get('confidence_scores', {}).get(intent_result.get('primary_intent', 'unknown'), 0.7), |
| | "reasoning": f"Intent '{intent_result.get('primary_intent', 'unknown')}' detected for {main_topic} based on linguistic patterns and context" |
| | } |
| | |
| | |
| | logger.info("Step 4: Creating execution plan...") |
| | execution_plan = await self._create_execution_plan(intent_result, context) |
| | |
| | |
| | reasoning_chain["chain_of_thought"]["step_3"] = { |
| | "hypothesis": f"Optimal approach for '{intent_result.get('primary_intent', 'unknown')}' intent on '{main_topic}'", |
| | "evidence": [ |
| | f"Intent complexity: {self._assess_intent_complexity(intent_result)}", |
| | f"Required agents: {execution_plan.get('agents_to_execute', [])}", |
| | f"Execution strategy: {execution_plan.get('execution_order', 'sequential')}", |
| | f"Response scope: {self._determine_response_scope(user_input)}" |
| | ], |
| | "confidence": 0.80, |
| | "reasoning": f"Agent selection optimized for {intent_result.get('primary_intent', 'unknown')} intent regarding {main_topic}" |
| | } |
| | |
| | |
| | logger.info("Step 5: Executing agents...") |
| | agent_results = await self._execute_agents(execution_plan, user_input, context) |
| | logger.info(f"Agent execution complete: {len(agent_results)} results") |
| | |
| | |
| | logger.info("Step 6: Synthesizing response...") |
| | self.execution_trace.append({ |
| | "step": "response_synthesis", |
| | "agent": "response_synthesis", |
| | "status": "executing" |
| | }) |
| | final_response = await self.agents['response_synthesis'].execute( |
| | agent_outputs=agent_results, |
| | user_input=user_input, |
| | context=context, |
| | skills_result=skills_result |
| | ) |
| | self.execution_trace[-1].update({ |
| | "status": "completed", |
| | "result": {"synthesis_method": final_response.get('synthesis_method', 'unknown')} |
| | }) |
| | |
| | |
| | reasoning_chain["chain_of_thought"]["step_4"] = { |
| | "hypothesis": f"Response synthesis for '{main_topic}' using '{final_response.get('synthesis_method', 'unknown')}' method", |
| | "evidence": [ |
| | f"Synthesis quality: {final_response.get('coherence_score', 0.7)}", |
| | f"Source integration: {len(final_response.get('source_references', []))} sources", |
| | f"Response length: {len(str(final_response.get('final_response', '')))} characters", |
| | f"Content relevance: {self._assess_content_relevance(user_input, final_response)}" |
| | ], |
| | "confidence": final_response.get('coherence_score', 0.7), |
| | "reasoning": f"Multi-source synthesis for {main_topic} using {final_response.get('synthesis_method', 'unknown')} approach" |
| | } |
| | |
| | |
| | logger.info("Step 7: Safety check...") |
| | self.execution_trace.append({ |
| | "step": "safety_check", |
| | "agent": "safety_check", |
| | "status": "executing" |
| | }) |
| | safety_checked = await self.agents['safety_check'].execute( |
| | response=final_response, |
| | context=context |
| | ) |
| | self.execution_trace[-1].update({ |
| | "status": "completed", |
| | "result": {"warnings": safety_checked.get('warnings', [])} |
| | }) |
| | |
| | |
| | |
| | intent_class = intent_result.get('primary_intent', 'casual_conversation') |
| | response_content = final_response.get('final_response', '') or str(final_response.get('response', '')) |
| | |
| | |
| | if SAFETY_CHOICE_AVAILABLE: |
| | safety_analysis = safety_checked.get('safety_analysis', {}) |
| | |
| | |
| | if should_trigger_user_choice(safety_analysis, intent_class): |
| | logger.info(f"Safety concerns detected for intent '{intent_class}' - appending warnings to response") |
| | |
| | |
| | from safety_threshold_matrix import format_safety_concerns |
| | concerns_text = format_safety_concerns(safety_analysis, intent_class) |
| | |
| | if concerns_text: |
| | |
| | warning_section = f""" |
| | |
| | --- |
| | |
| | ## ⚠️ Safety Advisory |
| | |
| | This response has been flagged for potential safety concerns: |
| | |
| | {concerns_text} |
| | |
| | **Please review this content carefully and consider:** |
| | - The potential impact on yourself and others |
| | - Whether this content aligns with your intended use |
| | - If additional verification or expert consultation is needed |
| | |
| | *This advisory is provided for transparency and user awareness. The response has not been modified.* |
| | """ |
| | |
| | response_content = response_content + warning_section |
| | |
| | |
| | final_response['final_response'] = response_content |
| | if 'response' in final_response: |
| | final_response['response'] = response_content |
| | |
| | |
| | |
| | safety_checked['safety_checked_response'] = response_content |
| | safety_checked['original_response'] = response_content |
| | |
| | logger.info("Safety warnings appended to response - no user choice prompted (feature paused)") |
| | |
| | |
| | reasoning_chain["chain_of_thought"]["step_5"] = { |
| | "hypothesis": f"Safety validation for response about '{main_topic}'", |
| | "evidence": [ |
| | f"Safety score: {safety_checked.get('safety_analysis', {}).get('overall_safety_score', 0.8)}", |
| | f"Warnings generated: {len(safety_checked.get('warnings', []))}", |
| | f"Analysis method: {safety_checked.get('safety_analysis', {}).get('analysis_method', 'unknown')}", |
| | f"Content appropriateness: {self._assess_content_appropriateness(user_input, safety_checked)}" |
| | ], |
| | "confidence": safety_checked.get('safety_analysis', {}).get('overall_safety_score', 0.8), |
| | "reasoning": f"Safety analysis for {main_topic} content with non-blocking warning system" |
| | } |
| | |
| | |
| | |
| | if 'final_response' in final_response: |
| | final_response['final_response'] = response_content |
| | if 'response' in final_response: |
| | final_response['response'] = response_content |
| | |
| | |
| | reasoning_chain["alternative_paths"] = self._generate_alternative_paths(intent_result, user_input, main_topic) |
| | reasoning_chain["uncertainty_areas"] = self._identify_uncertainty_areas(intent_result, final_response, safety_checked) |
| | reasoning_chain["evidence_sources"] = self._extract_evidence_sources(intent_result, final_response, context) |
| | reasoning_chain["confidence_calibration"] = self._calibrate_confidence_scores(reasoning_chain) |
| | |
| | processing_time = time.time() - start_time |
| | |
| | |
| | |
| | merged_response = { |
| | 'final_response': response_content, |
| | 'response': response_content, |
| | 'safety_checked_response': response_content, |
| | 'original_response': response_content, |
| | 'warnings': safety_checked.get('warnings', []) |
| | } |
| | |
| | |
| | result = self._format_final_output(merged_response, interaction_id, { |
| | 'intent': intent_result.get('primary_intent', 'unknown'), |
| | 'execution_plan': execution_plan, |
| | 'processing_steps': [ |
| | 'Context management', |
| | 'Intent recognition', |
| | 'Skills identification', |
| | 'Execution planning', |
| | 'Agent execution', |
| | 'Response synthesis', |
| | 'Safety check' |
| | ], |
| | 'processing_time': processing_time, |
| | 'agents_used': list(self.agents.keys()), |
| | 'intent_result': intent_result, |
| | 'skills_result': skills_result, |
| | 'synthesis_result': final_response, |
| | 'safety_result': safety_checked, |
| | 'reasoning_chain': reasoning_chain |
| | }) |
| | |
| | |
| | response_text = str(result.get('response', '')) |
| | user_id = getattr(self, '_current_user_id', {}).get(session_id, "Test_Any") |
| | if response_text: |
| | self.context_manager._update_context(context, user_input, response_text, user_id=user_id) |
| | |
| | |
| | interaction_id = result.get('interaction_id', f"{session_id}_{int(time.time())}") |
| | try: |
| | await self.context_manager.generate_interaction_context( |
| | interaction_id=interaction_id, |
| | session_id=session_id, |
| | user_input=user_input, |
| | system_response=response_text, |
| | user_id=user_id |
| | ) |
| | |
| | |
| | |
| | |
| | try: |
| | await self.context_manager.generate_session_context(session_id, user_id) |
| | |
| | except Exception as e: |
| | logger.error(f"Error generating session context: {e}", exc_info=True) |
| | |
| | |
| | if hasattr(self, '_context_cache'): |
| | orchestrator_cache_key = f"context_{session_id}" |
| | if orchestrator_cache_key in self._context_cache: |
| | del self._context_cache[orchestrator_cache_key] |
| | logger.debug(f"Orchestrator cache cleared for session {session_id} to refresh with updated contexts") |
| | except Exception as e: |
| | logger.error(f"Error generating interaction context: {e}", exc_info=True) |
| | |
| | |
| | result = self.track_response_metrics(start_time, result) |
| | |
| | |
| | if 'performance' not in result: |
| | result['performance'] = { |
| | "processing_time": round((time.time() - start_time) * 1000, 2), |
| | "tokens_used": 0, |
| | "agents_used": 0, |
| | "confidence_score": 0, |
| | "agent_contributions": [], |
| | "safety_score": 80, |
| | "latency_seconds": round(time.time() - start_time, 3), |
| | "timestamp": datetime.now().isoformat() |
| | } |
| | |
| | |
| | self.recent_queries.append({ |
| | 'query': user_input, |
| | 'response': result, |
| | 'timestamp': time.time() |
| | }) |
| | |
| | if len(self.recent_queries) > self.max_recent_queries: |
| | self.recent_queries = self.recent_queries[-self.max_recent_queries:] |
| | |
| | logger.info(f"Request processing complete. Response length: {len(response_text)}") |
| | return result |
| | |
| | except Exception as e: |
| | logger.error(f"Error in process_request: {e}", exc_info=True) |
| | processing_time = time.time() - start_time |
| | return { |
| | "response": f"Error processing request: {str(e)}", |
| | "error": str(e), |
| | "interaction_id": str(uuid.uuid4())[:8], |
| | "agent_trace": [], |
| | "timestamp": datetime.now().isoformat(), |
| | "metadata": { |
| | "agents_used": [], |
| | "processing_time": processing_time, |
| | "token_count": 0, |
| | "warnings": [] |
| | } |
| | } |
| | |
| | def _generate_interaction_id(self, session_id: str) -> str: |
| | """ |
| | Generate unique interaction identifier |
| | """ |
| | timestamp = datetime.now().isoformat() |
| | unique_id = str(uuid.uuid4())[:8] |
| | return f"{session_id}_{unique_id}_{int(datetime.now().timestamp())}" |
| | |
| | async def _get_all_user_sessions(self, user_id: str) -> List[Dict]: |
| | """ |
| | Fetch all session contexts for relevance classification |
| | Fallback method if context_manager doesn't have it |
| | |
| | Args: |
| | user_id: User identifier |
| | |
| | Returns: |
| | List of session context dictionaries |
| | """ |
| | try: |
| | |
| | if hasattr(self.context_manager, 'get_all_user_sessions'): |
| | return await self.context_manager.get_all_user_sessions(user_id) |
| | |
| | |
| | import sqlite3 |
| | db_path = getattr(self.context_manager, 'db_path', 'sessions.db') |
| | |
| | conn = sqlite3.connect(db_path) |
| | cursor = conn.cursor() |
| | |
| | cursor.execute(""" |
| | SELECT DISTINCT |
| | sc.session_id, |
| | sc.session_summary, |
| | sc.created_at, |
| | (SELECT GROUP_CONCAT(ic.interaction_summary, ' ||| ') |
| | FROM interaction_contexts ic |
| | WHERE ic.session_id = sc.session_id |
| | ORDER BY ic.created_at DESC |
| | LIMIT 10) as recent_interactions |
| | FROM session_contexts sc |
| | JOIN sessions s ON sc.session_id = s.session_id |
| | WHERE s.user_id = ? |
| | ORDER BY sc.created_at DESC |
| | LIMIT 50 |
| | """, (user_id,)) |
| | |
| | sessions = [] |
| | for row in cursor.fetchall(): |
| | session_id, session_summary, created_at, interactions_str = row |
| | |
| | interaction_list = [] |
| | if interactions_str: |
| | for summary in interactions_str.split(' ||| '): |
| | if summary.strip(): |
| | interaction_list.append({ |
| | 'summary': summary.strip(), |
| | 'timestamp': created_at |
| | }) |
| | |
| | sessions.append({ |
| | 'session_id': session_id, |
| | 'summary': session_summary or '', |
| | 'created_at': created_at, |
| | 'interaction_contexts': interaction_list |
| | }) |
| | |
| | conn.close() |
| | return sessions |
| | |
| | except Exception as e: |
| | logger.error(f"Error fetching user sessions: {e}", exc_info=True) |
| | return [] |
| | |
| | async def _create_execution_plan(self, intent_result: dict, context: dict) -> dict: |
| | """ |
| | Create execution plan based on intent recognition |
| | Maps intent types to specific execution tasks |
| | """ |
| | primary_intent = intent_result.get('primary_intent', 'casual_conversation') |
| | secondary_intents = intent_result.get('secondary_intents', []) |
| | confidence = intent_result.get('confidence_scores', {}).get(primary_intent, 0.7) |
| | |
| | |
| | intent_task_mapping = { |
| | "information_request": { |
| | "tasks": ["information_gathering", "content_research"], |
| | "execution_order": "sequential", |
| | "priority": "high" |
| | }, |
| | "task_execution": { |
| | "tasks": ["task_planning", "execution_strategy"], |
| | "execution_order": "sequential", |
| | "priority": "high" |
| | }, |
| | "creative_generation": { |
| | "tasks": ["creative_brainstorming", "content_ideation"], |
| | "execution_order": "parallel", |
| | "priority": "normal" |
| | }, |
| | "analysis_research": { |
| | "tasks": ["research_analysis", "data_collection", "pattern_identification"], |
| | "execution_order": "sequential", |
| | "priority": "high" |
| | }, |
| | "troubleshooting": { |
| | "tasks": ["problem_analysis", "solution_research"], |
| | "execution_order": "sequential", |
| | "priority": "high" |
| | }, |
| | "education_learning": { |
| | "tasks": ["curriculum_planning", "educational_content"], |
| | "execution_order": "sequential", |
| | "priority": "normal" |
| | }, |
| | "technical_support": { |
| | "tasks": ["technical_research", "guidance_generation"], |
| | "execution_order": "sequential", |
| | "priority": "high" |
| | }, |
| | "casual_conversation": { |
| | "tasks": ["context_enrichment"], |
| | "execution_order": "parallel", |
| | "priority": "low" |
| | } |
| | } |
| | |
| | |
| | plan = intent_task_mapping.get(primary_intent, { |
| | "tasks": ["general_research"], |
| | "execution_order": "parallel", |
| | "priority": "normal" |
| | }) |
| | |
| | |
| | if confidence > 0.7 and secondary_intents: |
| | for secondary_intent in secondary_intents[:2]: |
| | secondary_plan = intent_task_mapping.get(secondary_intent) |
| | if secondary_plan: |
| | |
| | existing_tasks = set(plan["tasks"]) |
| | for task in secondary_plan["tasks"]: |
| | if task not in existing_tasks: |
| | plan["tasks"].append(task) |
| | existing_tasks.add(task) |
| | |
| | logger.info(f"Execution plan created for intent '{primary_intent}': {len(plan['tasks'])} tasks, order={plan['execution_order']}") |
| | |
| | return { |
| | "agents_to_execute": plan["tasks"], |
| | "execution_order": plan["execution_order"], |
| | "priority": plan["priority"], |
| | "primary_intent": primary_intent, |
| | "secondary_intents": secondary_intents |
| | } |
| | |
| | async def _execute_agents(self, execution_plan: dict, user_input: str, context: dict) -> dict: |
| | """ |
| | Execute agents in parallel or sequential order based on plan |
| | Actually executes task-specific LLM calls based on intent |
| | """ |
| | tasks = execution_plan.get("agents_to_execute", []) |
| | execution_order = execution_plan.get("execution_order", "parallel") |
| | primary_intent = execution_plan.get("primary_intent", "casual_conversation") |
| | |
| | if not tasks: |
| | logger.warning("No tasks to execute in execution plan") |
| | return {} |
| | |
| | logger.info(f"Executing {len(tasks)} tasks in {execution_order} order for intent '{primary_intent}'") |
| | |
| | results = {} |
| | |
| | |
| | context_summary = self._build_context_summary(context) |
| | |
| | |
| | task_prompts = self._build_task_prompts(user_input, context_summary, primary_intent) |
| | |
| | if execution_order == "parallel": |
| | |
| | task_coroutines = [] |
| | for task in tasks: |
| | if task in task_prompts: |
| | coro = self._execute_single_task(task, task_prompts[task]) |
| | task_coroutines.append((task, coro)) |
| | else: |
| | logger.warning(f"No prompt template for task: {task}") |
| | |
| | |
| | if task_coroutines: |
| | task_results = await asyncio.gather( |
| | *[coro for _, coro in task_coroutines], |
| | return_exceptions=True |
| | ) |
| | |
| | |
| | for (task, _), result in zip(task_coroutines, task_results): |
| | if isinstance(result, Exception): |
| | logger.error(f"Task {task} failed: {result}") |
| | results[task] = {"error": str(result), "status": "failed"} |
| | else: |
| | results[task] = result |
| | logger.info(f"Task {task} completed: {len(str(result))} chars") |
| | else: |
| | |
| | previous_results = {} |
| | for task in tasks: |
| | if task in task_prompts: |
| | |
| | enhanced_prompt = task_prompts[task] |
| | if previous_results: |
| | enhanced_prompt += f"\n\nPrevious task results: {str(previous_results)}" |
| | |
| | try: |
| | result = await self._execute_single_task(task, enhanced_prompt) |
| | results[task] = result |
| | previous_results[task] = result |
| | logger.info(f"Task {task} completed: {len(str(result))} chars") |
| | except Exception as e: |
| | logger.error(f"Task {task} failed: {e}") |
| | results[task] = {"error": str(e), "status": "failed"} |
| | previous_results[task] = results[task] |
| | else: |
| | logger.warning(f"No prompt template for task: {task}") |
| | |
| | logger.info(f"Agent execution complete: {len(results)} results collected") |
| | return results |
| | |
| | def _build_context_summary(self, context: dict) -> str: |
| | """Build a concise summary of context for task execution (all from cache)""" |
| | summary_parts = [] |
| | |
| | |
| | session_context = context.get('session_context', {}) |
| | session_summary = session_context.get('summary', '') if isinstance(session_context, dict) else "" |
| | if session_summary: |
| | summary_parts.append(f"Session summary: {session_summary[:1500]}") |
| | |
| | |
| | interaction_contexts = context.get('interaction_contexts', []) |
| | if interaction_contexts: |
| | recent_summaries = [ic.get('summary', '') for ic in interaction_contexts[-3:]] |
| | if recent_summaries: |
| | summary_parts.append(f"Recent conversation topics: {', '.join(recent_summaries)}") |
| | |
| | |
| | user_context = context.get('user_context', '') |
| | if user_context: |
| | summary_parts.append(f"User background: {user_context[:200]}") |
| | |
| | return " | ".join(summary_parts) if summary_parts else "No prior context" |
| | |
| | async def process_agents_parallel(self, request: Dict) -> List: |
| | """ |
| | Step 1: Optimize Agent Chain - Process multiple agents in parallel |
| | |
| | Args: |
| | request: Dictionary containing request data with 'user_input' and 'context' |
| | |
| | Returns: |
| | List of agent results in order [intent_result, skills_result] |
| | """ |
| | user_input = request.get('user_input', '') |
| | context = request.get('context', {}) |
| | |
| | |
| | self.agent_call_count += 2 |
| | |
| | tasks = [ |
| | self.agents['intent_recognition'].execute( |
| | user_input=user_input, |
| | context=context |
| | ), |
| | self.agents['skills_identification'].execute( |
| | user_input=user_input, |
| | context=context |
| | ), |
| | ] |
| | |
| | try: |
| | results = await asyncio.gather(*tasks, return_exceptions=True) |
| | |
| | processed_results = [] |
| | for idx, result in enumerate(results): |
| | if isinstance(result, Exception): |
| | logger.error(f"Agent task {idx} failed: {result}") |
| | processed_results.append({}) |
| | else: |
| | processed_results.append(result) |
| | return processed_results |
| | except Exception as e: |
| | logger.error(f"Error in parallel agent processing: {e}", exc_info=True) |
| | return [{}, {}] |
| | |
| | async def process_request_parallel(self, session_id: str, user_input: str, context: Dict) -> Dict: |
| | """Process intent, skills, and safety in parallel with enhanced tracking""" |
| | |
| | |
| | agents_called = [] |
| | |
| | |
| | try: |
| | intent_task = self.agents['intent_recognition'].execute( |
| | user_input=user_input, |
| | context=context |
| | ) |
| | agents_called.append('Intent') |
| | |
| | skills_task = self.agents['skills_identification'].execute( |
| | user_input=user_input, |
| | context=context |
| | ) |
| | agents_called.append('Skills') |
| | |
| | |
| | safety_task = self.agents['safety_check'].execute( |
| | response=user_input, |
| | context=context |
| | ) |
| | agents_called.append('Safety') |
| | |
| | |
| | self.agent_call_count += len(agents_called) |
| | |
| | |
| | if len(self.agent_call_history) >= self.max_agent_history: |
| | self.agent_call_history = self.agent_call_history[-self.max_agent_history:] |
| | self.agent_call_history.append({ |
| | 'agents': agents_called, |
| | 'timestamp': time.time() |
| | }) |
| | |
| | |
| | results = await asyncio.gather( |
| | intent_task, |
| | skills_task, |
| | safety_task, |
| | return_exceptions=True |
| | ) |
| | |
| | |
| | intent_result = results[0] if not isinstance(results[0], Exception) else {} |
| | skills_result = results[1] if not isinstance(results[1], Exception) else {} |
| | safety_result = results[2] if not isinstance(results[2], Exception) else {} |
| | |
| | |
| | if isinstance(results[0], Exception): |
| | logger.error(f"Intent recognition error: {results[0]}") |
| | if isinstance(results[1], Exception): |
| | logger.error(f"Skills identification error: {results[1]}") |
| | if isinstance(results[2], Exception): |
| | logger.error(f"Safety check error: {results[2]}") |
| | |
| | return { |
| | 'intent': intent_result, |
| | 'skills': skills_result, |
| | 'safety_precheck': safety_result, |
| | 'agents_called': agents_called |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"Error in parallel processing: {e}", exc_info=True) |
| | |
| | return { |
| | 'intent': await self.agents['intent_recognition'].execute(user_input=user_input, context=context), |
| | 'skills': await self.agents['skills_identification'].execute(user_input=user_input, context=context), |
| | 'safety_precheck': {} |
| | } |
| | |
| | def _build_enhanced_context(self, session_id: str, prior_interactions: List[Dict]) -> Dict: |
| | """Build enhanced context with memory accumulation""" |
| | |
| | |
| | context = { |
| | 'session_memory': [], |
| | 'user_preferences': {}, |
| | 'interaction_patterns': {}, |
| | 'skills_used': set() |
| | } |
| | |
| | |
| | for idx, interaction in enumerate(prior_interactions): |
| | weight = 1.0 / (idx + 1) |
| | |
| | |
| | if 'skills' in interaction: |
| | for skill in interaction['skills']: |
| | if isinstance(skill, dict): |
| | context['skills_used'].add(skill.get('name', skill.get('skill', ''))) |
| | elif isinstance(skill, str): |
| | context['skills_used'].add(skill) |
| | |
| | |
| | if 'intent' in interaction: |
| | intent = interaction['intent'] |
| | if intent not in context['interaction_patterns']: |
| | context['interaction_patterns'][intent] = 0 |
| | context['interaction_patterns'][intent] += weight |
| | |
| | |
| | if idx < 3: |
| | context['session_memory'].append({ |
| | 'summary': interaction.get('summary', ''), |
| | 'timestamp': interaction.get('timestamp'), |
| | 'relevance': weight |
| | }) |
| | |
| | |
| | context['skills_used'] = list(context['skills_used']) |
| | |
| | return context |
| | |
| | def _build_task_prompts(self, user_input: str, context_summary: str, primary_intent: str) -> dict: |
| | """Build task-specific prompts for execution""" |
| | |
| | base_context = f"User Query: {user_input}\nContext: {context_summary}" |
| | |
| | prompts = { |
| | "information_gathering": f""" |
| | {base_context} |
| | |
| | Task: Gather comprehensive, accurate information relevant to the user's query. |
| | Focus on facts, definitions, explanations, and verified information. |
| | Structure the information clearly and cite key points. |
| | """, |
| | |
| | "content_research": f""" |
| | {base_context} |
| | |
| | Task: Research and compile detailed content about the topic. |
| | Include multiple perspectives, current information, and relevant examples. |
| | Organize findings logically with clear sections. |
| | """, |
| | |
| | "task_planning": f""" |
| | {base_context} |
| | |
| | Task: Create a detailed execution plan for the requested task. |
| | Break down into clear steps, identify requirements, and outline expected outcomes. |
| | Consider potential challenges and solutions. |
| | """, |
| | |
| | "execution_strategy": f""" |
| | {base_context} |
| | |
| | Task: Develop a strategic approach for task execution. |
| | Define methodology, best practices, and implementation considerations. |
| | Provide actionable guidance with clear priorities. |
| | """, |
| | |
| | "creative_brainstorming": f""" |
| | {base_context} |
| | |
| | Task: Generate creative ideas and approaches for content creation. |
| | Explore different angles, styles, and formats. |
| | Provide diverse creative options with implementation suggestions. |
| | """, |
| | |
| | "content_ideation": f""" |
| | {base_context} |
| | |
| | Task: Develop content concepts and detailed ideation. |
| | Create outlines, themes, and structural frameworks. |
| | Suggest variations and refinement paths. |
| | """, |
| | |
| | "research_analysis": f""" |
| | {base_context} |
| | |
| | Task: Conduct thorough research analysis on the topic. |
| | Identify key findings, trends, patterns, and insights. |
| | Analyze different perspectives and methodologies. |
| | """, |
| | |
| | "data_collection": f""" |
| | {base_context} |
| | |
| | Task: Collect and organize relevant data points and evidence. |
| | Gather statistics, examples, case studies, and supporting information. |
| | Structure data for easy analysis and reference. |
| | """, |
| | |
| | "pattern_identification": f""" |
| | {base_context} |
| | |
| | Task: Identify patterns, correlations, and significant relationships. |
| | Analyze trends, cause-effect relationships, and underlying structures. |
| | Provide insights based on pattern recognition. |
| | """, |
| | |
| | "problem_analysis": f""" |
| | {base_context} |
| | |
| | Task: Analyze the problem in detail. |
| | Identify root causes, contributing factors, and constraints. |
| | Break down the problem into components for systematic resolution. |
| | """, |
| | |
| | "solution_research": f""" |
| | {base_context} |
| | |
| | Task: Research and evaluate potential solutions. |
| | Compare approaches, assess pros/cons, and recommend best practices. |
| | Consider implementation feasibility and effectiveness. |
| | """, |
| | |
| | "curriculum_planning": f""" |
| | {base_context} |
| | |
| | Task: Design educational curriculum and learning path. |
| | Structure content progressively, define learning objectives, and suggest resources. |
| | Create a comprehensive learning framework. |
| | """, |
| | |
| | "educational_content": f""" |
| | {base_context} |
| | |
| | Task: Generate educational content with clear explanations. |
| | Use teaching methods, examples, analogies, and progressive complexity. |
| | Make content accessible and engaging for learning. |
| | """, |
| | |
| | "technical_research": f""" |
| | {base_context} |
| | |
| | Task: Research technical aspects and solutions. |
| | Gather technical documentation, best practices, and implementation details. |
| | Structure technical information clearly with practical guidance. |
| | """, |
| | |
| | "guidance_generation": f""" |
| | {base_context} |
| | |
| | Task: Generate step-by-step guidance and instructions. |
| | Create clear, actionable steps with explanations and troubleshooting tips. |
| | Ensure guidance is comprehensive and easy to follow. |
| | """, |
| | |
| | "context_enrichment": f""" |
| | {base_context} |
| | |
| | Task: Enrich the conversation with relevant context and insights. |
| | Add helpful background information, connections to previous topics, and engaging details. |
| | Enhance understanding and engagement. |
| | """, |
| | |
| | "general_research": f""" |
| | {base_context} |
| | |
| | Task: Conduct general research and information gathering. |
| | Compile relevant information, insights, and useful details about the topic. |
| | Organize findings for clear presentation. |
| | """ |
| | } |
| | |
| | return prompts |
| | |
| | async def _execute_single_task(self, task_name: str, prompt: str) -> dict: |
| | """Execute a single task using the LLM router""" |
| | try: |
| | logger.debug(f"Executing task: {task_name}") |
| | logger.debug(f"Task prompt length: {len(prompt)}") |
| | |
| | |
| | result = await self.llm_router.route_inference( |
| | task_type="general_reasoning", |
| | prompt=prompt, |
| | max_tokens=2000, |
| | temperature=0.7 |
| | ) |
| | |
| | if result: |
| | return { |
| | "task": task_name, |
| | "status": "completed", |
| | "content": result, |
| | "content_length": len(str(result)) |
| | } |
| | else: |
| | logger.warning(f"Task {task_name} returned empty result") |
| | return { |
| | "task": task_name, |
| | "status": "empty", |
| | "content": "", |
| | "content_length": 0 |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"Error executing task {task_name}: {e}", exc_info=True) |
| | return { |
| | "task": task_name, |
| | "status": "error", |
| | "error": str(e), |
| | "content": "" |
| | } |
| | |
| | def _format_final_output(self, response: dict, interaction_id: str, additional_metadata: dict = None) -> dict: |
| | """ |
| | Format final output with tracing and metadata |
| | """ |
| | |
| | response_text = ( |
| | response.get("final_response") or |
| | response.get("safety_checked_response") or |
| | response.get("original_response") or |
| | response.get("response") or |
| | str(response.get("result", "")) |
| | ) |
| | |
| | if not response_text: |
| | response_text = "I apologize, but I'm having trouble generating a response right now. Please try again." |
| | |
| | |
| | warnings = [] |
| | if "warnings" in response: |
| | warnings = response["warnings"] if isinstance(response["warnings"], list) else [] |
| | |
| | |
| | metadata = { |
| | "agents_used": response.get("agents_used", []), |
| | "processing_time": response.get("processing_time", 0), |
| | "token_count": response.get("token_count", 0), |
| | "warnings": warnings |
| | } |
| | |
| | |
| | if additional_metadata: |
| | metadata.update(additional_metadata) |
| | |
| | return { |
| | "interaction_id": interaction_id, |
| | "response": response_text, |
| | "final_response": response_text, |
| | "confidence_score": response.get("confidence_score", 0.7), |
| | "agent_trace": self.execution_trace if self.execution_trace else [ |
| | {"step": "complete", "agent": "orchestrator", "status": "completed"} |
| | ], |
| | "timestamp": datetime.now().isoformat(), |
| | "metadata": metadata |
| | } |
| | |
| | async def handle_user_safety_decision(self, choice_id: str, user_decision: bool, session_id: str = None) -> dict: |
| | """ |
| | Handle user's safety decision and complete processing |
| | |
| | Args: |
| | choice_id: The choice identifier from the prompt |
| | user_decision: True for revision, False for original with warnings |
| | session_id: Session identifier |
| | |
| | Returns: |
| | dict: Final response based on user choice |
| | """ |
| | try: |
| | |
| | if session_id: |
| | self.awaiting_safety_response[session_id] = False |
| | |
| | if not SAFETY_CHOICE_AVAILABLE: |
| | logger.warning("Safety choice modules not available") |
| | return {'error': 'Safety choice system not available'} |
| | |
| | choice_result = process_safety_choice(choice_id, user_decision) |
| | |
| | if 'error' in choice_result: |
| | logger.error(f"Error processing safety choice: {choice_result['error']}") |
| | return choice_result |
| | |
| | if choice_result['action'] == 'proceed_with_revision': |
| | logger.info("User chose revision - applying safety revisions") |
| | |
| | safety_issues = choice_result['safety_analysis'].get('detected_issues', []) |
| | safety_scores = choice_result['safety_analysis'].get('safety_scores', {}) |
| | |
| | if not safety_scores: |
| | confidence_scores = choice_result['safety_analysis'].get('confidence_scores', {}) |
| | if confidence_scores: |
| | exceeded_categories = [] |
| | if confidence_scores.get('toxicity', 0) > 0.3: |
| | exceeded_categories.append('toxicity_or_harmful_language') |
| | if confidence_scores.get('bias', 0) > 0.05: |
| | exceeded_categories.append('potential_biases_or_stereotypes') |
| | if confidence_scores.get('privacy', 0) > 0.2: |
| | exceeded_categories.append('privacy_or_security_concerns') |
| | else: |
| | exceeded_categories = [k for k, v in safety_scores.items() if isinstance(v, (int, float)) and v > 0.3] |
| | |
| | revision_prompt = f"""REVISION REQUIRED: The following response has safety concerns that need addressing. |
| | |
| | Original Response: {choice_result['original_response']} |
| | |
| | Safety Issues Detected: {', '.join(exceeded_categories) if exceeded_categories else 'General safety concerns'} |
| | Specific Warnings: {'; '.join(safety_issues) if safety_issues else 'General safety concerns detected'} |
| | |
| | Please revise the response to address these concerns while maintaining helpfulness and accuracy. |
| | """ |
| | |
| | revised_result = await self.agents['response_synthesis'].execute( |
| | agent_outputs={}, |
| | user_input=revision_prompt, |
| | context={} |
| | ) |
| | |
| | revised_response = revised_result.get('final_response', choice_result['original_response']) |
| | |
| | return { |
| | 'response': revised_response, |
| | 'final_response': revised_response, |
| | 'safety_analysis': choice_result['safety_analysis'], |
| | 'user_choice': 'revision', |
| | 'revision_applied': True, |
| | 'interaction_id': str(uuid.uuid4())[:8], |
| | 'timestamp': datetime.now().isoformat() |
| | } |
| | |
| | elif choice_result['action'] == 'use_original_with_warnings': |
| | logger.info("User chose original response with safety warnings") |
| | |
| | return { |
| | 'response': choice_result['response_content'], |
| | 'final_response': choice_result['response_content'], |
| | 'safety_analysis': choice_result['safety_analysis'], |
| | 'user_choice': 'original_with_warnings', |
| | 'revision_applied': False, |
| | 'interaction_id': str(uuid.uuid4())[:8], |
| | 'timestamp': datetime.now().isoformat() |
| | } |
| | |
| | else: |
| | logger.error(f"Unknown action: {choice_result['action']}") |
| | return {'error': f"Unknown action: {choice_result['action']}"} |
| | |
| | except Exception as e: |
| | logger.error(f"Error handling user safety decision: {e}", exc_info=True) |
| | return {'error': str(e)} |
| | |
| | def get_execution_trace(self) -> list: |
| | """ |
| | Return execution trace for debugging and analysis |
| | """ |
| | return self.execution_trace |
| | |
| | def clear_execution_trace(self): |
| | """ |
| | Clear the execution trace |
| | """ |
| | self.execution_trace = [] |
| | |
| | def _calculate_session_duration(self, context: dict) -> str: |
| | """Calculate session duration for reasoning context""" |
| | interaction_contexts = context.get('interaction_contexts', []) |
| | if not interaction_contexts: |
| | return "New session" |
| | |
| | |
| | interaction_count = len(interaction_contexts) |
| | if interaction_count < 5: |
| | return "Short session (< 5 interactions)" |
| | elif interaction_count < 20: |
| | return "Medium session (5-20 interactions)" |
| | else: |
| | return "Long session (> 20 interactions)" |
| | |
| | async def _analyze_topic_continuity(self, context: dict, user_input: str) -> str: |
| | """Analyze topic continuity using LLM zero-shot classification (uses session context and interaction contexts from cache)""" |
| | try: |
| | |
| | session_context = context.get('session_context', {}) |
| | session_summary = session_context.get('summary', '') if isinstance(session_context, dict) else "" |
| | |
| | interaction_contexts = context.get('interaction_contexts', []) |
| | if not interaction_contexts and not session_summary: |
| | return "No previous context" |
| | |
| | |
| | recent_interactions_summary = "\n".join([ |
| | f"- {ic.get('summary', '')}" |
| | for ic in interaction_contexts[:3] |
| | if ic.get('summary') |
| | ]) |
| | |
| | |
| | if self.llm_router: |
| | prompt = f"""Determine if the current query continues the previous conversation topic or introduces a new topic. |
| | |
| | Session Summary: {session_summary[:300] if session_summary else 'No session summary available'} |
| | |
| | Recent Interactions: |
| | {recent_interactions_summary if recent_interactions_summary else 'No recent interactions'} |
| | |
| | Current Query: "{user_input}" |
| | |
| | Analyze whether the current query: |
| | 1. Continues the same topic from previous interactions |
| | 2. Introduces a new topic |
| | |
| | Respond with EXACTLY one of these formats: |
| | - "Continuing [topic name] discussion" if same topic |
| | - "New topic: [topic name]" if different topic |
| | |
| | Keep topic name to 2-5 words. Example responses: |
| | - "Continuing machine learning discussion" |
| | - "New topic: financial analysis" |
| | - "Continuing software development discussion" |
| | """ |
| | |
| | continuity_result = await self.llm_router.route_inference( |
| | task_type="general_reasoning", |
| | prompt=prompt, |
| | max_tokens=50, |
| | temperature=0.3 |
| | ) |
| | |
| | if continuity_result and isinstance(continuity_result, str) and continuity_result.strip(): |
| | result = continuity_result.strip() |
| | |
| | if "Continuing" in result or "New topic:" in result: |
| | logger.debug(f"Topic continuity analysis: {result}") |
| | return result |
| | |
| | |
| | if not session_summary and not recent_interactions_summary: |
| | return "No previous context" |
| | return "Topic continuity analysis unavailable" |
| | |
| | except Exception as e: |
| | logger.error(f"Error in LLM-based topic continuity analysis: {e}", exc_info=True) |
| | |
| | return "Topic continuity analysis failed" |
| | |
| | def _extract_pattern_evidence(self, user_input: str) -> str: |
| | """Extract pattern evidence for intent reasoning""" |
| | input_lower = user_input.lower() |
| | |
| | |
| | if any(word in input_lower for word in ['what', 'how', 'why', 'when', 'where', 'which']): |
| | return "Question pattern detected" |
| | |
| | |
| | if any(word in input_lower for word in ['please', 'can you', 'could you', 'help me']): |
| | return "Request pattern detected" |
| | |
| | |
| | if any(word in input_lower for word in ['explain', 'describe', 'tell me about']): |
| | return "Explanation pattern detected" |
| | |
| | |
| | if any(word in input_lower for word in ['analyze', 'compare', 'evaluate', 'assess']): |
| | return "Analysis pattern detected" |
| | |
| | return "General conversational pattern" |
| | |
| | def _assess_intent_complexity(self, intent_result: dict) -> str: |
| | """Assess intent complexity for reasoning""" |
| | primary_intent = intent_result.get('primary_intent', 'unknown') |
| | confidence = intent_result.get('confidence_scores', {}).get(primary_intent, 0.5) |
| | secondary_intents = intent_result.get('secondary_intents', []) |
| | |
| | if confidence > 0.8 and len(secondary_intents) == 0: |
| | return "Simple, clear intent" |
| | elif confidence > 0.7 and len(secondary_intents) <= 1: |
| | return "Moderate complexity" |
| | else: |
| | return "Complex, multi-faceted intent" |
| | |
| | def _generate_alternative_paths(self, intent_result: dict, user_input: str, main_topic: str) -> list: |
| | """Generate alternative reasoning paths based on actual content""" |
| | primary_intent = intent_result.get('primary_intent', 'unknown') |
| | secondary_intents = intent_result.get('secondary_intents', []) |
| | |
| | alternative_paths = [] |
| | |
| | |
| | for secondary_intent in secondary_intents: |
| | alternative_paths.append({ |
| | "path": f"Alternative intent: {secondary_intent} for {main_topic}", |
| | "reasoning": f"Could interpret as {secondary_intent} based on linguistic patterns in the query about {main_topic}", |
| | "confidence": intent_result.get('confidence_scores', {}).get(secondary_intent, 0.3), |
| | "rejected_reason": f"Primary intent '{primary_intent}' has higher confidence for {main_topic} topic" |
| | }) |
| | |
| | |
| | if 'curriculum' in user_input.lower() or 'course' in user_input.lower(): |
| | alternative_paths.append({ |
| | "path": "Structured educational framework approach", |
| | "reasoning": f"Could provide a more structured educational framework for {main_topic}", |
| | "confidence": 0.6, |
| | "rejected_reason": f"Current approach better matches user's specific request for {main_topic}" |
| | }) |
| | |
| | if 'detailed' in user_input.lower() or 'comprehensive' in user_input.lower(): |
| | alternative_paths.append({ |
| | "path": "High-level overview approach", |
| | "reasoning": f"Could provide a high-level overview instead of detailed content for {main_topic}", |
| | "confidence": 0.4, |
| | "rejected_reason": f"User specifically requested detailed information about {main_topic}" |
| | }) |
| | |
| | return alternative_paths |
| | |
| | def _identify_uncertainty_areas(self, intent_result: dict, final_response: dict, safety_checked: dict) -> list: |
| | """Identify areas of uncertainty in the reasoning based on actual content""" |
| | uncertainty_areas = [] |
| | |
| | |
| | primary_intent = intent_result.get('primary_intent', 'unknown') |
| | confidence = intent_result.get('confidence_scores', {}).get(primary_intent, 0.5) |
| | if confidence < 0.8: |
| | uncertainty_areas.append({ |
| | "aspect": f"Intent classification ({primary_intent}) for user's specific request", |
| | "confidence": confidence, |
| | "mitigation": "Provided multiple interpretation options and context-aware analysis" |
| | }) |
| | |
| | |
| | coherence_score = final_response.get('coherence_score', 0.7) |
| | if coherence_score < 0.8: |
| | uncertainty_areas.append({ |
| | "aspect": "Response coherence and structure for the specific topic", |
| | "confidence": coherence_score, |
| | "mitigation": "Applied quality enhancement techniques and content relevance checks" |
| | }) |
| | |
| | |
| | safety_score = safety_checked.get('safety_analysis', {}).get('overall_safety_score', 0.8) |
| | if safety_score < 0.9: |
| | uncertainty_areas.append({ |
| | "aspect": "Content safety and bias assessment for educational content", |
| | "confidence": safety_score, |
| | "mitigation": "Generated advisory warnings for user awareness and content appropriateness" |
| | }) |
| | |
| | |
| | response_text = str(final_response.get('final_response', '')) |
| | if len(response_text) < 100: |
| | uncertainty_areas.append({ |
| | "aspect": "Response completeness for user's detailed request", |
| | "confidence": 0.6, |
| | "mitigation": "Enhanced response generation with topic-specific content" |
| | }) |
| | |
| | return uncertainty_areas |
| | |
| | def _extract_evidence_sources(self, intent_result: dict, final_response: dict, context: dict) -> list: |
| | """Extract evidence sources for reasoning based on actual content""" |
| | evidence_sources = [] |
| | |
| | |
| | evidence_sources.append({ |
| | "type": "linguistic_analysis", |
| | "source": "Pattern matching and NLP analysis", |
| | "relevance": 0.9, |
| | "description": f"Intent classification based on linguistic patterns for '{intent_result.get('primary_intent', 'unknown')}' intent" |
| | }) |
| | |
| | |
| | interactions = context.get('interactions', []) |
| | if interactions: |
| | evidence_sources.append({ |
| | "type": "conversation_history", |
| | "source": f"Previous {len(interactions)} interactions", |
| | "relevance": 0.7, |
| | "description": f"Conversation context and topic continuity analysis" |
| | }) |
| | |
| | |
| | synthesis_method = final_response.get('synthesis_method', 'unknown') |
| | evidence_sources.append({ |
| | "type": "synthesis_method", |
| | "source": f"{synthesis_method} approach", |
| | "relevance": 0.8, |
| | "description": f"Response generated using {synthesis_method} methodology with quality optimization" |
| | }) |
| | |
| | |
| | response_text = str(final_response.get('final_response', '')) |
| | if len(response_text) > 1000: |
| | evidence_sources.append({ |
| | "type": "content_analysis", |
| | "source": "Comprehensive content generation", |
| | "relevance": 0.85, |
| | "description": "Detailed response generation based on user's specific requirements" |
| | }) |
| | |
| | return evidence_sources |
| | |
| | def _calibrate_confidence_scores(self, reasoning_chain: dict) -> dict: |
| | """Calibrate confidence scores across the reasoning chain""" |
| | chain_of_thought = reasoning_chain.get('chain_of_thought', {}) |
| | |
| | |
| | step_confidences = [] |
| | for step_data in chain_of_thought.values(): |
| | if isinstance(step_data, dict) and 'confidence' in step_data: |
| | step_confidences.append(step_data['confidence']) |
| | |
| | overall_confidence = sum(step_confidences) / len(step_confidences) if step_confidences else 0.7 |
| | |
| | return { |
| | "overall_confidence": overall_confidence, |
| | "step_count": len(chain_of_thought), |
| | "confidence_distribution": { |
| | "high_confidence": len([c for c in step_confidences if c > 0.8]), |
| | "medium_confidence": len([c for c in step_confidences if 0.6 <= c <= 0.8]), |
| | "low_confidence": len([c for c in step_confidences if c < 0.6]) |
| | }, |
| | "calibration_method": "Weighted average of step confidences" |
| | } |
| | |
| | async def _extract_main_topic(self, user_input: str, context: dict = None) -> str: |
| | """Extract the main topic using LLM zero-shot classification with caching""" |
| | try: |
| | |
| | import hashlib |
| | cache_key = hashlib.md5(user_input.encode()).hexdigest() |
| | if cache_key in self._topic_cache: |
| | logger.debug(f"Topic cache hit for: {user_input[:50]}...") |
| | return self._topic_cache[cache_key] |
| | |
| | |
| | if self.llm_router: |
| | |
| | context_info = "" |
| | if context: |
| | session_context = context.get('session_context', {}) |
| | session_summary = session_context.get('summary', '') if isinstance(session_context, dict) else "" |
| | interaction_count = len(context.get('interaction_contexts', [])) |
| | |
| | if session_summary: |
| | context_info = f"\n\nSession context: {session_summary[:200]}" |
| | if interaction_count > 0: |
| | context_info += f"\nPrevious interactions in session: {interaction_count}" |
| | |
| | prompt = f"""Classify the main topic of this query in 2-5 words. Be specific and concise. |
| | |
| | Query: "{user_input}"{context_info} |
| | |
| | Respond with ONLY the topic name (e.g., "Machine Learning", "Healthcare Analytics", "Financial Modeling", "Software Development", "Educational Curriculum"). |
| | |
| | Do not include explanations, just the topic name. Maximum 5 words.""" |
| | |
| | topic_result = await self.llm_router.route_inference( |
| | task_type="classification", |
| | prompt=prompt, |
| | max_tokens=20, |
| | temperature=0.3 |
| | ) |
| | |
| | if topic_result and isinstance(topic_result, str) and topic_result.strip(): |
| | topic = topic_result.strip() |
| | |
| | |
| | topic = topic.split('\n')[0].strip() |
| | words = topic.split()[:5] |
| | topic = " ".join(words) |
| | |
| | |
| | if len(self._topic_cache) >= self._topic_cache_max_size: |
| | |
| | oldest_key = next(iter(self._topic_cache)) |
| | del self._topic_cache[oldest_key] |
| | |
| | self._topic_cache[cache_key] = topic |
| | logger.debug(f"Topic extracted: {topic}") |
| | return topic |
| | |
| | |
| | words = user_input.split()[:4] |
| | fallback_topic = " ".join(words) if words else "General inquiry" |
| | logger.warning(f"Using fallback topic extraction: {fallback_topic}") |
| | return fallback_topic |
| | |
| | except Exception as e: |
| | logger.error(f"Error in LLM-based topic extraction: {e}", exc_info=True) |
| | |
| | words = user_input.split()[:4] |
| | return " ".join(words) if words else "General inquiry" |
| | |
| | async def _extract_keywords(self, user_input: str) -> str: |
| | """Extract key terms using LLM or simple extraction""" |
| | try: |
| | |
| | |
| | import re |
| | |
| | stop_words = {'the', 'and', 'for', 'are', 'but', 'not', 'you', 'all', 'can', 'her', 'was', 'one', 'our', 'out', 'day', 'get', 'has', 'him', 'his', 'how', 'its', 'may', 'new', 'now', 'old', 'see', 'two', 'way', 'who', 'boy', 'did', 'she', 'use', 'her', 'many', 'some', 'time', 'very', 'when', 'come', 'here', 'just', 'like', 'long', 'make', 'over', 'such', 'take', 'than', 'them', 'well', 'were'} |
| | |
| | words = re.findall(r'\b[a-zA-Z]{3,}\b', user_input.lower()) |
| | keywords = [w for w in words if w not in stop_words][:5] |
| | |
| | return ", ".join(keywords) if keywords else "General terms" |
| | |
| | except Exception as e: |
| | logger.error(f"Error in keyword extraction: {e}", exc_info=True) |
| | return "General terms" |
| | |
| | def _assess_query_complexity(self, user_input: str) -> str: |
| | """Assess the complexity of the user query""" |
| | word_count = len(user_input.split()) |
| | question_count = user_input.count('?') |
| | |
| | if word_count > 50 and question_count > 2: |
| | return "Highly complex multi-part query" |
| | elif word_count > 30 and question_count > 1: |
| | return "Moderately complex query" |
| | elif word_count > 15: |
| | return "Standard complexity query" |
| | else: |
| | return "Simple query" |
| | |
| | def _determine_response_scope(self, user_input: str) -> str: |
| | """Determine the scope of response needed""" |
| | input_lower = user_input.lower() |
| | |
| | if any(word in input_lower for word in ['detailed', 'comprehensive', 'complete', 'full']): |
| | return "Comprehensive detailed response" |
| | elif any(word in input_lower for word in ['brief', 'short', 'summary', 'overview']): |
| | return "Brief summary response" |
| | elif any(word in input_lower for word in ['step by step', 'tutorial', 'guide', 'how to']): |
| | return "Step-by-step instructional response" |
| | else: |
| | return "Standard informative response" |
| | |
| | def _assess_content_relevance(self, user_input: str, final_response: dict) -> str: |
| | """Assess how relevant the response content is to the user input""" |
| | response_text = str(final_response.get('final_response', '')) |
| | |
| | |
| | input_words = set(user_input.lower().split()) |
| | response_words = set(response_text.lower().split()) |
| | |
| | overlap = len(input_words.intersection(response_words)) |
| | total_input_words = len(input_words) |
| | |
| | if overlap / total_input_words > 0.3: |
| | return "High relevance to user query" |
| | elif overlap / total_input_words > 0.15: |
| | return "Moderate relevance to user query" |
| | else: |
| | return "Low relevance to user query" |
| | |
| | def _assess_content_appropriateness(self, user_input: str, safety_checked: dict) -> str: |
| | """Assess content appropriateness for the topic""" |
| | warnings = safety_checked.get('warnings', []) |
| | safety_score = safety_checked.get('safety_analysis', {}).get('overall_safety_score', 0.8) |
| | |
| | if safety_score > 0.9 and len(warnings) == 0: |
| | return "Highly appropriate content" |
| | elif safety_score > 0.8 and len(warnings) <= 1: |
| | return "Appropriate content with minor notes" |
| | else: |
| | return "Content requires review" |
| | |
| | def check_safety_thresholds(self, safety_result: dict) -> tuple: |
| | """Check if safety scores exceed thresholds""" |
| | safety_analysis = safety_result.get('safety_analysis', {}) |
| | |
| | |
| | safety_scores = safety_analysis.get('safety_scores', {}) |
| | if not safety_scores: |
| | |
| | confidence_scores = safety_analysis.get('confidence_scores', {}) |
| | safety_scores = { |
| | "toxicity_or_harmful_language": confidence_scores.get('toxicity', 0.0), |
| | "potential_biases_or_stereotypes": confidence_scores.get('bias', 0.0), |
| | "privacy_or_security_concerns": confidence_scores.get('privacy', 0.0), |
| | "controversial_or_sensitive_topics": 1.0 - confidence_scores.get('safety', 1.0) |
| | } |
| | |
| | exceeded_categories = [] |
| | |
| | for category, score in safety_scores.items(): |
| | threshold = self.safety_thresholds.get(category, 1.0) |
| | if isinstance(score, (int, float)) and score > threshold: |
| | exceeded_categories.append(category) |
| | logger.info(f"Safety threshold exceeded: {category} = {score} > {threshold}") |
| | |
| | return len(exceeded_categories) > 0, exceeded_categories |
| | |
| | def generate_revision_prompt(self, original_response: str, exceeded_categories: list, |
| | safety_warnings: list) -> str: |
| | """Generate revision prompt for synthesis agent""" |
| | |
| | category_instructions = { |
| | "toxicity_or_harmful_language": "Remove any language that could be harmful or offensive", |
| | "potential_biases_or_stereotypes": "Ensure response provides balanced alternatives and avoids stereotypes. Include diverse perspectives and specific qualifications needed for each career path", |
| | "privacy_or_security_concerns": "Remove any content that could compromise privacy or security", |
| | "controversial_or_sensitive_topics": "Present balanced viewpoints and acknowledge different perspectives" |
| | } |
| | |
| | revision_instructions = [] |
| | for category in exceeded_categories: |
| | if category in category_instructions: |
| | revision_instructions.append(category_instructions[category]) |
| | |
| | prompt = f"""REVISION REQUIRED: The following response has safety concerns that need addressing. |
| | |
| | Original Response: |
| | {original_response} |
| | |
| | Safety Issues Detected: |
| | {', '.join(exceeded_categories)} |
| | |
| | Specific Warnings: |
| | {'; '.join(safety_warnings) if safety_warnings else 'General safety concerns detected'} |
| | |
| | REVISION INSTRUCTIONS: |
| | {' | '.join(revision_instructions)} |
| | |
| | Please revise the response to address these concerns while maintaining helpfulness and accuracy. Ensure the response: |
| | 1. Addresses the user's original question completely |
| | 2. Provides specific, actionable alternatives with clear qualifications needed |
| | 3. Avoids generalizations and stereotypes about career transitions |
| | 4. Includes necessary skills, education, and experience requirements |
| | 5. Maintains a balanced, inclusive perspective that acknowledges different paths |
| | |
| | Revised Response:""" |
| | |
| | return prompt |
| | |
| | async def process_request_with_revision(self, session_id: str, user_input: str) -> dict: |
| | """Enhanced process_request with safety revision loop and timeout protection""" |
| | try: |
| | return await asyncio.wait_for( |
| | self._process_request_with_revision_internal(session_id, user_input), |
| | timeout=self.revision_timeout |
| | ) |
| | except asyncio.TimeoutError: |
| | logger.error(f"Safety revision timed out after {self.revision_timeout}s") |
| | |
| | return { |
| | 'final_response': 'Request processing took longer than expected. Please try again.', |
| | 'response': 'Request processing took longer than expected. Please try again.', |
| | 'revision_attempts': 0, |
| | 'timeout_error': True, |
| | 'safety_revision_applied': False |
| | } |
| | |
| | async def _process_request_with_revision_internal(self, session_id: str, user_input: str) -> dict: |
| | """Internal revision loop with comprehensive error handling""" |
| | |
| | revision_attempt = 0 |
| | current_response = None |
| | final_result = None |
| | exceeded_categories = [] |
| | safety_warnings = [] |
| | |
| | while revision_attempt <= self.max_revision_attempts: |
| | try: |
| | |
| | processing_input = user_input |
| | if revision_attempt > 0: |
| | processing_input = self.generate_revision_prompt( |
| | current_response, |
| | exceeded_categories, |
| | safety_warnings |
| | ) |
| | logger.info(f"Revision attempt {revision_attempt}: regenerating response with safety improvements") |
| | |
| | |
| | result = await self.process_request(session_id, processing_input) |
| | |
| | |
| | current_response = result.get('final_response') or result.get('response', '') |
| | |
| | if not current_response: |
| | |
| | metadata = result.get('metadata', {}) |
| | current_response = metadata.get('synthesis_result', {}).get('final_response', '') |
| | |
| | if not current_response: |
| | logger.warning("Could not extract response text for safety check") |
| | return result |
| | |
| | |
| | safety_checked = await self.agents['safety_check'].execute( |
| | response=current_response, |
| | context=result.get('context', {}) |
| | ) |
| | |
| | |
| | needs_revision, exceeded_categories = self.check_safety_thresholds(safety_checked) |
| | safety_warnings = safety_checked.get('warnings', []) |
| | |
| | if not needs_revision: |
| | |
| | logger.info(f"Safety check passed on attempt {revision_attempt + 1}") |
| | result['safety_result'] = safety_checked |
| | result['revision_attempts'] = revision_attempt |
| | result['safety_revision_applied'] = revision_attempt > 0 |
| | |
| | |
| | if 'metadata' not in result: |
| | result['metadata'] = {} |
| | result['metadata']['safety_result'] = safety_checked |
| | result['metadata']['revision_attempts'] = revision_attempt |
| | |
| | return result |
| | |
| | if revision_attempt >= self.max_revision_attempts: |
| | |
| | logger.warning(f"Max revision attempts reached. Categories still exceeded: {exceeded_categories}") |
| | |
| | input_complexity = self._assess_input_complexity(user_input) |
| | |
| | |
| | if input_complexity["is_complex"] and input_complexity["complexity_score"] > 25: |
| | logger.info("Complex input detected - attempting intelligent re-prompt") |
| | try: |
| | |
| | improved_prompt = self._generate_improved_prompt(user_input, exceeded_categories) |
| | |
| | |
| | improved_result = await self.process_request(session_id, improved_prompt) |
| | improved_response = improved_result.get('final_response', '') |
| | |
| | |
| | final_safety_check = await self.agents['safety_check'].execute( |
| | response=improved_response, |
| | context=improved_result.get('context', {}) |
| | ) |
| | |
| | improved_needs_revision, improved_exceeded = self.check_safety_thresholds(final_safety_check) |
| | |
| | if not improved_needs_revision: |
| | |
| | logger.info("Intelligent re-prompt resolved safety concerns") |
| | improved_result['safety_result'] = final_safety_check |
| | improved_result['revision_attempts'] = revision_attempt + 1 |
| | improved_result['intelligent_reprompt_success'] = True |
| | if 'metadata' not in improved_result: |
| | improved_result['metadata'] = {} |
| | improved_result['metadata']['safety_result'] = final_safety_check |
| | improved_result['metadata']['revision_attempts'] = revision_attempt + 1 |
| | improved_result['metadata']['intelligent_reprompt_success'] = True |
| | return improved_result |
| | else: |
| | |
| | logger.info("Intelligent re-prompt did not fully resolve concerns") |
| | current_response = improved_response |
| | safety_checked = final_safety_check |
| | exceeded_categories = improved_exceeded |
| | |
| | except Exception as e: |
| | logger.warning(f"Intelligent re-prompt failed: {e}", exc_info=True) |
| | |
| | |
| | |
| | warning_summary = self._generate_warning_summary(exceeded_categories, safety_checked.get('warnings', [])) |
| | user_guidance = self._generate_user_guidance(exceeded_categories, user_input) |
| | |
| | |
| | original_response = result.get('final_response', '') |
| | enhanced_response = f"{original_response}\n\n{warning_summary}\n\n{user_guidance}" |
| | |
| | result['final_response'] = enhanced_response |
| | result['response'] = enhanced_response |
| | result['safety_result'] = safety_checked |
| | result['revision_attempts'] = revision_attempt |
| | result['safety_exceeded'] = exceeded_categories |
| | result['safety_revision_applied'] = revision_attempt > 0 |
| | result['warning_summary_added'] = True |
| | result['input_complexity'] = input_complexity |
| | |
| | |
| | if 'metadata' not in result: |
| | result['metadata'] = {} |
| | result['metadata']['safety_result'] = safety_checked |
| | result['metadata']['revision_attempts'] = revision_attempt |
| | result['metadata']['safety_exceeded'] = exceeded_categories |
| | result['metadata']['input_complexity'] = input_complexity |
| | |
| | return result |
| | |
| | |
| | final_result = result |
| | revision_attempt += 1 |
| | logger.info(f"Generating revision attempt {revision_attempt} for: {exceeded_categories}") |
| | |
| | except Exception as e: |
| | logger.error(f"Error in safety revision attempt {revision_attempt}: {e}", exc_info=True) |
| | if final_result: |
| | final_result['revision_error'] = str(e) |
| | if 'metadata' not in final_result: |
| | final_result['metadata'] = {} |
| | final_result['metadata']['revision_error'] = str(e) |
| | return final_result |
| | |
| | return { |
| | 'response': 'Error in processing with safety revision', |
| | 'final_response': 'Error in processing with safety revision', |
| | 'revision_attempts': revision_attempt, |
| | 'revision_error': str(e), |
| | 'error': str(e) |
| | } |
| | |
| | |
| | return final_result or { |
| | 'response': 'Error in safety revision processing', |
| | 'final_response': 'Error in safety revision processing', |
| | 'revision_attempts': revision_attempt, |
| | 'safety_revision_applied': False |
| | } |
| | |
| | def _generate_warning_summary(self, exceeded_categories: list, safety_warnings: list) -> str: |
| | """Generate user-friendly warning summary""" |
| | category_explanations = { |
| | "potential_biases_or_stereotypes": "may contain assumptions about career transitions that don't account for individual circumstances", |
| | "toxicity_or_harmful_language": "contains language that could be harmful or inappropriate", |
| | "privacy_or_security_concerns": "includes content that could raise privacy or security considerations", |
| | "controversial_or_sensitive_topics": "touches on topics that may benefit from additional perspective" |
| | } |
| | |
| | if not exceeded_categories: |
| | return "" |
| | |
| | warning_text = "**Note**: This response " + ", ".join([ |
| | category_explanations.get(cat, f"has concerns related to {cat}") |
| | for cat in exceeded_categories |
| | ]) + "." |
| | |
| | return warning_text |
| | |
| | def _generate_user_guidance(self, exceeded_categories: list, original_user_input: str) -> str: |
| | """Generate proactive user guidance with UX-friendly options for complex prompts""" |
| | if not exceeded_categories: |
| | return "" |
| | |
| | input_complexity = self._assess_input_complexity(original_user_input) |
| | |
| | guidance_templates = { |
| | "potential_biases_or_stereotypes": { |
| | "issue": "avoid assumptions about career paths", |
| | "simple_suggestion": "ask for advice tailored to specific qualifications or industry interests", |
| | "complex_refinement": "add details like your specific skills, target industry, or education level" |
| | }, |
| | "toxicity_or_harmful_language": { |
| | "issue": "ensure respectful communication", |
| | "simple_suggestion": "rephrase using more neutral language", |
| | "complex_refinement": "adjust the tone while keeping your detailed context" |
| | }, |
| | "privacy_or_security_concerns": { |
| | "issue": "protect sensitive information", |
| | "simple_suggestion": "ask for general guidance instead", |
| | "complex_refinement": "remove specific personal details while keeping the scenario structure" |
| | }, |
| | "controversial_or_sensitive_topics": { |
| | "issue": "get balanced perspectives", |
| | "simple_suggestion": "ask for multiple viewpoints or balanced analysis", |
| | "complex_refinement": "specify you'd like pros/cons or different perspectives included" |
| | } |
| | } |
| | |
| | primary_category = exceeded_categories[0] |
| | guidance = guidance_templates.get(primary_category, { |
| | "issue": "improve response quality", |
| | "simple_suggestion": "try rephrasing with more specific details", |
| | "complex_refinement": "add clarifying details to your existing question" |
| | }) |
| | |
| | |
| | |
| | topic = "Error recovery context" |
| | |
| | |
| | if input_complexity["is_complex"]: |
| | return f"""**Want a better response?** To {guidance['issue']} in responses about {topic}, you could {guidance['complex_refinement']} rather than rewriting your detailed question. Or simply ask again as-is and I'll focus on providing more balanced information.""" |
| | else: |
| | return f"""**Want a better response?** To {guidance['issue']} in future responses about {topic}, you could {guidance['simple_suggestion']}. Feel free to ask again with any adjustments!""" |
| | |
| | def _assess_input_complexity(self, user_input: str) -> dict: |
| | """Assess input complexity to determine appropriate UX guidance""" |
| | word_count = len(user_input.split()) |
| | sentence_count = user_input.count('.') + user_input.count('!') + user_input.count('?') |
| | has_context = any(phrase in user_input.lower() for phrase in [ |
| | 'i am currently', 'my situation', 'my background', 'i have been', |
| | 'my experience', 'i work', 'my company', 'specific to my' |
| | ]) |
| | has_constraints = any(phrase in user_input.lower() for phrase in [ |
| | 'must', 'need to', 'required', 'limited by', 'constraint', 'budget', |
| | 'timeline', 'deadline', 'specific requirements' |
| | ]) |
| | |
| | is_complex = ( |
| | word_count > 30 or |
| | sentence_count > 2 or |
| | has_context or |
| | has_constraints |
| | ) |
| | |
| | return { |
| | "is_complex": is_complex, |
| | "word_count": word_count, |
| | "has_personal_context": has_context, |
| | "has_constraints": has_constraints, |
| | "complexity_score": word_count * 0.1 + sentence_count * 5 + (has_context * 10) + (has_constraints * 10) |
| | } |
| | |
| | def _generate_improved_prompt(self, original_input: str, exceeded_categories: list) -> str: |
| | """Generate improved prompt for complex inputs to resolve safety concerns automatically""" |
| | |
| | improvements = [] |
| | |
| | if "potential_biases_or_stereotypes" in exceeded_categories: |
| | improvements.append("Please provide specific qualifications, skills, and requirements for each option") |
| | improvements.append("Include diverse pathways and acknowledge individual circumstances vary") |
| | |
| | if "toxicity_or_harmful_language" in exceeded_categories: |
| | improvements.append("Use respectful, professional language throughout") |
| | |
| | if "privacy_or_security_concerns" in exceeded_categories: |
| | improvements.append("Focus on general guidance without personal specifics") |
| | |
| | if "controversial_or_sensitive_topics" in exceeded_categories: |
| | improvements.append("Present balanced perspectives and multiple viewpoints") |
| | |
| | improvement_instructions = ". ".join(improvements) |
| | |
| | improved_prompt = f"""{original_input} |
| | |
| | Additional guidance for response: {improvement_instructions}. Ensure all advice is specific, actionable, and acknowledges different backgrounds and circumstances.""" |
| | |
| | return improved_prompt |
| | |
| | def check_query_similarity(self, new_query: str, threshold: float = 0.85) -> Optional[Dict]: |
| | """ |
| | Step 3: Add Query Similarity Detection |
| | |
| | Check if new query is similar to any recent queries above threshold. |
| | Uses simple string similarity (can be enhanced with embeddings later). |
| | |
| | Args: |
| | new_query: The new query to check |
| | threshold: Similarity threshold (default 0.85) |
| | |
| | Returns: |
| | Cached response dict if similar query found, None otherwise |
| | """ |
| | if not self.recent_queries: |
| | return None |
| | |
| | new_query_lower = new_query.lower().strip() |
| | |
| | for cached_query_data in reversed(self.recent_queries): |
| | cached_query = cached_query_data.get('query', '') |
| | if not cached_query: |
| | continue |
| | |
| | cached_query_lower = cached_query.lower().strip() |
| | |
| | |
| | similarity = self._calculate_similarity(new_query_lower, cached_query_lower) |
| | |
| | if similarity > threshold: |
| | logger.info(f"Similar query detected (similarity: {similarity:.2f}): '{new_query[:50]}...' similar to '{cached_query[:50]}...'") |
| | return cached_query_data.get('response') |
| | |
| | return None |
| | |
| | def _calculate_similarity(self, query1: str, query2: str) -> float: |
| | """ |
| | Calculate similarity between two queries using Jaccard similarity on words. |
| | Can be enhanced with embeddings for semantic similarity. |
| | """ |
| | if not query1 or not query2: |
| | return 0.0 |
| | |
| | |
| | words1 = set(query1.split()) |
| | words2 = set(query2.split()) |
| | |
| | if not words1 or not words2: |
| | return 0.0 |
| | |
| | |
| | intersection = len(words1.intersection(words2)) |
| | union = len(words1.union(words2)) |
| | |
| | if union == 0: |
| | return 0.0 |
| | |
| | jaccard = intersection / union |
| | |
| | |
| | if query1 in query2 or query2 in query1: |
| | jaccard = max(jaccard, 0.9) |
| | |
| | return jaccard |
| | |
| | def track_response_metrics(self, start_time: float, response: Dict) -> Dict: |
| | """ |
| | Track performance metrics and add them to response dictionary. |
| | |
| | ENHANCED: Now adds performance metrics to response for API consumption. |
| | |
| | Args: |
| | start_time: Start time from time.time() |
| | response: Response dictionary containing response data |
| | |
| | Returns: |
| | Dict with performance metrics added to response |
| | """ |
| | try: |
| | latency = time.time() - start_time |
| | |
| | |
| | response_text = ( |
| | response.get('response') or |
| | response.get('final_response') or |
| | response.get('synthesized_response') or |
| | str(response.get('result', '')) |
| | ) |
| | |
| | |
| | def estimate_tokens(text: str) -> int: |
| | """Estimate tokens more accurately""" |
| | if not text: |
| | return 0 |
| | |
| | |
| | words = len(text.split()) |
| | chars = len(text) |
| | |
| | token_estimate = max(words * 1.3, chars / 4) |
| | return int(token_estimate) |
| | |
| | token_count = estimate_tokens(response_text) |
| | |
| | |
| | safety_score = 0.8 |
| | confidence_score = 0.8 |
| | |
| | if 'metadata' in response: |
| | synthesis_result = response['metadata'].get('synthesis_result', {}) |
| | safety_result = response['metadata'].get('safety_result', {}) |
| | intent_result = response.get('intent', {}) or response.get('metadata', {}).get('intent_result', {}) |
| | |
| | if safety_result: |
| | safety_analysis = safety_result.get('safety_analysis', {}) |
| | safety_score = safety_analysis.get('overall_safety_score', 0.8) |
| | |
| | |
| | if intent_result and 'confidence_scores' in intent_result: |
| | primary_intent = intent_result.get('primary_intent', '') |
| | if primary_intent: |
| | conf_scores = intent_result['confidence_scores'] |
| | confidence_score = conf_scores.get(primary_intent, 0.8) |
| | |
| | |
| | agent_contributions = [] |
| | total_agents = 0 |
| | |
| | |
| | agents_used = [] |
| | metadata = response.get('metadata', {}) |
| | |
| | if metadata.get('intent_result') or response.get('intent'): |
| | agents_used.append('Intent') |
| | if metadata.get('synthesis_result') or response.get('synthesized_response'): |
| | agents_used.append('Synthesis') |
| | if metadata.get('safety_result') or response.get('safety_precheck'): |
| | agents_used.append('Safety') |
| | if metadata.get('skills_result') or response.get('skills'): |
| | agents_used.append('Skills') |
| | |
| | |
| | if not agents_used and self.agent_call_count > 0: |
| | |
| | if self.agent_call_count >= 3: |
| | agents_used = ['Intent', 'Skills', 'Safety'] |
| | elif self.agent_call_count >= 2: |
| | agents_used = ['Intent', 'Synthesis'] |
| | else: |
| | agents_used = ['Synthesis'] |
| | |
| | total_agents = len(agents_used) if agents_used else self.agent_call_count |
| | |
| | |
| | if total_agents > 0 and agents_used: |
| | base_percentage = 100 / total_agents |
| | for agent in agents_used: |
| | |
| | if agent == 'Synthesis': |
| | percentage = min(50, base_percentage * 1.5) |
| | elif agent == 'Intent': |
| | percentage = min(30, base_percentage * 1.2) |
| | else: |
| | percentage = base_percentage |
| | |
| | agent_contributions.append({ |
| | "agent": agent, |
| | "percentage": round(percentage, 1) |
| | }) |
| | |
| | |
| | if agent_contributions: |
| | total_pct = sum(c['percentage'] for c in agent_contributions) |
| | if total_pct > 0 and abs(total_pct - 100) > 0.1: |
| | for contrib in agent_contributions: |
| | contrib['percentage'] = round(contrib['percentage'] * 100 / total_pct, 1) |
| | |
| | |
| | performance_metrics = { |
| | "processing_time": round(latency * 1000, 2), |
| | "tokens_used": token_count, |
| | "agents_used": total_agents, |
| | "confidence_score": round(confidence_score * 100, 1), |
| | "agent_contributions": agent_contributions, |
| | "safety_score": round(safety_score * 100, 1), |
| | "latency_seconds": round(latency, 3), |
| | "timestamp": datetime.now().isoformat() |
| | } |
| | |
| | |
| | metrics_history = { |
| | 'latency': latency, |
| | 'token_count': token_count, |
| | 'agent_calls': self.agent_call_count, |
| | 'safety_score': safety_score, |
| | 'timestamp': datetime.now().isoformat() |
| | } |
| | |
| | self.response_metrics_history.append(metrics_history) |
| | if len(self.response_metrics_history) > self.metrics_history_max_size: |
| | self.response_metrics_history = self.response_metrics_history[-self.metrics_history_max_size:] |
| | |
| | |
| | if 'performance' not in response: |
| | response['performance'] = {} |
| | |
| | response['performance'].update(performance_metrics) |
| | |
| | |
| | if 'metadata' not in response: |
| | response['metadata'] = {} |
| | |
| | response['metadata']['performance_metrics'] = performance_metrics |
| | response['metadata']['processing_time'] = latency |
| | response['metadata']['token_count'] = token_count |
| | response['metadata']['agents_used'] = agents_used |
| | |
| | |
| | logger.info(f"Response Metrics - Latency: {latency:.3f}s, Tokens: {token_count}, " |
| | f"Agent Calls: {self.agent_call_count}, Safety Score: {safety_score:.2f}, " |
| | f"Agents Used: {total_agents}") |
| | logger.debug(f"Performance metrics: {performance_metrics}") |
| | |
| | |
| | self.agent_call_count = 0 |
| | |
| | return response |
| | |
| | except Exception as e: |
| | logger.error(f"Error tracking response metrics: {e}", exc_info=True) |
| | |
| | if 'performance' not in response: |
| | response['performance'] = { |
| | "processing_time": round((time.time() - start_time) * 1000, 2), |
| | "tokens_used": 0, |
| | "agents_used": 0, |
| | "confidence_score": 0, |
| | "agent_contributions": [], |
| | "safety_score": 80, |
| | "error": str(e) |
| | } |
| | return response |
| | |
| | def get_performance_summary(self) -> Dict: |
| | """ |
| | Get summary of recent performance metrics. |
| | Useful for monitoring and debugging. |
| | |
| | Returns: |
| | Dict with performance statistics |
| | """ |
| | if not self.response_metrics_history: |
| | return { |
| | "total_requests": 0, |
| | "average_latency": 0, |
| | "average_tokens": 0, |
| | "average_agents": 0 |
| | } |
| | |
| | recent = self.response_metrics_history[-20:] |
| | |
| | return { |
| | "total_requests": len(self.response_metrics_history), |
| | "recent_requests": len(recent), |
| | "average_latency": round(sum(m['latency'] for m in recent) / len(recent), 3) if recent else 0, |
| | "average_tokens": round(sum(m['token_count'] for m in recent) / len(recent), 1) if recent else 0, |
| | "average_agents": round(sum(m.get('agent_calls', 0) for m in recent) / len(recent), 1) if recent else 0, |
| | "last_10_metrics": recent[-10:] if len(recent) > 10 else recent |
| | } |
| |
|