""" Service for knowledge graph operations """ import logging import traceback import json from typing import Dict, List, Any, Optional import os from sqlalchemy.orm import Session from sqlalchemy import func from backend.database.utils import ( get_knowledge_graph, get_all_knowledge_graphs, get_knowledge_graph_by_id ) from backend.database.models import KnowledgeGraph, Entity, Relation from .base_service import BaseService from backend.server_config import DEFAULT_KNOWLEDGE_GRAPH, PROCESSING_STATUS_FILE # Use the logger from BaseService logger = BaseService.get_logger("knowledge_graph") class KnowledgeGraphService(BaseService): """ Service for knowledge graph operations Provides functionalities for: - Retrieving knowledge graphs from the database - Fetching statistics about entities and relations - Getting platform-wide knowledge graph metrics """ @staticmethod @BaseService.handle_errors @BaseService.with_db_session def get_all_graphs(session: Session) -> List[str]: """ Get all available knowledge graphs from database Args: session: Database session Returns: List of knowledge graph filenames """ # Fetch knowledge graphs from database knowledge_graphs = get_all_knowledge_graphs(session) # Extract filenames files = [kg.filename for kg in knowledge_graphs if kg.filename] return files @staticmethod @BaseService.handle_errors @BaseService.with_db_session def get_graph_by_id(session: Session, graph_id: str) -> Dict[str, Any]: """ Get a specific knowledge graph by ID Args: session: Database session graph_id: ID of the knowledge graph to retrieve Returns: Dictionary containing the knowledge graph data Raises: FileNotFoundError: If the knowledge graph is not found in the database """ # Special handling for "latest" if graph_id == "latest": # Get the latest knowledge graph kg = KnowledgeGraphService.get_latest_graph(session) if not kg: raise FileNotFoundError("No latest knowledge graph found") else: # Check if graph_id is an integer (database ID) try: kg_id = int(graph_id) # Use get_knowledge_graph_by_id for integer IDs kg = get_knowledge_graph_by_id(session, kg_id) except ValueError: # If not an integer, treat as filename kg = get_knowledge_graph(session, graph_id) # Log which knowledge graph we're using if kg: logger.info(f"Using knowledge graph with ID {kg.id} and filename {kg.filename}") if kg: # Return the knowledge graph content logger.info(f"Retrieved knowledge graph '{graph_id}' from database") # Handle the case where graph_data might be stored as a string (TEXT) instead of JSON if kg.graph_data: if isinstance(kg.graph_data, str): try: return json.loads(kg.graph_data) except: # If we can't parse it as JSON, fall back to content if kg.content: try: return json.loads(kg.content) except: return {"error": "Could not parse graph data"} else: # Already a dictionary return kg.graph_data elif kg.content: try: return json.loads(kg.content) except: return {"error": "Could not parse graph content"} else: return {"error": "No graph data available"} else: # Not found in database - don't try to fallback logger.warning(f"Knowledge graph '{graph_id}' not found in database") raise FileNotFoundError(f"Knowledge graph '{graph_id}' not found in database") @staticmethod @BaseService.handle_errors @BaseService.with_db_session def get_platform_stats(session: Session) -> Dict[str, Any]: """ Get platform-wide statistics about knowledge graphs Args: session: Database session Returns: Dictionary containing statistics about knowledge graphs, entities, and relations """ # Total Graphs total_graphs = session.query(func.count(KnowledgeGraph.id)).scalar() # Total Entities total_entities = session.query(func.count(Entity.id)).scalar() # Total Relations total_relations = session.query(func.count(Relation.id)).scalar() # Entity Type Distribution entity_dist = session.query(Entity.type, func.count(Entity.id)).group_by(Entity.type).all() entity_distribution = {type: count for type, count in entity_dist} # Relation Type Distribution relation_dist = session.query(Relation.type, func.count(Relation.id)).group_by(Relation.type).all() relation_distribution = {type: count for type, count in relation_dist} # Recent Graphs (Top 5 by creation date) recent_graphs_query = session.query(KnowledgeGraph).order_by(KnowledgeGraph.creation_timestamp.desc()).limit(5).all() recent_graphs = [ { "filename": kg.filename, "creation_timestamp": kg.creation_timestamp.isoformat() if kg.creation_timestamp else None, "entity_count": kg.entity_count, "relation_count": kg.relation_count, "status": kg.status } for kg in recent_graphs_query ] return { "total_graphs": total_graphs, "total_entities": total_entities, "total_relations": total_relations, "entity_distribution": entity_distribution, "relation_distribution": relation_distribution, "recent_graphs": recent_graphs } @staticmethod def get_latest_graph(session): """ Get the most recently created knowledge graph from the database """ try: # Import DB functions from backend.database.utils import get_all_knowledge_graphs from backend.database.models import KnowledgeGraph import time from datetime import datetime, timedelta # Get all knowledge graphs knowledge_graphs = get_all_knowledge_graphs(session) # Sort by creation timestamp, most recent first sorted_graphs = sorted(knowledge_graphs, key=lambda x: x.creation_timestamp, reverse=True) if not sorted_graphs: return None # Get the most recent one latest_graph = sorted_graphs[0] # Log the current state logger.info(f"Latest knowledge graph has ID {latest_graph.id} and status '{latest_graph.status}'") # Always force at least 'created' status for a knowledge graph that doesn't have a status if not latest_graph.status or latest_graph.status == '': logger.info(f"Knowledge graph {latest_graph.id} has no status, setting to 'created'") latest_graph.status = 'created' latest_graph.update_timestamp = datetime.now() session.commit() return latest_graph except Exception as e: logger.error(f"Error getting latest knowledge graph: {str(e)}") raise @staticmethod def get_graph_model_by_id(session, graph_id): """ Get a knowledge graph model object by ID """ try: # Import DB function from backend.database.utils import get_knowledge_graph_by_id # Get the knowledge graph graph = get_knowledge_graph_by_id(session, graph_id) if not graph: return None return graph except Exception as e: logger.error(f"Error getting knowledge graph by ID: {str(e)}") raise @staticmethod def get_graph_by_filename(session, filename): """ Get a knowledge graph model object by filename """ try: # Import DB function from backend.database.utils import get_knowledge_graph # Get the knowledge graph graph = get_knowledge_graph(session, filename) if not graph: return None return graph except Exception as e: logger.error(f"Error getting knowledge graph by filename: {str(e)}") raise