Spaces:
Running
on
Zero
Running
on
Zero
| import os | |
| import asyncio | |
| import numpy as np | |
| from textwrap import dedent | |
| from dotenv import load_dotenv | |
| from openai import AzureOpenAI | |
| from huggingface_hub import InferenceClient | |
| from lightrag import LightRAG | |
| from lightrag.utils import EmbeddingFunc | |
| from lightrag.kg.shared_storage import initialize_pipeline_status | |
| load_dotenv() | |
| # Load the environment variables | |
| api_token = os.environ["HF_TOKEN"] | |
| endpoint_url = os.environ["HF_API_ENDPOINT"] | |
| AZURE_OPENAI_API_VERSION = os.environ["AZURE_OPENAI_API_VERSION"] | |
| AZURE_OPENAI_DEPLOYMENT = os.environ["AZURE_OPENAI_DEPLOYMENT"] | |
| AZURE_OPENAI_API_KEY = os.environ["AZURE_OPENAI_API_KEY"] | |
| AZURE_OPENAI_ENDPOINT = os.environ["AZURE_OPENAI_ENDPOINT"] | |
| AZURE_EMBEDDING_DEPLOYMENT = os.environ["AZURE_EMBEDDING_DEPLOYMENT"] | |
| AZURE_EMBEDDING_API_VERSION = os.environ["AZURE_EMBEDDING_API_VERSION"] | |
| WORKING_DIR = "./cache" | |
| MODEL_LIST = [ | |
| "OpenAI/GPT-4.1-mini", | |
| "EmergentMethods/Phi-3-mini-128k-instruct-graph", | |
| ] | |
| class LLMGraph: | |
| """ | |
| A class to interact with LLMs for knowledge graph extraction. | |
| """ | |
| async def _initialize_rag(self, embedding_dimension=3072): | |
| """ | |
| Initialize the LightRAG instance with the specified embedding dimension. | |
| """ | |
| rag = LightRAG( | |
| working_dir=WORKING_DIR, | |
| llm_model_func=self._llm_model_func, | |
| embedding_func=EmbeddingFunc( | |
| embedding_dim=embedding_dimension, | |
| max_token_size=8192, | |
| func=self._embedding_func, | |
| ), | |
| ) | |
| await rag.initialize_storages() | |
| await initialize_pipeline_status() | |
| return rag | |
| async def _get_rag(self): | |
| """ | |
| Get or initialize the RAG instance (lazy loading). | |
| """ | |
| if self.rag is None: | |
| self.rag = await self._initialize_rag() | |
| return self.rag | |
| def __init__(self, model="OpenAI/GPT-4.1-mini"): | |
| """ | |
| Initialize the Phi3InstructGraph with a specified model. | |
| """ | |
| if model not in MODEL_LIST: | |
| raise ValueError(f"Model must be one of {MODEL_LIST}") | |
| self.model_name = model | |
| if model == MODEL_LIST[0]: | |
| # Use Azure OpenAI for GPT-4.1-mini | |
| self.llm_client = AzureOpenAI( | |
| api_key=AZURE_OPENAI_API_KEY, | |
| api_version=AZURE_OPENAI_API_VERSION, | |
| azure_endpoint=AZURE_OPENAI_ENDPOINT, | |
| ) | |
| self.emb_client = AzureOpenAI( | |
| api_key=AZURE_OPENAI_API_KEY, | |
| api_version=AZURE_EMBEDDING_API_VERSION, | |
| azure_endpoint=AZURE_OPENAI_ENDPOINT, | |
| ) | |
| self.rag = None # Initialize as None for lazy loading | |
| else: | |
| # Use Hugging Face Inference API for Phi-3-mini-128k-instruct-graph | |
| self.hf_client = InferenceClient( | |
| model=endpoint_url, | |
| token=api_token | |
| ) | |
| def _generate(self, messages): | |
| """ | |
| Generate a response from the model based on the provided messages. | |
| """ | |
| # Use the chat_completion method | |
| response = self.hf_client.chat_completion( | |
| messages=messages, | |
| max_tokens=1024, | |
| ) | |
| # Access the generated text | |
| generated_text = response.choices[0].message.content | |
| return generated_text | |
| def _get_messages(self, text): | |
| """ | |
| Construct the message list for the chat model. | |
| """ | |
| context = dedent("""\n | |
| A chat between a curious user and an artificial intelligence Assistant. The Assistant is an expert at identifying entities and relationships in text. The Assistant responds in JSON output only. | |
| The User provides text in the format: | |
| -------Text begin------- | |
| <User provided text> | |
| -------Text end------- | |
| The Assistant follows the following steps before replying to the User: | |
| 1. **identify the most important entities** The Assistant identifies the most important entities in the text. These entities are listed in the JSON output under the key "nodes", they follow the structure of a list of dictionaries where each dict is: | |
| "nodes":[{"id": <entity N>, "type": <type>, "detailed_type": <detailed type>}, ...] | |
| where "type": <type> is a broad categorization of the entity. "detailed type": <detailed_type> is a very descriptive categorization of the entity. | |
| 2. **determine relationships** The Assistant uses the text between -------Text begin------- and -------Text end------- to determine the relationships between the entities identified in the "nodes" list defined above. These relationships are called "edges" and they follow the structure of: | |
| "edges":[{"from": <entity 1>, "to": <entity 2>, "label": <relationship>}, ...] | |
| The <entity N> must correspond to the "id" of an entity in the "nodes" list. | |
| The Assistant never repeats the same node twice. The Assistant never repeats the same edge twice. | |
| The Assistant responds to the User in JSON only, according to the following JSON schema: | |
| {"type":"object","properties":{"nodes":{"type":"array","items":{"type":"object","properties":{"id":{"type":"string"},"type":{"type":"string"},"detailed_type":{"type":"string"}},"required":["id","type","detailed_type"],"additionalProperties":false}},"edges":{"type":"array","items":{"type":"object","properties":{"from":{"type":"string"},"to":{"type":"string"},"label":{"type":"string"}},"required":["from","to","label"],"additionalProperties":false}}},"required":["nodes","edges"],"additionalProperties":false} | |
| """) | |
| user_message = dedent(f"""\n | |
| -------Text begin------- | |
| {text} | |
| -------Text end------- | |
| """) | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": context | |
| }, | |
| { | |
| "role": "user", | |
| "content": user_message | |
| } | |
| ] | |
| return messages | |
| async def extract(self, text): | |
| """ | |
| Extract knowledge graph from text | |
| """ | |
| generated_text = "" | |
| if self.model_name == MODEL_LIST[0]: | |
| # Use LightRAG with Azure OpenAI | |
| rag = await self._get_rag() | |
| rag.insert(text) | |
| else: | |
| # Use Hugging Face Inference API with Phi-3-mini-128k-instruct-graph | |
| messages = self._get_messages(text) | |
| generated_text = self._generate(messages) | |
| return generated_text | |
| async def _llm_model_func(self, prompt, system_prompt=None, history_messages=[], **kwargs) -> str: | |
| """ | |
| Call the Azure OpenAI chat completion endpoint with the given prompt and optional system prompt and history messages. | |
| """ | |
| messages = [] | |
| if system_prompt: | |
| messages.append({"role": "system", "content": system_prompt}) | |
| if history_messages: | |
| messages.extend(history_messages) | |
| messages.append({"role": "user", "content": prompt}) | |
| chat_completion = self.llm_client.chat.completions.create( | |
| model=AZURE_OPENAI_DEPLOYMENT, | |
| messages=messages, | |
| temperature=kwargs.get("temperature", 0), | |
| top_p=kwargs.get("top_p", 1), | |
| n=kwargs.get("n", 1), | |
| ) | |
| return chat_completion.choices[0].message.content | |
| async def _embedding_func(self, texts: list[str]) -> np.ndarray: | |
| """ | |
| Call the Azure OpenAI embeddings endpoint with the given texts. | |
| """ | |
| embedding = self.emb_client.embeddings.create(model=AZURE_EMBEDDING_DEPLOYMENT, input=texts) | |
| embeddings = [item.embedding for item in embedding.data] | |
| return np.array(embeddings) | |