AgentGraph / backend /services /knowledge_graph_service.py
wu981526092's picture
🚀 Deploy AgentGraph: Complete agent monitoring and knowledge graph system
c2ea5ed
"""
Service for knowledge graph operations
"""
import logging
import traceback
import json
from typing import Dict, List, Any, Optional
import os
from sqlalchemy.orm import Session
from sqlalchemy import func
from backend.database.utils import (
get_knowledge_graph,
get_all_knowledge_graphs,
get_knowledge_graph_by_id
)
from backend.database.models import KnowledgeGraph, Entity, Relation
from .base_service import BaseService
from backend.server_config import DEFAULT_KNOWLEDGE_GRAPH, PROCESSING_STATUS_FILE
# Use the logger from BaseService
logger = BaseService.get_logger("knowledge_graph")
class KnowledgeGraphService(BaseService):
"""
Service for knowledge graph operations
Provides functionalities for:
- Retrieving knowledge graphs from the database
- Fetching statistics about entities and relations
- Getting platform-wide knowledge graph metrics
"""
@staticmethod
@BaseService.handle_errors
@BaseService.with_db_session
def get_all_graphs(session: Session) -> List[str]:
"""
Get all available knowledge graphs from database
Args:
session: Database session
Returns:
List of knowledge graph filenames
"""
# Fetch knowledge graphs from database
knowledge_graphs = get_all_knowledge_graphs(session)
# Extract filenames
files = [kg.filename for kg in knowledge_graphs if kg.filename]
return files
@staticmethod
@BaseService.handle_errors
@BaseService.with_db_session
def get_graph_by_id(session: Session, graph_id: str) -> Dict[str, Any]:
"""
Get a specific knowledge graph by ID
Args:
session: Database session
graph_id: ID of the knowledge graph to retrieve
Returns:
Dictionary containing the knowledge graph data
Raises:
FileNotFoundError: If the knowledge graph is not found in the database
"""
# Special handling for "latest"
if graph_id == "latest":
# Get the latest knowledge graph
kg = KnowledgeGraphService.get_latest_graph(session)
if not kg:
raise FileNotFoundError("No latest knowledge graph found")
else:
# Check if graph_id is an integer (database ID)
try:
kg_id = int(graph_id)
# Use get_knowledge_graph_by_id for integer IDs
kg = get_knowledge_graph_by_id(session, kg_id)
except ValueError:
# If not an integer, treat as filename
kg = get_knowledge_graph(session, graph_id)
# Log which knowledge graph we're using
if kg:
logger.info(f"Using knowledge graph with ID {kg.id} and filename {kg.filename}")
if kg:
# Return the knowledge graph content
logger.info(f"Retrieved knowledge graph '{graph_id}' from database")
# Handle the case where graph_data might be stored as a string (TEXT) instead of JSON
if kg.graph_data:
if isinstance(kg.graph_data, str):
try:
return json.loads(kg.graph_data)
except:
# If we can't parse it as JSON, fall back to content
if kg.content:
try:
return json.loads(kg.content)
except:
return {"error": "Could not parse graph data"}
else:
# Already a dictionary
return kg.graph_data
elif kg.content:
try:
return json.loads(kg.content)
except:
return {"error": "Could not parse graph content"}
else:
return {"error": "No graph data available"}
else:
# Not found in database - don't try to fallback
logger.warning(f"Knowledge graph '{graph_id}' not found in database")
raise FileNotFoundError(f"Knowledge graph '{graph_id}' not found in database")
@staticmethod
@BaseService.handle_errors
@BaseService.with_db_session
def get_platform_stats(session: Session) -> Dict[str, Any]:
"""
Get platform-wide statistics about knowledge graphs
Args:
session: Database session
Returns:
Dictionary containing statistics about knowledge graphs, entities, and relations
"""
# Total Graphs
total_graphs = session.query(func.count(KnowledgeGraph.id)).scalar()
# Total Entities
total_entities = session.query(func.count(Entity.id)).scalar()
# Total Relations
total_relations = session.query(func.count(Relation.id)).scalar()
# Entity Type Distribution
entity_dist = session.query(Entity.type, func.count(Entity.id)).group_by(Entity.type).all()
entity_distribution = {type: count for type, count in entity_dist}
# Relation Type Distribution
relation_dist = session.query(Relation.type, func.count(Relation.id)).group_by(Relation.type).all()
relation_distribution = {type: count for type, count in relation_dist}
# Recent Graphs (Top 5 by creation date)
recent_graphs_query = session.query(KnowledgeGraph).order_by(KnowledgeGraph.creation_timestamp.desc()).limit(5).all()
recent_graphs = [
{
"filename": kg.filename,
"creation_timestamp": kg.creation_timestamp.isoformat() if kg.creation_timestamp else None,
"entity_count": kg.entity_count,
"relation_count": kg.relation_count,
"status": kg.status
} for kg in recent_graphs_query
]
return {
"total_graphs": total_graphs,
"total_entities": total_entities,
"total_relations": total_relations,
"entity_distribution": entity_distribution,
"relation_distribution": relation_distribution,
"recent_graphs": recent_graphs
}
@staticmethod
def get_latest_graph(session):
"""
Get the most recently created knowledge graph from the database
"""
try:
# Import DB functions
from backend.database.utils import get_all_knowledge_graphs
from backend.database.models import KnowledgeGraph
import time
from datetime import datetime, timedelta
# Get all knowledge graphs
knowledge_graphs = get_all_knowledge_graphs(session)
# Sort by creation timestamp, most recent first
sorted_graphs = sorted(knowledge_graphs, key=lambda x: x.creation_timestamp, reverse=True)
if not sorted_graphs:
return None
# Get the most recent one
latest_graph = sorted_graphs[0]
# Log the current state
logger.info(f"Latest knowledge graph has ID {latest_graph.id} and status '{latest_graph.status}'")
# Always force at least 'created' status for a knowledge graph that doesn't have a status
if not latest_graph.status or latest_graph.status == '':
logger.info(f"Knowledge graph {latest_graph.id} has no status, setting to 'created'")
latest_graph.status = 'created'
latest_graph.update_timestamp = datetime.now()
session.commit()
return latest_graph
except Exception as e:
logger.error(f"Error getting latest knowledge graph: {str(e)}")
raise
@staticmethod
def get_graph_model_by_id(session, graph_id):
"""
Get a knowledge graph model object by ID
"""
try:
# Import DB function
from backend.database.utils import get_knowledge_graph_by_id
# Get the knowledge graph
graph = get_knowledge_graph_by_id(session, graph_id)
if not graph:
return None
return graph
except Exception as e:
logger.error(f"Error getting knowledge graph by ID: {str(e)}")
raise
@staticmethod
def get_graph_by_filename(session, filename):
"""
Get a knowledge graph model object by filename
"""
try:
# Import DB function
from backend.database.utils import get_knowledge_graph
# Get the knowledge graph
graph = get_knowledge_graph(session, filename)
if not graph:
return None
return graph
except Exception as e:
logger.error(f"Error getting knowledge graph by filename: {str(e)}")
raise