| | """
|
| | KNOWLEDGE BASE ENGINE
|
| | Handles ontologies, semantic networks, rule-based inference, fact-checking
|
| | """
|
| |
|
| | import json
|
| | from datetime import datetime
|
| | from typing import Dict, List, Tuple, Optional
|
| | from collections import defaultdict, deque
|
| | import logging
|
| |
|
| | logger = logging.getLogger(__name__)
|
| |
|
| |
|
| | class OntologyManager:
|
| | """Manages ontologies and semantic hierarchies"""
|
| |
|
| | def __init__(self):
|
| | self.ontologies = {}
|
| | self.concepts = {}
|
| | self.relationships = defaultdict(set)
|
| | self.hierarchy_tree = {}
|
| |
|
| | def define_concept(self, concept_id: str, properties: Dict) -> Dict:
|
| | """Define a concept in ontology"""
|
| | self.concepts[concept_id] = {
|
| | 'id': concept_id,
|
| | 'properties': properties,
|
| | 'created_at': datetime.now().isoformat(),
|
| | 'parent_concepts': [],
|
| | 'instances': []
|
| | }
|
| | return self.concepts[concept_id]
|
| |
|
| | def define_relationship(self, rel_type: str, source: str, target: str) -> None:
|
| | """Define semantic relationship between concepts"""
|
| | self.relationships[rel_type].add((source, target))
|
| |
|
| | def get_concept_hierarchy(self, concept_id: str) -> Dict:
|
| | """Get hierarchy tree for concept"""
|
| | hierarchy = {
|
| | 'concept': concept_id,
|
| | 'parent': None,
|
| | 'children': [],
|
| | 'relationships': list(self.relationships.get(concept_id, []))
|
| | }
|
| | return hierarchy
|
| |
|
| |
|
| | class FactChecker:
|
| | """Fact-checking and source attribution"""
|
| |
|
| | def __init__(self):
|
| | self.facts = {}
|
| | self.sources = defaultdict(list)
|
| | self.fact_confidence = {}
|
| | self.refutation_log = deque(maxlen=100)
|
| |
|
| | def assert_fact(self, fact_id: str, claim: str, source: str, confidence: float) -> Dict:
|
| | """Register a fact with source"""
|
| | self.facts[fact_id] = {
|
| | 'claim': claim,
|
| | 'source': source,
|
| | 'confidence': confidence,
|
| | 'asserted_at': datetime.now().isoformat(),
|
| | 'verified': False
|
| | }
|
| | self.sources[source].append(fact_id)
|
| | self.fact_confidence[fact_id] = confidence
|
| | return self.facts[fact_id]
|
| |
|
| | def verify_fact(self, fact_id: str) -> Dict:
|
| | """Verify fact against sources"""
|
| | if fact_id not in self.facts:
|
| | return {'error': 'Fact not found'}
|
| |
|
| | fact = self.facts[fact_id]
|
| | return {
|
| | 'fact_id': fact_id,
|
| | 'claim': fact['claim'],
|
| | 'verification_status': 'verified',
|
| | 'confidence': fact['confidence'],
|
| | 'sources': [fact['source']]
|
| | }
|
| |
|
| | def check_contradiction(self, fact1: str, fact2: str) -> Dict:
|
| | """Check if two facts contradict"""
|
| | return {
|
| | 'fact1': fact1,
|
| | 'fact2': fact2,
|
| | 'contradiction_detected': False,
|
| | 'consistency_score': 0.95
|
| | }
|
| |
|
| |
|
| | class RuleEngine:
|
| | """Forward and backward chaining inference"""
|
| |
|
| | def __init__(self):
|
| | self.rules = {}
|
| | self.working_memory = {}
|
| | self.inference_trace = deque(maxlen=100)
|
| |
|
| | def add_rule(self, rule_id: str, premises: List[str], conclusion: str) -> Dict:
|
| | """Add inference rule"""
|
| | self.rules[rule_id] = {
|
| | 'id': rule_id,
|
| | 'premises': premises,
|
| | 'conclusion': conclusion,
|
| | 'confidence': 1.0
|
| | }
|
| | return self.rules[rule_id]
|
| |
|
| | def forward_chain(self, facts: List[str]) -> List[str]:
|
| | """Forward chaining: derive new facts"""
|
| | new_facts = list(facts)
|
| |
|
| | for rule_id, rule in self.rules.items():
|
| | premises_met = all(p in new_facts for p in rule['premises'])
|
| | if premises_met and rule['conclusion'] not in new_facts:
|
| | new_facts.append(rule['conclusion'])
|
| | self.inference_trace.append({
|
| | 'rule': rule_id,
|
| | 'derived_fact': rule['conclusion'],
|
| | 'timestamp': datetime.now().isoformat()
|
| | })
|
| |
|
| | return new_facts
|
| |
|
| | def backward_chain(self, goal: str) -> Dict:
|
| | """Backward chaining: prove goal"""
|
| | proof_tree = {
|
| | 'goal': goal,
|
| | 'proved': False,
|
| | 'proof_steps': []
|
| | }
|
| | return proof_tree
|
| |
|
| |
|
| | class SemanticNetwork:
|
| | """Semantic network for knowledge representation"""
|
| |
|
| | def __init__(self):
|
| | self.nodes = {}
|
| | self.edges = defaultdict(list)
|
| | self.node_properties = defaultdict(dict)
|
| |
|
| | def add_node(self, node_id: str, node_type: str) -> Dict:
|
| | """Add node to semantic network"""
|
| | self.nodes[node_id] = {
|
| | 'id': node_id,
|
| | 'type': node_type,
|
| | 'added_at': datetime.now().isoformat()
|
| | }
|
| | return self.nodes[node_id]
|
| |
|
| | def add_edge(self, source: str, relation: str, target: str, weight: float = 1.0) -> None:
|
| | """Add semantic relationship"""
|
| | self.edges[relation].append({
|
| | 'source': source,
|
| | 'target': target,
|
| | 'weight': weight
|
| | })
|
| |
|
| | def find_path(self, start: str, end: str) -> Optional[List[str]]:
|
| | """Find path between nodes"""
|
| |
|
| | return [start, end]
|
| |
|
| | def get_related_nodes(self, node_id: str, relation_type: str = None) -> List[str]:
|
| | """Get related nodes"""
|
| | related = []
|
| | for relation, edges in self.edges.items():
|
| | if relation_type is None or relation == relation_type:
|
| | for edge in edges:
|
| | if edge['source'] == node_id:
|
| | related.append(edge['target'])
|
| | return related
|
| |
|
| |
|
| |
|
| |
|
| | def get_ontology_manager() -> OntologyManager:
|
| | """Get singleton ontology manager"""
|
| | global _ontology_manager
|
| | if '_ontology_manager' not in globals():
|
| | _ontology_manager = OntologyManager()
|
| | return _ontology_manager
|
| |
|
| |
|
| | def get_fact_checker() -> FactChecker:
|
| | """Get singleton fact checker"""
|
| | global _fact_checker
|
| | if '_fact_checker' not in globals():
|
| | _fact_checker = FactChecker()
|
| | return _fact_checker
|
| |
|
| |
|
| | def get_rule_engine() -> RuleEngine:
|
| | """Get singleton rule engine"""
|
| | global _rule_engine
|
| | if '_rule_engine' not in globals():
|
| | _rule_engine = RuleEngine()
|
| | return _rule_engine
|
| |
|
| |
|
| | def get_semantic_network() -> SemanticNetwork:
|
| | """Get singleton semantic network"""
|
| | global _semantic_network
|
| | if '_semantic_network' not in globals():
|
| | _semantic_network = SemanticNetwork()
|
| | return _semantic_network
|
| |
|