| """ |
| Knowledge Graph Agent with GraphRAG |
| |
| Manages the user's knowledge graph using GraphRAG: |
| - Nodes: concepts, doubts, topics, resources |
| - Edges: relationships, dependencies, associations |
| - GraphRAG for retrieval and generation |
| """ |
|
|
| from typing import Dict, List, Any, Optional |
| from dataclasses import dataclass, field |
| from datetime import datetime |
| import json |
|
|
|
|
| @dataclass |
| class GraphNode: |
| """Knowledge graph node""" |
| node_id: str |
| node_type: str |
| label: str |
| properties: Dict = field(default_factory=dict) |
| embeddings: Optional[List[float]] = None |
| created_at: datetime = field(default_factory=datetime.now) |
|
|
|
|
| @dataclass |
| class GraphEdge: |
| """Knowledge graph edge""" |
| edge_id: str |
| source_id: str |
| target_id: str |
| relation_type: str |
| weight: float = 1.0 |
| properties: Dict = field(default_factory=dict) |
| created_at: datetime = field(default_factory=datetime.now) |
|
|
|
|
| @dataclass |
| class Ontology: |
| """Domain ontology for topic structure""" |
| entity_types: List[Dict] = field(default_factory=list) |
| relation_types: List[Dict] = field(default_factory=list) |
|
|
|
|
| class KnowledgeGraphAgent: |
| """ |
| Agent that manages the knowledge graph with GraphRAG capabilities. |
| |
| Features: |
| - Entity extraction from doubts and notes |
| - Relationship discovery |
| - Graph-based retrieval |
| - Path finding between concepts |
| - Ontology generation |
| """ |
| |
| def __init__(self, user_id: str, config: Optional[Dict] = None): |
| self.user_id = user_id |
| self.config = config or {} |
| |
| self.nodes: Dict[str, GraphNode] = {} |
| self.edges: Dict[str, GraphEdge] = {} |
| |
| self.graph_id = f"cf_graph_{user_id}_{datetime.now().timestamp()}" |
| |
| self._initialize_default_ontology() |
| |
| def _initialize_default_ontology(self): |
| """Initialize default learning ontology""" |
| self.ontology = Ontology( |
| entity_types=[ |
| {'name': 'Concept', 'description': 'A learning concept or topic'}, |
| {'name': 'Doubt', 'description': 'A question or confusion point'}, |
| {'name': 'Resource', 'description': 'Learning resource or material'}, |
| {'name': 'Topic', 'description': 'Main subject area'}, |
| {'name': 'Skill', 'description': 'Developed skill or competency'} |
| ], |
| relation_types=[ |
| {'name': 'prerequisite_of', 'description': 'Is prerequisite for'}, |
| {'name': 'related_to', 'description': 'Is related to'}, |
| {'name': 'part_of', 'description': 'Is part of'}, |
| {'name': 'helps_understand', 'description': 'Helps understand'}, |
| {'name': 'contrasts_with', 'description': 'Contrasts with'} |
| ] |
| ) |
| |
| def add_doubt_to_graph(self, doubt_data: Dict) -> GraphNode: |
| """Add a captured doubt to the knowledge graph""" |
| node_id = f"doubt_{doubt_data.get('id', datetime.now().timestamp())}" |
| |
| concept_tags = doubt_data.get('conceptTags', []) |
| |
| node = GraphNode( |
| node_id=node_id, |
| node_type='Doubt', |
| label=doubt_data.get('formattedTitle', doubt_data.get('rawText', '')), |
| properties={ |
| 'raw_text': doubt_data.get('rawText', ''), |
| 'summary': doubt_data.get('formattedSummary', ''), |
| 'doubt_type': doubt_data.get('doubtType', 'concept'), |
| 'concepts': concept_tags, |
| 'url': doubt_data.get('page', {}).get('url', ''), |
| 'mastered': doubt_data.get('mastered', False), |
| 'review_count': doubt_data.get('reviewCount', 0) |
| } |
| ) |
| |
| self.nodes[node_id] = node |
| |
| for concept in concept_tags: |
| self._ensure_concept_node(concept) |
| self._add_edge( |
| source=concept, |
| target=node_id, |
| relation='part_of' |
| ) |
| |
| return node |
| |
| def _ensure_concept_node(self, concept: str) -> GraphNode: |
| """Ensure a concept node exists in the graph""" |
| concept_id = f"concept_{concept.lower().replace(' ', '_')}" |
| |
| if concept_id in self.nodes: |
| return self.nodes[concept_id] |
| |
| node = GraphNode( |
| node_id=concept_id, |
| node_type='Concept', |
| label=concept, |
| properties={ |
| 'mastery_level': 0.0, |
| 'importance': 0.5, |
| 'last_reviewed': None |
| } |
| ) |
| |
| self.nodes[concept_id] = node |
| return node |
| |
| def _add_edge( |
| self, |
| source: str, |
| target: str, |
| relation: str, |
| weight: float = 1.0 |
| ) -> GraphEdge: |
| """Add an edge between nodes""" |
| edge_id = f"edge_{source}_{target}_{relation}" |
| |
| source_id = f"concept_{source.lower().replace(' ', '_')}" if not source.startswith('concept_') else source |
| target_id = f"concept_{target.lower().replace(' ', '_')}" if not target.startswith('concept_') else target |
| |
| if source_id not in self.nodes or target_id not in self.nodes: |
| return None |
| |
| edge = GraphEdge( |
| edge_id=edge_id, |
| source_id=source_id, |
| target_id=target_id, |
| relation_type=relation, |
| weight=weight |
| ) |
| |
| self.edges[edge_id] = edge |
| return edge |
| |
| def add_resource(self, resource_data: Dict) -> GraphNode: |
| """Add a learning resource to the graph""" |
| node_id = f"resource_{resource_data.get('id', datetime.now().timestamp())}" |
| |
| node = GraphNode( |
| node_id=node_id, |
| node_type='Resource', |
| label=resource_data.get('title', 'Untitled Resource'), |
| properties={ |
| 'url': resource_data.get('url', ''), |
| 'type': resource_data.get('type', 'webpage'), |
| 'topics': resource_data.get('topics', []), |
| 'difficulty': resource_data.get('difficulty', 0.5) |
| } |
| ) |
| |
| self.nodes[node_id] = node |
| |
| for topic in resource_data.get('topics', []): |
| self._ensure_concept_node(topic) |
| self._add_edge(topic, node_id, 'part_of') |
| |
| return node |
| |
| def add_topic(self, topic: str, parent: Optional[str] = None) -> GraphNode: |
| """Add a topic node to the graph""" |
| node = self._ensure_concept_node(topic) |
| |
| if parent: |
| self._ensure_concept_node(parent) |
| self._add_edge(topic, parent, 'prerequisite_of') |
| |
| return node |
| |
| def graphrag_retrieve( |
| self, |
| query: str, |
| top_k: int = 5 |
| ) -> List[Dict]: |
| """ |
| GraphRAG retrieval - find relevant nodes based on query. |
| |
| Uses: |
| - Keyword matching |
| - Graph traversal |
| - Relationship scoring |
| """ |
| results = [] |
| |
| query_lower = query.lower() |
| query_terms = query_lower.split() |
| |
| for node_id, node in self.nodes.items(): |
| score = 0.0 |
| |
| label_lower = node.label.lower() |
| for term in query_terms: |
| if term in label_lower: |
| score += 1.0 |
| if term in str(node.properties).lower(): |
| score += 0.5 |
| |
| if node.node_type == 'Doubt' and 'mastered' in node.properties: |
| if node.properties['mastered']: |
| score *= 0.8 |
| |
| if score > 0: |
| results.append({ |
| 'node': node, |
| 'score': score, |
| 'matched_terms': [t for t in query_terms if t in label_lower] |
| }) |
| |
| results.sort(key=lambda x: x['score'], reverse=True) |
| |
| return [{ |
| 'node_id': r['node'].node_id, |
| 'type': r['node'].node_type, |
| 'label': r['node'].label, |
| 'properties': r['node'].properties, |
| 'score': r['score'], |
| 'related': self._get_related_nodes(r['node'].node_id, limit=3) |
| } for r in results[:top_k]] |
| |
| def _get_related_nodes(self, node_id: str, limit: int = 3) -> List[Dict]: |
| """Get related nodes through graph traversal""" |
| related = [] |
| |
| for edge_id, edge in self.edges.items(): |
| if edge.source_id == node_id: |
| target = self.nodes.get(edge.target_id) |
| if target: |
| related.append({ |
| 'node_id': target.node_id, |
| 'type': target.node_type, |
| 'label': target.label, |
| 'relation': edge.relation_type |
| }) |
| elif edge.target_id == node_id: |
| source = self.nodes.get(edge.source_id) |
| if source: |
| related.append({ |
| 'node_id': source.node_id, |
| 'type': source.node_type, |
| 'label': source.label, |
| 'relation': edge.relation_type |
| }) |
| |
| return related[:limit] |
| |
| def find_learning_path( |
| self, |
| from_topic: str, |
| to_topic: str |
| ) -> List[str]: |
| """Find shortest path between two topics using BFS""" |
| from_id = f"concept_{from_topic.lower().replace(' ', '_')}" |
| to_id = f"concept_{to_topic.lower().replace(' ', '_')}" |
| |
| if from_id not in self.nodes or to_id not in self.nodes: |
| return [] |
| |
| queue = [(from_id, [from_id])] |
| visited = {from_id} |
| |
| while queue: |
| current, path = queue.pop(0) |
| |
| if current == to_id: |
| return [self.nodes[n].label for n in path] |
| |
| for edge_id, edge in self.edges.items(): |
| neighbor = None |
| if edge.source_id == current: |
| neighbor = edge.target_id |
| elif edge.target_id == current: |
| neighbor = edge.source_id |
| |
| if neighbor and neighbor not in visited: |
| visited.add(neighbor) |
| queue.append((neighbor, path + [neighbor])) |
| |
| return [] |
| |
| def get_topic_mastery(self) -> Dict[str, float]: |
| """Calculate mastery level for each topic""" |
| mastery = {} |
| |
| for node_id, node in self.nodes.items(): |
| if node.node_type == 'Concept': |
| related_doubts = self._get_doubt_count(node_id) |
| total_doubts = len([n for n in self.nodes.values() if n.node_type == 'Doubt']) |
| |
| if total_doubts > 0: |
| mastery[node.label] = 1.0 - (related_doubts / total_doubts) |
| else: |
| mastery[node.label] = 0.0 |
| |
| return mastery |
| |
| def _get_doubt_count(self, concept_id: str) -> int: |
| """Get number of doubts associated with a concept""" |
| count = 0 |
| for edge_id, edge in self.edges.items(): |
| if edge.source_id == concept_id and edge.relation_type == 'part_of': |
| target = self.nodes.get(edge.target_id) |
| if target and target.node_type == 'Doubt': |
| count += 1 |
| return count |
| |
| def get_graph_stats(self) -> Dict: |
| """Get graph statistics""" |
| node_types = {} |
| for node in self.nodes.values(): |
| node_types[node.node_type] = node_types.get(node.node_type, 0) + 1 |
| |
| relation_types = {} |
| for edge in self.edges.values(): |
| relation_types[edge.relation_type] = relation_types.get(edge.relation_type, 0) + 1 |
| |
| return { |
| 'graph_id': self.graph_id, |
| 'total_nodes': len(self.nodes), |
| 'total_edges': len(self.edges), |
| 'node_types': node_types, |
| 'relation_types': relation_types, |
| 'mastery': self.get_topic_mastery() |
| } |
| |
| def export_graph(self) -> Dict: |
| """Export graph for persistence""" |
| return { |
| 'graph_id': self.graph_id, |
| 'nodes': [ |
| { |
| 'node_id': n.node_id, |
| 'node_type': n.node_type, |
| 'label': n.label, |
| 'properties': n.properties |
| } |
| for n in self.nodes.values() |
| ], |
| 'edges': [ |
| { |
| 'edge_id': e.edge_id, |
| 'source_id': e.source_id, |
| 'target_id': e.target_id, |
| 'relation_type': e.relation_type, |
| 'weight': e.weight |
| } |
| for e in self.edges.values() |
| ], |
| 'ontology': { |
| 'entity_types': self.ontology.entity_types, |
| 'relation_types': self.ontology.relation_types |
| } |
| } |
| |
| def import_graph(self, graph_data: Dict): |
| """Import graph from persistence""" |
| self.graph_id = graph_data.get('graph_id', self.graph_id) |
| |
| self.nodes.clear() |
| self.edges.clear() |
| |
| for node_data in graph_data.get('nodes', []): |
| node = GraphNode( |
| node_id=node_data['node_id'], |
| node_type=node_data['node_type'], |
| label=node_data['label'], |
| properties=node_data.get('properties', {}) |
| ) |
| self.nodes[node.node_id] = node |
| |
| for edge_data in graph_data.get('edges', []): |
| edge = GraphEdge( |
| edge_id=edge_data['edge_id'], |
| source_id=edge_data['source_id'], |
| target_id=edge_data['target_id'], |
| relation_type=edge_data['relation_type'], |
| weight=edge_data.get('weight', 1.0) |
| ) |
| self.edges[edge.edge_id] = edge |
| |
| async def sync_to_zep(self): |
| """Sync graph to Zep Cloud for advanced GraphRAG""" |
| pass |
| |
| async def sync_to_graph(self): |
| """Sync current state""" |
| pass |
|
|