|
|
""" |
|
|
Simplified Gradio MCP Server for Knowledge Graphs loaded from HuggingFace datasets. |
|
|
""" |
|
|
import os |
|
|
import sys |
|
|
import argparse |
|
|
import difflib |
|
|
import fnmatch |
|
|
import re |
|
|
from typing import Optional, List |
|
|
import gradio as gr |
|
|
|
|
|
|
|
|
try: |
|
|
from langfuse import get_client, observe |
|
|
langfuse = get_client() |
|
|
LANGFUSE_ENABLED = langfuse.auth_check() |
|
|
if LANGFUSE_ENABLED: |
|
|
print("β Langfuse client is authenticated and ready!") |
|
|
else: |
|
|
print("β οΈ Langfuse authentication failed. Tracing disabled.") |
|
|
except Exception as e: |
|
|
print(f"β οΈ Langfuse not available: {e}. Tracing disabled.") |
|
|
LANGFUSE_ENABLED = False |
|
|
def observe(*args, **kwargs): |
|
|
def decorator(func): |
|
|
return func |
|
|
return decorator |
|
|
|
|
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'pedagogia_graph_code_repo')) |
|
|
|
|
|
from pedagogia_graph_code_repo.RepoKnowledgeGraphLib.RepoKnowledgeGraph import RepoKnowledgeGraph |
|
|
|
|
|
|
|
|
knowledge_graph = None |
|
|
|
|
|
|
|
|
def initialize_knowledge_graph( |
|
|
hf_dataset: str, |
|
|
hf_token: Optional[str] = None, |
|
|
index_nodes: bool = True, |
|
|
code_index_kwargs: Optional[dict] = None |
|
|
): |
|
|
"""Initialize the knowledge graph from a HuggingFace dataset.""" |
|
|
global knowledge_graph |
|
|
|
|
|
model_service_kwargs = { |
|
|
"embedder_type": "sentence-transformers", |
|
|
"embed_model_name": "Salesforce/SFR-Embedding-Code-400M_R", |
|
|
} |
|
|
|
|
|
print(f"Loading knowledge graph from HuggingFace dataset: {hf_dataset}") |
|
|
knowledge_graph = RepoKnowledgeGraph.from_hf_dataset( |
|
|
repo_id=hf_dataset, |
|
|
index_nodes=index_nodes, |
|
|
model_service_kwargs=model_service_kwargs, |
|
|
code_index_kwargs=code_index_kwargs, |
|
|
token=hf_token |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
@observe(as_type="tool") |
|
|
def get_node_info(node_id: str) -> str: |
|
|
""" |
|
|
Get detailed information about a node in the knowledge graph. |
|
|
|
|
|
Returns information including the node's type, name, description, |
|
|
declared/called entities, and type-specific details. |
|
|
|
|
|
Args: |
|
|
node_id: The ID of the node to retrieve information for |
|
|
|
|
|
Returns: |
|
|
str: A formatted string with node information |
|
|
""" |
|
|
if knowledge_graph is None: |
|
|
return "Error: Knowledge graph not initialized" |
|
|
|
|
|
try: |
|
|
if node_id not in knowledge_graph.graph: |
|
|
return f"Error: Node '{node_id}' not found in knowledge graph" |
|
|
|
|
|
node = knowledge_graph.graph.nodes[node_id]['data'] |
|
|
node_type = getattr(node, 'node_type', 'Unknown') |
|
|
node_class = node.__class__.__name__ |
|
|
node_name = getattr(node, 'name', 'Unknown') |
|
|
description = getattr(node, 'description', None) |
|
|
|
|
|
result = f"Node Information:\nββββββββββββββββββββββββββββββββββββββββ\n" |
|
|
result += f"Node ID: {node_id}\nClass: {node_class}\nName: {node_name}\nType: {node_type}\n" |
|
|
result += f"Description: {description or 'N/A'}\n" |
|
|
|
|
|
if node_class == 'EntityNode' or node_type == 'entity': |
|
|
entity_type = getattr(node, 'entity_type', 'Unknown') |
|
|
declaring_chunk_ids = getattr(node, 'declaring_chunk_ids', []) |
|
|
calling_chunk_ids = getattr(node, 'calling_chunk_ids', []) |
|
|
aliases = getattr(node, 'aliases', []) |
|
|
|
|
|
result += f"\nEntity Type: {entity_type}\n" |
|
|
result += f"Aliases: {', '.join(aliases) if aliases else 'None'}\n" |
|
|
result += f"Declared in {len(declaring_chunk_ids)} chunk(s):\n" |
|
|
for cid in declaring_chunk_ids[:5]: |
|
|
result += f" - {cid}\n" |
|
|
if len(declaring_chunk_ids) > 5: |
|
|
result += f" ... and {len(declaring_chunk_ids) - 5} more\n" |
|
|
result += f"Called in {len(calling_chunk_ids)} chunk(s):\n" |
|
|
for cid in calling_chunk_ids[:5]: |
|
|
result += f" - {cid}\n" |
|
|
if len(calling_chunk_ids) > 5: |
|
|
result += f" ... and {len(calling_chunk_ids) - 5} more\n" |
|
|
result += f"\nSummary: Entity {node_id} ({node_name}) β {entity_type} declared in {len(declaring_chunk_ids)} chunk(s) and called in {len(calling_chunk_ids)} chunk(s).\n" |
|
|
else: |
|
|
declared_entities = getattr(node, 'declared_entities', []) |
|
|
called_entities = getattr(node, 'called_entities', []) |
|
|
|
|
|
result += f"\nDeclared Entities ({len(declared_entities)}):\n" |
|
|
for entity in declared_entities[:10]: |
|
|
result += f" - {entity}\n" |
|
|
if len(declared_entities) > 10: |
|
|
result += f" ... and {len(declared_entities) - 10} more\n" |
|
|
|
|
|
result += f"\nCalled Entities ({len(called_entities)}):\n" |
|
|
for entity in called_entities[:10]: |
|
|
result += f" - {entity}\n" |
|
|
if len(called_entities) > 10: |
|
|
result += f" ... and {len(called_entities) - 10} more\n" |
|
|
|
|
|
|
|
|
if node_type in ['file', 'chunk']: |
|
|
content = getattr(node, 'content', None) |
|
|
result += f"\nContent:\n{content or 'N/A'}\n" |
|
|
if hasattr(node, 'path'): |
|
|
result += f"Path: {node.path}\n" |
|
|
if hasattr(node, 'language'): |
|
|
result += f"Language: {node.language}\n" |
|
|
if node_type == 'chunk' and hasattr(node, 'order_in_file'): |
|
|
result += f"Order in File: {node.order_in_file}\n" |
|
|
elif node_type == 'directory': |
|
|
if hasattr(node, 'path'): |
|
|
result += f"Path: {node.path}\n" |
|
|
|
|
|
result += f"\nSummary: Node {node_id} ({node_name}) β {node_type} with {len(declared_entities)} declared and {len(called_entities)} called entities.\n" |
|
|
|
|
|
return result |
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
|
|
|
@observe(as_type="tool") |
|
|
def get_node_edges(node_id: str) -> str: |
|
|
""" |
|
|
List all incoming and outgoing edges for a node. |
|
|
|
|
|
Shows relationships to other nodes in the knowledge graph. |
|
|
|
|
|
Args: |
|
|
node_id: The ID of the node whose edges to list |
|
|
|
|
|
Returns: |
|
|
str: A formatted string showing all edges |
|
|
""" |
|
|
if knowledge_graph is None: |
|
|
return "Error: Knowledge graph not initialized" |
|
|
|
|
|
try: |
|
|
if node_id not in knowledge_graph.graph: |
|
|
return f"Error: Node '{node_id}' not found in knowledge graph" |
|
|
|
|
|
g = knowledge_graph.graph |
|
|
|
|
|
incoming = [ |
|
|
{"source": src, "target": tgt, "relation": data.get("relation", "?")} |
|
|
for src, tgt, data in g.in_edges(node_id, data=True) |
|
|
] |
|
|
outgoing = [ |
|
|
{"source": src, "target": tgt, "relation": data.get("relation", "?")} |
|
|
for src, tgt, data in g.out_edges(node_id, data=True) |
|
|
] |
|
|
|
|
|
result = f"""Node Edges for '{node_id}': |
|
|
ββββββββββββββββββββββββββββββββββββββββ |
|
|
|
|
|
Incoming Edges ({len(incoming)}): |
|
|
""" |
|
|
for edge in incoming[:20]: |
|
|
result += f" β {edge['source']} [{edge['relation']}]\n" |
|
|
if len(incoming) > 20: |
|
|
result += f" ... and {len(incoming) - 20} more\n" |
|
|
|
|
|
result += f"\nOutgoing Edges ({len(outgoing)}):\n" |
|
|
for edge in outgoing[:20]: |
|
|
result += f" β {edge['target']} [{edge['relation']}]\n" |
|
|
if len(outgoing) > 20: |
|
|
result += f" ... and {len(outgoing) - 20} more\n" |
|
|
|
|
|
return result |
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
|
|
|
@observe(as_type="tool") |
|
|
def search_nodes(query: str, limit: int = 10) -> str: |
|
|
""" |
|
|
Search for nodes in the knowledge graph by query string. |
|
|
|
|
|
Uses semantic and keyword search via the code index. |
|
|
|
|
|
Args: |
|
|
query: The search string to match against code index |
|
|
limit: Maximum number of results to return (default: 10) |
|
|
|
|
|
Returns: |
|
|
str: A formatted string with search results |
|
|
""" |
|
|
if knowledge_graph is None: |
|
|
return "Error: Knowledge graph not initialized" |
|
|
|
|
|
try: |
|
|
|
|
|
if isinstance(limit, str): |
|
|
try: |
|
|
limit = int(limit) |
|
|
except ValueError: |
|
|
return f"Error: 'limit' must be an integer, got '{limit}'" |
|
|
|
|
|
if limit <= 0: |
|
|
return "Error: limit must be a positive integer" |
|
|
|
|
|
results = knowledge_graph.code_index.query(query, n_results=limit) |
|
|
metadatas = results.get("metadatas", [[]])[0] |
|
|
|
|
|
if not metadatas: |
|
|
return f"No results found for '{query}'." |
|
|
|
|
|
result = f"Search Results for '{query}' ({len(metadatas)} results):\n" |
|
|
result += "ββββββββββββββββββββββββββββββββββββββββ\n\n" |
|
|
|
|
|
for i, res in enumerate(metadatas, 1): |
|
|
result += f"{i}. ID: {res.get('id', 'N/A')}\n" |
|
|
content = res.get('content', '') |
|
|
if content: |
|
|
result += f" Content: {content}\n" |
|
|
|
|
|
|
|
|
declared = res.get('declared_entities', '') |
|
|
if declared and declared != '[]': |
|
|
try: |
|
|
|
|
|
import json |
|
|
if isinstance(declared, str): |
|
|
declared = json.loads(declared) |
|
|
|
|
|
if isinstance(declared, list) and declared: |
|
|
entity_names = [e.get('name', str(e)) if isinstance(e, dict) else str(e) for e in declared[:10]] |
|
|
result += f" Declared: {', '.join(entity_names)}\n" |
|
|
if len(declared) > 10: |
|
|
result += f" ... and {len(declared) - 10} more\n" |
|
|
except (json.JSONDecodeError, AttributeError): |
|
|
result += f" Declared: {declared}\n" |
|
|
|
|
|
|
|
|
called = res.get('called_entities', '') |
|
|
if called and called != '[]': |
|
|
try: |
|
|
|
|
|
import json |
|
|
if isinstance(called, str): |
|
|
called = json.loads(called) |
|
|
|
|
|
if isinstance(called, list) and called: |
|
|
entity_names = [e.get('name', str(e)) if isinstance(e, dict) else str(e) for e in called[:10]] |
|
|
result += f" Called: {', '.join(entity_names)}\n" |
|
|
if len(called) > 10: |
|
|
result += f" ... and {len(called) - 10} more\n" |
|
|
except (json.JSONDecodeError, AttributeError): |
|
|
result += f" Called: {called}\n" |
|
|
result += "\n" |
|
|
|
|
|
return result |
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
|
|
|
@observe(as_type="tool") |
|
|
def get_graph_stats() -> str: |
|
|
""" |
|
|
Get overall statistics about the knowledge graph. |
|
|
|
|
|
Includes node and edge counts, types, and relations. |
|
|
|
|
|
Returns: |
|
|
str: A formatted string with graph statistics |
|
|
""" |
|
|
if knowledge_graph is None: |
|
|
return "Error: Knowledge graph not initialized" |
|
|
|
|
|
try: |
|
|
g = knowledge_graph.graph |
|
|
num_nodes = g.number_of_nodes() |
|
|
num_edges = g.number_of_edges() |
|
|
|
|
|
node_types = {} |
|
|
for _, node_attrs in g.nodes(data=True): |
|
|
node_type = getattr(node_attrs['data'], 'node_type', 'Unknown') |
|
|
node_types[node_type] = node_types.get(node_type, 0) + 1 |
|
|
|
|
|
edge_relations = {} |
|
|
for _, _, attrs in g.edges(data=True): |
|
|
relation = attrs.get('relation', 'Unknown') |
|
|
edge_relations[relation] = edge_relations.get(relation, 0) + 1 |
|
|
|
|
|
result = f"""Knowledge Graph Statistics: |
|
|
ββββββββββββββββββββββββββββββββββββββββ |
|
|
|
|
|
Total Nodes: {num_nodes} |
|
|
Total Edges: {num_edges} |
|
|
|
|
|
Node Types: |
|
|
""" |
|
|
for ntype, count in sorted(node_types.items(), key=lambda x: x[1], reverse=True): |
|
|
result += f" - {ntype}: {count}\n" |
|
|
|
|
|
result += "\nEdge Relations:\n" |
|
|
for relation, count in sorted(edge_relations.items(), key=lambda x: x[1], reverse=True): |
|
|
result += f" - {relation}: {count}\n" |
|
|
|
|
|
return result |
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
|
|
|
@observe(as_type="tool") |
|
|
def list_nodes_by_type(node_type: str, limit: int = 20) -> str: |
|
|
""" |
|
|
List nodes of a specific type in the knowledge graph. |
|
|
|
|
|
Args: |
|
|
node_type: The type of nodes to list (e.g., 'function', 'class', 'file') |
|
|
limit: Maximum number of nodes to return (default: 20) |
|
|
|
|
|
Returns: |
|
|
str: A formatted string with matching nodes |
|
|
""" |
|
|
if knowledge_graph is None: |
|
|
return "Error: Knowledge graph not initialized" |
|
|
|
|
|
try: |
|
|
|
|
|
if isinstance(limit, str): |
|
|
try: |
|
|
limit = int(limit) |
|
|
except ValueError: |
|
|
return f"Error: 'limit' must be an integer, got '{limit}'" |
|
|
|
|
|
g = knowledge_graph.graph |
|
|
matching_nodes = [ |
|
|
{ |
|
|
"id": node_id, |
|
|
"name": getattr(data['data'], 'name', 'Unknown') |
|
|
} |
|
|
for node_id, data in g.nodes(data=True) |
|
|
if getattr(data['data'], 'node_type', None) == node_type |
|
|
][:limit] |
|
|
|
|
|
if not matching_nodes: |
|
|
return f"No nodes found of type '{node_type}'." |
|
|
|
|
|
result = f"Nodes of type '{node_type}' ({len(matching_nodes)} results):\n" |
|
|
result += "ββββββββββββββββββββββββββββββββββββββββ\n\n" |
|
|
|
|
|
for i, node in enumerate(matching_nodes, 1): |
|
|
result += f"{i}. {node['name']}\n" |
|
|
result += f" ID: {node['id']}\n\n" |
|
|
|
|
|
return result |
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
|
|
|
@observe(as_type="tool") |
|
|
def get_neighbors(node_id: str) -> str: |
|
|
""" |
|
|
Get all nodes directly connected to a given node. |
|
|
|
|
|
Shows neighboring nodes with their relationship types. |
|
|
|
|
|
Args: |
|
|
node_id: The ID of the node whose neighbors to retrieve |
|
|
|
|
|
Returns: |
|
|
str: A formatted string showing all neighbors |
|
|
""" |
|
|
if knowledge_graph is None: |
|
|
return "Error: Knowledge graph not initialized" |
|
|
|
|
|
try: |
|
|
if node_id not in knowledge_graph.graph: |
|
|
return f"Error: Node '{node_id}' not found in knowledge graph" |
|
|
|
|
|
neighbors = knowledge_graph.get_neighbors(node_id) |
|
|
if not neighbors: |
|
|
return f"No neighbors found for node '{node_id}'" |
|
|
|
|
|
result = f"Neighbors of '{node_id}' ({len(neighbors)} total):\n" |
|
|
result += "ββββββββββββββββββββββββββββββββββββββββ\n\n" |
|
|
|
|
|
for i, neighbor in enumerate(neighbors[:20], 1): |
|
|
result += f"{i}. {neighbor.id}\n" |
|
|
result += f" Name: {getattr(neighbor, 'name', 'Unknown')}\n" |
|
|
result += f" Type: {neighbor.node_type}\n" |
|
|
|
|
|
if knowledge_graph.graph.has_edge(node_id, neighbor.id): |
|
|
edge_data = knowledge_graph.graph.get_edge_data(node_id, neighbor.id) |
|
|
result += f" β Relation: {edge_data.get('relation', 'Unknown')}\n" |
|
|
elif knowledge_graph.graph.has_edge(neighbor.id, node_id): |
|
|
edge_data = knowledge_graph.graph.get_edge_data(neighbor.id, node_id) |
|
|
result += f" β Relation: {edge_data.get('relation', 'Unknown')}\n" |
|
|
result += "\n" |
|
|
|
|
|
if len(neighbors) > 20: |
|
|
result += f"... and {len(neighbors) - 20} more neighbors\n" |
|
|
|
|
|
return result |
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
|
|
|
@observe(as_type="tool") |
|
|
def go_to_definition(entity_name: str) -> str: |
|
|
""" |
|
|
Find where an entity is declared or defined in the codebase. |
|
|
|
|
|
Locates the declaration point for functions, classes, variables, etc. |
|
|
|
|
|
Args: |
|
|
entity_name: The name of the entity to find the definition for |
|
|
|
|
|
Returns: |
|
|
str: A formatted string with definition locations |
|
|
""" |
|
|
if knowledge_graph is None: |
|
|
return "Error: Knowledge graph not initialized" |
|
|
|
|
|
try: |
|
|
if entity_name not in knowledge_graph.entities: |
|
|
return f"Error: Entity '{entity_name}' not found in knowledge graph" |
|
|
|
|
|
entity_info = knowledge_graph.entities[entity_name] |
|
|
declaring_chunks = entity_info.get('declaring_chunk_ids', []) |
|
|
|
|
|
if not declaring_chunks: |
|
|
return f"Entity '{entity_name}' found but no declarations identified." |
|
|
|
|
|
result = f"Definition(s) for '{entity_name}':\n" |
|
|
result += "ββββββββββββββββββββββββββββββββββββββββ\n\n" |
|
|
result += f"Type: {', '.join(entity_info.get('type', ['Unknown']))}\n" |
|
|
if entity_info.get('dtype'): |
|
|
result += f"Data Type: {entity_info['dtype']}\n" |
|
|
result += f"\nDeclared in {len(declaring_chunks)} location(s):\n\n" |
|
|
|
|
|
for i, chunk_id in enumerate(declaring_chunks[:5], 1): |
|
|
if chunk_id in knowledge_graph.graph: |
|
|
chunk = knowledge_graph.graph.nodes[chunk_id]['data'] |
|
|
result += f"{i}. Chunk: {chunk_id}\n" |
|
|
result += f" File: {chunk.path}\n" |
|
|
result += f" Order: {chunk.order_in_file}\n" |
|
|
result += f" Content:\n{chunk.content}\n\n" |
|
|
|
|
|
if len(declaring_chunks) > 5: |
|
|
result += f"... and {len(declaring_chunks) - 5} more locations\n" |
|
|
|
|
|
return result |
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
|
|
|
@observe(as_type="tool") |
|
|
def find_usages(entity_name: str, limit: int = 20) -> str: |
|
|
""" |
|
|
Find all usages or calls of an entity in the codebase. |
|
|
|
|
|
Shows where functions, classes, variables, etc. are used. |
|
|
|
|
|
Args: |
|
|
entity_name: The name of the entity to find usages for |
|
|
limit: Maximum number of usages to return (default: 20) |
|
|
|
|
|
Returns: |
|
|
str: A formatted string with usage locations |
|
|
""" |
|
|
if knowledge_graph is None: |
|
|
return "Error: Knowledge graph not initialized" |
|
|
|
|
|
try: |
|
|
|
|
|
if isinstance(limit, str): |
|
|
try: |
|
|
limit = int(limit) |
|
|
except ValueError: |
|
|
return f"Error: 'limit' must be an integer, got '{limit}'" |
|
|
|
|
|
if entity_name not in knowledge_graph.entities: |
|
|
return f"Error: Entity '{entity_name}' not found in knowledge graph" |
|
|
|
|
|
if limit <= 0: |
|
|
return "Error: limit must be a positive integer" |
|
|
|
|
|
entity_info = knowledge_graph.entities[entity_name] |
|
|
calling_chunks = entity_info.get('calling_chunk_ids', []) |
|
|
|
|
|
if not calling_chunks: |
|
|
return f"Entity '{entity_name}' found but no usages identified." |
|
|
|
|
|
result = f"Usages of '{entity_name}' ({len(calling_chunks)} total):\n" |
|
|
result += "ββββββββββββββββββββββββββββββββββββββββ\n\n" |
|
|
|
|
|
for i, chunk_id in enumerate(calling_chunks[:limit], 1): |
|
|
if chunk_id in knowledge_graph.graph: |
|
|
chunk = knowledge_graph.graph.nodes[chunk_id]['data'] |
|
|
result += f"{i}. {chunk.path} (chunk {chunk.order_in_file})\n" |
|
|
result += f" Content:\n{chunk.content}\n\n" |
|
|
|
|
|
if len(calling_chunks) > limit: |
|
|
result += f"... and {len(calling_chunks) - limit} more usages\n" |
|
|
|
|
|
return result |
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
|
|
|
@observe(as_type="tool") |
|
|
def get_file_structure(file_path: str) -> str: |
|
|
""" |
|
|
Get an overview of the structure of a file. |
|
|
|
|
|
Shows chunks and declared entities within a specific file. |
|
|
|
|
|
Args: |
|
|
file_path: The path of the file to get the structure for |
|
|
|
|
|
Returns: |
|
|
str: A formatted string with file structure |
|
|
""" |
|
|
if knowledge_graph is None: |
|
|
return "Error: Knowledge graph not initialized" |
|
|
|
|
|
try: |
|
|
if file_path not in knowledge_graph.graph: |
|
|
return f"Error: File '{file_path}' not found in knowledge graph" |
|
|
|
|
|
file_node = knowledge_graph.graph.nodes[file_path]['data'] |
|
|
chunks = knowledge_graph.get_chunks_of_file(file_path) |
|
|
|
|
|
result = f"File Structure: {file_node.name}\n" |
|
|
result += "ββββββββββββββββββββββββββββββββββββββββ\n\n" |
|
|
result += f"Path: {file_path}\n" |
|
|
result += f"Language: {getattr(file_node, 'language', 'Unknown')}\n" |
|
|
result += f"Total Chunks: {len(chunks)}\n\n" |
|
|
|
|
|
if hasattr(file_node, 'declared_entities') and file_node.declared_entities: |
|
|
result += f"Declared Entities ({len(file_node.declared_entities)}):\n" |
|
|
for entity in file_node.declared_entities[:15]: |
|
|
if isinstance(entity, dict): |
|
|
result += f" - {entity.get('name', '?')} ({entity.get('type', '?')})\n" |
|
|
else: |
|
|
result += f" - {entity}\n" |
|
|
if len(file_node.declared_entities) > 15: |
|
|
result += f" ... and {len(file_node.declared_entities) - 15} more\n" |
|
|
|
|
|
result += f"\nChunks:\n" |
|
|
for chunk in chunks[:10]: |
|
|
result += f" [{chunk.order_in_file}] {chunk.id}\n" |
|
|
if chunk.description: |
|
|
desc = chunk.description[:80] + "..." if len(chunk.description) > 80 else chunk.description |
|
|
result += f" {desc}\n" |
|
|
|
|
|
if len(chunks) > 10: |
|
|
result += f" ... and {len(chunks) - 10} more chunks\n" |
|
|
|
|
|
return result |
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
|
|
|
@observe(as_type="tool") |
|
|
def get_related_chunks(chunk_id: str, relation_type: str = "calls") -> str: |
|
|
""" |
|
|
Get chunks related to a given chunk by a specific relationship. |
|
|
|
|
|
Find chunks connected via relationships like 'calls', 'contains', etc. |
|
|
|
|
|
Args: |
|
|
chunk_id: The ID of the chunk to find related chunks for |
|
|
relation_type: The type of relationship to filter by (default: 'calls') |
|
|
|
|
|
Returns: |
|
|
str: A formatted string with related chunks |
|
|
""" |
|
|
if knowledge_graph is None: |
|
|
return "Error: Knowledge graph not initialized" |
|
|
|
|
|
try: |
|
|
if chunk_id not in knowledge_graph.graph: |
|
|
return f"Error: Chunk '{chunk_id}' not found in knowledge graph" |
|
|
|
|
|
related = [] |
|
|
for _, target, attrs in knowledge_graph.graph.out_edges(chunk_id, data=True): |
|
|
if attrs.get('relation') == relation_type: |
|
|
target_node = knowledge_graph.graph.nodes[target]['data'] |
|
|
related.append({ |
|
|
"id": target, |
|
|
"file_path": getattr(target_node, 'path', 'Unknown'), |
|
|
"entity_name": attrs.get('entity_name') |
|
|
}) |
|
|
|
|
|
if not related: |
|
|
return f"No chunks found with '{relation_type}' relationship from '{chunk_id}'" |
|
|
|
|
|
result = f"Chunks related to '{chunk_id}' via '{relation_type}' ({len(related)} total):\n" |
|
|
result += "ββββββββββββββββββββββββββββββββββββββββ\n\n" |
|
|
|
|
|
for i, chunk in enumerate(related[:15], 1): |
|
|
result += f"{i}. {chunk['id']}\n" |
|
|
result += f" File: {chunk['file_path']}\n" |
|
|
if chunk['entity_name']: |
|
|
result += f" Entity: {chunk['entity_name']}\n" |
|
|
result += "\n" |
|
|
|
|
|
if len(related) > 15: |
|
|
result += f"... and {len(related) - 15} more\n" |
|
|
|
|
|
return result |
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
|
|
|
@observe(as_type="tool") |
|
|
def list_all_entities( |
|
|
limit: int = 50, |
|
|
page: int = 1, |
|
|
entity_type: Optional[str] = None, |
|
|
declared_in_repo: Optional[bool] = None |
|
|
) -> str: |
|
|
""" |
|
|
List all entities tracked in the knowledge graph with filtering and pagination options. |
|
|
|
|
|
Shows entity types, declaration counts, and usage counts. |
|
|
|
|
|
Args: |
|
|
limit: Maximum number of entities to return per page (default: 50) |
|
|
page: Page number for pagination, 1-indexed (default: 1) |
|
|
entity_type: Filter by entity type ('class', 'function', 'method', 'variable', 'parameter', 'function_call', 'method_call') |
|
|
declared_in_repo: If True, only return entities with declarations. If False, only entities without declarations. If None, return all. |
|
|
|
|
|
Returns: |
|
|
str: A formatted string with all entities for the requested page |
|
|
""" |
|
|
if knowledge_graph is None: |
|
|
return "Error: Knowledge graph not initialized" |
|
|
|
|
|
try: |
|
|
|
|
|
if isinstance(limit, str): |
|
|
try: |
|
|
limit = int(limit) |
|
|
except ValueError: |
|
|
return f"Error: 'limit' must be an integer, got '{limit}'" |
|
|
|
|
|
|
|
|
if isinstance(page, str): |
|
|
try: |
|
|
page = int(page) |
|
|
except ValueError: |
|
|
return f"Error: 'page' must be an integer, got '{page}'" |
|
|
|
|
|
if page < 1: |
|
|
return "Error: 'page' must be a positive integer (1 or greater)" |
|
|
|
|
|
|
|
|
if entity_type == "" or entity_type == "null": |
|
|
entity_type = None |
|
|
|
|
|
|
|
|
if isinstance(declared_in_repo, str): |
|
|
if declared_in_repo.lower() in ("true", "1", "yes"): |
|
|
declared_in_repo = True |
|
|
elif declared_in_repo.lower() in ("false", "0", "no"): |
|
|
declared_in_repo = False |
|
|
elif declared_in_repo.lower() in ("none", "null", "all", ""): |
|
|
declared_in_repo = None |
|
|
|
|
|
if not knowledge_graph.entities: |
|
|
return "No entities found in the knowledge graph." |
|
|
|
|
|
|
|
|
filtered_entities = {} |
|
|
for entity_name, info in knowledge_graph.entities.items(): |
|
|
|
|
|
if entity_type is not None: |
|
|
entity_types = [t.lower() if t else '' for t in info.get('type', [])] |
|
|
if entity_type.lower() not in entity_types: |
|
|
continue |
|
|
|
|
|
|
|
|
if declared_in_repo is not None: |
|
|
has_declaration = len(info.get('declaring_chunk_ids', [])) > 0 |
|
|
if declared_in_repo and not has_declaration: |
|
|
continue |
|
|
if not declared_in_repo and has_declaration: |
|
|
continue |
|
|
|
|
|
filtered_entities[entity_name] = info |
|
|
|
|
|
|
|
|
if not filtered_entities: |
|
|
filter_desc = [] |
|
|
if entity_type: |
|
|
filter_desc.append(f"type={entity_type}") |
|
|
if declared_in_repo is not None: |
|
|
filter_desc.append(f"declared_in_repo={declared_in_repo}") |
|
|
filter_text = f" (filtered by {', '.join(filter_desc)})" if filter_desc else "" |
|
|
return f"No entities found{filter_text}." |
|
|
|
|
|
|
|
|
total_entities = len(filtered_entities) |
|
|
total_pages = (total_entities + limit - 1) // limit |
|
|
|
|
|
if page > total_pages: |
|
|
return f"Error: Page {page} does not exist. Total pages: {total_pages} (with {total_entities} entities at {limit} per page)" |
|
|
|
|
|
start_idx = (page - 1) * limit |
|
|
end_idx = start_idx + limit |
|
|
|
|
|
|
|
|
entity_items = list(filtered_entities.items()) |
|
|
paginated_items = entity_items[start_idx:end_idx] |
|
|
|
|
|
result = f"All Entities (Page {page}/{total_pages}, {total_entities} total):\n" |
|
|
result += "ββββββββββββββββββββββββββββββββββββββββ\n\n" |
|
|
|
|
|
for i, (entity_name, info) in enumerate(paginated_items, start=start_idx + 1): |
|
|
result += f"{i}. {entity_name}\n" |
|
|
result += f" Types: {', '.join(info.get('type', ['Unknown']))}\n" |
|
|
result += f" Declarations: {len(info.get('declaring_chunk_ids', []))}\n" |
|
|
result += f" Usages: {len(info.get('calling_chunk_ids', []))}\n\n" |
|
|
|
|
|
|
|
|
result += f"ββββββββββββββββββββββββββββββββββββββββ\n" |
|
|
result += f"Showing {start_idx + 1}-{min(end_idx, total_entities)} of {total_entities} entities\n" |
|
|
result += f"Page {page} of {total_pages}\n" |
|
|
|
|
|
if page < total_pages: |
|
|
result += f"Use page={page + 1} to see the next page\n" |
|
|
|
|
|
|
|
|
if entity_type: |
|
|
result += f"\n(Filtered by type={entity_type})\n" |
|
|
if declared_in_repo is not None: |
|
|
result += f"(Filtered by declared_in_repo={declared_in_repo})\n" |
|
|
|
|
|
return result |
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
|
|
|
@observe(as_type="tool") |
|
|
def diff_chunks(node_id_1: str, node_id_2: str) -> str: |
|
|
""" |
|
|
Show the diff between two code chunks or nodes. |
|
|
|
|
|
Compares the content of two nodes and shows differences. |
|
|
|
|
|
Args: |
|
|
node_id_1: The ID of the first node/chunk |
|
|
node_id_2: The ID of the second node/chunk |
|
|
|
|
|
Returns: |
|
|
str: A formatted string with the diff |
|
|
""" |
|
|
if knowledge_graph is None: |
|
|
return "Error: Knowledge graph not initialized" |
|
|
|
|
|
try: |
|
|
if node_id_1 not in knowledge_graph.graph: |
|
|
return f"Error: Node '{node_id_1}' not found in knowledge graph" |
|
|
if node_id_2 not in knowledge_graph.graph: |
|
|
return f"Error: Node '{node_id_2}' not found in knowledge graph" |
|
|
|
|
|
g = knowledge_graph.graph |
|
|
content1 = getattr(g.nodes[node_id_1]['data'], 'content', None) |
|
|
content2 = getattr(g.nodes[node_id_2]['data'], 'content', None) |
|
|
|
|
|
if not content1 or not content2: |
|
|
return "Error: One or both nodes have no content." |
|
|
|
|
|
diff = list(difflib.unified_diff( |
|
|
content1.splitlines(), content2.splitlines(), |
|
|
fromfile=node_id_1, tofile=node_id_2, lineterm="" |
|
|
)) |
|
|
|
|
|
if not diff: |
|
|
return "No differences found between the two chunks." |
|
|
|
|
|
return "\n".join(diff) |
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
|
|
|
@observe(as_type="tool") |
|
|
def print_tree(root_id: str = "root", max_depth: int = 3) -> str: |
|
|
""" |
|
|
Show a tree view of the repository structure. |
|
|
|
|
|
Displays a hierarchical tree starting from a given node. |
|
|
|
|
|
Args: |
|
|
root_id: The node ID to start the tree from (default: 'root') |
|
|
max_depth: Maximum depth to show (default: 3) |
|
|
|
|
|
Returns: |
|
|
str: A formatted string with the tree structure |
|
|
""" |
|
|
if knowledge_graph is None: |
|
|
return "Error: Knowledge graph not initialized" |
|
|
|
|
|
try: |
|
|
|
|
|
if isinstance(max_depth, str): |
|
|
try: |
|
|
max_depth = int(max_depth) |
|
|
except ValueError: |
|
|
return f"Error: 'max_depth' must be an integer, got '{max_depth}'" |
|
|
|
|
|
g = knowledge_graph.graph |
|
|
|
|
|
if root_id not in g: |
|
|
|
|
|
roots = [n for n, d in g.nodes(data=True) |
|
|
if getattr(d['data'], 'node_type', None) in ('repo', 'directory', 'file')] |
|
|
if roots: |
|
|
root_id = roots[0] |
|
|
else: |
|
|
return f"Error: Node '{root_id}' not found and no suitable root found" |
|
|
|
|
|
result = f"Tree View (starting from '{root_id}', max depth: {max_depth}):\n" |
|
|
result += "ββββββββββββββββββββββββββββββββββββββββ\n\n" |
|
|
|
|
|
def format_node(node_id, depth): |
|
|
if depth > max_depth: |
|
|
return "" |
|
|
|
|
|
node = g.nodes[node_id]['data'] |
|
|
name = getattr(node, 'name', node_id) |
|
|
node_type = getattr(node, 'node_type', '?') |
|
|
|
|
|
line = " " * depth + f"- {name} ({node_type})\n" |
|
|
|
|
|
children = [t for s, t in g.out_edges(node_id)] |
|
|
for child in children[:20]: |
|
|
line += format_node(child, depth + 1) |
|
|
|
|
|
if len(children) > 20: |
|
|
line += " " * (depth + 1) + f"... and {len(children) - 20} more\n" |
|
|
|
|
|
return line |
|
|
|
|
|
result += format_node(root_id, 0) |
|
|
return result |
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
|
|
|
@observe(as_type="tool") |
|
|
def entity_relationships(node_id: str) -> str: |
|
|
""" |
|
|
Show all relationships for a given entity or node. |
|
|
|
|
|
Displays incoming and outgoing relationships with their types. |
|
|
|
|
|
Args: |
|
|
node_id: The node/entity ID to explore relationships for |
|
|
|
|
|
Returns: |
|
|
str: A formatted string with all relationships |
|
|
""" |
|
|
if knowledge_graph is None: |
|
|
return "Error: Knowledge graph not initialized" |
|
|
|
|
|
try: |
|
|
if node_id not in knowledge_graph.graph: |
|
|
return f"Error: Node '{node_id}' not found in knowledge graph" |
|
|
|
|
|
g = knowledge_graph.graph |
|
|
|
|
|
result = f"Relationships for '{node_id}':\n" |
|
|
result += "ββββββββββββββββββββββββββββββββββββββββ\n\n" |
|
|
|
|
|
incoming = list(g.in_edges(node_id, data=True)) |
|
|
outgoing = list(g.out_edges(node_id, data=True)) |
|
|
|
|
|
if incoming: |
|
|
result += f"Incoming Relationships ({len(incoming)}):\n" |
|
|
for source, target, data in incoming[:20]: |
|
|
result += f" β {source} [{data.get('relation', '?')}]\n" |
|
|
if len(incoming) > 20: |
|
|
result += f" ... and {len(incoming) - 20} more\n" |
|
|
result += "\n" |
|
|
|
|
|
if outgoing: |
|
|
result += f"Outgoing Relationships ({len(outgoing)}):\n" |
|
|
for source, target, data in outgoing[:20]: |
|
|
result += f" β {target} [{data.get('relation', '?')}]\n" |
|
|
if len(outgoing) > 20: |
|
|
result += f" ... and {len(outgoing) - 20} more\n" |
|
|
|
|
|
if not incoming and not outgoing: |
|
|
result += "No relationships found.\n" |
|
|
|
|
|
return result |
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
|
|
|
@observe(as_type="tool") |
|
|
def search_by_type_and_name(node_type: str, name_query: str, limit: int = 10, fuzzy: bool = True) -> str: |
|
|
""" |
|
|
Search for nodes/entities by type and name substring with fuzzy matching support. |
|
|
|
|
|
Filters nodes by type and searches for matching names. Supports partial/fuzzy matching |
|
|
so searching for 'Embedding' will find 'BertEmbeddings', 'LlamaRotaryEmbedding', etc. |
|
|
|
|
|
For entities, searches by entity_type (e.g., 'class', 'function', 'method'). |
|
|
For other nodes, searches by node_type (e.g., 'file', 'chunk', 'directory'). |
|
|
|
|
|
Args: |
|
|
node_type: Type of node/entity (e.g., 'function', 'class', 'file', 'chunk', 'directory') |
|
|
name_query: Substring to match in the name (case-insensitive, supports partial matches) |
|
|
limit: Maximum results to return (default: 10) |
|
|
fuzzy: Enable fuzzy/partial matching (default: True). If False, requires exact substring match. |
|
|
|
|
|
Returns: |
|
|
str: A formatted string with matching nodes |
|
|
""" |
|
|
if knowledge_graph is None: |
|
|
return "Error: Knowledge graph not initialized" |
|
|
|
|
|
try: |
|
|
|
|
|
if isinstance(limit, str): |
|
|
try: |
|
|
limit = int(limit) |
|
|
except ValueError: |
|
|
return f"Error: 'limit' must be an integer, got '{limit}'" |
|
|
|
|
|
|
|
|
if isinstance(fuzzy, str): |
|
|
fuzzy = fuzzy.lower() in ('true', '1', 'yes') |
|
|
|
|
|
if limit <= 0: |
|
|
return "Error: limit must be a positive integer" |
|
|
|
|
|
g = knowledge_graph.graph |
|
|
matches = [] |
|
|
query_lower = name_query.lower() |
|
|
|
|
|
|
|
|
|
|
|
if fuzzy: |
|
|
|
|
|
|
|
|
fuzzy_pattern = '.*'.join(re.escape(c) for c in query_lower) |
|
|
fuzzy_regex = re.compile(fuzzy_pattern, re.IGNORECASE) |
|
|
|
|
|
for nid, n in g.nodes(data=True): |
|
|
node = n['data'] |
|
|
node_name = getattr(node, 'name', '') |
|
|
|
|
|
if not node_name: |
|
|
continue |
|
|
|
|
|
|
|
|
name_matches = False |
|
|
if fuzzy: |
|
|
|
|
|
if query_lower in node_name.lower() or fuzzy_regex.search(node_name): |
|
|
name_matches = True |
|
|
else: |
|
|
|
|
|
if query_lower in node_name.lower(): |
|
|
name_matches = True |
|
|
|
|
|
if not name_matches: |
|
|
continue |
|
|
|
|
|
|
|
|
current_node_type = getattr(node, 'node_type', None) |
|
|
|
|
|
|
|
|
if current_node_type == 'entity': |
|
|
entity_type = getattr(node, 'entity_type', '') |
|
|
|
|
|
|
|
|
|
|
|
if not entity_type and nid in knowledge_graph.entities: |
|
|
entity_types = knowledge_graph.entities[nid].get('type', []) |
|
|
entity_type = entity_types[0] if entity_types else '' |
|
|
|
|
|
if entity_type and entity_type.lower() == node_type.lower(): |
|
|
|
|
|
score = 0 if query_lower == node_name.lower() else (1 if query_lower in node_name.lower() else 2) |
|
|
matches.append({ |
|
|
"id": nid, |
|
|
"name": node_name, |
|
|
"type": f"entity ({entity_type})", |
|
|
"score": score |
|
|
}) |
|
|
|
|
|
elif current_node_type == node_type: |
|
|
score = 0 if query_lower == node_name.lower() else (1 if query_lower in node_name.lower() else 2) |
|
|
matches.append({ |
|
|
"id": nid, |
|
|
"name": node_name, |
|
|
"type": current_node_type, |
|
|
"score": score |
|
|
}) |
|
|
|
|
|
|
|
|
matches.sort(key=lambda x: (x['score'], x['name'].lower())) |
|
|
matches = matches[:limit] |
|
|
|
|
|
if not matches: |
|
|
return f"No matches for type '{node_type}' and name containing '{name_query}'." |
|
|
|
|
|
result = f"Matches for type '{node_type}' and name '{name_query}' ({len(matches)} results):\n" |
|
|
result += "ββββββββββββββββββββββββββββββββββββββββ\n\n" |
|
|
|
|
|
for i, match in enumerate(matches, 1): |
|
|
result += f"{i}. {match['name']}\n" |
|
|
result += f" ID: {match['id']}\n" |
|
|
result += f" Type: {match['type']}\n\n" |
|
|
|
|
|
return result |
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
|
|
|
@observe(as_type="tool") |
|
|
def get_chunk_context(node_id: str) -> str: |
|
|
""" |
|
|
Get the full content of a code chunk along with its surrounding chunks. |
|
|
|
|
|
Returns the full content of the previous, current, and next chunks, |
|
|
organized by file and joined together. |
|
|
|
|
|
Args: |
|
|
node_id: The node/chunk ID to get context for |
|
|
|
|
|
Returns: |
|
|
str: The full content of surrounding code chunks |
|
|
""" |
|
|
from pedagogia_graph_code_repo.RepoKnowledgeGraphLib.utils.chunk_utils import ( |
|
|
organize_chunks_by_file_name, join_organized_chunks |
|
|
) |
|
|
|
|
|
if knowledge_graph is None: |
|
|
return "Error: Knowledge graph not initialized" |
|
|
|
|
|
try: |
|
|
if node_id not in knowledge_graph.graph: |
|
|
return f"Error: Node '{node_id}' not found in knowledge graph" |
|
|
|
|
|
g = knowledge_graph.graph |
|
|
current_chunk = g.nodes[node_id]['data'] |
|
|
previous_chunk = knowledge_graph.get_previous_chunk(node_id) |
|
|
next_chunk = knowledge_graph.get_next_chunk(node_id) |
|
|
|
|
|
|
|
|
chunks = [] |
|
|
if previous_chunk: |
|
|
chunks.append(previous_chunk) |
|
|
chunks.append(current_chunk) |
|
|
if next_chunk: |
|
|
chunks.append(next_chunk) |
|
|
|
|
|
|
|
|
organized = organize_chunks_by_file_name(chunks) |
|
|
full_content = join_organized_chunks(organized) |
|
|
|
|
|
return full_content |
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
|
|
|
@observe(as_type="tool") |
|
|
def get_file_stats(path: str) -> str: |
|
|
""" |
|
|
Get statistics for a file or directory. |
|
|
|
|
|
Shows number of entities, lines, chunks, etc. |
|
|
|
|
|
Args: |
|
|
path: The file or directory path to get statistics for |
|
|
|
|
|
Returns: |
|
|
str: A formatted string with file statistics |
|
|
""" |
|
|
if knowledge_graph is None: |
|
|
return "Error: Knowledge graph not initialized" |
|
|
|
|
|
try: |
|
|
g = knowledge_graph.graph |
|
|
nodes = [n for n, d in g.nodes(data=True) if getattr(d['data'], 'path', None) == path] |
|
|
|
|
|
if not nodes: |
|
|
return f"No nodes found for path '{path}'." |
|
|
|
|
|
result = f"Statistics for '{path}':\n" |
|
|
result += "ββββββββββββββββββββββββββββββββββββββββ\n\n" |
|
|
|
|
|
for node_id in nodes: |
|
|
node = g.nodes[node_id]['data'] |
|
|
content = getattr(node, 'content', '') |
|
|
declared = getattr(node, 'declared_entities', []) |
|
|
called = getattr(node, 'called_entities', []) |
|
|
chunks = [t for s, t in g.out_edges(node_id) |
|
|
if getattr(g.nodes[t]['data'], 'node_type', None) == 'chunk'] |
|
|
|
|
|
result += f"Node: {node_id} ({getattr(node, 'node_type', '?')})\n" |
|
|
result += f" Lines: {len(content.splitlines()) if content else 0}\n" |
|
|
result += f" Declared entities: {len(declared)}\n" |
|
|
|
|
|
if declared: |
|
|
for entity in declared[:10]: |
|
|
if isinstance(entity, dict): |
|
|
result += f" - {entity.get('name', '?')} ({entity.get('type', '?')})\n" |
|
|
else: |
|
|
result += f" - {entity}\n" |
|
|
if len(declared) > 10: |
|
|
result += f" ... and {len(declared) - 10} more\n" |
|
|
|
|
|
result += f" Called entities: {len(called)}\n" |
|
|
if called: |
|
|
for entity in called[:10]: |
|
|
result += f" - {entity}\n" |
|
|
if len(called) > 10: |
|
|
result += f" ... and {len(called) - 10} more\n" |
|
|
|
|
|
result += f" Chunks: {len(chunks)}\n\n" |
|
|
|
|
|
return result |
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
|
|
|
@observe(as_type="tool") |
|
|
def find_path(source_id: str, target_id: str, max_depth: int = 5) -> str: |
|
|
""" |
|
|
Find the shortest path between two nodes in the knowledge graph. |
|
|
|
|
|
Uses graph traversal to find connections between nodes. |
|
|
|
|
|
Args: |
|
|
source_id: The ID of the source node |
|
|
target_id: The ID of the target node |
|
|
max_depth: Maximum depth to search for a path (default: 5) |
|
|
|
|
|
Returns: |
|
|
str: A formatted string showing the path |
|
|
""" |
|
|
if knowledge_graph is None: |
|
|
return "Error: Knowledge graph not initialized" |
|
|
|
|
|
try: |
|
|
|
|
|
if isinstance(max_depth, str): |
|
|
try: |
|
|
max_depth = int(max_depth) |
|
|
except ValueError: |
|
|
return f"Error: 'max_depth' must be an integer, got '{max_depth}'" |
|
|
|
|
|
path_result = knowledge_graph.find_path(source_id, target_id, max_depth) |
|
|
|
|
|
if "error" in path_result: |
|
|
return f"Error: {path_result['error']}" |
|
|
|
|
|
if not path_result.get("path"): |
|
|
return f"No path found from '{source_id}' to '{target_id}' within depth {max_depth}" |
|
|
|
|
|
result = f"Path from '{source_id}' to '{target_id}':\n" |
|
|
result += "ββββββββββββββββββββββββββββββββββββββββ\n\n" |
|
|
result += f"Length: {path_result['length']}\n\n" |
|
|
|
|
|
path = path_result['path'] |
|
|
for i, node_id in enumerate(path): |
|
|
result += f"{i}. {node_id}\n" |
|
|
if i < len(path) - 1: |
|
|
result += " β\n" |
|
|
|
|
|
return result |
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
|
|
|
@observe(as_type="tool") |
|
|
def get_subgraph(node_id: str, depth: int = 2, edge_types: Optional[str] = None) -> str: |
|
|
""" |
|
|
Extract a subgraph around a node up to a specified depth. |
|
|
|
|
|
Optionally filters by edge types (comma-separated). |
|
|
|
|
|
Args: |
|
|
node_id: The ID of the central node |
|
|
depth: The depth/radius of the subgraph to extract (default: 2) |
|
|
edge_types: Optional comma-separated list of edge types (e.g., 'calls,contains') |
|
|
|
|
|
Returns: |
|
|
str: A formatted string describing the subgraph |
|
|
""" |
|
|
if knowledge_graph is None: |
|
|
return "Error: Knowledge graph not initialized" |
|
|
|
|
|
try: |
|
|
|
|
|
if isinstance(depth, str): |
|
|
try: |
|
|
depth = int(depth) |
|
|
except ValueError: |
|
|
return f"Error: 'depth' must be an integer, got '{depth}'" |
|
|
|
|
|
edge_types_list = edge_types.split(",") if edge_types else None |
|
|
subgraph_result = knowledge_graph.get_subgraph(node_id, depth, edge_types_list) |
|
|
|
|
|
if "error" in subgraph_result: |
|
|
return f"Error: {subgraph_result['error']}" |
|
|
|
|
|
result = f"Subgraph around '{node_id}' (depth: {depth}):\n" |
|
|
result += "ββββββββββββββββββββββββββββββββββββββββ\n\n" |
|
|
result += f"Nodes: {subgraph_result['node_count']}\n" |
|
|
result += f"Edges: {subgraph_result['edge_count']}\n" |
|
|
|
|
|
if edge_types_list: |
|
|
result += f"Filtered by edge types: {', '.join(edge_types_list)}\n" |
|
|
|
|
|
result += "\nNodes in subgraph:\n" |
|
|
for node in subgraph_result['nodes'][:30]: |
|
|
result += f" - {node}\n" |
|
|
|
|
|
if len(subgraph_result['nodes']) > 30: |
|
|
result += f" ... and {len(subgraph_result['nodes']) - 30} more\n" |
|
|
|
|
|
return result |
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
|
|
|
@observe(as_type="tool") |
|
|
def list_files_in_directory(directory_path: str = "", pattern: str = "*", recursive: bool = True, limit: int = 50) -> str: |
|
|
""" |
|
|
List files in a directory with optional glob pattern matching. |
|
|
|
|
|
This provides hierarchical file listing, showing files within directories |
|
|
rather than just top-level files. Supports glob patterns for filtering. |
|
|
|
|
|
Args: |
|
|
directory_path: Path to the directory to list (empty string for root/all files) |
|
|
pattern: Glob pattern to filter files (e.g., '*.py', 'test_*.py', '**/*.js') |
|
|
recursive: Whether to search recursively in subdirectories (default: True) |
|
|
limit: Maximum number of files to return (default: 50) |
|
|
|
|
|
Returns: |
|
|
str: A formatted string with matching files |
|
|
""" |
|
|
if knowledge_graph is None: |
|
|
return "Error: Knowledge graph not initialized" |
|
|
|
|
|
try: |
|
|
|
|
|
if isinstance(limit, str): |
|
|
try: |
|
|
limit = int(limit) |
|
|
except ValueError: |
|
|
return f"Error: 'limit' must be an integer, got '{limit}'" |
|
|
|
|
|
|
|
|
if isinstance(recursive, str): |
|
|
recursive = recursive.lower() in ('true', '1', 'yes') |
|
|
|
|
|
g = knowledge_graph.graph |
|
|
matching_files = [] |
|
|
|
|
|
for nid, n in g.nodes(data=True): |
|
|
node = n['data'] |
|
|
node_type = getattr(node, 'node_type', None) |
|
|
|
|
|
|
|
|
if node_type != 'file': |
|
|
continue |
|
|
|
|
|
file_path = getattr(node, 'path', nid) |
|
|
file_name = getattr(node, 'name', '') |
|
|
|
|
|
|
|
|
if directory_path: |
|
|
if recursive: |
|
|
|
|
|
if not file_path.startswith(directory_path.rstrip('/') + '/') and file_path != directory_path: |
|
|
continue |
|
|
else: |
|
|
|
|
|
parent_dir = '/'.join(file_path.rsplit('/', 1)[:-1]) if '/' in file_path else '' |
|
|
if parent_dir != directory_path.rstrip('/'): |
|
|
continue |
|
|
|
|
|
|
|
|
if pattern and pattern != '*': |
|
|
|
|
|
if not (fnmatch.fnmatch(file_path, pattern) or |
|
|
fnmatch.fnmatch(file_name, pattern) or |
|
|
fnmatch.fnmatch(file_path, f'**/{pattern}')): |
|
|
continue |
|
|
|
|
|
language = getattr(node, 'language', 'Unknown') |
|
|
declared_entities = getattr(node, 'declared_entities', []) |
|
|
|
|
|
matching_files.append({ |
|
|
'path': file_path, |
|
|
'name': file_name, |
|
|
'language': language, |
|
|
'entity_count': len(declared_entities) |
|
|
}) |
|
|
|
|
|
if len(matching_files) >= limit: |
|
|
break |
|
|
|
|
|
|
|
|
matching_files.sort(key=lambda x: x['path']) |
|
|
|
|
|
if not matching_files: |
|
|
filter_desc = f" in '{directory_path}'" if directory_path else "" |
|
|
pattern_desc = f" matching '{pattern}'" if pattern and pattern != '*' else "" |
|
|
return f"No files found{filter_desc}{pattern_desc}." |
|
|
|
|
|
result = f"Files" |
|
|
if directory_path: |
|
|
result += f" in '{directory_path}'" |
|
|
if pattern and pattern != '*': |
|
|
result += f" matching '{pattern}'" |
|
|
result += f" ({len(matching_files)} results):\n" |
|
|
result += "ββββββββββββββββββββββββββββββββββββββββ\n\n" |
|
|
|
|
|
for i, f in enumerate(matching_files, 1): |
|
|
result += f"{i}. {f['path']}\n" |
|
|
result += f" Language: {f['language']}, Entities: {f['entity_count']}\n\n" |
|
|
|
|
|
return result |
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
|
|
|
@observe(as_type="tool") |
|
|
def find_classes_inheriting_from(base_class_name: str, limit: int = 20) -> str: |
|
|
""" |
|
|
Find all classes that inherit from a given base class. |
|
|
|
|
|
Searches the knowledge graph for class entities that have the specified |
|
|
base class in their inheritance chain. |
|
|
|
|
|
Args: |
|
|
base_class_name: The name of the base class to find subclasses of |
|
|
limit: Maximum number of results to return (default: 20) |
|
|
|
|
|
Returns: |
|
|
str: A formatted string with classes inheriting from the base class |
|
|
""" |
|
|
if knowledge_graph is None: |
|
|
return "Error: Knowledge graph not initialized" |
|
|
|
|
|
try: |
|
|
|
|
|
if isinstance(limit, str): |
|
|
try: |
|
|
limit = int(limit) |
|
|
except ValueError: |
|
|
return f"Error: 'limit' must be an integer, got '{limit}'" |
|
|
|
|
|
g = knowledge_graph.graph |
|
|
inheriting_classes = [] |
|
|
base_lower = base_class_name.lower() |
|
|
|
|
|
|
|
|
for nid, n in g.nodes(data=True): |
|
|
node = n['data'] |
|
|
node_type = getattr(node, 'node_type', None) |
|
|
entity_type = getattr(node, 'entity_type', '') |
|
|
|
|
|
if node_type != 'entity' or entity_type.lower() != 'class': |
|
|
continue |
|
|
|
|
|
class_name = getattr(node, 'name', '') |
|
|
|
|
|
|
|
|
|
|
|
for _, target, edge_data in g.out_edges(nid, data=True): |
|
|
relation = edge_data.get('relation', '').lower() |
|
|
target_node = g.nodes[target]['data'] |
|
|
target_name = getattr(target_node, 'name', '') |
|
|
|
|
|
if relation in ('inherits', 'extends', 'inherits_from', 'base_class'): |
|
|
if target_name.lower() == base_lower or base_lower in target_name.lower(): |
|
|
declaring_chunks = getattr(node, 'declaring_chunk_ids', []) |
|
|
inheriting_classes.append({ |
|
|
'name': class_name, |
|
|
'id': nid, |
|
|
'base': target_name, |
|
|
'file': declaring_chunks[0] if declaring_chunks else 'Unknown' |
|
|
}) |
|
|
break |
|
|
|
|
|
|
|
|
|
|
|
called = getattr(node, 'called_entities', []) |
|
|
if any(base_lower in str(c).lower() for c in called): |
|
|
|
|
|
declaring_chunks = getattr(node, 'declaring_chunk_ids', []) |
|
|
if declaring_chunks: |
|
|
chunk_id = declaring_chunks[0] |
|
|
if chunk_id in g: |
|
|
chunk_node = g.nodes[chunk_id]['data'] |
|
|
content = getattr(chunk_node, 'content', '') |
|
|
|
|
|
class_pattern = rf'class\s+{re.escape(class_name)}\s*\([^)]*{re.escape(base_class_name)}' |
|
|
if re.search(class_pattern, content, re.IGNORECASE): |
|
|
if not any(c['name'] == class_name for c in inheriting_classes): |
|
|
inheriting_classes.append({ |
|
|
'name': class_name, |
|
|
'id': nid, |
|
|
'base': base_class_name, |
|
|
'file': chunk_id |
|
|
}) |
|
|
|
|
|
if len(inheriting_classes) >= limit: |
|
|
break |
|
|
|
|
|
if not inheriting_classes: |
|
|
return f"No classes found inheriting from '{base_class_name}'.\n\nTip: Try searching for the base class name in code content using search_nodes." |
|
|
|
|
|
result = f"Classes inheriting from '{base_class_name}' ({len(inheriting_classes)} results):\n" |
|
|
result += "ββββββββββββββββββββββββββββββββββββββββ\n\n" |
|
|
|
|
|
for i, cls in enumerate(inheriting_classes, 1): |
|
|
result += f"{i}. {cls['name']}\n" |
|
|
result += f" ID: {cls['id']}\n" |
|
|
result += f" Inherits from: {cls['base']}\n" |
|
|
result += f" Defined in: {cls['file']}\n\n" |
|
|
|
|
|
return result |
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
|
|
|
@observe(as_type="tool") |
|
|
def find_files_importing(module_or_entity: str, limit: int = 30) -> str: |
|
|
""" |
|
|
Find all files that import a specific module or entity. |
|
|
|
|
|
Searches for import statements and usage patterns across the codebase. |
|
|
|
|
|
Args: |
|
|
module_or_entity: The name of the module or entity to find imports of |
|
|
limit: Maximum number of results to return (default: 30) |
|
|
|
|
|
Returns: |
|
|
str: A formatted string with files that import the specified module/entity |
|
|
""" |
|
|
if knowledge_graph is None: |
|
|
return "Error: Knowledge graph not initialized" |
|
|
|
|
|
try: |
|
|
|
|
|
if isinstance(limit, str): |
|
|
try: |
|
|
limit = int(limit) |
|
|
except ValueError: |
|
|
return f"Error: 'limit' must be an integer, got '{limit}'" |
|
|
|
|
|
g = knowledge_graph.graph |
|
|
importing_files = [] |
|
|
search_term = module_or_entity.lower() |
|
|
|
|
|
|
|
|
for nid, n in g.nodes(data=True): |
|
|
node = n['data'] |
|
|
node_type = getattr(node, 'node_type', None) |
|
|
|
|
|
if node_type != 'file': |
|
|
continue |
|
|
|
|
|
file_path = getattr(node, 'path', nid) |
|
|
called_entities = getattr(node, 'called_entities', []) |
|
|
|
|
|
|
|
|
found_in_calls = False |
|
|
matched_entities = [] |
|
|
for entity in called_entities: |
|
|
entity_str = str(entity).lower() if not isinstance(entity, dict) else entity.get('name', '').lower() |
|
|
if search_term in entity_str: |
|
|
found_in_calls = True |
|
|
matched_entities.append(entity_str) |
|
|
|
|
|
if found_in_calls: |
|
|
importing_files.append({ |
|
|
'path': file_path, |
|
|
'name': getattr(node, 'name', ''), |
|
|
'matched_entities': matched_entities[:5], |
|
|
'match_type': 'called_entity' |
|
|
}) |
|
|
continue |
|
|
|
|
|
|
|
|
chunks = knowledge_graph.get_chunks_of_file(file_path) if hasattr(knowledge_graph, 'get_chunks_of_file') else [] |
|
|
for chunk in chunks[:3]: |
|
|
content = getattr(chunk, 'content', '') |
|
|
|
|
|
import_patterns = [ |
|
|
rf'import\s+.*{re.escape(module_or_entity)}', |
|
|
rf'from\s+.*{re.escape(module_or_entity)}.*\s+import', |
|
|
rf'require\s*\(\s*["\'].*{re.escape(module_or_entity)}', |
|
|
rf'use\s+.*{re.escape(module_or_entity)}', |
|
|
] |
|
|
for pattern in import_patterns: |
|
|
if re.search(pattern, content, re.IGNORECASE): |
|
|
if not any(f['path'] == file_path for f in importing_files): |
|
|
importing_files.append({ |
|
|
'path': file_path, |
|
|
'name': getattr(node, 'name', ''), |
|
|
'matched_entities': [], |
|
|
'match_type': 'import_statement' |
|
|
}) |
|
|
break |
|
|
|
|
|
if len(importing_files) >= limit: |
|
|
break |
|
|
|
|
|
|
|
|
importing_files.sort(key=lambda x: x['path']) |
|
|
|
|
|
if not importing_files: |
|
|
return f"No files found importing '{module_or_entity}'.\n\nTip: Try searching for the module name in code content using search_nodes." |
|
|
|
|
|
result = f"Files importing '{module_or_entity}' ({len(importing_files)} results):\n" |
|
|
result += "ββββββββββββββββββββββββββββββββββββββββ\n\n" |
|
|
|
|
|
for i, f in enumerate(importing_files, 1): |
|
|
result += f"{i}. {f['path']}\n" |
|
|
result += f" Match type: {f['match_type']}\n" |
|
|
if f['matched_entities']: |
|
|
result += f" Matched: {', '.join(f['matched_entities'][:3])}\n" |
|
|
result += "\n" |
|
|
|
|
|
return result |
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
|
|
|
@observe(as_type="tool") |
|
|
def get_concept_overview(concept: str, limit: int = 15) -> str: |
|
|
""" |
|
|
Get a high-level overview of a concept across the codebase. |
|
|
|
|
|
Combines multiple search strategies to provide a comprehensive view of how |
|
|
a concept (like 'embeddings', 'authentication', 'caching') is implemented. |
|
|
|
|
|
Args: |
|
|
concept: The concept to search for (e.g., 'embedding', 'authentication', 'cache') |
|
|
limit: Maximum number of results per category (default: 15) |
|
|
|
|
|
Returns: |
|
|
str: A formatted overview of the concept across the codebase |
|
|
""" |
|
|
if knowledge_graph is None: |
|
|
return "Error: Knowledge graph not initialized" |
|
|
|
|
|
try: |
|
|
|
|
|
if isinstance(limit, str): |
|
|
try: |
|
|
limit = int(limit) |
|
|
except ValueError: |
|
|
return f"Error: 'limit' must be an integer, got '{limit}'" |
|
|
|
|
|
g = knowledge_graph.graph |
|
|
concept_lower = concept.lower() |
|
|
|
|
|
|
|
|
related_classes = [] |
|
|
related_functions = [] |
|
|
related_files = [] |
|
|
related_chunks = [] |
|
|
|
|
|
|
|
|
for nid, n in g.nodes(data=True): |
|
|
node = n['data'] |
|
|
node_type = getattr(node, 'node_type', None) |
|
|
node_name = getattr(node, 'name', '') |
|
|
|
|
|
|
|
|
name_match = concept_lower in node_name.lower() |
|
|
|
|
|
if node_type == 'entity': |
|
|
entity_type = getattr(node, 'entity_type', '') |
|
|
if name_match: |
|
|
if entity_type.lower() == 'class' and len(related_classes) < limit: |
|
|
declaring = getattr(node, 'declaring_chunk_ids', []) |
|
|
related_classes.append({ |
|
|
'name': node_name, |
|
|
'id': nid, |
|
|
'file': declaring[0] if declaring else 'Unknown' |
|
|
}) |
|
|
elif entity_type.lower() in ('function', 'method') and len(related_functions) < limit: |
|
|
declaring = getattr(node, 'declaring_chunk_ids', []) |
|
|
related_functions.append({ |
|
|
'name': node_name, |
|
|
'id': nid, |
|
|
'type': entity_type, |
|
|
'file': declaring[0] if declaring else 'Unknown' |
|
|
}) |
|
|
|
|
|
elif node_type == 'file' and len(related_files) < limit: |
|
|
|
|
|
file_path = getattr(node, 'path', '') |
|
|
if concept_lower in file_path.lower() or name_match: |
|
|
declared = getattr(node, 'declared_entities', []) |
|
|
related_files.append({ |
|
|
'path': file_path, |
|
|
'name': node_name, |
|
|
'entity_count': len(declared) |
|
|
}) |
|
|
|
|
|
elif node_type == 'chunk' and len(related_chunks) < limit // 2: |
|
|
|
|
|
content = getattr(node, 'content', '') |
|
|
description = getattr(node, 'description', '') |
|
|
if concept_lower in content.lower() or concept_lower in (description or '').lower(): |
|
|
file_path = getattr(node, 'path', '') |
|
|
related_chunks.append({ |
|
|
'id': nid, |
|
|
'file': file_path, |
|
|
'content': content |
|
|
}) |
|
|
|
|
|
|
|
|
result = f"Concept Overview: '{concept}'\n" |
|
|
result += "ββββββββββββββββββββββββββββββββββββββββ\n\n" |
|
|
|
|
|
|
|
|
total = len(related_classes) + len(related_functions) + len(related_files) + len(related_chunks) |
|
|
result += f"Found {total} related items across the codebase.\n\n" |
|
|
|
|
|
if related_classes: |
|
|
result += f"π¦ Related Classes ({len(related_classes)}):\n" |
|
|
for cls in related_classes[:10]: |
|
|
result += f" β’ {cls['name']}\n" |
|
|
result += f" File: {cls['file']}\n" |
|
|
if len(related_classes) > 10: |
|
|
result += f" ... and {len(related_classes) - 10} more\n" |
|
|
result += "\n" |
|
|
|
|
|
if related_functions: |
|
|
result += f"β‘ Related Functions/Methods ({len(related_functions)}):\n" |
|
|
for func in related_functions[:10]: |
|
|
result += f" β’ {func['name']} ({func['type']})\n" |
|
|
result += f" File: {func['file']}\n" |
|
|
if len(related_functions) > 10: |
|
|
result += f" ... and {len(related_functions) - 10} more\n" |
|
|
result += "\n" |
|
|
|
|
|
if related_files: |
|
|
result += f"π Related Files ({len(related_files)}):\n" |
|
|
for f in related_files[:10]: |
|
|
result += f" β’ {f['path']}\n" |
|
|
result += f" Entities: {f['entity_count']}\n" |
|
|
if len(related_files) > 10: |
|
|
result += f" ... and {len(related_files) - 10} more\n" |
|
|
result += "\n" |
|
|
|
|
|
if related_chunks: |
|
|
result += f"π Code Snippets ({len(related_chunks)}):\n" |
|
|
for chunk in related_chunks[:5]: |
|
|
result += f" β’ {chunk['id']}\n" |
|
|
result += f" Content:\n{chunk['content']}\n\n" |
|
|
if len(related_chunks) > 5: |
|
|
result += f" ... and {len(related_chunks) - 5} more\n" |
|
|
|
|
|
if total == 0: |
|
|
result += "No direct matches found.\n\n" |
|
|
result += "Suggestions:\n" |
|
|
result += f" β’ Try searching with: search_nodes('{concept}')\n" |
|
|
result += f" β’ Try partial name: search_by_type_and_name('class', '{concept[:4]}')\n" |
|
|
result += f" β’ Check entity list: list_all_entities(entity_type='class')\n" |
|
|
|
|
|
return result |
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_gradio_app(): |
|
|
"""Create and configure the Gradio interface.""" |
|
|
|
|
|
with gr.Blocks(title="Transformers Knowledge Graph Explorer β Knowledge Graph MCP Server", theme=gr.themes.Soft()) as demo: |
|
|
|
|
|
def _tool_doc_md(func): |
|
|
doc = (func.__doc__ or "No description available.").strip() |
|
|
|
|
|
return f"**Description:**\n\n```\n{doc}\n```" |
|
|
|
|
|
gr.Markdown(""" |
|
|
# π Transformers Knowledge Graph Explorer |
|
|
|
|
|
Explore and query the Hugging Face Transformers library codebase using a knowledge graph. |
|
|
""") |
|
|
|
|
|
with gr.Tab("π Graph Overview"): |
|
|
stats_btn = gr.Button("Get Graph Statistics", variant="primary") |
|
|
stats_output = gr.Textbox(label="Statistics", lines=20, max_lines=30) |
|
|
stats_btn.click(fn=get_graph_stats, outputs=stats_output) |
|
|
gr.Markdown(_tool_doc_md(get_graph_stats)) |
|
|
|
|
|
with gr.Tab("π Search"): |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
search_query = gr.Textbox(label="Search Query", placeholder="Enter search query...") |
|
|
search_limit = gr.Slider(1, 50, value=10, step=1, label="Max Results") |
|
|
search_btn = gr.Button("Search", variant="primary") |
|
|
with gr.Column(): |
|
|
search_output = gr.Textbox(label="Search Results", lines=20, max_lines=30) |
|
|
search_btn.click(fn=search_nodes, inputs=[search_query, search_limit], outputs=search_output) |
|
|
gr.Markdown(_tool_doc_md(search_nodes)) |
|
|
|
|
|
with gr.Tab("π Node Info"): |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
node_id_input = gr.Textbox(label="Node ID", placeholder="Enter node ID...") |
|
|
node_info_btn = gr.Button("Get Node Info", variant="primary") |
|
|
node_edges_btn = gr.Button("Get Node Edges", variant="secondary") |
|
|
with gr.Column(): |
|
|
node_output = gr.Textbox(label="Node Information", lines=20, max_lines=30) |
|
|
node_info_btn.click(fn=get_node_info, inputs=node_id_input, outputs=node_output) |
|
|
node_edges_btn.click(fn=get_node_edges, inputs=node_id_input, outputs=node_output) |
|
|
gr.Markdown(_tool_doc_md(get_node_info)) |
|
|
gr.Markdown(_tool_doc_md(get_node_edges)) |
|
|
|
|
|
with gr.Tab("ποΈ Structure"): |
|
|
gr.Markdown("### Repository Tree") |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
tree_root = gr.Textbox(label="Root Node ID", value="root", placeholder="root") |
|
|
tree_depth = gr.Slider(1, 10, value=3, step=1, label="Max Depth") |
|
|
tree_btn = gr.Button("Show Tree", variant="primary") |
|
|
with gr.Column(): |
|
|
tree_output = gr.Textbox(label="Tree View", lines=20, max_lines=40) |
|
|
tree_btn.click(fn=print_tree, inputs=[tree_root, tree_depth], outputs=tree_output) |
|
|
gr.Markdown(_tool_doc_md(print_tree)) |
|
|
|
|
|
gr.Markdown("---") |
|
|
gr.Markdown("### File Structure") |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
file_path_input = gr.Textbox(label="File Path", placeholder="Enter file path...") |
|
|
file_structure_btn = gr.Button("Get File Structure", variant="primary") |
|
|
with gr.Column(): |
|
|
file_structure_output = gr.Textbox(label="File Structure", lines=20, max_lines=30) |
|
|
file_structure_btn.click(fn=get_file_structure, inputs=file_path_input, outputs=file_structure_output) |
|
|
gr.Markdown(_tool_doc_md(get_file_structure)) |
|
|
|
|
|
with gr.Tab("π― Entities"): |
|
|
gr.Markdown("### List All Entities") |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
entity_page = gr.Slider(1, 100, value=1, step=1, label="Page") |
|
|
entity_limit = gr.Slider(10, 100, value=50, step=10, label="Per Page") |
|
|
entity_type_filter = gr.Dropdown( |
|
|
choices=["", "class", "function", "method", "variable", "parameter"], |
|
|
label="Filter by Type (optional)", value="" |
|
|
) |
|
|
declared_in_repo = gr.Dropdown( |
|
|
choices=["", "true", "false"], |
|
|
label="Declared in Repo (optional)", |
|
|
value="" |
|
|
) |
|
|
list_entities_btn = gr.Button("List Entities", variant="primary") |
|
|
with gr.Column(): |
|
|
list_entities_output = gr.Textbox(label="Entities", lines=20, max_lines=30) |
|
|
list_entities_btn.click( |
|
|
fn=list_all_entities, |
|
|
inputs=[entity_limit, entity_page, entity_type_filter, declared_in_repo], |
|
|
outputs=list_entities_output, |
|
|
) |
|
|
gr.Markdown(_tool_doc_md(list_all_entities)) |
|
|
|
|
|
gr.Markdown("---") |
|
|
gr.Markdown("### Go to Definition") |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
entity_name_def = gr.Textbox(label="Entity Name", placeholder="Enter entity name...") |
|
|
def_btn = gr.Button("Go to Definition", variant="primary") |
|
|
with gr.Column(): |
|
|
def_output = gr.Textbox(label="Definition", lines=15, max_lines=25) |
|
|
def_btn.click(fn=go_to_definition, inputs=entity_name_def, outputs=def_output) |
|
|
gr.Markdown(_tool_doc_md(go_to_definition)) |
|
|
|
|
|
gr.Markdown("---") |
|
|
gr.Markdown("### Find Usages") |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
entity_name_usage = gr.Textbox(label="Entity Name", placeholder="Enter entity name...") |
|
|
usage_limit = gr.Slider(1, 50, value=20, step=1, label="Max Results") |
|
|
usage_btn = gr.Button("Find Usages", variant="primary") |
|
|
with gr.Column(): |
|
|
usage_output = gr.Textbox(label="Usages", lines=15, max_lines=25) |
|
|
usage_btn.click(fn=find_usages, inputs=[entity_name_usage, usage_limit], outputs=usage_output) |
|
|
gr.Markdown(_tool_doc_md(find_usages)) |
|
|
|
|
|
with gr.Tab("π¬ Discovery"): |
|
|
gr.Markdown("### List Nodes by Type") |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
node_type_input = gr.Dropdown( |
|
|
choices=["file", "directory", "chunk", "entity", "function", "class", "method"], |
|
|
label="Node Type" |
|
|
) |
|
|
type_limit = gr.Slider(1, 100, value=20, step=1, label="Max Results") |
|
|
type_btn = gr.Button("List Nodes", variant="primary") |
|
|
with gr.Column(): |
|
|
type_output = gr.Textbox(label="Results", lines=20, max_lines=30) |
|
|
type_btn.click(fn=list_nodes_by_type, inputs=[node_type_input, type_limit], outputs=type_output) |
|
|
gr.Markdown(_tool_doc_md(list_nodes_by_type)) |
|
|
|
|
|
gr.Markdown("---") |
|
|
gr.Markdown("### Search by Type and Name") |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
search_type = gr.Dropdown( |
|
|
choices=["file", "directory", "chunk", "entity", "function", "class", "method"], |
|
|
label="Node Type" |
|
|
) |
|
|
search_name = gr.Textbox(label="Name Contains", placeholder="Enter partial name...") |
|
|
search_type_btn = gr.Button("Search", variant="primary") |
|
|
with gr.Column(): |
|
|
search_type_output = gr.Textbox(label="Results", lines=20, max_lines=30) |
|
|
search_type_btn.click(fn=search_by_type_and_name, inputs=[search_type, search_name], outputs=search_type_output) |
|
|
gr.Markdown(_tool_doc_md(search_by_type_and_name)) |
|
|
|
|
|
with gr.Tab("π Relationships"): |
|
|
gr.Markdown("### Get Neighbors") |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
neighbor_node_id = gr.Textbox(label="Node ID", placeholder="Enter node ID...") |
|
|
neighbor_btn = gr.Button("Get Neighbors", variant="primary") |
|
|
with gr.Column(): |
|
|
neighbor_output = gr.Textbox(label="Neighbors", lines=20, max_lines=30) |
|
|
neighbor_btn.click(fn=get_neighbors, inputs=neighbor_node_id, outputs=neighbor_output) |
|
|
gr.Markdown(_tool_doc_md(get_neighbors)) |
|
|
|
|
|
gr.Markdown("---") |
|
|
gr.Markdown("### Entity Relationships") |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
rel_node_id = gr.Textbox(label="Node ID", placeholder="Enter node ID...") |
|
|
rel_btn = gr.Button("Get Relationships", variant="primary") |
|
|
with gr.Column(): |
|
|
rel_output = gr.Textbox(label="Relationships", lines=20, max_lines=30) |
|
|
rel_btn.click(fn=entity_relationships, inputs=rel_node_id, outputs=rel_output) |
|
|
gr.Markdown(_tool_doc_md(entity_relationships)) |
|
|
|
|
|
gr.Markdown("---") |
|
|
gr.Markdown("### Get Related Chunks") |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
related_chunk_id = gr.Textbox(label="Chunk ID", placeholder="Enter chunk ID...") |
|
|
relation_type = gr.Dropdown(choices=["calls", "contains", "declares", "uses"], label="Relation Type", value="calls") |
|
|
related_btn = gr.Button("Get Related Chunks", variant="primary") |
|
|
with gr.Column(): |
|
|
related_output = gr.Textbox(label="Related Chunks", lines=20, max_lines=30) |
|
|
related_btn.click(fn=get_related_chunks, inputs=[related_chunk_id, relation_type], outputs=related_output) |
|
|
gr.Markdown(_tool_doc_md(get_related_chunks)) |
|
|
|
|
|
gr.Markdown("---") |
|
|
gr.Markdown("### Find Path Between Nodes") |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
path_source = gr.Textbox(label="Source Node ID", placeholder="Enter source node ID...") |
|
|
path_target = gr.Textbox(label="Target Node ID", placeholder="Enter target node ID...") |
|
|
path_depth = gr.Slider(1, 10, value=5, step=1, label="Max Depth") |
|
|
path_btn = gr.Button("Find Path", variant="primary") |
|
|
with gr.Column(): |
|
|
path_output = gr.Textbox(label="Path", lines=20, max_lines=30) |
|
|
path_btn.click(fn=find_path, inputs=[path_source, path_target, path_depth], outputs=path_output) |
|
|
gr.Markdown(_tool_doc_md(find_path)) |
|
|
|
|
|
gr.Markdown("---") |
|
|
gr.Markdown("### Find Classes Inheriting From") |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
base_class_input = gr.Textbox(label="Base Class Name", placeholder="Enter base class...") |
|
|
inherit_btn = gr.Button("Find Subclasses", variant="primary") |
|
|
with gr.Column(): |
|
|
inherit_output = gr.Textbox(label="Inheriting Classes", lines=20, max_lines=30) |
|
|
inherit_btn.click(fn=find_classes_inheriting_from, inputs=base_class_input, outputs=inherit_output) |
|
|
gr.Markdown(_tool_doc_md(find_classes_inheriting_from)) |
|
|
|
|
|
with gr.Tab("π Context"): |
|
|
gr.Markdown("### Get Chunk Context") |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
chunk_id_input = gr.Textbox(label="Chunk ID", placeholder="Enter chunk ID...") |
|
|
context_btn = gr.Button("Get Context", variant="primary") |
|
|
with gr.Column(): |
|
|
context_output = gr.Textbox(label="Context", lines=25, max_lines=40) |
|
|
context_btn.click(fn=get_chunk_context, inputs=chunk_id_input, outputs=context_output) |
|
|
gr.Markdown(_tool_doc_md(get_chunk_context)) |
|
|
|
|
|
gr.Markdown("---") |
|
|
gr.Markdown("### Concept Overview") |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
concept_input = gr.Textbox(label="Concept", placeholder="e.g., embedding, authentication...") |
|
|
concept_btn = gr.Button("Get Overview", variant="primary") |
|
|
with gr.Column(): |
|
|
concept_output = gr.Textbox(label="Concept Overview", lines=25, max_lines=40) |
|
|
concept_btn.click(fn=get_concept_overview, inputs=concept_input, outputs=concept_output) |
|
|
gr.Markdown(_tool_doc_md(get_concept_overview)) |
|
|
|
|
|
gr.Markdown("---") |
|
|
gr.Markdown("### Get Subgraph") |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
subgraph_node = gr.Textbox(label="Center Node ID", placeholder="Enter node ID...") |
|
|
subgraph_depth = gr.Slider(1, 5, value=2, step=1, label="Depth") |
|
|
subgraph_edge_types = gr.Textbox(label="Edge Types (comma-separated, optional)", placeholder="e.g., calls,contains") |
|
|
subgraph_btn = gr.Button("Extract Subgraph", variant="primary") |
|
|
with gr.Column(): |
|
|
subgraph_output = gr.Textbox(label="Subgraph", lines=20, max_lines=30) |
|
|
subgraph_btn.click(fn=get_subgraph, inputs=[subgraph_node, subgraph_depth, subgraph_edge_types], outputs=subgraph_output) |
|
|
gr.Markdown(_tool_doc_md(get_subgraph)) |
|
|
|
|
|
with gr.Tab("π Files"): |
|
|
gr.Markdown("### List Files in Directory") |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
dir_path = gr.Textbox(label="Directory Path (empty for root)", placeholder="e.g., src/") |
|
|
file_pattern = gr.Textbox(label="Pattern", value="*", placeholder="e.g., *.py") |
|
|
file_recursive = gr.Checkbox(label="Recursive", value=True) |
|
|
file_limit = gr.Slider(10, 100, value=50, step=10, label="Max Results") |
|
|
list_files_btn = gr.Button("List Files", variant="primary") |
|
|
with gr.Column(): |
|
|
list_files_output = gr.Textbox(label="Files", lines=20, max_lines=30) |
|
|
list_files_btn.click(fn=list_files_in_directory, inputs=[dir_path, file_pattern, file_recursive, file_limit], outputs=list_files_output) |
|
|
gr.Markdown(_tool_doc_md(list_files_in_directory)) |
|
|
|
|
|
gr.Markdown("---") |
|
|
gr.Markdown("### Find Files Importing") |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
import_module = gr.Textbox(label="Module/Entity Name", placeholder="e.g., torch, numpy...") |
|
|
import_limit = gr.Slider(10, 50, value=30, step=5, label="Max Results") |
|
|
find_imports_btn = gr.Button("Find Files", variant="primary") |
|
|
with gr.Column(): |
|
|
find_imports_output = gr.Textbox(label="Importing Files", lines=20, max_lines=30) |
|
|
find_imports_btn.click(fn=find_files_importing, inputs=[import_module, import_limit], outputs=find_imports_output) |
|
|
gr.Markdown(_tool_doc_md(find_files_importing)) |
|
|
|
|
|
gr.Markdown("---") |
|
|
gr.Markdown("### Get File Stats") |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
stats_path = gr.Textbox(label="File Path", placeholder="Enter file path...") |
|
|
stats_btn = gr.Button("Get Stats", variant="primary") |
|
|
with gr.Column(): |
|
|
stats_output = gr.Textbox(label="Statistics", lines=20, max_lines=30) |
|
|
stats_btn.click(fn=get_file_stats, inputs=stats_path, outputs=stats_output) |
|
|
gr.Markdown(_tool_doc_md(get_file_stats)) |
|
|
|
|
|
with gr.Tab("π Analysis"): |
|
|
gr.Markdown("### Diff Chunks") |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
diff_node1 = gr.Textbox(label="First Node ID", placeholder="Enter first node ID...") |
|
|
diff_node2 = gr.Textbox(label="Second Node ID", placeholder="Enter second node ID...") |
|
|
diff_btn = gr.Button("Show Diff", variant="primary") |
|
|
with gr.Column(): |
|
|
diff_output = gr.Textbox(label="Diff Output", lines=25, max_lines=40) |
|
|
diff_btn.click(fn=diff_chunks, inputs=[diff_node1, diff_node2], outputs=diff_output) |
|
|
gr.Markdown(_tool_doc_md(diff_chunks)) |
|
|
|
|
|
return demo |
|
|
|
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser(description="Knowledge Graph MCP Server from HuggingFace Dataset") |
|
|
|
|
|
|
|
|
parser.add_argument("--hf-dataset", type=str, default=os.environ.get("HF_DATASET"), |
|
|
help="HuggingFace dataset repo ID (e.g., 'username/dataset-name')") |
|
|
|
|
|
|
|
|
parser.add_argument("--hf-token", type=str, default=os.environ.get("HF_TOKEN"), |
|
|
help="HuggingFace API token for private datasets (or set HF_TOKEN env var)") |
|
|
|
|
|
|
|
|
parser.add_argument("--host", type=str, default="0.0.0.0", help="Host to bind to") |
|
|
parser.add_argument("--port", type=int, default=7860, help="Port to bind to") |
|
|
parser.add_argument("--share", action="store_true", help="Create a public link") |
|
|
|
|
|
|
|
|
parser.add_argument("--no-index", action="store_true", help="Skip indexing nodes") |
|
|
parser.add_argument("--code-index-type", type=str, default="keyword-only", |
|
|
choices=["keyword-only", "embedding-only", "hybrid"], |
|
|
help="Type of code index to use") |
|
|
parser.add_argument("--code-index-backend", type=str, default="lancedb", |
|
|
choices=["lancedb", "weaviate"], |
|
|
help="Backend for code index") |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
code_index_kwargs = { |
|
|
"index_type": args.code_index_type, |
|
|
"backend": args.code_index_backend, |
|
|
"use_embed": args.code_index_type != "keyword-only", |
|
|
} |
|
|
|
|
|
|
|
|
print("Initializing knowledge graph from HuggingFace dataset...") |
|
|
initialize_knowledge_graph( |
|
|
hf_dataset=args.hf_dataset, |
|
|
hf_token=args.hf_token, |
|
|
index_nodes=not args.no_index, |
|
|
code_index_kwargs=code_index_kwargs |
|
|
) |
|
|
print("Knowledge graph initialized!") |
|
|
|
|
|
|
|
|
demo = create_gradio_app() |
|
|
demo.launch( |
|
|
server_name=args.host, |
|
|
server_port=args.port, |
|
|
share=args.share, |
|
|
mcp_server=True |
|
|
) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|