Spaces:
Running
Running
| """ | |
| Nuclear Intelligence v3.0 - Enhanced Knowledge Graph | |
| ═══════════════════════════════════════════════════════════════════ | |
| Advanced graph with: | |
| - Entity relationships | |
| - Category management | |
| - Advanced search | |
| - Export capabilities | |
| - Statistics and analytics | |
| - Version control | |
| ═══════════════════════════════════════════════════════════════════ | |
| """ | |
| import os | |
| import json | |
| import hashlib | |
| from datetime import datetime | |
| from typing import Dict, Any, List, Optional, Set, Tuple | |
| from loguru import logger | |
| from collections import defaultdict | |
| class KnowledgeGraph: | |
| """Enhanced knowledge graph with advanced features""" | |
| def __init__(self, path: str = "knowledge_base/knowledge_graph.json"): | |
| self.path = path | |
| # Graph structure | |
| self.graph: Dict[str, Any] = { | |
| "entities": {}, # entity_id -> entity data | |
| "relationships": [], # relationship links | |
| "categories": {}, # category metadata | |
| "metadata": { | |
| "created": datetime.now().isoformat(), | |
| "version": "3.0", | |
| "last_updated": datetime.now().isoformat(), | |
| "total_entities": 0, | |
| "total_relationships": 0, | |
| } | |
| } | |
| # Index for fast lookups | |
| self._entity_index: Dict[str, Set[str]] = defaultdict(set) # word -> entity_ids | |
| self._category_index: Dict[str, Set[str]] = defaultdict(set) # category -> entity_ids | |
| self._tag_index: Dict[str, Set[str]] = defaultdict(set) # tag -> entity_ids | |
| self._load() | |
| def _load(self): | |
| """Load graph from disk""" | |
| if os.path.exists(self.path): | |
| try: | |
| with open(self.path, 'r', encoding='utf-8') as f: | |
| content = f.read().strip() | |
| if content: | |
| data = json.loads(content) | |
| if isinstance(data, dict) and "entities" in data: | |
| self.graph = data | |
| self._rebuild_indices() | |
| logger.info(f"📚 Loaded KG: {len(self.graph['entities'])} entities") | |
| else: | |
| logger.warning("Invalid KG format, starting fresh") | |
| except json.JSONDecodeError as e: | |
| logger.error(f"KG JSON error: {e}") | |
| self._backup_and_reset() | |
| except Exception as e: | |
| logger.error(f"KG load error: {e}") | |
| self._backup_and_reset() | |
| else: | |
| os.makedirs(os.path.dirname(self.path), exist_ok=True) | |
| self._save() | |
| def _backup_and_reset(self): | |
| """Backup corrupted file and reset""" | |
| try: | |
| backup_path = self.path + f".backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" | |
| if os.path.exists(self.path): | |
| os.rename(self.path, backup_path) | |
| logger.info(f"📦 Backed up corrupted KG to: {backup_path}") | |
| except: | |
| pass | |
| self.graph = self._get_empty_graph() | |
| self._save() | |
| def _get_empty_graph(self) -> Dict: | |
| return { | |
| "entities": {}, | |
| "relationships": [], | |
| "categories": {}, | |
| "metadata": { | |
| "created": datetime.now().isoformat(), | |
| "version": "3.0", | |
| "last_updated": datetime.now().isoformat(), | |
| } | |
| } | |
| def _save(self): | |
| """Save graph to disk atomically""" | |
| try: | |
| os.makedirs(os.path.dirname(self.path), exist_ok=True) | |
| self.graph["metadata"]["last_updated"] = datetime.now().isoformat() | |
| self.graph["metadata"]["total_entities"] = len(self.graph["entities"]) | |
| self.graph["metadata"]["total_relationships"] = len(self.graph["relationships"]) | |
| temp_path = self.path + ".tmp" | |
| with open(temp_path, 'w', encoding='utf-8') as f: | |
| json.dump(self.graph, f, indent=4, ensure_ascii=False) | |
| os.replace(temp_path, self.path) | |
| except Exception as e: | |
| logger.error(f"💾 Save error: {e}") | |
| def _rebuild_indices(self): | |
| """Rebuild search indices""" | |
| self._entity_index.clear() | |
| self._category_index.clear() | |
| self._tag_index.clear() | |
| for eid, entity in self.graph.get("entities", {}).items(): | |
| # Index by words in question | |
| question = entity.get("question", "").lower() | |
| words = set(question.split()) | |
| for word in words: | |
| if len(word) > 2: | |
| self._entity_index[word].add(eid) | |
| # Index by category | |
| meta = entity.get("metadata", {}) | |
| category = meta.get("category", "General") | |
| self._category_index[category].add(eid) | |
| # Index by keywords | |
| keywords = meta.get("keywords", []) | |
| for kw in keywords: | |
| self._tag_index[kw.lower()].add(eid) | |
| def _index_entity(self, entity_id: str, question: str, category: str, keywords: List[str]): | |
| """Index a new entity""" | |
| # Word index | |
| words = set(question.lower().split()) | |
| for word in words: | |
| if len(word) > 2: | |
| self._entity_index[word].add(entity_id) | |
| # Category index | |
| self._category_index[category].add(entity_id) | |
| # Tag index | |
| for kw in keywords: | |
| self._tag_index[kw.lower()].add(entity_id) | |
| def add_knowledge( | |
| self, | |
| question: str, | |
| answer: str, | |
| metadata: Dict[str, Any], | |
| entity_id: Optional[str] = None, | |
| ): | |
| """Add new knowledge to the graph""" | |
| # Generate entity ID | |
| if not entity_id: | |
| entity_id = hashlib.sha256(question.encode()).hexdigest()[:16] | |
| # Create entity | |
| entity = { | |
| "id": entity_id, | |
| "question": question, | |
| "answer_summary": answer[:500] + ("..." if len(answer) > 500 else ""), | |
| "answer_full": answer, | |
| "metadata": metadata, | |
| "created_at": datetime.now().isoformat(), | |
| "access_count": 0, | |
| "verified": metadata.get("accuracy", 0) >= 80, | |
| "version": "3.0", | |
| "relationships": [], | |
| } | |
| # Add to graph | |
| self.graph["entities"][entity_id] = entity | |
| # Create category relationship | |
| category = metadata.get("category", "General") | |
| rel_id = hashlib.sha256(f"{entity_id}_{category}".encode()).hexdigest()[:12] | |
| self.graph["relationships"].append({ | |
| "id": rel_id, | |
| "from": entity_id, | |
| "to": category, | |
| "type": "belongs_to_category", | |
| "created_at": datetime.now().isoformat(), | |
| }) | |
| # Create difficulty relationship | |
| difficulty = metadata.get("difficulty", 5) | |
| rel_id2 = hashlib.sha256(f"{entity_id}_difficulty".encode()).hexdigest()[:12] | |
| self.graph["relationships"].append({ | |
| "id": rel_id2, | |
| "from": entity_id, | |
| "to": f"difficulty_{difficulty}", | |
| "type": "has_difficulty", | |
| "created_at": datetime.now().isoformat(), | |
| }) | |
| # Update category metadata | |
| if category not in self.graph["categories"]: | |
| self.graph["categories"][category] = { | |
| "name": category, | |
| "count": 0, | |
| "avg_accuracy": 0, | |
| "created": datetime.now().isoformat(), | |
| } | |
| cat_data = self.graph["categories"][category] | |
| cat_data["count"] += 1 | |
| cat_data["avg_accuracy"] = ( | |
| (cat_data["avg_accuracy"] * (cat_data["count"] - 1) + metadata.get("accuracy", 0)) | |
| / cat_data["count"] | |
| ) | |
| # Index entity | |
| keywords = metadata.get("keywords", []) | |
| self._index_entity(entity_id, question, category, keywords) | |
| # Save | |
| self._save() | |
| logger.info(f"✅ Added entity: {question[:60]}... [{category}]") | |
| def update_entity(self, entity_id: str, updates: Dict) -> bool: | |
| """Update an existing entity""" | |
| if entity_id not in self.graph["entities"]: | |
| return False | |
| entity = self.graph["entities"][entity_id] | |
| entity.update(updates) | |
| entity["updated_at"] = datetime.now().isoformat() | |
| self._save() | |
| return True | |
| def delete_entity(self, entity_id: str) -> bool: | |
| """Delete an entity""" | |
| if entity_id not in self.graph["entities"]: | |
| return False | |
| # Remove entity | |
| del self.graph["entities"][entity_id] | |
| # Remove relationships | |
| self.graph["relationships"] = [ | |
| r for r in self.graph["relationships"] | |
| if r.get("from") != entity_id | |
| ] | |
| # Rebuild indices | |
| self._rebuild_indices() | |
| self._save() | |
| return True | |
| def search_entities( | |
| self, | |
| query: str, | |
| limit: int = 10, | |
| category: Optional[str] = None, | |
| min_accuracy: float = 0, | |
| ) -> List[Dict]: | |
| """Advanced entity search""" | |
| results = [] | |
| q_lower = query.lower() | |
| q_words = q_lower.split() | |
| # Score each entity | |
| entity_scores = {} | |
| for eid, entity in self.graph["entities"].items(): | |
| score = 0 | |
| question = entity.get("question", "").lower() | |
| answer = entity.get("answer_summary", "").lower() | |
| meta = entity.get("metadata", {}) | |
| # Exact match bonus | |
| if q_lower in question: | |
| score += 100 | |
| # Word match | |
| for word in q_words: | |
| if word in question: | |
| score += 20 | |
| if word in answer: | |
| score += 10 | |
| if word in str(meta.get("keywords", [])): | |
| score += 30 | |
| # Category filter | |
| if category and meta.get("category") != category: | |
| continue | |
| # Accuracy filter | |
| if meta.get("accuracy", 0) < min_accuracy: | |
| continue | |
| if score > 0: | |
| entity_scores[eid] = score | |
| # Sort by score | |
| sorted_ids = sorted(entity_scores.keys(), key=lambda x: entity_scores[x], reverse=True) | |
| # Get top results | |
| for eid in sorted_ids[:limit]: | |
| entity = self.graph["entities"][eid].copy() | |
| entity["_score"] = entity_scores[eid] | |
| entity["access_count"] = entity.get("access_count", 0) + 1 | |
| results.append(entity) | |
| return results | |
| def get_category_stats(self) -> Dict[str, int]: | |
| """Get category distribution""" | |
| stats = {} | |
| for rel in self.graph.get("relationships", []): | |
| if rel.get("type") == "belongs_to_category": | |
| cat = rel.get("to", "Unknown") | |
| stats[cat] = stats.get(cat, 0) + 1 | |
| return stats | |
| def get_recent_entities(self, limit: int = 10) -> List[Dict]: | |
| """Get most recently created entities""" | |
| entities = sorted( | |
| self.graph["entities"].values(), | |
| key=lambda x: x.get("created_at", ""), | |
| reverse=True | |
| ) | |
| return entities[:limit] | |
| def get_top_entities( | |
| self, | |
| by: str = "accuracy", | |
| limit: int = 10, | |
| category: Optional[str] = None, | |
| ) -> List[Dict]: | |
| """Get top entities by metric""" | |
| entities = list(self.graph["entities"].values()) | |
| # Filter by category | |
| if category: | |
| entities = [e for e in entities if e.get("metadata", {}).get("category") == category] | |
| # Sort | |
| if by == "accuracy": | |
| entities.sort(key=lambda x: x.get("metadata", {}).get("accuracy", 0), reverse=True) | |
| elif by == "novelty": | |
| entities.sort(key=lambda x: x.get("metadata", {}).get("novelty", 0), reverse=True) | |
| elif by == "access": | |
| entities.sort(key=lambda x: x.get("access_count", 0), reverse=True) | |
| elif by == "completeness": | |
| entities.sort(key=lambda x: x.get("metadata", {}).get("completeness", 0), reverse=True) | |
| return entities[:limit] | |
| def get_entities_by_category(self, category: str, limit: int = 50) -> List[Dict]: | |
| """Get all entities in a category""" | |
| return self.get_top_entities(by="accuracy", limit=limit, category=category) | |
| def get_difficulty_distribution(self) -> Dict[int, int]: | |
| """Get distribution of entities by difficulty""" | |
| dist = defaultdict(int) | |
| for entity in self.graph["entities"].values(): | |
| diff = entity.get("metadata", {}).get("difficulty", 5) | |
| dist[diff] += 1 | |
| return dict(sorted(dist.items())) | |
| def get_time_series(self, interval: str = "day", limit: int = 30) -> List[Dict]: | |
| """Get entity creation time series""" | |
| from collections import Counter | |
| timestamps = [] | |
| for entity in self.graph["entities"].values(): | |
| ts = entity.get("created_at", "")[:10] # YYYY-MM-DD | |
| if ts: | |
| timestamps.append(ts) | |
| counter = Counter(timestamps) | |
| return [{"date": k, "count": v} for k, v in sorted(counter.items(), reverse=True)[:limit]] | |
| def get_correlations(self) -> Dict: | |
| """Find entity correlations based on categories and keywords""" | |
| correlations = defaultdict(int) | |
| for entity in self.graph["entities"].values(): | |
| meta = entity.get("metadata", {}) | |
| keywords = meta.get("keywords", []) | |
| # Count keyword pairs | |
| for i, kw1 in enumerate(keywords): | |
| for kw2 in keywords[i+1:]: | |
| pair = tuple(sorted([kw1.lower(), kw2.lower()])) | |
| correlations[pair] += 1 | |
| # Return top correlations | |
| top = sorted(correlations.items(), key=lambda x: x[1], reverse=True)[:20] | |
| return {f"{k[0]}-{k[1]}": v for k, v in top} | |
| def export_json(self, path: Optional[str] = None) -> str: | |
| """Export graph as JSON""" | |
| path = path or self.path.replace(".json", "_export.json") | |
| with open(path, 'w', encoding='utf-8') as f: | |
| json.dump(self.graph, f, indent=4, ensure_ascii=False) | |
| return path | |
| def export_markdown(self, path: Optional[str] = None) -> str: | |
| """Export graph as Markdown""" | |
| lines = [ | |
| f"# Nuclear Intelligence Knowledge Graph v3.0", | |
| f"**Generated:** {datetime.now().isoformat()}", | |
| f"**Total Entities:** {len(self.graph['entities'])}", | |
| f"**Total Relationships:** {len(self.graph['relationships'])}", | |
| "", | |
| ] | |
| # Categories section | |
| stats = self.get_category_stats() | |
| lines.append("## Categories\n") | |
| for cat, count in sorted(stats.items(), key=lambda x: x[1], reverse=True): | |
| lines.append(f"- **{cat}**: {count} entities") | |
| # Top entities | |
| lines.append("\n## Top Entities (by Accuracy)\n") | |
| for entity in self.get_top_entities("accuracy", 20): | |
| meta = entity.get("metadata", {}) | |
| lines.append( | |
| f"### {entity['question'][:100]}...\n" | |
| f"- Category: {meta.get('category')}\n" | |
| f"- Accuracy: {meta.get('accuracy', 0):.1f}%\n" | |
| f"- Novelty: {meta.get('novelty', 0):.1f}%\n" | |
| f"- Difficulty: {meta.get('difficulty', 'N/A')}/10\n" | |
| f"- Created: {entity.get('created_at', '')[:10]}\n" | |
| ) | |
| path = path or self.path.replace(".json", "_export.md") | |
| with open(path, 'w', encoding='utf-8') as f: | |
| f.write("\n".join(lines)) | |
| return path | |
| def export_csv(self, path: Optional[str] = None) -> str: | |
| """Export entities as CSV""" | |
| import csv | |
| path = path or self.path.replace(".json", "_export.csv") | |
| with open(path, 'w', newline='', encoding='utf-8') as f: | |
| writer = csv.writer(f) | |
| writer.writerow([ | |
| "ID", "Question", "Category", "Difficulty", | |
| "Accuracy", "Novelty", "Usefulness", "Created" | |
| ]) | |
| for entity in self.graph["entities"].values(): | |
| meta = entity.get("metadata", {}) | |
| writer.writerow([ | |
| entity.get("id", ""), | |
| entity.get("question", "")[:200], | |
| meta.get("category", ""), | |
| meta.get("difficulty", ""), | |
| meta.get("accuracy", ""), | |
| meta.get("novelty", ""), | |
| meta.get("usefulness", ""), | |
| entity.get("created_at", "")[:10], | |
| ]) | |
| return path | |
| def get_stats(self) -> Dict[str, Any]: | |
| """Get comprehensive graph statistics""" | |
| entities = self.graph.get("entities", {}) | |
| # Calculate averages | |
| accuracies = [e.get("metadata", {}).get("accuracy", 0) for e in entities.values()] | |
| novelties = [e.get("metadata", {}).get("novelty", 0) for e in entities.values()] | |
| return { | |
| "total_entities": len(entities), | |
| "total_relationships": len(self.graph.get("relationships", [])), | |
| "category_distribution": self.get_category_stats(), | |
| "difficulty_distribution": self.get_difficulty_distribution(), | |
| "avg_accuracy": sum(accuracies) / max(len(accuracies), 1), | |
| "avg_novelty": sum(novelties) / max(len(novelties), 1), | |
| "verified_entities": sum(1 for e in entities.values() if e.get("verified")), | |
| "latest_update": self.graph.get("metadata", {}).get("last_updated", "N/A"), | |
| "version": self.graph.get("metadata", {}).get("version", "unknown"), | |
| } | |
| __all__ = ['KnowledgeGraph'] |