| import gradio as gr |
| import json |
| from dataclasses import dataclass, field, asdict |
| from typing import List, Dict, Any, Tuple, TypedDict, Annotated |
|
|
| |
| @dataclass |
| class Entity: name: str; entityType: str; observations: List[str] = field(default_factory=list) |
| @dataclass |
| class Relation: from_entity: str; to_entity: str; relationType: str |
| @dataclass |
| class KnowledgeGraph: entities: Dict[str, Entity] = field(default_factory=dict); relations: List[Relation] = field(default_factory=list) |
|
|
| class KnowledgeGraphManager: |
| def __init__(self): self.graph = KnowledgeGraph(); print("Initialized a new, empty in-memory knowledge graph.") |
| def create_entities(self, entities_data: List[Dict]) -> List[Entity]: |
| new_entities = []; [ (new_entity := Entity(**e_data), self.graph.entities.update({e_data['name']: new_entity}), new_entities.append(new_entity)) for e_data in entities_data if e_data['name'] not in self.graph.entities]; return new_entities |
| def create_relations(self, relations_data: List[Dict]) -> List[Relation]: |
| new_relations = []; existing = {(r.from_entity, r.to_entity, r.relationType) for r in self.graph.relations}; [ (r_data.update({'from_entity': r_data.pop('from'), 'to_entity': r_data.pop('to')}), (new_relation := Relation(**r_data), self.graph.relations.append(new_relation), new_relations.append(new_relation))) for r_data in relations_data if (r_data['from'], r_data['to'], r_data['relationType']) not in existing]; return new_relations |
| def add_observations(self, observations_data: List[Dict]) -> List[Dict]: |
| results = []; [ (entity := self.graph.entities.get(o_data['entityName']), ( (new_obs := [obs for obs in o_data['contents'] if obs not in entity.observations]), entity.observations.extend(new_obs), results.append({"entityName": entity.name, "addedObservations": new_obs}) ) if entity else (_ for _ in ()).throw(ValueError(f"Entity with name {o_data['entityName']} not found"))) for o_data in observations_data]; return results |
| def delete_entities(self, entity_names: List[str]) -> None: |
| to_delete = set(entity_names); self.graph.entities = {n: e for n, e in self.graph.entities.items() if n not in to_delete}; self.graph.relations = [r for r in self.graph.relations if r.from_entity not in to_delete and r.to_entity not in to_delete] |
| def delete_observations(self, deletions_data: List[Dict]) -> None: |
| for d in deletions_data: |
| if (entity := self.graph.entities.get(d['entityName'])): |
| obs_to_delete = set(d['observations']); entity.observations = [obs for obs in entity.observations if obs not in obs_to_delete] |
| def delete_relations(self, relations_data: List[Dict]) -> None: |
| to_delete = {(r.get('from'), r.get('to'), r.get('relationType')) for r in relations_data}; self.graph.relations = [r for r in self.graph.relations if (r.from_entity, r.to_entity, r.relationType) not in to_delete] |
| def read_graph(self) -> Dict: |
| return {"entities": [asdict(e) for e in self.graph.entities.values()], "relations": [{"from": r.from_entity, "to": r.to_entity, "relationType": r.relationType} for r in self.graph.relations]} |
| def search_nodes(self, query: str) -> Dict: |
| q_lower = query.lower(); f_entities = [e for e in self.graph.entities.values() if q_lower in e.name.lower() or q_lower in e.entityType.lower() or any(q_lower in obs.lower() for obs in e.observations)]; f_names = {e.name for e in f_entities}; f_relations = [r for r in self.graph.relations if r.from_entity in f_names and r.to_entity in f_names]; return {"entities": [asdict(e) for e in f_entities], "relations": [{"from": r.from_entity, "to": r.to_entity, "relationType": r.relationType} for r in f_relations]} |
| def open_nodes(self, names: List[str]) -> Dict: |
| names_set = set(names); f_entities = [e for n, e in self.graph.entities.items() if n in names_set]; f_names = {e.name for e in f_entities}; f_relations = [r for r in self.graph.relations if r.from_entity in f_names and r.to_entity in f_names]; return {"entities": [asdict(e) for e in f_entities], "relations": [{"from": r.from_entity, "to": r.to_entity, "relationType": r.relationType} for r in f_relations]} |
|
|
| kg_manager = KnowledgeGraphManager() |
|
|
| class CreateEntitiesPayload(TypedDict): entities: List[Dict] |
| class CreateRelationsPayload(TypedDict): relations: List[Dict] |
| class AddObservationsPayload(TypedDict): observations: List[Dict] |
| class DeleteEntitiesPayload(TypedDict): entityNames: List[str] |
| class DeleteObservationsPayload(TypedDict): deletions: List[Dict] |
| class DeleteRelationsPayload(TypedDict): relations: List[Dict] |
| class SearchNodesPayload(TypedDict): query: str |
| class OpenNodesPayload(TypedDict): names: List[str] |
|
|
| def create_entities(payload: Annotated[CreateEntitiesPayload, "一个包含'entities'键的JSON对象。'entities'的值是一个实体对象的列表。"]) -> str: |
| """在知识图谱中创建多个新实体。""" |
| new_entities = kg_manager.create_entities(payload['entities']) |
| return json.dumps([asdict(e) for e in new_entities], indent=2) |
|
|
| def create_relations(payload: Annotated[CreateRelationsPayload, "一个包含'relations'键的JSON对象。'relations'的值是一个关系对象的列表。"]) -> str: |
| """在知识图谱中的实体之间创建多个新关系。""" |
| new_relations = kg_manager.create_relations(payload['relations']) |
| return json.dumps([{"from": r.from_entity, "to": r.to_entity, "relationType": r.relationType} for r in new_relations], indent=2) |
|
|
| |
| def add_observations(payload: Annotated[AddObservationsPayload, "一个包含'observations'键的JSON对象。'observations'的值是一个观察记录对象的列表。"]) -> str: |
| """向知识图ppu中已存在的实体添加新的观察记录。""" |
| results = kg_manager.add_observations(payload['observations']) |
| return json.dumps(results, indent=2) |
| def delete_entities(payload: Annotated[DeleteEntitiesPayload, "一个包含'entityNames'键的JSON对象。'entityNames'的值是一个实体名称的列表。"]) -> str: |
| """从知识图谱中删除多个实体及其相关联的关系。""" |
| kg_manager.delete_entities(payload['entityNames']) |
| return "Entities and their relations deleted successfully." |
| def delete_observations(payload: Annotated[DeleteObservationsPayload, "一个包含'deletions'键的JSON对象。'deletions'的值是一个删除指令的列表。"]) -> str: |
| """从知识图谱中的实体删除特定的观察记录。""" |
| kg_manager.delete_observations(payload['deletions']) |
| return "Observations deleted successfully." |
| def delete_relations(payload: Annotated[DeleteRelationsPayload, "一个包含'relations'键的JSON对象。'relations'的值是一个要删除的关系对象的列表。"]) -> str: |
| """从知识图谱中删除多个关系。""" |
| kg_manager.delete_relations(payload['relations']) |
| return "Relations deleted successfully." |
| def read_graph() -> Dict: |
| """读取并返回整个知识图谱。此函数不需要参数。""" |
| return kg_manager.read_graph() |
| def search_nodes(payload: Annotated[SearchNodesPayload, "一个包含'query'键的JSON对象。'query'的值是用于搜索的字符串。"]) -> Dict: |
| """根据查询词在知识图谱中搜索节点。""" |
| return kg_manager.search_nodes(payload['query']) |
| def open_nodes(payload: Annotated[OpenNodesPayload, "一个包含'names'键的JSON对象。'names'的值是一个要检索的实体名称的列表。"]) -> Dict: |
| """通过名称打开知识图谱中的特定节点。""" |
| return kg_manager.open_nodes(payload['names']) |
|
|
|
|
| with gr.Blocks() as app: |
| gr.Markdown("MCP Server is running. This UI is intentionally blank.", visible=True) |
| with gr.Row(visible=False): |
| json_input = gr.JSON(value={}); generic_output = gr.JSON(); dummy_btn = gr.Button() |
| dummy_btn.click(fn=create_entities, inputs=json_input, outputs=generic_output, api_name="create_entities") |
| dummy_btn.click(fn=create_relations, inputs=json_input, outputs=generic_output, api_name="create_relations") |
| dummy_btn.click(fn=add_observations, inputs=json_input, outputs=generic_output, api_name="add_observations") |
| dummy_btn.click(fn=delete_entities, inputs=json_input, outputs=generic_output, api_name="delete_entities") |
| dummy_btn.click(fn=delete_observations, inputs=json_input, outputs=generic_output, api_name="delete_observations") |
| dummy_btn.click(fn=delete_relations, inputs=json_input, outputs=generic_output, api_name="delete_relations") |
| dummy_btn.click(fn=read_graph, inputs=None, outputs=generic_output, api_name="read_graph") |
| dummy_btn.click(fn=search_nodes, inputs=json_input, outputs=generic_output, api_name="search_nodes") |
| dummy_btn.click(fn=open_nodes, inputs=json_input, outputs=generic_output, api_name="open_nodes") |
|
|
| if __name__ == "__main__": |
| app.launch(mcp_server=True) |