""" Knowledge Graph Service Layer Safely integrates the SQLite knowledge graph with the UI """ import json import logging from typing import Dict, List, Any, Optional from datetime import datetime from pathlib import Path logger = logging.getLogger(__name__) # Try to import the knowledge graph, but don't fail if it's not available try: from knowledge_graph_direct import JobApplicationKnowledgeGraph KG_AVAILABLE = True except ImportError: logger.warning("Knowledge graph not available - running without it") KG_AVAILABLE = False class KnowledgeGraphService: """ Service layer for knowledge graph operations Provides a safe interface that won't break if KG is unavailable """ def __init__(self, db_path: str = "job_application_kg.db"): self.enabled = KG_AVAILABLE self.kg = None if self.enabled: try: self.kg = JobApplicationKnowledgeGraph(db_path) logger.info(f"Knowledge graph initialized at {db_path}") except Exception as e: logger.error(f"Failed to initialize knowledge graph: {e}") self.enabled = False def is_enabled(self) -> bool: """Check if knowledge graph is available and working""" return self.enabled and self.kg is not None def track_application( self, user_name: str, company: str, job_title: str, job_description: str, cv_text: str, cover_letter: str, skills_matched: List[str], score: float = 0.0 ) -> bool: """Track a job application in the knowledge graph""" if not self.is_enabled(): return False try: # Create entities self.kg.create_entity( name=user_name, entity_type="candidate", properties={ "last_application": datetime.now().isoformat(), "total_applications": 1 # Will increment } ) self.kg.create_entity( name=company, entity_type="company", properties={ "last_applied": datetime.now().isoformat() } ) job_id = f"{company}_{job_title}_{datetime.now().strftime('%Y%m%d')}" self.kg.create_entity( name=job_id, entity_type="job", properties={ "title": job_title, "company": company, "description": job_description[:500], # First 500 chars "match_score": score } ) # Create relations self.kg.create_relation( from_entity=user_name, to_entity=job_id, relation_type="applied_to", properties={ "date": datetime.now().isoformat(), "cv_length": len(cv_text), "cover_length": len(cover_letter), "match_score": score } ) # Track skills for skill in skills_matched: self.kg.create_entity( name=skill.lower(), entity_type="skill", properties={"category": "technical"} ) self.kg.create_relation( from_entity=user_name, to_entity=skill.lower(), relation_type="has_skill" ) self.kg.create_relation( from_entity=job_id, to_entity=skill.lower(), relation_type="requires_skill" ) # Add observation self.kg.add_observation( entity_name=user_name, observation=f"Applied to {job_title} at {company} on {datetime.now().strftime('%Y-%m-%d')}", confidence=1.0, source="application_tracker" ) logger.info(f"Tracked application: {user_name} -> {job_title} @ {company}") return True except Exception as e: logger.error(f"Failed to track application: {e}") return False def get_user_history(self, user_name: str) -> Dict[str, Any]: """Get application history for a user""" if not self.is_enabled(): return {"error": "Knowledge graph not available"} try: # Get all jobs the user applied to jobs = self.kg.find_related( entity_name=user_name, relation_type="applied_to", direction="out" ) # Get user's skills skills = self.kg.find_related( entity_name=user_name, relation_type="has_skill", direction="out" ) # Get observations observations = self.kg.get_observations(user_name) return { "user": user_name, "applications": jobs, "skills": skills, "observations": observations, "total_applications": len(jobs) } except Exception as e: logger.error(f"Failed to get user history: {e}") return {"error": str(e)} def get_company_insights(self, company: str) -> Dict[str, Any]: """Get insights about a company""" if not self.is_enabled(): return {"error": "Knowledge graph not available"} try: # Find all jobs at this company jobs = self.kg.search_entities( entity_type="job", query=company ) # Find all candidates who applied candidates = [] for job in jobs: applicants = self.kg.find_related( entity_name=job['name'], relation_type="applied_to", direction="in" ) candidates.extend(applicants) return { "company": company, "jobs_posted": len(jobs), "total_applicants": len(set(c['name'] for c in candidates)), "jobs": jobs } except Exception as e: logger.error(f"Failed to get company insights: {e}") return {"error": str(e)} def find_similar_jobs(self, job_id: str, limit: int = 5) -> List[Dict]: """Find jobs with similar skill requirements""" if not self.is_enabled(): return [] try: # Get skills for this job skills = self.kg.find_related( entity_name=job_id, relation_type="requires_skill", direction="out" ) if not skills: return [] # Find other jobs requiring similar skills similar_jobs = [] skill_names = [s['name'] for s in skills] for skill_name in skill_names: jobs = self.kg.find_related( entity_name=skill_name, relation_type="requires_skill", direction="in" ) similar_jobs.extend(jobs) # Count occurrences and sort job_counts = {} for job in similar_jobs: if job['name'] != job_id: # Exclude the original job job_counts[job['name']] = job_counts.get(job['name'], 0) + 1 # Sort by similarity (number of shared skills) sorted_jobs = sorted( job_counts.items(), key=lambda x: x[1], reverse=True )[:limit] return [ {"job_id": job_id, "similarity_score": score} for job_id, score in sorted_jobs ] except Exception as e: logger.error(f"Failed to find similar jobs: {e}") return [] def get_skill_trends(self) -> Dict[str, Any]: """Get trending skills from job postings""" if not self.is_enabled(): return {"error": "Knowledge graph not available"} try: # Get all skills skills = self.kg.search_entities(entity_type="skill") skill_stats = {} for skill in skills: # Count jobs requiring this skill jobs = self.kg.find_related( entity_name=skill['name'], relation_type="requires_skill", direction="in" ) # Count candidates with this skill candidates = self.kg.find_related( entity_name=skill['name'], relation_type="has_skill", direction="in" ) skill_stats[skill['name']] = { "demand": len(jobs), "supply": len(candidates), "gap": len(jobs) - len(candidates) } # Sort by demand top_skills = sorted( skill_stats.items(), key=lambda x: x[1]['demand'], reverse=True )[:10] return { "trending_skills": dict(top_skills), "total_skills": len(skills) } except Exception as e: logger.error(f"Failed to get skill trends: {e}") return {"error": str(e)} def visualize_graph_data(self) -> Dict[str, Any]: """Get graph data for visualization""" if not self.is_enabled(): return {"nodes": [], "edges": []} try: # Get all entities entities = [] for entity_type in ["candidate", "company", "job", "skill"]: entities.extend(self.kg.search_entities(entity_type=entity_type)) # Format nodes for visualization nodes = [] for entity in entities[:100]: # Limit to 100 for performance nodes.append({ "id": entity['name'], "label": entity['name'], "type": entity['type'], "properties": entity.get('properties', {}) }) # Get relations (edges) edges = [] # This would need to be implemented in knowledge_graph_direct.py # For now, return empty edges return { "nodes": nodes, "edges": edges, "stats": { "total_entities": len(entities), "candidates": len([e for e in entities if e['type'] == 'candidate']), "companies": len([e for e in entities if e['type'] == 'company']), "jobs": len([e for e in entities if e['type'] == 'job']), "skills": len([e for e in entities if e['type'] == 'skill']) } } except Exception as e: logger.error(f"Failed to get graph data: {e}") return {"nodes": [], "edges": [], "error": str(e)} # Global instance _kg_service = None def get_knowledge_graph_service() -> KnowledgeGraphService: """Get or create the global knowledge graph service""" global _kg_service if _kg_service is None: _kg_service = KnowledgeGraphService() return _kg_service