Spaces:
Running
Running
| """ | |
| Service for knowledge graph operations | |
| """ | |
| import logging | |
| import traceback | |
| import json | |
| from typing import Dict, List, Any, Optional | |
| import os | |
| from sqlalchemy.orm import Session | |
| from sqlalchemy import func | |
| from backend.database.utils import ( | |
| get_knowledge_graph, | |
| get_all_knowledge_graphs, | |
| get_knowledge_graph_by_id | |
| ) | |
| from backend.database.models import KnowledgeGraph, Entity, Relation | |
| from .base_service import BaseService | |
| from backend.server_config import DEFAULT_KNOWLEDGE_GRAPH, PROCESSING_STATUS_FILE | |
| # Use the logger from BaseService | |
| logger = BaseService.get_logger("knowledge_graph") | |
| class KnowledgeGraphService(BaseService): | |
| """ | |
| Service for knowledge graph operations | |
| Provides functionalities for: | |
| - Retrieving knowledge graphs from the database | |
| - Fetching statistics about entities and relations | |
| - Getting platform-wide knowledge graph metrics | |
| """ | |
| def get_all_graphs(session: Session) -> List[str]: | |
| """ | |
| Get all available knowledge graphs from database | |
| Args: | |
| session: Database session | |
| Returns: | |
| List of knowledge graph filenames | |
| """ | |
| # Fetch knowledge graphs from database | |
| knowledge_graphs = get_all_knowledge_graphs(session) | |
| # Extract filenames | |
| files = [kg.filename for kg in knowledge_graphs if kg.filename] | |
| return files | |
| def get_graph_by_id(session: Session, graph_id: str) -> Dict[str, Any]: | |
| """ | |
| Get a specific knowledge graph by ID | |
| Args: | |
| session: Database session | |
| graph_id: ID of the knowledge graph to retrieve | |
| Returns: | |
| Dictionary containing the knowledge graph data | |
| Raises: | |
| FileNotFoundError: If the knowledge graph is not found in the database | |
| """ | |
| # Special handling for "latest" | |
| if graph_id == "latest": | |
| # Get the latest knowledge graph | |
| kg = KnowledgeGraphService.get_latest_graph(session) | |
| if not kg: | |
| raise FileNotFoundError("No latest knowledge graph found") | |
| else: | |
| # Check if graph_id is an integer (database ID) | |
| try: | |
| kg_id = int(graph_id) | |
| # Use get_knowledge_graph_by_id for integer IDs | |
| kg = get_knowledge_graph_by_id(session, kg_id) | |
| except ValueError: | |
| # If not an integer, treat as filename | |
| kg = get_knowledge_graph(session, graph_id) | |
| # Log which knowledge graph we're using | |
| if kg: | |
| logger.info(f"Using knowledge graph with ID {kg.id} and filename {kg.filename}") | |
| if kg: | |
| # Return the knowledge graph content | |
| logger.info(f"Retrieved knowledge graph '{graph_id}' from database") | |
| # Handle the case where graph_data might be stored as a string (TEXT) instead of JSON | |
| if kg.graph_data: | |
| if isinstance(kg.graph_data, str): | |
| try: | |
| return json.loads(kg.graph_data) | |
| except: | |
| # If we can't parse it as JSON, fall back to content | |
| if kg.content: | |
| try: | |
| return json.loads(kg.content) | |
| except: | |
| return {"error": "Could not parse graph data"} | |
| else: | |
| # Already a dictionary | |
| return kg.graph_data | |
| elif kg.content: | |
| try: | |
| return json.loads(kg.content) | |
| except: | |
| return {"error": "Could not parse graph content"} | |
| else: | |
| return {"error": "No graph data available"} | |
| else: | |
| # Not found in database - don't try to fallback | |
| logger.warning(f"Knowledge graph '{graph_id}' not found in database") | |
| raise FileNotFoundError(f"Knowledge graph '{graph_id}' not found in database") | |
| def get_platform_stats(session: Session) -> Dict[str, Any]: | |
| """ | |
| Get platform-wide statistics about knowledge graphs | |
| Args: | |
| session: Database session | |
| Returns: | |
| Dictionary containing statistics about knowledge graphs, entities, and relations | |
| """ | |
| # Total Graphs | |
| total_graphs = session.query(func.count(KnowledgeGraph.id)).scalar() | |
| # Total Entities | |
| total_entities = session.query(func.count(Entity.id)).scalar() | |
| # Total Relations | |
| total_relations = session.query(func.count(Relation.id)).scalar() | |
| # Entity Type Distribution | |
| entity_dist = session.query(Entity.type, func.count(Entity.id)).group_by(Entity.type).all() | |
| entity_distribution = {type: count for type, count in entity_dist} | |
| # Relation Type Distribution | |
| relation_dist = session.query(Relation.type, func.count(Relation.id)).group_by(Relation.type).all() | |
| relation_distribution = {type: count for type, count in relation_dist} | |
| # Recent Graphs (Top 5 by creation date) | |
| recent_graphs_query = session.query(KnowledgeGraph).order_by(KnowledgeGraph.creation_timestamp.desc()).limit(5).all() | |
| recent_graphs = [ | |
| { | |
| "filename": kg.filename, | |
| "creation_timestamp": kg.creation_timestamp.isoformat() if kg.creation_timestamp else None, | |
| "entity_count": kg.entity_count, | |
| "relation_count": kg.relation_count, | |
| "status": kg.status | |
| } for kg in recent_graphs_query | |
| ] | |
| return { | |
| "total_graphs": total_graphs, | |
| "total_entities": total_entities, | |
| "total_relations": total_relations, | |
| "entity_distribution": entity_distribution, | |
| "relation_distribution": relation_distribution, | |
| "recent_graphs": recent_graphs | |
| } | |
| def get_latest_graph(session): | |
| """ | |
| Get the most recently created knowledge graph from the database | |
| """ | |
| try: | |
| # Import DB functions | |
| from backend.database.utils import get_all_knowledge_graphs | |
| from backend.database.models import KnowledgeGraph | |
| import time | |
| from datetime import datetime, timedelta | |
| # Get all knowledge graphs | |
| knowledge_graphs = get_all_knowledge_graphs(session) | |
| # Sort by creation timestamp, most recent first | |
| sorted_graphs = sorted(knowledge_graphs, key=lambda x: x.creation_timestamp, reverse=True) | |
| if not sorted_graphs: | |
| return None | |
| # Get the most recent one | |
| latest_graph = sorted_graphs[0] | |
| # Log the current state | |
| logger.info(f"Latest knowledge graph has ID {latest_graph.id} and status '{latest_graph.status}'") | |
| # Always force at least 'created' status for a knowledge graph that doesn't have a status | |
| if not latest_graph.status or latest_graph.status == '': | |
| logger.info(f"Knowledge graph {latest_graph.id} has no status, setting to 'created'") | |
| latest_graph.status = 'created' | |
| latest_graph.update_timestamp = datetime.now() | |
| session.commit() | |
| return latest_graph | |
| except Exception as e: | |
| logger.error(f"Error getting latest knowledge graph: {str(e)}") | |
| raise | |
| def get_graph_model_by_id(session, graph_id): | |
| """ | |
| Get a knowledge graph model object by ID | |
| """ | |
| try: | |
| # Import DB function | |
| from backend.database.utils import get_knowledge_graph_by_id | |
| # Get the knowledge graph | |
| graph = get_knowledge_graph_by_id(session, graph_id) | |
| if not graph: | |
| return None | |
| return graph | |
| except Exception as e: | |
| logger.error(f"Error getting knowledge graph by ID: {str(e)}") | |
| raise | |
| def get_graph_by_filename(session, filename): | |
| """ | |
| Get a knowledge graph model object by filename | |
| """ | |
| try: | |
| # Import DB function | |
| from backend.database.utils import get_knowledge_graph | |
| # Get the knowledge graph | |
| graph = get_knowledge_graph(session, filename) | |
| if not graph: | |
| return None | |
| return graph | |
| except Exception as e: | |
| logger.error(f"Error getting knowledge graph by filename: {str(e)}") | |
| raise |