memory / app.py
play7284's picture
Update app.py
c50c4ad verified
import gradio as gr
import json
from dataclasses import dataclass, field, asdict
from typing import List, Dict, Any, Tuple, TypedDict, Annotated
# ... (所有数据结构和 Manager 类的代码保持不变) ...
@dataclass
class Entity: name: str; entityType: str; observations: List[str] = field(default_factory=list)
@dataclass
class Relation: from_entity: str; to_entity: str; relationType: str
@dataclass
class KnowledgeGraph: entities: Dict[str, Entity] = field(default_factory=dict); relations: List[Relation] = field(default_factory=list)
class KnowledgeGraphManager:
def __init__(self): self.graph = KnowledgeGraph(); print("Initialized a new, empty in-memory knowledge graph.")
def create_entities(self, entities_data: List[Dict]) -> List[Entity]:
new_entities = []; [ (new_entity := Entity(**e_data), self.graph.entities.update({e_data['name']: new_entity}), new_entities.append(new_entity)) for e_data in entities_data if e_data['name'] not in self.graph.entities]; return new_entities
def create_relations(self, relations_data: List[Dict]) -> List[Relation]:
new_relations = []; existing = {(r.from_entity, r.to_entity, r.relationType) for r in self.graph.relations}; [ (r_data.update({'from_entity': r_data.pop('from'), 'to_entity': r_data.pop('to')}), (new_relation := Relation(**r_data), self.graph.relations.append(new_relation), new_relations.append(new_relation))) for r_data in relations_data if (r_data['from'], r_data['to'], r_data['relationType']) not in existing]; return new_relations
def add_observations(self, observations_data: List[Dict]) -> List[Dict]:
results = []; [ (entity := self.graph.entities.get(o_data['entityName']), ( (new_obs := [obs for obs in o_data['contents'] if obs not in entity.observations]), entity.observations.extend(new_obs), results.append({"entityName": entity.name, "addedObservations": new_obs}) ) if entity else (_ for _ in ()).throw(ValueError(f"Entity with name {o_data['entityName']} not found"))) for o_data in observations_data]; return results
def delete_entities(self, entity_names: List[str]) -> None:
to_delete = set(entity_names); self.graph.entities = {n: e for n, e in self.graph.entities.items() if n not in to_delete}; self.graph.relations = [r for r in self.graph.relations if r.from_entity not in to_delete and r.to_entity not in to_delete]
def delete_observations(self, deletions_data: List[Dict]) -> None:
for d in deletions_data:
if (entity := self.graph.entities.get(d['entityName'])):
obs_to_delete = set(d['observations']); entity.observations = [obs for obs in entity.observations if obs not in obs_to_delete]
def delete_relations(self, relations_data: List[Dict]) -> None:
to_delete = {(r.get('from'), r.get('to'), r.get('relationType')) for r in relations_data}; self.graph.relations = [r for r in self.graph.relations if (r.from_entity, r.to_entity, r.relationType) not in to_delete]
def read_graph(self) -> Dict:
return {"entities": [asdict(e) for e in self.graph.entities.values()], "relations": [{"from": r.from_entity, "to": r.to_entity, "relationType": r.relationType} for r in self.graph.relations]}
def search_nodes(self, query: str) -> Dict:
q_lower = query.lower(); f_entities = [e for e in self.graph.entities.values() if q_lower in e.name.lower() or q_lower in e.entityType.lower() or any(q_lower in obs.lower() for obs in e.observations)]; f_names = {e.name for e in f_entities}; f_relations = [r for r in self.graph.relations if r.from_entity in f_names and r.to_entity in f_names]; return {"entities": [asdict(e) for e in f_entities], "relations": [{"from": r.from_entity, "to": r.to_entity, "relationType": r.relationType} for r in f_relations]}
def open_nodes(self, names: List[str]) -> Dict:
names_set = set(names); f_entities = [e for n, e in self.graph.entities.items() if n in names_set]; f_names = {e.name for e in f_entities}; f_relations = [r for r in self.graph.relations if r.from_entity in f_names and r.to_entity in f_names]; return {"entities": [asdict(e) for e in f_entities], "relations": [{"from": r.from_entity, "to": r.to_entity, "relationType": r.relationType} for r in f_relations]}
kg_manager = KnowledgeGraphManager()
class CreateEntitiesPayload(TypedDict): entities: List[Dict]
class CreateRelationsPayload(TypedDict): relations: List[Dict]
class AddObservationsPayload(TypedDict): observations: List[Dict]
class DeleteEntitiesPayload(TypedDict): entityNames: List[str]
class DeleteObservationsPayload(TypedDict): deletions: List[Dict]
class DeleteRelationsPayload(TypedDict): relations: List[Dict]
class SearchNodesPayload(TypedDict): query: str
class OpenNodesPayload(TypedDict): names: List[str]
def create_entities(payload: Annotated[CreateEntitiesPayload, "一个包含'entities'键的JSON对象。'entities'的值是一个实体对象的列表。"]) -> str:
"""在知识图谱中创建多个新实体。"""
new_entities = kg_manager.create_entities(payload['entities'])
return json.dumps([asdict(e) for e in new_entities], indent=2)
def create_relations(payload: Annotated[CreateRelationsPayload, "一个包含'relations'键的JSON对象。'relations'的值是一个关系对象的列表。"]) -> str:
"""在知识图谱中的实体之间创建多个新关系。"""
new_relations = kg_manager.create_relations(payload['relations'])
return json.dumps([{"from": r.from_entity, "to": r.to_entity, "relationType": r.relationType} for r in new_relations], indent=2)
# ... (所有其他API函数的定义也使用 Annotated[TypedDict, "description"] 格式) ...
def add_observations(payload: Annotated[AddObservationsPayload, "一个包含'observations'键的JSON对象。'observations'的值是一个观察记录对象的列表。"]) -> str:
"""向知识图ppu中已存在的实体添加新的观察记录。"""
results = kg_manager.add_observations(payload['observations'])
return json.dumps(results, indent=2)
def delete_entities(payload: Annotated[DeleteEntitiesPayload, "一个包含'entityNames'键的JSON对象。'entityNames'的值是一个实体名称的列表。"]) -> str:
"""从知识图谱中删除多个实体及其相关联的关系。"""
kg_manager.delete_entities(payload['entityNames'])
return "Entities and their relations deleted successfully."
def delete_observations(payload: Annotated[DeleteObservationsPayload, "一个包含'deletions'键的JSON对象。'deletions'的值是一个删除指令的列表。"]) -> str:
"""从知识图谱中的实体删除特定的观察记录。"""
kg_manager.delete_observations(payload['deletions'])
return "Observations deleted successfully."
def delete_relations(payload: Annotated[DeleteRelationsPayload, "一个包含'relations'键的JSON对象。'relations'的值是一个要删除的关系对象的列表。"]) -> str:
"""从知识图谱中删除多个关系。"""
kg_manager.delete_relations(payload['relations'])
return "Relations deleted successfully."
def read_graph() -> Dict:
"""读取并返回整个知识图谱。此函数不需要参数。"""
return kg_manager.read_graph()
def search_nodes(payload: Annotated[SearchNodesPayload, "一个包含'query'键的JSON对象。'query'的值是用于搜索的字符串。"]) -> Dict:
"""根据查询词在知识图谱中搜索节点。"""
return kg_manager.search_nodes(payload['query'])
def open_nodes(payload: Annotated[OpenNodesPayload, "一个包含'names'键的JSON对象。'names'的值是一个要检索的实体名称的列表。"]) -> Dict:
"""通过名称打开知识图谱中的特定节点。"""
return kg_manager.open_nodes(payload['names'])
with gr.Blocks() as app:
gr.Markdown("MCP Server is running. This UI is intentionally blank.", visible=True)
with gr.Row(visible=False):
json_input = gr.JSON(value={}); generic_output = gr.JSON(); dummy_btn = gr.Button()
dummy_btn.click(fn=create_entities, inputs=json_input, outputs=generic_output, api_name="create_entities")
dummy_btn.click(fn=create_relations, inputs=json_input, outputs=generic_output, api_name="create_relations")
dummy_btn.click(fn=add_observations, inputs=json_input, outputs=generic_output, api_name="add_observations")
dummy_btn.click(fn=delete_entities, inputs=json_input, outputs=generic_output, api_name="delete_entities")
dummy_btn.click(fn=delete_observations, inputs=json_input, outputs=generic_output, api_name="delete_observations")
dummy_btn.click(fn=delete_relations, inputs=json_input, outputs=generic_output, api_name="delete_relations")
dummy_btn.click(fn=read_graph, inputs=None, outputs=generic_output, api_name="read_graph")
dummy_btn.click(fn=search_nodes, inputs=json_input, outputs=generic_output, api_name="search_nodes")
dummy_btn.click(fn=open_nodes, inputs=json_input, outputs=generic_output, api_name="open_nodes")
if __name__ == "__main__":
app.launch(mcp_server=True)