| | """ |
| | Skills Identification Agent |
| | Specialized in analyzing user prompts and identifying relevant expert skills based on market analysis |
| | """ |
| |
|
| | import logging |
| | from typing import Dict, Any, List, Tuple |
| | import json |
| | import re |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | class SkillsIdentificationAgent: |
| | def __init__(self, llm_router=None): |
| | self.llm_router = llm_router |
| | self.agent_id = "SKILLS_ID_001" |
| | self.specialization = "Expert skills identification and market analysis" |
| | |
| | |
| | self.market_categories = { |
| | "IT and Software Development": { |
| | "market_share": 25, |
| | "growth_rate": 25.0, |
| | "specialized_skills": [ |
| | "Cybersecurity", "Artificial Intelligence & Machine Learning", |
| | "Cloud Computing", "Data Analytics & Big Data", |
| | "Software Engineering", "Blockchain Technology", "Quantum Computing" |
| | ] |
| | }, |
| | "Finance and Accounting": { |
| | "market_share": 20, |
| | "growth_rate": 6.8, |
| | "specialized_skills": [ |
| | "Financial Analysis & Modeling", "Risk Management", |
| | "Regulatory Compliance", "Fintech Solutions", |
| | "ESG Reporting", "Tax Preparation", "Investment Analysis" |
| | ] |
| | }, |
| | "Healthcare and Medicine": { |
| | "market_share": 15, |
| | "growth_rate": 8.5, |
| | "specialized_skills": [ |
| | "Telemedicine Training", "Advanced Nursing Certifications", |
| | "Healthcare Informatics", "Clinical Research", |
| | "Medical Device Technology", "Public Health", "Mental Health Services" |
| | ] |
| | }, |
| | "Education and Teaching": { |
| | "market_share": 10, |
| | "growth_rate": 3.2, |
| | "specialized_skills": [ |
| | "Instructional Design", "Educational Technology Integration", |
| | "Digital Literacy Training", "Special Education", |
| | "Career Coaching", "E-learning Development", "STEM Education" |
| | ] |
| | }, |
| | "Engineering and Construction": { |
| | "market_share": 10, |
| | "growth_rate": 8.5, |
| | "specialized_skills": [ |
| | "Automation Engineering", "Sustainable Design", |
| | "Project Management", "Environmental Engineering", |
| | "Advanced Manufacturing", "Infrastructure Development", "Quality Control" |
| | ] |
| | }, |
| | "Marketing and Sales": { |
| | "market_share": 10, |
| | "growth_rate": 7.1, |
| | "specialized_skills": [ |
| | "Digital Marketing", "Data Analytics", |
| | "Customer Relationship Management", "Content Marketing", |
| | "E-commerce Management", "Market Research", "Sales Strategy" |
| | ] |
| | }, |
| | "Consulting and Strategy": { |
| | "market_share": 5, |
| | "growth_rate": 6.0, |
| | "specialized_skills": [ |
| | "Business Analysis", "Change Management", |
| | "Strategic Planning", "Operations Research", |
| | "Industry-Specific Knowledge", "Problem-Solving", "Leadership Development" |
| | ] |
| | }, |
| | "Environmental and Sustainability": { |
| | "market_share": 5, |
| | "growth_rate": 15.0, |
| | "specialized_skills": [ |
| | "Renewable Energy Technologies", "Environmental Policy", |
| | "Sustainability Reporting", "Ecological Conservation", |
| | "Carbon Management", "Green Technology", "Circular Economy" |
| | ] |
| | }, |
| | "Arts and Humanities": { |
| | "market_share": 5, |
| | "growth_rate": 2.5, |
| | "specialized_skills": [ |
| | "Creative Thinking", "Cultural Analysis", |
| | "Communication", "Digital Media", |
| | "Language Services", "Historical Research", "Philosophical Analysis" |
| | ] |
| | } |
| | } |
| | |
| | |
| | self.skill_categories = [ |
| | "technical_programming", "data_analysis", "cybersecurity", "cloud_computing", |
| | "financial_analysis", "risk_management", "regulatory_compliance", "fintech", |
| | "healthcare_technology", "medical_research", "telemedicine", "nursing", |
| | "educational_technology", "curriculum_design", "online_learning", "teaching", |
| | "project_management", "engineering_design", "sustainable_engineering", "manufacturing", |
| | "digital_marketing", "sales_strategy", "customer_management", "market_research", |
| | "business_consulting", "strategic_planning", "change_management", "leadership", |
| | "environmental_science", "sustainability", "renewable_energy", "green_technology", |
| | "creative_design", "content_creation", "communication", "cultural_analysis" |
| | ] |
| | |
| | async def execute(self, user_input: str, context: Dict[str, Any] = None, **kwargs) -> Dict[str, Any]: |
| | """ |
| | Execute skills identification with two-step process: |
| | 1. Market analysis using reasoning_primary model |
| | 2. Skill classification using classification_specialist model |
| | """ |
| | try: |
| | logger.info(f"{self.agent_id} processing user input: {user_input[:100]}...") |
| | |
| | |
| | market_analysis = await self._analyze_market_relevance(user_input, context) |
| | |
| | |
| | skill_classification = await self._classify_skills(user_input, context) |
| | |
| | |
| | combined_data = { |
| | "market_analysis": market_analysis, |
| | "skill_classification": skill_classification, |
| | "user_input": user_input, |
| | "context": context |
| | } |
| | |
| | result = { |
| | "agent_id": self.agent_id, |
| | "market_analysis": market_analysis, |
| | "skill_classification": skill_classification, |
| | "identified_skills": self._extract_high_probability_skills(combined_data), |
| | "processing_time": market_analysis.get("processing_time", 0) + skill_classification.get("processing_time", 0), |
| | "confidence_score": self._calculate_overall_confidence(market_analysis, skill_classification) |
| | } |
| | |
| | logger.info(f"{self.agent_id} completed with {len(result['identified_skills'])} skills identified") |
| | return result |
| | |
| | except Exception as e: |
| | logger.error(f"{self.agent_id} error: {str(e)}") |
| | return self._get_fallback_result(user_input, context) |
| | |
| | async def _analyze_market_relevance(self, user_input: str, context: Dict[str, Any]) -> Dict[str, Any]: |
| | """Use reasoning_primary model to analyze market relevance""" |
| | |
| | if self.llm_router: |
| | try: |
| | |
| | market_prompt = self._build_market_analysis_prompt(user_input, context) |
| | |
| | logger.info(f"{self.agent_id} calling reasoning_primary for market analysis") |
| | llm_response = await self.llm_router.route_inference( |
| | task_type="general_reasoning", |
| | prompt=market_prompt, |
| | max_tokens=2000, |
| | temperature=0.7 |
| | ) |
| | |
| | if llm_response and isinstance(llm_response, str) and len(llm_response.strip()) > 0: |
| | |
| | parsed_analysis = self._parse_market_analysis_response(llm_response) |
| | parsed_analysis["processing_time"] = 0.8 |
| | parsed_analysis["method"] = "llm_enhanced" |
| | return parsed_analysis |
| | |
| | except Exception as e: |
| | logger.error(f"{self.agent_id} LLM market analysis failed: {e}") |
| | |
| | |
| | return self._rule_based_market_analysis(user_input) |
| | |
| | async def _classify_skills(self, user_input: str, context: Dict[str, Any]) -> Dict[str, Any]: |
| | """Use classification_specialist model to classify skills""" |
| | |
| | if self.llm_router: |
| | try: |
| | |
| | classification_prompt = self._build_classification_prompt(user_input) |
| | |
| | logger.info(f"{self.agent_id} calling classification_specialist for skill classification") |
| | llm_response = await self.llm_router.route_inference( |
| | task_type="intent_classification", |
| | prompt=classification_prompt, |
| | max_tokens=512, |
| | temperature=0.3 |
| | ) |
| | |
| | if llm_response and isinstance(llm_response, str) and len(llm_response.strip()) > 0: |
| | |
| | parsed_classification = self._parse_classification_response(llm_response) |
| | parsed_classification["processing_time"] = 0.3 |
| | parsed_classification["method"] = "llm_enhanced" |
| | return parsed_classification |
| | |
| | except Exception as e: |
| | logger.error(f"{self.agent_id} LLM classification failed: {e}") |
| | |
| | |
| | return self._rule_based_skill_classification(user_input) |
| | |
| | def _build_market_analysis_prompt(self, user_input: str, context: Dict[str, Any] = None) -> str: |
| | """Build prompt for market analysis using reasoning_primary model with optional context""" |
| | |
| | market_data = "\n".join([ |
| | f"- {category}: {data['market_share']}% market share, {data['growth_rate']}% growth rate" |
| | for category, data in self.market_categories.items() |
| | ]) |
| | |
| | specialized_skills = "\n".join([ |
| | f"- {category}: {', '.join(data['specialized_skills'][:3])}" |
| | for category, data in self.market_categories.items() |
| | ]) |
| | |
| | |
| | context_info = "" |
| | if context: |
| | session_context = context.get('session_context', {}) |
| | session_summary = session_context.get('summary', '') if isinstance(session_context, dict) else "" |
| | user_context = context.get('user_context', '') |
| | interaction_contexts = context.get('interaction_contexts', []) |
| | |
| | if session_summary: |
| | context_info = f"\n\nSession Context (session summary): {session_summary[:300]}..." |
| | if user_context: |
| | context_info += f"\n\nUser Context (persona summary): {user_context[:300]}..." |
| | |
| | if interaction_contexts: |
| | |
| | recent_contexts = interaction_contexts[-2:] |
| | if recent_contexts: |
| | context_info += "\n\nRecent conversation context:" |
| | for idx, ic in enumerate(recent_contexts, 1): |
| | summary = ic.get('summary', '') |
| | if summary: |
| | context_info += f"\n {idx}. {summary}" |
| | |
| | return f"""Analyze the following user input and identify the most relevant industry categories and specialized skills based on current market data. |
| | |
| | User Input: "{user_input}" |
| | {context_info} |
| | |
| | Current Market Distribution: |
| | {market_data} |
| | |
| | Specialized Skills by Category (top 3 per category): |
| | {specialized_skills} |
| | |
| | Task: |
| | 1. Identify which industry categories are most relevant to the user's input (consider conversation context if provided) |
| | 2. Select 1-3 specialized skills from each relevant category that best match the user's needs |
| | 3. Provide market share percentages and growth rates for identified categories |
| | 4. Explain your reasoning for each selection |
| | 5. If conversation context is available, consider how previous topics might inform the skill identification |
| | |
| | Respond in JSON format: |
| | {{ |
| | "relevant_categories": [ |
| | {{ |
| | "category": "category_name", |
| | "market_share": percentage, |
| | "growth_rate": percentage, |
| | "relevance_score": 0.0-1.0, |
| | "reasoning": "explanation" |
| | }} |
| | ], |
| | "selected_skills": [ |
| | {{ |
| | "skill": "skill_name", |
| | "category": "category_name", |
| | "relevance_score": 0.0-1.0, |
| | "reasoning": "explanation" |
| | }} |
| | ], |
| | "overall_analysis": "summary of findings" |
| | }}""" |
| | |
| | def _build_classification_prompt(self, user_input: str) -> str: |
| | """Build prompt for skill classification using classification_specialist model""" |
| | |
| | skill_categories_str = ", ".join(self.skill_categories) |
| | |
| | return f"""Classify the following user input into relevant skill categories. For each category, provide a probability score (0.0-1.0) indicating how likely the input relates to that skill. |
| | |
| | User Input: "{user_input}" |
| | |
| | Available Skill Categories: {skill_categories_str} |
| | |
| | Task: Provide probability scores for each skill category that passes a 20% threshold. |
| | |
| | Respond in JSON format: |
| | {{ |
| | "skill_probabilities": {{ |
| | "category_name": probability_score, |
| | ... |
| | }}, |
| | "top_skills": [ |
| | {{ |
| | "skill": "category_name", |
| | "probability": score, |
| | "confidence": "high/medium/low" |
| | }} |
| | ], |
| | "classification_reasoning": "explanation of classification decisions" |
| | }}""" |
| | |
| | def _parse_market_analysis_response(self, response: str) -> Dict[str, Any]: |
| | """Parse LLM response for market analysis""" |
| | try: |
| | |
| | json_match = re.search(r'\{.*\}', response, re.DOTALL) |
| | if json_match: |
| | parsed = json.loads(json_match.group()) |
| | return parsed |
| | except json.JSONDecodeError: |
| | logger.warning(f"{self.agent_id} Failed to parse market analysis JSON") |
| | |
| | |
| | return { |
| | "relevant_categories": [{"category": "General", "market_share": 10, "growth_rate": 5.0, "relevance_score": 0.7, "reasoning": "General analysis"}], |
| | "selected_skills": [{"skill": "General Analysis", "category": "General", "relevance_score": 0.7, "reasoning": "Broad applicability"}], |
| | "overall_analysis": "Market analysis completed with fallback parsing", |
| | "method": "fallback_parsing" |
| | } |
| | |
| | def _parse_classification_response(self, response: str) -> Dict[str, Any]: |
| | """Parse LLM response for skill classification""" |
| | try: |
| | |
| | json_match = re.search(r'\{.*\}', response, re.DOTALL) |
| | if json_match: |
| | parsed = json.loads(json_match.group()) |
| | return parsed |
| | except json.JSONDecodeError: |
| | logger.warning(f"{self.agent_id} Failed to parse classification JSON") |
| | |
| | |
| | return { |
| | "skill_probabilities": {"general_analysis": 0.7}, |
| | "top_skills": [{"skill": "general_analysis", "probability": 0.7, "confidence": "medium"}], |
| | "classification_reasoning": "Classification completed with fallback parsing", |
| | "method": "fallback_parsing" |
| | } |
| | |
| | def _rule_based_market_analysis(self, user_input: str) -> Dict[str, Any]: |
| | """Rule-based fallback for market analysis""" |
| | user_input_lower = user_input.lower() |
| | |
| | relevant_categories = [] |
| | selected_skills = [] |
| | |
| | |
| | patterns = { |
| | "IT and Software Development": ["code", "programming", "software", "tech", "ai", "machine learning", "data", "cyber", "cloud"], |
| | "Finance and Accounting": ["finance", "money", "investment", "banking", "accounting", "financial", "risk", "compliance"], |
| | "Healthcare and Medicine": ["health", "medical", "doctor", "nurse", "patient", "clinical", "medicine", "healthcare"], |
| | "Education and Teaching": ["teach", "education", "learn", "student", "school", "curriculum", "instruction"], |
| | "Engineering and Construction": ["engineer", "construction", "build", "project", "manufacturing", "design"], |
| | "Marketing and Sales": ["marketing", "sales", "customer", "advertising", "promotion", "brand"], |
| | "Consulting and Strategy": ["consulting", "strategy", "business", "management", "planning"], |
| | "Environmental and Sustainability": ["environment", "sustainable", "green", "renewable", "climate", "carbon"], |
| | "Arts and Humanities": ["art", "creative", "culture", "humanities", "design", "communication"] |
| | } |
| | |
| | for category, keywords in patterns.items(): |
| | relevance_score = 0.0 |
| | for keyword in keywords: |
| | if keyword in user_input_lower: |
| | relevance_score += 0.2 |
| | |
| | if relevance_score > 0.0: |
| | category_data = self.market_categories[category] |
| | relevant_categories.append({ |
| | "category": category, |
| | "market_share": category_data["market_share"], |
| | "growth_rate": category_data["growth_rate"], |
| | "relevance_score": min(1.0, relevance_score), |
| | "reasoning": f"Matched keywords: {[k for k in keywords if k in user_input_lower]}" |
| | }) |
| | |
| | |
| | for skill in category_data["specialized_skills"][:2]: |
| | selected_skills.append({ |
| | "skill": skill, |
| | "category": category, |
| | "relevance_score": relevance_score * 0.8, |
| | "reasoning": f"From {category} category" |
| | }) |
| | |
| | return { |
| | "relevant_categories": relevant_categories, |
| | "selected_skills": selected_skills, |
| | "overall_analysis": f"Rule-based analysis identified {len(relevant_categories)} relevant categories", |
| | "processing_time": 0.1, |
| | "method": "rule_based" |
| | } |
| | |
| | def _rule_based_skill_classification(self, user_input: str) -> Dict[str, Any]: |
| | """Rule-based fallback for skill classification""" |
| | user_input_lower = user_input.lower() |
| | |
| | skill_probabilities = {} |
| | top_skills = [] |
| | |
| | |
| | skill_keywords = { |
| | "technical_programming": ["code", "programming", "software", "development", "python", "java"], |
| | "data_analysis": ["data", "analysis", "statistics", "analytics", "research"], |
| | "cybersecurity": ["security", "cyber", "hack", "protection", "vulnerability"], |
| | "financial_analysis": ["finance", "money", "investment", "financial", "economic"], |
| | "healthcare_technology": ["health", "medical", "healthcare", "clinical", "patient"], |
| | "educational_technology": ["education", "teach", "learn", "student", "curriculum"], |
| | "project_management": ["project", "manage", "planning", "coordination", "leadership"], |
| | "digital_marketing": ["marketing", "advertising", "promotion", "social media", "brand"], |
| | "environmental_science": ["environment", "sustainable", "green", "climate", "carbon"], |
| | "creative_design": ["design", "creative", "art", "visual", "graphic"] |
| | } |
| | |
| | for skill, keywords in skill_keywords.items(): |
| | probability = 0.0 |
| | for keyword in keywords: |
| | if keyword in user_input_lower: |
| | probability += 0.3 |
| | |
| | if probability > 0.2: |
| | skill_probabilities[skill] = min(1.0, probability) |
| | top_skills.append({ |
| | "skill": skill, |
| | "probability": skill_probabilities[skill], |
| | "confidence": "high" if probability > 0.6 else "medium" if probability > 0.4 else "low" |
| | }) |
| | |
| | return { |
| | "skill_probabilities": skill_probabilities, |
| | "top_skills": top_skills, |
| | "classification_reasoning": f"Rule-based classification identified {len(top_skills)} relevant skills", |
| | "processing_time": 0.05, |
| | "method": "rule_based" |
| | } |
| | |
| | def _extract_high_probability_skills(self, classification: Dict[str, Any]) -> List[Dict[str, Any]]: |
| | """Extract skills that pass the 20% probability threshold""" |
| | high_prob_skills = [] |
| | |
| | |
| | market_analysis = classification.get("market_analysis", {}) |
| | market_skills = market_analysis.get("selected_skills", []) |
| | for skill in market_skills: |
| | if skill.get("relevance_score", 0) > 0.2: |
| | high_prob_skills.append({ |
| | "skill": skill["skill"], |
| | "category": skill["category"], |
| | "probability": skill["relevance_score"], |
| | "source": "market_analysis" |
| | }) |
| | |
| | |
| | skill_classification = classification.get("skill_classification", {}) |
| | classification_skills = skill_classification.get("top_skills", []) |
| | for skill in classification_skills: |
| | if skill.get("probability", 0) > 0.2: |
| | high_prob_skills.append({ |
| | "skill": skill["skill"], |
| | "category": "classified", |
| | "probability": skill["probability"], |
| | "source": "skill_classification" |
| | }) |
| | |
| | |
| | if not high_prob_skills: |
| | logger.warning(f"{self.agent_id} No skills identified from LLM, using rule-based fallback") |
| | |
| | user_input = "" |
| | if isinstance(classification, dict) and "user_input" in classification: |
| | user_input = classification["user_input"] |
| | elif isinstance(classification, dict) and "context" in classification: |
| | context = classification["context"] |
| | if isinstance(context, dict) and "user_input" in context: |
| | user_input = context["user_input"] |
| | |
| | if user_input: |
| | rule_based_result = self._rule_based_skill_classification(user_input) |
| | rule_skills = rule_based_result.get("top_skills", []) |
| | for skill in rule_skills: |
| | if skill.get("probability", 0) > 0.2: |
| | high_prob_skills.append({ |
| | "skill": skill["skill"], |
| | "category": "rule_based", |
| | "probability": skill["probability"], |
| | "source": "rule_based_fallback" |
| | }) |
| | |
| | |
| | unique_skills = {} |
| | for skill in high_prob_skills: |
| | skill_name = skill["skill"] |
| | if skill_name not in unique_skills or skill["probability"] > unique_skills[skill_name]["probability"]: |
| | unique_skills[skill_name] = skill |
| | |
| | return sorted(unique_skills.values(), key=lambda x: x["probability"], reverse=True) |
| | |
| | def _calculate_overall_confidence(self, market_analysis: Dict[str, Any], skill_classification: Dict[str, Any]) -> float: |
| | """Calculate overall confidence score""" |
| | market_confidence = len(market_analysis.get("relevant_categories", [])) * 0.1 |
| | classification_confidence = len(skill_classification.get("top_skills", [])) * 0.1 |
| | |
| | return min(1.0, market_confidence + classification_confidence + 0.3) |
| | |
| | def _get_fallback_result(self, user_input: str, context: Dict[str, Any]) -> Dict[str, Any]: |
| | """Provide fallback result when processing fails""" |
| | return { |
| | "agent_id": self.agent_id, |
| | "market_analysis": { |
| | "relevant_categories": [{"category": "General", "market_share": 10, "growth_rate": 5.0, "relevance_score": 0.5, "reasoning": "Fallback analysis"}], |
| | "selected_skills": [{"skill": "General Analysis", "category": "General", "relevance_score": 0.5, "reasoning": "Fallback skill"}], |
| | "overall_analysis": "Fallback analysis due to processing error", |
| | "processing_time": 0.01, |
| | "method": "fallback" |
| | }, |
| | "skill_classification": { |
| | "skill_probabilities": {"general_analysis": 0.5}, |
| | "top_skills": [{"skill": "general_analysis", "probability": 0.5, "confidence": "low"}], |
| | "classification_reasoning": "Fallback classification due to processing error", |
| | "processing_time": 0.01, |
| | "method": "fallback" |
| | }, |
| | "identified_skills": [{"skill": "General Analysis", "category": "General", "probability": 0.5, "source": "fallback"}], |
| | "processing_time": 0.02, |
| | "confidence_score": 0.3, |
| | "error_handled": True |
| | } |
| |
|
| | |
| | def create_skills_identification_agent(llm_router=None): |
| | return SkillsIdentificationAgent(llm_router) |
| |
|