Spaces:
Running
on
Zero
Running
on
Zero
| import os | |
| import time | |
| # import shutil | |
| import numpy as np | |
| import networkx as nx | |
| from textwrap import dedent | |
| from dotenv import load_dotenv | |
| from openai import AzureOpenAI | |
| from huggingface_hub import InferenceClient | |
| from lightrag import LightRAG | |
| from lightrag.utils import EmbeddingFunc | |
| from lightrag.kg.shared_storage import initialize_pipeline_status | |
| load_dotenv() | |
| # Load the environment variables | |
| HF_API_TOKEN = os.environ["HF_TOKEN"] | |
| HF_API_ENDPOINT = os.environ["HF_API_ENDPOINT"] | |
| AZURE_OPENAI_API_VERSION = os.environ["AZURE_OPENAI_API_VERSION"] | |
| AZURE_OPENAI_DEPLOYMENT = os.environ["AZURE_OPENAI_DEPLOYMENT"] | |
| AZURE_OPENAI_API_KEY = os.environ["AZURE_OPENAI_API_KEY"] | |
| AZURE_OPENAI_ENDPOINT = os.environ["AZURE_OPENAI_ENDPOINT"] | |
| AZURE_EMBEDDING_DEPLOYMENT = os.environ["AZURE_EMBEDDING_DEPLOYMENT"] | |
| AZURE_EMBEDDING_API_VERSION = os.environ["AZURE_EMBEDDING_API_VERSION"] | |
| WORKING_DIR = "./sample" | |
| GRAPHML_FILE = WORKING_DIR + "/graph_chunk_entity_relation.graphml" | |
| MODEL_LIST = [ | |
| "EmergentMethods/Phi-3-mini-128k-instruct-graph", | |
| "OpenAI/GPT-4.1-mini", | |
| ] | |
| class LLMGraph: | |
| """ | |
| A class to interact with LLMs for knowledge graph extraction. | |
| """ | |
| async def initialize_rag(self, embedding_dimension=3072): | |
| """ | |
| Initialize the LightRAG instance with the specified embedding dimension. | |
| """ | |
| if self.rag is None: | |
| self.rag = LightRAG( | |
| working_dir=WORKING_DIR, | |
| llm_model_func=self._llm_model_func, | |
| embedding_func=EmbeddingFunc( | |
| embedding_dim=embedding_dimension, | |
| max_token_size=8192, | |
| func=self._embedding_func, | |
| ), | |
| ) | |
| await self.rag.initialize_storages() | |
| await initialize_pipeline_status() | |
| # async def test_responses(self): | |
| # """ | |
| # Test the LLM and embedding functions. | |
| # """ | |
| # result = await self._llm_model_func("How are you?") | |
| # print("Response from llm_model_func: ", result) | |
| # result = await self._embedding_func(["How are you?"]) | |
| # print("Result of embedding_func: ", result.shape) | |
| # print("Dimension of embedding: ", result.shape[1]) | |
| # return True | |
| def __init__(self): | |
| """ | |
| Initialize the Phi3InstructGraph with a specified model. | |
| """ | |
| # Hugging Face Inference API for Phi-3-mini-128k-instruct-graph | |
| self.hf_client = InferenceClient( | |
| model=HF_API_ENDPOINT, | |
| token=HF_API_TOKEN | |
| ) | |
| self.rag = None # Lazy loading of RAG instance | |
| def _generate(self, messages): | |
| """ | |
| Generate a response from the model based on the provided messages. | |
| """ | |
| # Use the chat_completion method | |
| response = self.hf_client.chat_completion( | |
| messages=messages, | |
| max_tokens=1024, | |
| ) | |
| # Access the generated text | |
| generated_text = response.choices[0].message.content | |
| return generated_text | |
| def _get_messages(self, text): | |
| """ | |
| Construct the message list for the chat model. | |
| """ | |
| context = dedent("""\n | |
| A chat between a curious user and an artificial intelligence Assistant. The Assistant is an expert at identifying entities and relationships in text. The Assistant responds in JSON output only. | |
| The User provides text in the format: | |
| -------Text begin------- | |
| <User provided text> | |
| -------Text end------- | |
| The Assistant follows the following steps before replying to the User: | |
| 1. **identify the most important entities** The Assistant identifies the most important entities in the text. These entities are listed in the JSON output under the key "nodes", they follow the structure of a list of dictionaries where each dict is: | |
| "nodes":[{"id": <entity N>, "type": <type>, "detailed_type": <detailed type>}, ...] | |
| where "type": <type> is a broad categorization of the entity. "detailed type": <detailed_type> is a very descriptive categorization of the entity. | |
| 2. **determine relationships** The Assistant uses the text between -------Text begin------- and -------Text end------- to determine the relationships between the entities identified in the "nodes" list defined above. These relationships are called "edges" and they follow the structure of: | |
| "edges":[{"from": <entity 1>, "to": <entity 2>, "label": <relationship>}, ...] | |
| The <entity N> must correspond to the "id" of an entity in the "nodes" list. | |
| The Assistant never repeats the same node twice. The Assistant never repeats the same edge twice. | |
| The Assistant responds to the User in JSON only, according to the following JSON schema: | |
| {"type":"object","properties":{"nodes":{"type":"array","items":{"type":"object","properties":{"id":{"type":"string"},"type":{"type":"string"},"detailed_type":{"type":"string"}},"required":["id","type","detailed_type"],"additionalProperties":false}},"edges":{"type":"array","items":{"type":"object","properties":{"from":{"type":"string"},"to":{"type":"string"},"label":{"type":"string"}},"required":["from","to","label"],"additionalProperties":false}}},"required":["nodes","edges"],"additionalProperties":false} | |
| """) | |
| user_message = dedent(f"""\n | |
| -------Text begin------- | |
| {text} | |
| -------Text end------- | |
| """) | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": context | |
| }, | |
| { | |
| "role": "user", | |
| "content": user_message | |
| } | |
| ] | |
| return messages | |
| def extract(self, text, model_name=MODEL_LIST[0]): | |
| """ | |
| Extract knowledge graph in structured format from text. | |
| """ | |
| if model_name == MODEL_LIST[0]: | |
| # Use Hugging Face Inference API with Phi-3-mini-128k-instruct-graph | |
| messages = self._get_messages(text) | |
| json_graph = self._generate(messages) | |
| return json_graph | |
| else: | |
| # Use LightRAG with Azure OpenAI | |
| self.rag.insert(text) # Insert the text into the RAG storage | |
| # Wait for GRAPHML_FILE to be created | |
| while not os.path.exists(GRAPHML_FILE): | |
| time.sleep(0.1) # Sleep for 100ms before checking again | |
| # Extract dict format of the knowledge graph | |
| G = nx.read_graphml(GRAPHML_FILE) | |
| # Convert the graph to node-link data format | |
| dict_graph = nx.node_link_data(G, edges="edges") | |
| return dict_graph | |
| async def _llm_model_func(self, prompt, system_prompt=None, history_messages=[], **kwargs) -> str: | |
| """ | |
| Call the Azure OpenAI chat completion endpoint with the given prompt and optional system prompt and history messages. | |
| """ | |
| llm_client = AzureOpenAI( | |
| api_key=AZURE_OPENAI_API_KEY, | |
| api_version=AZURE_OPENAI_API_VERSION, | |
| azure_endpoint=AZURE_OPENAI_ENDPOINT, | |
| ) | |
| messages = [] | |
| if system_prompt: | |
| messages.append({"role": "system", "content": system_prompt}) | |
| if history_messages: | |
| messages.extend(history_messages) | |
| messages.append({"role": "user", "content": prompt}) | |
| chat_completion = llm_client.chat.completions.create( | |
| model=AZURE_OPENAI_DEPLOYMENT, | |
| messages=messages, | |
| temperature=kwargs.get("temperature", 0), | |
| top_p=kwargs.get("top_p", 1), | |
| n=kwargs.get("n", 1), | |
| ) | |
| return chat_completion.choices[0].message.content | |
| async def _embedding_func(self, texts: list[str]) -> np.ndarray: | |
| """ | |
| Call the Azure OpenAI embeddings endpoint with the given texts. | |
| """ | |
| emb_client = AzureOpenAI( | |
| api_key=AZURE_OPENAI_API_KEY, | |
| api_version=AZURE_EMBEDDING_API_VERSION, | |
| azure_endpoint=AZURE_OPENAI_ENDPOINT, | |
| ) | |
| embedding = emb_client.embeddings.create(model=AZURE_EMBEDDING_DEPLOYMENT, input=texts) | |
| embeddings = [item.embedding for item in embedding.data] | |
| return np.array(embeddings) | |
| # if __name__ == "__main__": | |
| # # Initialize the LLMGraph model | |
| # model = LLMGraph() | |
| # asyncio.run(model.initialize_rag()) # Ensure RAG is initialized | |
| # print("LLMGraph model initialized.") | |